Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Puede cargar datos desde cualquier origen de datos compatible con Apache Spark en Azure Databricks mediante canalizaciones. Puede definir conjuntos de datos (tablas y vistas) en las canalizaciones declarativas de Spark de Lakeflow en cualquier consulta que devuelva un DataFrame de Spark, incluidos los DataFrames de streaming y el uso de Pandas para DataFrames de Spark. Para las tareas de ingesta de datos, Databricks recomienda usar tablas de streaming para la mayoría de los casos de uso. Las tablas de streaming son adecuadas para ingerir datos del almacenamiento de objetos en la nube mediante el autocargador o desde los buses de mensajes como Kafka.
Nota:
- No todos los orígenes de datos tienen compatibilidad con SQL para la ingesta. Puede mezclar orígenes de SQL y Python en canalizaciones para usar Python donde sea necesario y SQL para otras operaciones en la misma canalización.
- Para más información sobre cómo trabajar con bibliotecas no empaquetadas en Canalizaciones declarativas de Spark de Lakeflow de forma predeterminada, consulte Administración de dependencias de Python para canalizaciones.
- Para obtener información general sobre la ingesta en Azure Databricks, consulte Conectores estándar en Lakeflow Connect.
En los ejemplos siguientes se muestran algunos patrones comunes.
Cargar desde una tabla existente
Cargue datos de cualquier tabla existente en Azure Databricks. Puede transformar los datos mediante una consulta o cargar la tabla para su posterior procesamiento en la canalización.
En el ejemplo siguiente se leen los datos de una tabla existente:
Pitón
@dp.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)
SQL
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
Carga de archivos desde el almacenamiento de objetos en la nube
Databricks recomienda usar Auto Loader en canalizaciones para la mayoría de las tareas de ingesta de datos desde el almacenamiento de objetos en la nube o desde archivos de un volumen de Catálogo de Unity. El cargador automático y las canalizaciones están diseñados para cargar datos cada vez mayores de forma incremental e idempotente a medida que llegan al almacenamiento en la nube.
Consulte ¿Qué es el autocargador? y Carga de datos desde de almacenamiento de objetos.
En el ejemplo siguiente se leen los datos del almacenamiento en la nube mediante el cargador automático:
Pitón
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
)
SQL
CREATE OR REFRESH STREAMING TABLE sales
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
format => "json"
);
En los ejemplos siguientes se usa Auto Loader para crear conjuntos de datos a partir de archivos CSV en un volumen de catálogo de Unity:
Pitón
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/retail_org/customers/",
format => "csv"
)
Nota:
- Si utiliza el autocargador con notificaciones de archivos y ejecuta una actualización completa para su canalización o tabla de streaming, deberá limpiar manualmente sus recursos. Puede utilizar el CloudFilesResourceManager en un bloc de notas para realizar la limpieza.
- Para cargar archivos con el autocargador en una canalización habilitada para el catálogo de Unity, debe usar ubicaciones externas. Para más información sobre el uso del catálogo de Unity con canalizaciones, consulte Uso del catálogo de Unity con canalizaciones.
Carga de datos desde un bus de mensajes
Puede configurar canalizaciones para ingerir datos de buses de mensajes. Databricks recomienda usar tablas de streaming con ejecución continua y escalado automático mejorado para proporcionar la ingesta más eficaz para la carga de baja latencia desde los buses de mensajes. Consulte Optimización del uso del clúster de canalizaciones declarativas de Spark de Lakeflow con escalado automático.
Por ejemplo, el código siguiente configura una tabla de streaming para ingerir datos de Kafka mediante la función read_kafka :
Pitón
from pyspark import pipelines as dp
@dp.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_raw AS
SELECT *
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'topic1'
);
Para ingerir desde otras fuentes de buses de mensajes, consulte:
- Kinesis: read_kinesis
- Tema pub/sub: read_pubsub
- Pulsar: read_pulsar
Carga de datos desde Azure Event Hubs
Azure Event Hubs es un servicio de streaming de datos que proporciona una interfaz compatible con Apache Kafka. Puede usar el conector de Kafka de Structured Streaming, incluido en el entorno de ejecución de canalizaciones declarativas de Spark de Lakeflow, para cargar mensajes desde Azure Event Hubs. Para más información sobre cómo cargar y procesar mensajes desde Azure Event Hubs, consulte Uso de Azure Event Hubs como origen de datos de canalización.
Carga de datos desde sistemas externos
Las canalizaciones declarativas de Spark de Lakeflow admiten la carga de datos desde cualquier origen de datos compatible con Azure Databricks. Consulte Conexión a orígenes de datos y servicios externos. También puede cargar datos externos mediante Lakehouse Federation para orígenes de datos admitidos. Debido a que Lakehouse Federation requiere Databricks Runtime 13.3 LTS u otra versión posterior, para utilizar Lakehouse Federation, su canalización debe estar configurada para poder usar el canal de versión preliminar.
Algunas fuentes de datos no tienen un soporte equivalente en SQL. Si no puede usar Lakehouse Federation con uno de estos orígenes de datos, puede utilizar Python para ingerir datos del origen. Puede agregar archivos de origen de Python y SQL a la misma canalización. El siguiente ejemplo declara una vista materializada para acceder al estado actual de los datos en una tabla PostgreSQL remota:
import dp
@dp.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Carga de conjuntos de datos pequeños o estáticos desde el almacenamiento de objetos en la nube
Puede cargar conjuntos de datos pequeños o estáticos utilizando la sintaxis de carga de Apache Spark. Lakeflow Spark Declarative Pipelines admite todos los formatos de archivo compatibles con Apache Spark en Azure Databricks. Para obtener una lista completa, consulte Opciones de formato de datos.
En los ejemplos siguientes se muestra cómo cargar JSON para crear una tabla:
Pitón
@dp.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)
Nota:
La función SQL read_files es común a todos los entornos de SQL en Azure Databricks. Es el patrón recomendado para el acceso directo a archivos mediante SQL en canalizaciones. Para obtener más información, consulte Opciones.
Carga de datos desde un origen de datos personalizado de Python
Los orígenes de datos personalizados de Python permiten cargar datos en formatos personalizados. Puede escribir código para leer y escribir en un origen de datos externo específico o aprovechar el código de Python existente en los sistemas existentes para leer datos de sus propios sistemas internos. Para más información sobre el desarrollo de orígenes de datos de Python, consulte Orígenes de datos personalizados de PySpark.
Para usar un origen de datos personalizado de Python para cargar datos en una canalización, regístrelos con un nombre de formato, como my_custom_datasource, y luego lean de él:
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.
# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
return spark.read.format("my_custom_datasource").load()
# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
return spark.readStream.format("my_custom_datasource").load()
Configurar una tabla de streaming para omitir los cambios en una tabla de streaming de origen
Nota:
- La marca
skipChangeCommitssolo funciona conspark.readStreammediante el uso de la funciónoption(). No es posible usar esta marca en una funcióndp.read_stream(). - No se puede usar la
skipChangeCommitsmarca cuando la tabla de streaming de origen se define como destino de una función create_auto_cdc_flow().
De forma predeterminada, las tablas de streaming requieren orígenes de solo anexión. Cuando una tabla de streaming use otra tabla de streaming como origen y la tabla de streaming de origen requiera actualizaciones o eliminaciones (por ejemplo: el procesamiento del "derecho al olvido" del RGPD), la marca skipChangeCommits se establecerá al leer la tabla de streaming de origen para omitir esos cambios. Para obtener más información sobre esta marca, consulte Omitir actualizaciones y eliminaciones.
@dp.table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
Acceda de forma segura a las credenciales de almacenamiento mediante los secretos de una canalización
Puede utilizar los secretos de Azure Databricks para almacenar credenciales como claves de acceso o contraseñas. Para configurar el secreto de su canalización, use una propiedad Spark en la configuración del clúster de la configuración de la canalización. Consulte Configurar el cálculo clásico para canalizaciones.
En el ejemplo siguiente se usa un secreto para almacenar una clave de acceso necesaria para leer los datos de entrada de una cuenta de almacenamiento de Azure Data Lake Storage (ADLS) mediante el autocargador. Puede usar este mismo método para configurar cualquier secreto requerido por la canalización, por ejemplo, claves de AWS para acceder a S3 o la contraseña a un metastore de Apache Hive.
Para más información sobre cómo trabajar con Azure Data Lake Storage, consulte Conexión a Azure Data Lake Storage y Blob Storage.
Nota:
Debe agregar el prefijo spark.hadoop. a la clave de configuración spark_conf que establece el valor del secreto.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
}
}
],
"name": ":re[LDP] quickstart using ADLS2"
}
Replace
-
<storage-account-name>con el nombre de la cuenta de almacenamiento de ADLS. -
<scope-name>con el nombre del ámbito de secreto de Azure Databricks. -
<secret-name>con el nombre de la clave que contiene la clave de acceso de la cuenta de almacenamiento de Azure.
from pyspark import pipelines as dp
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Replace
-
<container-name>con el nombre del contenedor de la cuenta de almacenamiento de Azure que almacena los datos de entrada. -
<storage-account-name>con el nombre de la cuenta de almacenamiento de ADLS. -
<path-to-input-dataset>con la ruta de acceso al conjunto de datos de entrada.