Partilhar via


O que é o modo de notificação de arquivo Auto Loader?

No modo de notificação de ficheiros, o Carregador Automático configura automaticamente um serviço de notificação e um serviço de fila que subscreve eventos de ficheiros a partir do diretório de entrada. Você pode usar notificações de arquivo para dimensionar o Auto Loader para ingerir milhões de arquivos por hora. Quando comparado ao modo de listagem de diretórios, o modo de notificação de arquivos é mais eficiente e escalável para grandes diretórios de entrada ou um grande volume de arquivos, mas requer permissões de nuvem adicionais.

Você pode alternar entre notificações de arquivos e listas de diretórios a qualquer momento e ainda manter garantias de processamento de dados exatamente uma vez.

Nota

O modo de notificação de ficheiros não é suportado para contas de armazenamento premium do Azure porque as contas premium não suportam armazenamento em filas.

Aviso

A alteração do caminho de origem do Auto Loader não é suportada no modo de notificação de ficheiros. Se o modo de notificação de ficheiro for utilizado e o caminho for alterado, podem ocorrer falhas na ingestão de ficheiros já existentes no novo diretório aquando da atualização do diretório.

Recursos na nuvem usados no modo de notificação de arquivos do Auto Loader

Importante

Você precisa de permissões elevadas para configurar automaticamente a infraestrutura de nuvem para o modo de notificação de arquivos. Entre em contato com o administrador da nuvem ou o administrador do espaço de trabalho. Veja:

O Auto Loader pode configurar notificações de arquivo automaticamente quando você define a opção cloudFiles.useNotifications e true fornece as permissões necessárias para criar recursos de nuvem. Além disso, talvez seja necessário fornecer opções adicionais para conceder autorização ao Auto Loader para criar esses recursos.

A tabela a seguir resume quais recursos são criados pelo Auto Loader.

Armazenamento na Nuvem Serviço de Subscrição Serviço Fila Prefixo * Limite **
AWS S3 AWS SNS AWS SQS databricks-auto-ingest 100 por balde S3
ADLS Gen2 Grelha de Eventos do Azure Armazenamento de Filas do Azure databricks 500 por conta de armazenamento
GCS Google Pub/Sub Google Pub/Sub databricks-auto-ingest 100 por balde GCS
Armazenamento de Blobs do Azure Grelha de Eventos do Azure Armazenamento de Filas do Azure databricks 500 por conta de armazenamento
  • Auto Loader nomeia os recursos com este prefixo.

** Quantos pipelines de notificação de arquivo simultâneos podem ser iniciados

Se você precisar executar mais do que o número limitado de pipelines de notificação de arquivo para uma determinada conta de armazenamento, poderá:

  • Aproveite um serviço como o AWS Lambda, o Azure Functions ou o Google Cloud Functions para distribuir notificações de uma única fila que escuta um contêiner ou bucket inteiro em filas específicas do diretório.

Eventos de notificação de arquivo

O AWS S3 fornece um ObjectCreated evento quando um arquivo é carregado em um bucket do S3, independentemente de ter sido carregado por um put ou multi-part upload.

O ADLS Gen2 fornece notificações de eventos diferentes para arquivos que aparecem em seu contêiner Gen2.

  • O Auto Loader escuta o FlushWithClose evento para processar um arquivo.
  • Os fluxos do Auto Loader suportam a RenameFile ação para descobrir arquivos. RenameFile exigem uma solicitação de API ao sistema de armazenamento para obter o tamanho do arquivo renomeado.
  • Os fluxos do Auto Loader criados com o Databricks Runtime 9.0 e após suportam a RenameDirectory ação de descoberta de arquivos. RenameDirectory ações exigem solicitações de API ao sistema de armazenamento para listar o conteúdo do diretório renomeado.

O Google Cloud Storage fornece um OBJECT_FINALIZE evento quando um arquivo é carregado, o que inclui substituições e cópias de arquivos. Carregamentos com falha não geram esse evento.

Nota

Os provedores de nuvem não garantem 100% de entrega de todos os eventos de arquivo em condições muito raras e não fornecem SLAs rigorosos sobre a latência dos eventos de arquivo. O Databricks recomenda que você acione backfills regulares com o Auto Loader usando a opção para garantir que todos os arquivos sejam descobertos dentro de um determinado SLA se a cloudFiles.backfillInterval integridade dos dados for um requisito. Acionar preenchimentos regulares não causa duplicatas.

Permissões necessárias para configurar a notificação de arquivo para ADLS Gen2 e Armazenamento de Blob do Azure

Você deve ter permissões de leitura para o diretório de entrada. Consulte Armazenamento de Blobs do Azure.

Para usar o modo de notificação de arquivo, você deve fornecer credenciais de autenticação para configurar e acessar os serviços de notificação de eventos. Você só precisa de uma entidade de serviço para autenticação.

  • Entidade de serviço - usando funções internas do Azure

    Crie um aplicativo Microsoft Entra ID (anteriormente Azure Ative Directory) e uma entidade de serviço na forma de ID do cliente e segredo do cliente.

    Atribua a este aplicativo as seguintes funções à conta de armazenamento na qual o caminho de entrada reside:

    • Colaborador: essa função é para configurar recursos em sua conta de armazenamento, como filas e assinaturas de eventos.
    • Colaborador de Dados da Fila de Armazenamento: essa função é para executar operações de fila, como recuperar e excluir mensagens das filas. Essa função é necessária somente quando você fornece uma entidade de serviço sem uma cadeia de conexão.

    Atribua a este aplicativo a seguinte função ao grupo de recursos relacionado:

    Para obter mais informações, consulte Atribuir funções do Azure utilizando o portal do Azure.

  • Entidade de serviço - usando função personalizada

    Se você estiver preocupado com as permissões excessivas necessárias para as funções anteriores, poderá criar uma Função Personalizada com pelo menos as seguintes permissões, listadas abaixo no formato JSON da função do Azure:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    Em seguida, você pode atribuir essa função personalizada ao seu aplicativo.

    Para obter mais informações, consulte Atribuir funções do Azure utilizando o portal do Azure.

Permissões do carregador automático

Resolver erros comuns

Erro:

java.lang.RuntimeException: Failed to create event grid subscription.

Se vir esta mensagem de erro quando executar o Carregador Automático pela primeira vez, a Grelha de Eventos não está registada como um Fornecedor de Recursos na sua subscrição do Azure. Para fazer o registo no portal do Azure:

  1. Aceda à subscrição.
  2. Clique em Provedores de Recursos na seção Configurações .
  3. Registe o fornecedor Microsoft.EventGrid.

Erro:

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

Se vir esta mensagem de erro quando executar o Carregador Automático pela primeira vez, certifique-se de que atribuiu a função Contribuidor ao principal do serviço da Grelha de Eventos, bem como à conta de armazenamento.

Permissões necessárias para configurar a notificação de arquivo para o AWS S3

Você deve ter permissões de leitura para o diretório de entrada. Consulte Detalhes da conexão do S3 para obter mais detalhes.

Para usar o modo de notificação de arquivo, anexe o seguinte documento de política JSON ao seu usuário ou função do IAM.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

onde:

  • <bucket-name>: O nome do bucket do S3 onde seu stream lerá arquivos, por exemplo, auto-logs. Você pode usar * como curinga, por exemplo, databricks-*-logs. Para descobrir o bucket subjacente do S3 para o caminho do DBFS, você pode listar todos os pontos de montagem do DBFS em um bloco de anotações executando %fs mounts.
  • <region>: A região da AWS onde o bucket do S3 reside, por exemplo, us-west-2. Se não quiser especificar a região, use *.
  • <account-number>: o número da conta da AWS que possui o bucket do S3, por exemplo, 123456789012. Se não quiser especificar o número da conta, use *.

A cadeia de caracteres databricks-auto-ingest-* na especificação SQS e SNS ARN é o prefixo de nome que a cloudFiles fonte usa ao criar serviços SQS e SNS. Como o Azure Databricks configura os serviços de notificação na execução inicial do fluxo, você pode usar uma política com permissões reduzidas após a execução inicial (por exemplo, parar o fluxo e reiniciá-lo).

Nota

A política anterior diz respeito apenas às permissões necessárias para configurar serviços de notificação de arquivos, ou seja, notificação de bucket do S3, serviços SNS e SQS e pressupõe que você já tenha acesso de leitura ao bucket do S3. Se você precisar adicionar permissões somente leitura do S3, adicione o seguinte à Action lista na DatabricksAutoLoaderSetup instrução no documento JSON:

  • s3:ListBucket
  • s3:GetObject

Permissões reduzidas após a configuração inicial

As permissões de configuração de recursos descritas acima são necessárias apenas durante a execução inicial do fluxo. Após a primeira execução, você pode alternar para a seguinte política do IAM com permissões reduzidas.

Importante

Com as permissões reduzidas, você não pode iniciar novas consultas de streaming ou recriar recursos em caso de falhas (por exemplo, a fila SQS foi excluída acidentalmente); você também não pode usar a API de gerenciamento de recursos na nuvem para listar ou derrubar recursos.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

Permissões necessárias para configurar a notificação de arquivo para GCS

Você deve ter list permissões no get bucket GCS e em todos os objetos. Para obter detalhes, consulte a documentação do Google sobre permissões do IAM.

Para usar o modo de notificação de arquivo, você precisa adicionar permissões para a conta de serviço GCS e a conta usada para acessar os recursos do Google Cloud Pub/Sub.

Adicione a Pub/Sub Publisher função à conta de serviço GCS. Isso permite que a conta publique mensagens de notificação de eventos de seus buckets GCS no Google Cloud Pub/Sub.

Quanto à conta de serviço usada para os recursos do Google Cloud Pub/Sub, você precisa adicionar as seguintes permissões:

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

Para fazer isso, você pode criar uma função personalizada do IAM com essas permissões ou atribuir funções GCP pré-existentes para cobrir essas permissões.

Localizando a conta de serviço GCS

No Google Cloud Console do projeto correspondente, navegue até Cloud Storage > Settings. A seção "Conta de serviço de armazenamento em nuvem" contém o e-mail da conta de serviço GCS.

Conta de serviço GCS

Criação de uma função personalizada do Google Cloud IAM para o modo de notificação de arquivo

No console do Google Cloud para o projeto correspondente, navegue até IAM & Admin > Roles. Em seguida, crie uma função na parte superior ou atualize uma função existente. Na tela de criação ou edição de funções, clique em Add Permissions. Aparece um menu no qual você pode adicionar as permissões desejadas à função.

Funções personalizadas do GCP IAM

Configurar ou gerenciar manualmente recursos de notificação de arquivos

Os usuários privilegiados podem configurar ou gerenciar manualmente os recursos de notificação de arquivos.

  • Configure os serviços de notificação de arquivos manualmente por meio do provedor de nuvem e especifique manualmente o identificador de fila. Consulte Opções de notificação de arquivo para obter mais detalhes.
  • Use APIs do Scala para criar ou gerenciar as notificações e os serviços de enfileiramento, conforme mostrado no exemplo a seguir:

Nota

Você deve ter permissões apropriadas para configurar ou modificar a infraestrutura de nuvem. Consulte a documentação de permissões do Azure, S3 ou GCS.

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices())

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Use setUpNotificationServices(<resource-suffix>) para criar uma fila e uma assinatura com o nome <prefix>-<resource-suffix> (o prefixo depende do sistema de armazenamento resumido em Recursos de nuvem usados no modo de notificação de arquivo do Auto Loader. Se houver um recurso existente com o mesmo nome, o Azure Databricks reutilizará o recurso existente em vez de criar um novo. Essa função retorna um identificador de fila que você pode passar para a cloudFiles origem usando o identificador nas opções de notificação de arquivo. Isso permite que o cloudFiles usuário de origem tenha menos permissões do que o usuário que cria os recursos.

Forneça a "path" opção de newManager apenas se ligar setUpNotificationServices, não é necessário para listNotificationServices ou tearDownNotificationServices. Isso é o mesmo path que você usa ao executar uma consulta de streaming.

A matriz a seguir indica quais métodos de API são suportados em quais Databricks Runtime para cada tipo de armazenamento:

Armazenamento na Nuvem API de configuração Listar API Derrubar API
AWS S3 Todas as versões Todas as versões Todas as versões
ADLS Gen2 Todas as versões Todas as versões Todas as versões
GCS Databricks Runtime 9.1 e superior Databricks Runtime 9.1 e superior Databricks Runtime 9.1 e superior
Armazenamento de Blobs do Azure Todas as versões Todas as versões Todas as versões
ADLS Gen1 Não suportado Não suportado Não suportado