Comenzar a usar COPY INTO para cargar datos.
El comando SQL COPY INTO
permite cargar datos desde una ubicación de archivos en una tabla Delta. Se trata de una operación que se puede volver a intentar e idempotente; se omiten los archivos de la ubicación de origen que ya se han cargado.
COPY INTO
ofrece las siguientes capacidades:
- Filtros de archivos o directorios configurables fácilmente desde el almacenamiento en la nube, incluidos los volúmenes S3, ADLS Gen2, ABFS, GCS y Unity Catalog.
- Compatibilidad con varios formatos de archivo de origen: CSV, JSON, XML, Avro, ORC, Parquet, texto y archivos binarios
- Procesamiento de archivos exactamente una vez (idempotente) de manera predeterminada
- Inferencia, asignación, combinación y evolución del esquema de tabla de destino
Nota:
Para obtener una experiencia de ingesta de archivos más escalable y sólida, Databricks recomienda que los usuarios de SQL aprovechen las tablas de streaming. Consulte Carga de datos mediante tablas de secuencia en Databricks SQL.
Advertencia
COPY INTO
respeta la configuración del área de trabajo para los vectores de eliminación. Si está habilitado, los vectores de eliminación se habilitan en la tabla de destino cuando COPY INTO
se ejecuta en un almacenamiento de SQL o en un proceso que ejecuta Databricks Runtime 14.0 o superior. Una vez habilitado, los vectores de eliminación bloquean las consultas en una tabla de Databricks Runtime 11.3 LTS y versiones posteriores. Consulte ¿Qué son los vectores de eliminación? y vectores de eliminación de habilitación automática.
Requisitos
Un administrador de cuenta debe seguir los pasos descritos en Configurar acceso a datos para la ingesta para configurar el acceso a los datos en el almacenamiento de objetos en la nube antes de que los usuarios puedan cargar datos mediante COPY INTO
.
Ejemplo: carga de datos en una tabla de Delta Lake sin esquema
Nota:
Esta característica está disponible en Databricks Runtime 11.3 LTS y versiones posteriores.
Puede crear tablas Delta vacías de marcador de posición para que el esquema se infiera más adelante durante un comando COPY INTO
estableciendo mergeSchema
en true
en COPY_OPTIONS
:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
La instrucción SQL anterior es idempotente y se puede programar para que se ejecute e ingiera datos una vez exactamente en una tabla Delta.
Nota:
La tabla Delta vacía no se puede usar fuera de COPY INTO
. INSERT INTO
y MERGE INTO
no se admiten para escribir datos en tablas Delta sin esquema. Una vez insertados los datos en la tabla con COPY INTO
, la tabla se puede consultar.
Consulte Creación de tablas de destino para COPY INTO.
Ejemplo: establecimiento del esquema y carga de datos en una tabla de Delta Lake
En el ejemplo siguiente se muestra cómo crear una tabla Delta y luego usar el comando SQL COPY INTO
para cargar datos de ejemplo de conjuntos de datos de Databricks en la tabla. Puede ejecutar el código de ejemplo de Python, R, Scala y SQL desde un cuaderno asociado a un clúster de Azure Databricks. También puede ejecutar el código SQL desde una consulta asociada a un almacén de SQL en Databricks SQL.
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
Para limpiar, ejecute el código siguiente, que elimina la tabla:
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
Limpieza de archivos de metadatos
Puede ejecutar VACUUM para limpiar archivos de metadatos sin referencia creados por COPY INTO
en Databricks Runtime 15.2 y versiones posteriores.
Referencia
- Databricks Runtime 7.x y versiones superiores: COPY INTO
Recursos adicionales
Carga de datos mediante COPY INTO con volúmenes o ubicaciones externas de Unity Catalog
Para ver patrones de uso comunes, incluidos ejemplos de varias operaciones de
COPY INTO
en la misma tabla Delta, consulte Patrones comunes de carga de datos mediante COPY INTO.