¿Qué es el modo de notificación de archivos de Auto Loader?
En el modo de notificación de archivos, el cargador automático configura automáticamente un servicio de notificación y un servicio de cola que se suscriben a eventos de archivo desde el directorio de entrada. Puede usar notificaciones de archivos para escalar Auto Loader para que ingiera millones de archivos por hora. En comparación con el modo de listas de directorios, el modo de notificación de archivos es más eficaz y escalable para directorios de entrada grandes o para un gran volumen de archivos, pero requiere permisos de nube adicionales.
Puede cambiar entre las notificaciones de archivo y la lista de directorios en cualquier momento y seguir manteniendo las garantías de que los datos se procesan exactamente una vez.
Nota:
El modo de notificación de archivos no es compatible con las cuentas de Azure Premium Storage porque las cuentas prémium no admiten Queue Storage.
Advertencia
El cambio de la ruta de origen para el cargador automático no es compatible con el modo de notificación de archivos. Si se usa el modo de notificación de archivos y se cambia la ruta de acceso, es posible que no pueda ingerir archivos que ya están presentes en el nuevo directorio en el momento de la actualización del directorio.
El modo de notificación de archivos solo se admite en procesos de usuario único.
Recursos en la nube usados en el modo de notificación de archivos de Auto Loader
Importante
Necesita permisos elevados para configurar automáticamente la infraestructura en la nube para el modo de notificación de archivos. Póngase en contacto con el administrador de la nube o el administrador del área de trabajo. Consulte:
El cargador automático puede configurar notificaciones de archivos automáticamente al establecer la opción cloudFiles.useNotifications
en true
y proporcionar los permisos para crear recursos en la nube. Además, es posible que tenga que proporcionar opciones adicionales para conceder autorización a Auto Loader para crear estos recursos.
En la tabla siguiente se resumen los recursos creados por el cargador automático.
Almacenamiento en la nube | Servicio de suscripción | Queue service | Prefijo * | Límite ** |
---|---|---|---|---|
AWS S3 | AWS SNS | AWS SQS | databricks-auto-ingest | 100 por cubo S3 |
ADLS Gen2 | Azure Event Grid | Azure Queue Storage | databricks | 500 por cuenta de almacenamiento |
GCS | Google Pub/Sub | Google Pub/Sub | databricks-auto-ingest | 100 por cubo GCS |
Azure Blob Storage | Azure Event Grid | Azure Queue Storage | databricks | 500 por cuenta de almacenamiento |
- El cargador automático nombra los recursos con este prefijo.
** Cuántas canalizaciones de notificación de archivos simultáneas se pueden iniciar
Si necesita ejecutar más del número limitado de canalizaciones de notificación de archivos para una cuenta de almacenamiento determinada, puede:
- Aproveche un servicio como AWS Lambda, Azure Functions o Google Cloud Functions para enviar notificaciones de una sola cola que escucha todo un contenedor o cubo en colas específicas del directorio.
Eventos de notificación de archivos
AWS S3 proporciona un evento ObjectCreated
cuando se carga un archivo en un cubo de S3, independientemente de si se cargó mediante una carga put o de varias partes.
ADLS Gen2 proporciona notificaciones de eventos diferentes para los archivos que aparecen en el contenedor de Gen2.
- Auto Loader escucha el evento
FlushWithClose
para procesar un archivo. - Los flujos del cargador automático admiten la
RenameFile
acción para detectar archivos. Las accionesRenameFile
requieren una solicitud de API al sistema de almacenamiento para obtener el tamaño del archivo renombrado. - Las secuencia de Auto Loader creadas con Databricks Runtime 9.0 y versiones posteriores admiten la acción
RenameDirectory
para detectar archivos. Las accionesRenameDirectory
requieren solicitudes de API al sistema de almacenamiento para enumerar los contenidos del directorio renombrado.
Google Cloud Storage proporciona un evento OBJECT_FINALIZE
cuando se carga un archivo, lo que incluye sobreescrituras y copias de archivos. Las cargas con errores no generan este evento.
Nota:
Los proveedores de nube no garantizan la entrega al 100 % de todos los eventos de archivo en condiciones muy poco frecuentes y no proporcionan un Acuerdo de Nivel de Servicio estricto sobre la latencia de los eventos de archivo. Databricks recomienda desencadenar reposiciones periódicas con el cargador automático mediante la opción cloudFiles.backfillInterval
para garantizar que todos los archivos se detectan dentro de un Acuerdo de Nivel de Servicio determinado si la integridad de los datos es un requisito. Desencadenar reposiciones periódicas no provoca duplicados.
Permisos necesarios para configurar la notificación de archivos para ADLS Gen2 y Azure Blob Storage
Debe tener permisos de lectura para el directorio de entrada. Consulte Azure Blob Storage.
Para usar el modo de notificación de archivos, debe proporcionar credenciales de autenticación para configurar y acceder a los servicios de notificación de eventos. Solo necesita una entidad de servicio para la autenticación.
Entidad de servicio: Uso de roles integrados de Azure
Cree una aplicación de Microsoft Entra ID (anteriormente Azure Active Directory) y una entidad de servicio en forma de Id. de cliente y secreto de cliente.
Asigne a esta aplicación los siguientes roles para la cuenta de almacenamiento en la que reside la ruta de acceso de entrada:
- Colaborador: este rol es para configurar recursos en la cuenta de almacenamiento, como colas y suscripciones de eventos.
- Colaborador de datos de la cola de Storage: este rol sirve para realizar operaciones de colas, como recuperar y eliminar mensajes de las colas. Este rol solo es necesario cuando se proporciona una entidad de servicio sin una cadena de conexión.
Asigne a esta aplicación el siguiente rol para el grupo de recursos relacionado:
- Colaborador de suscripción de eventos de Event Grid: este rol sirve para realizar operaciones de suscripción de Event Grid, como crear o enumerar suscripciones de eventos.
Para más información, consulte Asignación de roles de Azure mediante Azure Portal.
Entidad de servicio: Uso de un rol personalizado
Si le preocupan los permisos excesivos necesarios para los roles anteriores, puede crear un rol personalizado con al menos los permisos siguientes, que se indican a continuación en el formato JSON de rol de Azure:
"permissions": [ { "actions": [ "Microsoft.EventGrid/eventSubscriptions/write", "Microsoft.EventGrid/eventSubscriptions/read", "Microsoft.EventGrid/eventSubscriptions/delete", "Microsoft.EventGrid/locations/eventSubscriptions/read", "Microsoft.Storage/storageAccounts/read", "Microsoft.Storage/storageAccounts/write", "Microsoft.Storage/storageAccounts/queueServices/read", "Microsoft.Storage/storageAccounts/queueServices/write", "Microsoft.Storage/storageAccounts/queueServices/queues/write", "Microsoft.Storage/storageAccounts/queueServices/queues/read", "Microsoft.Storage/storageAccounts/queueServices/queues/delete" ], "notActions": [], "dataActions": [ "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action" ], "notDataActions": [] } ]
A continuación, puede asignar este rol personalizado a la aplicación.
Para más información, consulte Asignación de roles de Azure mediante Azure Portal.
Solución de problemas de errores comunes
Error:
java.lang.RuntimeException: Failed to create event grid subscription.
Si ve este mensaje de error al ejecutar Auto Loader por primera vez, la instancia de Event Grid no está registrada como proveedor de recursos en su suscripción de Azure. Para registrarla en Azure Portal:
- Vaya a su suscripción.
- Haga clic en Proveedores de recursos en la sección Configuración.
- Registre el proveedor
Microsoft.EventGrid
.
Error:
403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...
Si ve este mensaje de error al ejecutar Auto Loader por primera vez, asegúrese de que ha asignado el rol Colaborador a la entidad de servicio para Event Grid, así como para la cuenta de almacenamiento.
Permisos necesarios para configurar la notificación de archivos para AWS S3
Debe tener permisos de lectura para el directorio de entrada. Consulte los detalles de conexión S3 para obtener más información.
Para usar el modo de notificación de archivos, adjunte el siguiente documento de directiva JSON al usuario o rol IAM.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderSetup",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"s3:PutBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:CreateTopic",
"sns:TagResource",
"sns:Publish",
"sns:Subscribe",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:s3:::<bucket-name>",
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
},
{
"Sid": "DatabricksAutoLoaderList",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "*"
},
{
"Sid": "DatabricksAutoLoaderTeardown",
"Effect": "Allow",
"Action": [
"sns:Unsubscribe",
"sns:DeleteTopic",
"sqs:DeleteQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
}
]
}
donde:
<bucket-name>
: nombre del cubo S3 donde la secuencia leerá archivos, por ejemplo,auto-logs
. Puede usar*
como carácter comodín, por ejemplo,databricks-*-logs
. Para averiguar el cubo S3 subyacente de la ruta de acceso de DBFS, puede enumerar todos los puntos de montaje de DBFS en un cuaderno mediante la ejecución de%fs mounts
.<region>
: región de AWS donde reside el cubo S3, por ejemplo,us-west-2
. Si no desea especificar la región, use*
.<account-number>
: número de cuenta de AWS que posee el cubo S3, por ejemplo,123456789012
. Si no desea especificar el número de cuenta, use*
.
La cadena databricks-auto-ingest-*
de la especificación de SQS y SNS ARN es el prefijo de nombre que utiliza el origen cloudFiles
al crear los servicios SQS y SNS. Dado que Azure Databricks configura los servicios de notificación en la ejecución inicial de la secuencia, puede utilizar una directiva con permisos reducidos después de la ejecución inicial (por ejemplo, detener la secuencia y reiniciarla).
Nota:
La directiva anterior solo se refiere a los permisos necesarios para configurar los servicios de notificación de archivos, es decir, los servicios de notificación de cubos S3, SNS y SQS, y asume que ya tiene acceso de lectura al cubo de S3. Si necesita agregar permisos de solo lectura S3, agregue lo siguiente a la lista Action
en la instrucción DatabricksAutoLoaderSetup
del documento JSON:
s3:ListBucket
s3:GetObject
Permisos reducidos después de la configuración inicial
Los permisos de configuración de recursos descritos anteriormente solo son necesarios durante la ejecución inicial de la secuencia. Después de la primera ejecución, puede cambiar a la siguiente directiva IAM con permisos reducidos.
Importante
Con los permisos reducidos, no puede iniciar nuevas consultas de streaming ni volver a crear recursos en caso de errores (por ejemplo, la cola de SQS se ha eliminado accidentalmente); tampoco puede usar la API de administración de recursos en la nube para enumerar o eliminar recursos.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderUse",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:TagResource",
"sns:Publish",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:<queue-name>",
"arn:aws:sns:<region>:<account-number>:<topic-name>",
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::<bucket-name>/*"
]
},
{
"Sid": "DatabricksAutoLoaderListTopics",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "arn:aws:sns:<region>:<account-number>:*"
}
]
}
Permisos necesarios para configurar la notificación de archivos para GCS
Debe tener los permisos list
y get
en el cubo de GCS y en todos los objetos. Para obtener más información, consulte la documentación de Google sobre los permisos IAM.
Para usar el modo de notificación de archivos, debe agregar permisos para la cuenta de servicio GCS y para la cuenta que se usa para acceder a los recursos de Google Cloud Pub/Sub.
Agregue el rol Pub/Sub Publisher
a la cuenta de servicio de GCS. Esto permite que la cuenta publique mensajes de notificación de eventos desde los cubos de GCS en Google Cloud Pub/Sub.
En cuanto a la cuenta de servicio utilizada para los recursos de Google Cloud Pub/Sub, debe agregar los siguientes permisos:
pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update
Para ello, puede crear un rol personalizado IAM con estos permisos o asignar roles de GCP preexistentes para cubrir estos permisos.
Búsqueda de la cuenta de servicio de GCS
En la consola en la nube de Google para el proyecto correspondiente, vaya a Cloud Storage > Settings
.
La sección «Cuenta de servicio de almacenamiento en nube» contiene el correo electrónico de la cuenta de servicio de GCS.
Creación de un rol IAM personalizado de Google Cloud para el modo de notificación de archivos
En la consola de Google Cloud del proyecto correspondiente, vaya a IAM & Admin > Roles
. A continuación, cree un rol en la parte superior o actualice un rol existente. En la pantalla de creación o edición de roles, haga clic en Add Permissions
. A continuación, aparece un menú en el que puede agregar los permisos deseados al rol.
Configuración o administración manual de recursos de notificación de archivos
Los usuarios con privilegios pueden configurar o administrar manualmente los recursos de notificación de archivos.
- Configure los servicios de notificación de archivos manualmente a través del proveedor de nube y especifique manualmente el identificador de cola. Consulte Opciones de notificación de archivos para obtener más detalles.
- Use las API de Scala para crear o administrar los servicios de notificaciones y colas, como se muestra en el ejemplo siguiente:
Nota:
Debe tener los permisos adecuados para configurar o modificar la infraestructura en la nube. Consulte la documentación de permisos para Azure, S3 o GCS.
Python
# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds
# COMMAND ----------
#####################################
## Creating a ResourceManager in AWS
#####################################
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in Azure
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.connectionString", <connection-string>) \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("cloudFiles.tenantId", <tenant-id>) \
.option("cloudFiles.clientId", <service-principal-client-id>) \
.option("cloudFiles.clientSecret", <service-principal-client-secret>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)
# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Scala
/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////
import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
.newManager
.option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
.option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////
import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
.newManager
.option("cloudFiles.connectionString", <connection-string>)
.option("cloudFiles.resourceGroup", <resource-group>)
.option("cloudFiles.subscriptionId", <subscription-id>)
.option("cloudFiles.tenantId", <tenant-id>)
.option("cloudFiles.clientId", <service-principal-client-id>)
.option("cloudFiles.clientSecret", <service-principal-client-secret>)
.option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////
import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
.newManager
.option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
.create()
// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
// List notification services created by <AL>
val df = manager.listNotificationServices()
// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Use setUpNotificationServices(<resource-suffix>)
para crear una cola y una suscripción con el nombre <prefix>-<resource-suffix>
(el prefijo depende del sistema de almacenamiento resumido en Recursos de nube usados en el modo de notificación de archivos de Auto Loader. Si hay un recurso existente con el mismo nombre, Azure Databricks reutiliza el recurso ya existente en lugar de crear uno nuevo. Esta función devuelve un identificador de cola que se puede pasar al origen cloudFiles
mediante el identificador en Opciones de notificación de archivos. Esto permite que el usuario de origen cloudFiles
tenga menos permisos que el usuario que crea los recursos.
Proporcione la opción "path"
a newManager
solo si llama a setUpNotificationServices
; no es necesario para listNotificationServices
o tearDownNotificationServices
. Es el mismo valor path
que se usa al ejecutar una consulta de streaming.
La siguiente matriz indica qué métodos de API se admiten en qué Databricks Runtime para cada tipo de almacenamiento:
Almacenamiento en la nube | Configuración de API | API de lista | API de anulación |
---|---|---|---|
AWS S3 | Todas las versiones | Todas las versiones | Todas las versiones |
ADLS Gen2 | Todas las versiones | Todas las versiones | Todas las versiones |
GCS | Databricks Runtime 9.1 y versiones superiores | Databricks Runtime 9.1 y versiones superiores | Databricks Runtime 9.1 y versiones superiores |
Azure Blob Storage | Todas las versiones | Todas las versiones | Todas las versiones |
ADLS Gen1 | No compatible | No compatible | No compatible |