Tutorial: COPY INTO con Spark SQL

Databricks recomienda usar el COPY INTO comando para la carga incremental y masiva de datos para orígenes de datos que contienen miles de archivos.

En este tutorial, usará el comando COPY INTO para cargar datos JSON desde un volumen de Catálogo de Unity en una tabla Delta del área de trabajo de Azure Databricks. Se utiliza el conjunto de datos de ejemplo de Wanderbricks como origen de datos. Para obtener casos de uso de ingesta más avanzados, consulte ¿Qué es Auto Loader?.

Requisitos

Paso 1: Configuración del entorno

El código de este tutorial usa un volumen de Catálogo de Unity para almacenar archivos de código fuente JSON. Reemplace <catalog> por un catálogo en el que tenga los permisos CREATE SCHEMA y CREATE VOLUME. Si no puede ejecutar el código, póngase en contacto con el administrador del área de trabajo.

Cree un notebook y vincúlelo a un recurso de cómputo. A continuación, ejecute el código siguiente para configurar un esquema y un volumen para este tutorial.

Python

# Set parameters and reset demo environment

catalog = "<catalog>"

username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"

spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")

SQL

-- Reset demo environment

DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;

Paso 2: Escribe datos de ejemplo en el volumen como JSON

El COPY INTO comando carga datos de orígenes basados en archivos. Lea desde la tabla de ejemplo de Wanderbricksbookings y escriba un conjunto de registros como archivos JSON en el volumen, simulando la llegada de datos desde un sistema externo.

Python

# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")

SQL

Escribir archivos en un volumen requiere Python. En un flujo de trabajo real, estos datos llegarían desde un sistema externo.

%python
# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")

Paso 3: Use COPY INTO para cargar datos JSON idempotentemente

Cree una tabla Delta de destino antes de usar COPY INTO. No es necesario proporcionar nada más que un nombre de tabla dentro de la CREATE TABLE instrucción. Dado que esta acción es idempotente, Databricks carga los datos solo una vez, incluso si ejecuta el código varias veces.

Python

# Create target table and load data

spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")

spark.sql(f"""
  COPY INTO {catalog}.{schema}.bookings_target
  FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('mergeSchema' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true')
""")

SQL

-- Create target table and load data

CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;

COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')

Paso 4: Vista previa del contenido de la tabla

Compruebe que la tabla contiene 20 filas del primer lote de datos de reservas de Wanderbricks y que el esquema se infirió correctamente de los archivos de origen JSON.

Python

# Review loaded data

display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))

SQL

-- Review loaded data

SELECT * FROM <catalog>.copy_into_tutorial.bookings_target

Paso 5: Cargar más datos y obtener una vista previa de los resultados

Puede simular datos adicionales que llegan desde un sistema externo escribiendo otro lote de registros y ejecutándose COPY INTO de nuevo. Ejecute el código siguiente para escribir un segundo lote de datos.

Python

# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")

SQL

Escribir archivos en un volumen requiere Python. En un flujo de trabajo real, estos datos llegarían desde un sistema externo.

%python
# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")

A continuación, vuelva a ejecutar el COPY INTO comando del paso 3 y obtenga una vista previa de la tabla para confirmar los nuevos registros. Solo se cargan los nuevos archivos.

Python

# Confirm new data was loaded

display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))

SQL

-- Confirm new data was loaded

SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target

Paso 6: Tutorial de limpieza

Cuando haya terminado con este tutorial, puede limpiar los recursos asociados si ya no desea mantenerlos. Elimine el esquema, las tablas y el volumen, y borre todos los datos.

Python

# Drop schema and all associated objects

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")

SQL

-- Drop schema and all associated objects

DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;

Recursos adicionales