Che cos'è la modalità di notifica file del caricatore automatico?

In modalità di notifica file, il caricatore automatico configura automaticamente un servizio di notifica e un servizio di accodamento che sottoscrive gli eventi di file dalla directory di input. È possibile usare le notifiche dei file per ridimensionare il caricatore automatico per inserire milioni di file un'ora. Rispetto alla modalità elenco directory, la modalità di notifica dei file è più efficiente e scalabile per directory di input di grandi dimensioni o un volume elevato di file, ma richiede autorizzazioni cloud aggiuntive.

È possibile passare tra le notifiche dei file e l'elenco di directory in qualsiasi momento e mantenere comunque garanzie di elaborazione dei dati esattamente una volta.

Avviso

La modifica del percorso di origine per il caricatore automatico non è supportata per la modalità di notifica file. Se viene usata la modalità di notifica file e il percorso viene modificato, potrebbe non essere possibile inserire file già presenti nella nuova directory al momento dell'aggiornamento della directory.

Risorse cloud usate nella modalità di notifica file del caricatore automatico

Importante

Sono necessarie autorizzazioni elevate per configurare automaticamente l'infrastruttura cloud per la modalità di notifica file. Contattare l'amministratore del cloud o l'amministratore dell'area di lavoro. Vedere:

Il caricatore automatico può configurare automaticamente le notifiche dei file quando si imposta l'opzione cloudFiles.useNotifications su true e si forniscono le autorizzazioni necessarie per creare risorse cloud. Potrebbe anche essere necessario fornire opzioni aggiuntive per concedere l'autorizzazione del caricatore automatico per creare queste risorse.

La tabella seguente riepiloga le risorse create dal caricatore automatico.

Archiviazione cloud Servizio di sottoscrizione Servizio di accodamento Prefisso* Limite**
AWS S3 AWS SNS AWS SQS inserimento automatico di databricks 100 per bucket S3
ADLS Gen2 Griglia di eventi di Azure Archiviazione code di Azure databricks 500 per account di archiviazione
GCS Google Pub/Sub Google Pub/Sub inserimento automatico di databricks 100 per bucket GCS
Archiviazione BLOB di Azure Griglia di eventi di Azure Archiviazione code di Azure databricks 500 per account di archiviazione
  • Il caricatore automatico assegna un nome alle risorse con questo prefisso.

** Quante pipeline di notifica file simultanee possono essere avviate

Se è necessario eseguire più di un numero limitato di pipeline di notifica file per un determinato account di archiviazione, è possibile:

  • Sfruttare un servizio come AWS Lambda, Funzioni di Azure o Google Cloud Functions per visualizzare le notifiche da una singola coda in ascolto di un intero contenitore o bucket in code specifiche della directory.

Eventi di notifica dei file

AWS S3 fornisce un ObjectCreated evento quando un file viene caricato in un bucket S3 indipendentemente dal fatto che sia stato caricato da un caricamento in put o in più parti.

ADLS Gen2 fornisce notifiche di eventi diverse per i file visualizzati nel contenitore Gen2.

  • Il caricatore automatico è in ascolto dell'evento per l'elaborazione FlushWithClose di un file.
  • I flussi del caricatore automatico supportano l'azione per l'individuazione RenameFile dei file. RenameFile le azioni richiedono una richiesta API al sistema di archiviazione per ottenere le dimensioni del file rinominato.
  • Flussi del caricatore automatico creati con Databricks Runtime 9.0 e dopo aver supportato l'azione per l'individuazione RenameDirectory dei file. RenameDirectory le azioni richiedono richieste API al sistema di archiviazione per elencare il contenuto della directory rinominata.

Google Cloud Archiviazione fornisce un OBJECT_FINALIZE evento quando viene caricato un file, che include sovrascrizioni e copie di file. I caricamenti non riusciti non generano questo evento.

Nota

I provider di servizi cloud non garantiscono il 100% di recapito di tutti gli eventi di file in condizioni molto rare e non forniscono contratti di servizio rigorosi sulla latenza degli eventi di file. Databricks consiglia di attivare normali backfill con il caricatore automatico usando l'opzione cloudFiles.backfillInterval per garantire che tutti i file vengano individuati all'interno di un determinato contratto di servizio se il completamento dei dati è un requisito. L'attivazione di backfill regolari non causa duplicati.

Autorizzazioni necessarie per la configurazione della notifica file per ADLS Gen2 e Archiviazione BLOB di Azure

È necessario disporre delle autorizzazioni di lettura per la directory di input. Vedere Archiviazione BLOB di Azure.

Per usare la modalità di notifica dei file, è necessario fornire le credenziali di autenticazione per la configurazione e l'accesso ai servizi di notifica degli eventi. Per l'autenticazione è necessaria solo un'entità servizio.

  • Entità servizio: uso dei ruoli predefiniti di Azure

    Creare un'app e un'entità servizio Microsoft Entra ID (in precedenza Azure Active Directory) sotto forma di ID client e segreto client.

    Assegnare questa app ai ruoli seguenti all'account di archiviazione in cui risiede il percorso di input:

    • Collaboratore: questo ruolo è per la configurazione delle risorse nell'account di archiviazione, ad esempio code e sottoscrizioni di eventi.
    • Archiviazione Collaboratore ai dati della coda: questo ruolo è destinato all'esecuzione di operazioni di accodamento, ad esempio il recupero e l'eliminazione di messaggi dalle code. Questo ruolo è obbligatorio solo quando si fornisce un'entità servizio senza un stringa di connessione.

    Assegnare questa app al gruppo di risorse correlato:

    • Collaboratore EventGrid EventSubscription: questo ruolo è per l'esecuzione di operazioni di sottoscrizione di Griglia di eventi, ad esempio la creazione o l'elenco di sottoscrizioni di eventi.

    Per ulteriori informazioni, vedi Assegnare ruoli di Azure usando il portale di Azure.

  • Entità servizio : uso di un ruolo personalizzato

    Se si riguardano le autorizzazioni eccessive necessarie per i ruoli precedenti, è possibile creare un ruolo personalizzato con almeno le autorizzazioni seguenti, elencate di seguito in formato JSON del ruolo di Azure:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    È quindi possibile assegnare questo ruolo personalizzato all'app.

    Per ulteriori informazioni, vedi Assegnare ruoli di Azure usando il portale di Azure.

Autorizzazioni del caricatore automatico

Risoluzione dei problemi comuni

Errore:

java.lang.RuntimeException: Failed to create event grid subscription.

Se viene visualizzato questo messaggio di errore quando si esegue l'Autoloader per la prima volta, Griglia di eventi non viene registrata come provider di risorse nella sottoscrizione di Azure. Per registrare ciò nel portale di Azure:

  1. Vai alla tua sottoscrizione.
  2. Fare clic su Provider di risorse nella sezione Impostazioni.
  3. Registra il provider Microsoft.EventGrid.

Errore:

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

Se viene visualizzato questo messaggio di errore quando si esegue l'Autoloader per la prima volta, assicurarsi di avere assegnato il ruolo Collaboratore all'entità servizio per Griglia di eventi e all'account di archiviazione.

Autorizzazioni necessarie per la configurazione della notifica file per AWS S3

È necessario disporre delle autorizzazioni di lettura per la directory di input. Per altri dettagli, vedere Dettagli sulla connessione S3.

Per usare la modalità di notifica dei file, allegare il documento dei criteri JSON seguente all'utente o al ruolo IAM.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

dove:

  • <bucket-name>: il nome del bucket S3 in cui il flusso leggerà i file, auto-logsad esempio . È possibile usare * come carattere jolly, ad esempio databricks-*-logs. Per individuare il bucket S3 sottostante per il percorso DBFS, è possibile elencare tutti i punti di montaggio DBFS in un notebook eseguendo %fs mounts.
  • <region>: l'area AWS in cui risiede il bucket S3, ad esempio us-west-2. Se non si vuole specificare l'area, usare *.
  • <account-number>: numero di account AWS proprietario del bucket S3, 123456789012ad esempio . Se non si vuole specificare il numero di account, usare *.

La stringa databricks-auto-ingest-* nella specifica SQS e ARN SNS è il prefisso del nome usato dall'origine durante la cloudFiles creazione di servizi SQS e SNS. Poiché Azure Databricks configura i servizi di notifica nell'esecuzione iniziale del flusso, è possibile usare un criterio con autorizzazioni ridotte dopo l'esecuzione iniziale, ad esempio arrestare il flusso e quindi riavviarlo.

Nota

Il criterio precedente riguarda solo le autorizzazioni necessarie per configurare i servizi di notifica file, ovvero la notifica bucket S3, SNS e SQS e presuppone che l'accesso in lettura al bucket S3 sia già disponibile. Se è necessario aggiungere autorizzazioni di sola lettura S3, aggiungere quanto segue all'elenco nell'istruzione ActionDatabricksAutoLoaderSetup nel documento JSON:

  • s3:ListBucket
  • s3:GetObject

Autorizzazioni ridotte dopo l'installazione iniziale

Le autorizzazioni di installazione delle risorse descritte in precedenza sono necessarie solo durante l'esecuzione iniziale del flusso. Dopo la prima esecuzione, è possibile passare ai criteri IAM seguenti con autorizzazioni ridotte.

Importante

Con le autorizzazioni ridotte, non è possibile avviare nuove query di streaming o ricreare risorse in caso di errori( ad esempio, la coda SQS è stata eliminata accidentalmente); non è anche possibile usare l'API di gestione delle risorse cloud per elencare o eliminare le risorse.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

Autorizzazioni necessarie per la configurazione della notifica file per GCS

È necessario disporre list di autorizzazioni e get per il bucket GCS e per tutti gli oggetti. Per informazioni dettagliate, vedere la documentazione di Google sulle autorizzazioni IAM.

Per usare la modalità di notifica file, è necessario aggiungere autorizzazioni per l'account del servizio GCS e l'account usato per accedere alle risorse Google Cloud Pub/Sub.

Aggiungere il Pub/Sub Publisher ruolo all'account del servizio GCS. Ciò consente all'account di pubblicare messaggi di notifica degli eventi dai bucket GCS a Google Cloud Pub/Sub.

Per quanto riguarda l'account del servizio usato per le risorse Google Cloud Pub/Sub, è necessario aggiungere le autorizzazioni seguenti:

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

A tale scopo, è possibile creare un ruolo personalizzato IAM con queste autorizzazioni o assegnare ruoli GCP preesistenti per coprire queste autorizzazioni.

Ricerca dell'account del servizio GCS

In Google Cloud Console per il progetto corrispondente passare a Cloud Storage > Settings. La sezione "Cloud Archiviazione Service Account" contiene l'indirizzo di posta elettronica dell'account del servizio GCS.

Account del servizio GCS

Creazione di un ruolo IAM di Google Cloud personalizzato per la modalità di notifica file

Nella console di Google Cloud per il progetto corrispondente passare a IAM & Admin > Roles. Creare quindi un ruolo nella parte superiore o aggiornare un ruolo esistente. Nella schermata relativa alla creazione o alla modifica del ruolo fare clic su Add Permissions. Viene visualizzato un menu in cui è possibile aggiungere le autorizzazioni desiderate al ruolo.

Ruoli personalizzati di GCP IAM

Configurare o gestire manualmente le risorse di notifica file

Gli utenti con privilegi possono configurare o gestire manualmente le risorse di notifica dei file.

  • Configurare manualmente i servizi di notifica file tramite il provider di servizi cloud e specificare manualmente l'identificatore della coda. Per altri dettagli, vedere Opzioni di notifica file.
  • Usare le API Scala per creare o gestire le notifiche e i servizi di accodamento, come illustrato nell'esempio seguente:

Nota

È necessario disporre delle autorizzazioni appropriate per configurare o modificare l'infrastruttura cloud. Vedere la documentazione relativa alle autorizzazioni per Azure, S3 o GCS.

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices())

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Usare setUpNotificationServices(<resource-suffix>) per creare una coda e una sottoscrizione con il nome <prefix>-<resource-suffix> (il prefisso dipende dal sistema di archiviazione riepilogato nelle risorse cloud usate nella modalità di notifica del file del caricatore automatico. Se è presente una risorsa esistente con lo stesso nome, Azure Databricks riutilizza la risorsa esistente anziché crearne una nuova. Questa funzione restituisce un identificatore della coda che è possibile passare all'origine cloudFiles usando l'identificatore nelle opzioni di notifica file. Ciò consente all'utente cloudFiles di origine di avere meno autorizzazioni rispetto all'utente che crea le risorse.

Specificare l'opzione "path" solo newManager se si chiama setUpNotificationServices; non è necessario per listNotificationServices o tearDownNotificationServices. Questa operazione è la stessa path usata durante l'esecuzione di una query di streaming.

La matrice seguente indica i metodi API supportati in cui Databricks Runtime per ogni tipo di archiviazione:

Archiviazione cloud API di installazione API Elenco API di disinstallazione
AWS S3 Tutte le versioni Tutte le versioni Tutte le versioni
ADLS Gen2 Tutte le versioni Tutte le versioni Tutte le versioni
GCS Databricks Runtime 9.1 e versioni successive Databricks Runtime 9.1 e versioni successive Databricks Runtime 9.1 e versioni successive
Archiviazione BLOB di Azure Tutte le versioni Tutte le versioni Tutte le versioni
ADLS Gen1 Non supportato Non supportato Non supportato