Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Databricks recomienda usar el COPY INTO comando para la carga incremental y masiva de datos para orígenes de datos que contienen miles de archivos.
En este tutorial, usará el comando COPY INTO para cargar datos JSON desde un volumen de Catálogo de Unity en una tabla Delta del área de trabajo de Azure Databricks. Se utiliza el conjunto de datos de ejemplo de Wanderbricks como origen de datos. Para obtener casos de uso de ingesta más avanzados, consulte ¿Qué es Auto Loader?.
Requisitos
- Acceso a un recurso informático. Consulte Computación.
- Un área de trabajo habilitada para catálogos de Unity con permisos para crear esquemas y volúmenes en un catálogo. Consulte Conexión al almacenamiento de objetos en la nube mediante el catálogo de Unity.
Paso 1: Configuración del entorno
El código de este tutorial usa un volumen de Catálogo de Unity para almacenar archivos de código fuente JSON. Reemplace <catalog> por un catálogo en el que tenga los permisos CREATE SCHEMA y CREATE VOLUME. Si no puede ejecutar el código, póngase en contacto con el administrador del área de trabajo.
Cree un notebook y vincúlelo a un recurso de cómputo. A continuación, ejecute el código siguiente para configurar un esquema y un volumen para este tutorial.
Python
# Set parameters and reset demo environment
catalog = "<catalog>"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"
spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")
SQL
-- Reset demo environment
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;
Paso 2: Escribe datos de ejemplo en el volumen como JSON
El COPY INTO comando carga datos de orígenes basados en archivos. Lea desde la tabla de ejemplo de Wanderbricksbookings y escriba un conjunto de registros como archivos JSON en el volumen, simulando la llegada de datos desde un sistema externo.
Python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")
SQL
Escribir archivos en un volumen requiere Python. En un flujo de trabajo real, estos datos llegarían desde un sistema externo.
%python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
Paso 3: Use COPY INTO para cargar datos JSON idempotentemente
Cree una tabla Delta de destino antes de usar COPY INTO. No es necesario proporcionar nada más que un nombre de tabla dentro de la CREATE TABLE instrucción. Dado que esta acción es idempotente, Databricks carga los datos solo una vez, incluso si ejecuta el código varias veces.
Python
# Create target table and load data
spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")
spark.sql(f"""
COPY INTO {catalog}.{schema}.bookings_target
FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
""")
SQL
-- Create target table and load data
CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;
COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
Paso 4: Vista previa del contenido de la tabla
Compruebe que la tabla contiene 20 filas del primer lote de datos de reservas de Wanderbricks y que el esquema se infirió correctamente de los archivos de origen JSON.
Python
# Review loaded data
display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))
SQL
-- Review loaded data
SELECT * FROM <catalog>.copy_into_tutorial.bookings_target
Paso 5: Cargar más datos y obtener una vista previa de los resultados
Puede simular datos adicionales que llegan desde un sistema externo escribiendo otro lote de registros y ejecutándose COPY INTO de nuevo. Ejecute el código siguiente para escribir un segundo lote de datos.
Python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")
SQL
Escribir archivos en un volumen requiere Python. En un flujo de trabajo real, estos datos llegarían desde un sistema externo.
%python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
A continuación, vuelva a ejecutar el COPY INTO comando del paso 3 y obtenga una vista previa de la tabla para confirmar los nuevos registros. Solo se cargan los nuevos archivos.
Python
# Confirm new data was loaded
display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))
SQL
-- Confirm new data was loaded
SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target
Paso 6: Tutorial de limpieza
Cuando haya terminado con este tutorial, puede limpiar los recursos asociados si ya no desea mantenerlos. Elimine el esquema, las tablas y el volumen, y borre todos los datos.
Python
# Drop schema and all associated objects
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
SQL
-- Drop schema and all associated objects
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;