Compartir a través de


Carga de datos con Delta Live Tables

Puede cargar datos desde cualquier origen de datos compatible con Apache Spark en Azure Databricks mediante Delta Live Tables. Puede definir conjuntos de datos (tablas y vistas) en Delta Live Tables en cualquier consulta que devuelva un DataFrame de Spark, incluidos DataFrames de streaming y Pandas para DataFrames de Spark. Para las tareas de ingesta de datos, Databricks recomienda usar tablas de streaming para la mayoría de los casos de uso. Las tablas de streaming son adecuadas para ingerir datos del almacenamiento de objetos en la nube mediante el autocargador o desde los buses de mensajes como Kafka. En los ejemplos siguientes se muestran algunos patrones comunes.

Importante

No todas las fuentes de datos son compatibles con SQL. Puede mezclar cuadernos de SQL y Python en una canalización de Delta Live Tables para usar SQL en todas las operaciones más allá de la ingesta.

Para más información sobre cómo trabajar con bibliotecas no empaquetadas en Delta Live Tables de forma predeterminada, consulte Administración de dependencias de Python para canalizaciones de Delta Live Tables.

Carga de archivos desde el almacenamiento de objetos en la nube

Databricks recomienda utilizar Auto Loader con Delta Live Tables para la mayoría de las tareas de ingesta de datos desde el almacenamiento de objetos en la nube. Auto Loader y Delta Live Tables están diseñados para cargar de forma incremental e idempotente datos en constante crecimiento a medida que llegan al almacenamiento en la nube. En los ejemplos siguientes se usa el cargador automático para crear conjuntos de datos a partir de archivos CSV y JSON:

Nota:

Para cargar archivos con el cargador automático en una canalización habilitada para el catálogo de Unity, debe usar ubicaciones externas. Para obtener más información sobre el uso del catálogo de Unity con Delta Live Tables, consulte Uso del catálogo de Unity con las canalizaciones de Delta Live Tables.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")

Consulte ¿Qué es Auto Loader? y Sintaxis SQL del cargador automático.

Advertencia

Si utiliza Auto Loader con notificaciones de archivos y ejecuta una actualización completa para su canalización o tabla de streaming, deberá limpiar manualmente sus recursos. Puede utilizar el CloudFilesResourceManager en un bloc de notas para realizar la limpieza.

Carga de datos desde un bus de mensajes

Puede configurar canalizaciones de Delta Live Tables para ingerir datos de buses de mensajes con tablas de streaming. Databricks recomienda combinar las tablas de streaming con la ejecución continua y el autoescalado mejorado para proporcionar la ingesta más eficiente para la carga de baja latencia desde los buses de mensajes. Consulte Optimización del uso del clúster de canalizaciones de Delta Live Tables con escalado automático mejorado.

Por ejemplo, el siguiente código configura una tabla de streaming para ingerir datos de Kafka:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

Puede escribir operaciones posteriores en SQL puro para realizar transformaciones de flujo en estos datos, como en el siguiente ejemplo:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

Para ver un ejemplo de cómo trabajar con Event Hubs, consulte Uso de Azure Event Hubs como fuente de datos de Delta Live Tables.

Consulte Configuración de orígenes de datos de streaming.

Carga de datos desde sistemas externos

Delta Live Tables admite la carga de datos desde cualquier fuente de datos admitida por Azure Databricks. Consulte Conexión a orígenes de datos. También puede cargar datos externos utilizando Lakehouse Federation para fuentes de datos admitidos. Debido a que Lakehouse Federation requiere Databricks Runtime 13.3 LTS o superior, para utilizar Lakehouse Federation, su canalización debe estar configurada para utilizar el canal de versión preliminar.

Algunas fuentes de datos no tienen un soporte equivalente en SQL. Si no puede usar La federación de Lakehouse con uno de estos orígenes de datos, puede usar un cuaderno de Python para ingerir datos del origen. Puede agregar código fuente de Python y SQL a la misma canalización de Delta Live Tables. El siguiente ejemplo declara una vista materializada para acceder al estado actual de los datos en una tabla PostgreSQL remota:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Carga de conjuntos de datos pequeños o estáticos desde el almacenamiento de objetos en la nube

Puede cargar conjuntos de datos pequeños o estáticos utilizando la sintaxis de carga de Apache Spark. Delta Live Tables admite todos los formatos de archivo compatibles con Apache Spark en Azure Databricks. Para obtener una lista completa, consulte Opciones de formato de datos.

Los siguientes ejemplos muestran la carga de JSON para crear tablas Delta Live Tables:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Nota:

La SELECT * FROM format.`path`; construcción SQL es común a todos los entornos SQL en Azure Databricks. Es el patrón recomendado para el acceso directo a ficheros utilizando SQL con Delta Live Tables.

Acceda de forma segura a las credenciales de almacenamiento con secretos en una canalización

Puede utilizar los secretos de Azure Databricks para almacenar credenciales como claves de acceso o contraseñas. Para configurar el secreto en la canalización, use una propiedad Spark en la configuración del clúster de la configuración de la canalización. Consulte Configuración del proceso para una canalización de Delta Live Tables.

En el ejemplo siguiente se usa un secreto para almacenar una clave de acceso necesaria para leer los datos de entrada de una cuenta de almacenamiento de Azure Data Lake Storage Gen2 (ADLS Gen2) mediante el cargador automático. Puede usar este mismo método para configurar cualquier secreto requerido por la canalización, por ejemplo, claves de AWS para acceder a S3 o la contraseña a un metastore de Apache Hive.

Para obtener más información sobre cómo trabajar con Azure Data Lake Storage Gen2, consulte Conectarse a Azure Data Lake Storage Gen2 y Blob Storage.

Nota:

Debe agregar el prefijo spark.hadoop. a la clave de configuración spark_conf que establece el valor del secreto.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Replace

  • <storage-account-name> por el nombre de la cuenta de almacenamiento de ADLS Gen2.
  • <scope-name> con el nombre del ámbito de secreto de Azure Databricks.
  • <secret-name> con el nombre de la clave que contiene la clave de acceso de la cuenta de almacenamiento de Azure.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Replace

  • <container-name> con el nombre del contenedor de la cuenta de almacenamiento de Azure que almacena los datos de entrada.
  • <storage-account-name> por el nombre de la cuenta de almacenamiento de ADLS Gen2.
  • <path-to-input-dataset> con la ruta de acceso al conjunto de datos de entrada.

Carga de datos desde Azure Event Hubs

Azure Event Hubs es un servicio de streaming de datos que proporciona una interfaz compatible con Apache Kafka. Puede usar el conector de Kafka de Structured Streaming, incluido en el entorno de ejecución de Delta Live Tables, para cargar mensajes desde Azure Event Hubs. Para más información sobre cómo cargar y procesar mensajes desde Azure Event Hubs, consulte Uso de Azure Event Hubs como origen de datos delta Live Tables.