Что такое режим уведомлений о файлах автозагрузчика?

В режиме уведомлений о файлах автозагрузчик автоматически настраивает службу уведомлений и службу очередей, которые подписываются на события файлов из входного каталога. Вы можете использовать уведомления файлов для масштабирования автозагрузчика для приема миллионов файлов в час. По сравнению с режимом перечисления каталогов режим уведомлений о файлах является более производительным и масштабируемым для больших каталогов ввода или большого объема файлов, но требует дополнительных облачных разрешений.

Вы можете переключаться между уведомлениями о файлах и списком каталогов в любое время и по-прежнему поддерживать гарантии однократной обработки данных.

Предупреждение

Изменение исходного пути для автозагрузчика не поддерживается для режима уведомлений файлов. Если режим уведомлений о файлах используется и при этом путь изменяется, вам, вероятно, не удастся принять файлы, которые в момент обновления каталога уже находятся в новом каталоге.

Облачные ресурсы, используемые в режиме уведомления файла автозагрузчика

Внимание

Вам нужны повышенные разрешения для автоматической настройки облачной инфраструктуры для режима уведомлений о файлах. Обратитесь к администратору облака или администратору рабочей области. Видеть:

Автозагрузчик может автоматически настраивать уведомления о файлах при настройке параметра cloudFiles.useNotificationstrue и предоставлении необходимых разрешений для создания облачных ресурсов. Кроме того, может потребоваться предоставить дополнительные параметры для предоставления авторизации автозагрузчика для создания этих ресурсов.

В следующей таблице перечислены ресурсы, создаваемые с помощью Автозагрузчика.

Облачное хранилище Служба подписки Использование хранилища очередей из Python Префикс* Ограничение**
AWS S3 AWS SNS AWS SQS databricks-auto-ingest 100 на каждый контейнер S3
ADLS 2-го поколения Сетку событий Azure Хранилище очередей Azure databricks 500 на каждую учетную запись хранения
GCS Публикация и подписка Google Публикация и подписка Google databricks-auto-ingest 100 на каждый контейнер GCS
Хранилище BLOB-объектов Azure Сетку событий Azure Хранилище очередей Azure databricks 500 на каждую учетную запись хранения
  • Автозагрузчик именует ресурсы с использованием этого префикса.

**Количество параллельных конвейеров уведомлений о файлах, которые можно запустить.

Если для конкретной учетной записи хранения требуется запускать больше, чем ограниченное количество конвейеров уведомлений о файлах, можно выполнить следующие действия.

  • Используйте службу, например AWS Lambda, Функции Azure или Google Cloud Functions, чтобы раздувать уведомления из одной очереди, которая прослушивает весь контейнер или контейнер в определенные очереди каталога.

События уведомлений о файлах

AWS S3 предоставляет событие ObjectCreated, когда файл передается в контейнер S3, независимо от того, был ли он отправлен методом PUT или передачи из нескольких частей.

ADLS 2-го поколения предоставляет различные уведомления о событиях для файлов, отображаемых в контейнере 2-го поколения.

  • Автозагрузчик прослушивает на наличие события FlushWithClose для обработки файла.
  • Потоки автозагрузчика поддерживают RenameFile действие обнаружения файлов. Действия RenameFile требуют выполнения запроса API к системе хранения, чтобы получить размер переименованного файла.
  • Потоки Автозагрузчика, созданные с помощью Databricks Runtime 9.0 и более поздних версий, поддерживают действие RenameDirectory для обнаружения файлов. Действия RenameDirectory требуют запросов API к системе хранения, чтобы отобразить содержимое переименованного каталога.

Google Cloud Storage предоставляет событие OBJECT_FINALIZE при передаче файла, включая перезапись и копирование файлов. Неудачные отправки не создают это событие.

Примечание.

Поставщики облачных служб не гарантируют 100 % доставки всех событий файлов в очень редких случаях и не предоставляют строгих соглашений об уровне обслуживания в отношении задержки событий файла. В Databricks рекомендуется активировать регулярные обратные заполнения с помощью Автозагрузчика, используя параметр cloudFiles.backfillInterval, чтобы гарантировать, что все файлы будут обнаружены в рамках данного соглашения об уровне обслуживания, если полнота данных является обязательным условием. Активация регулярных выполнения задним числом не приведет к появлению дубликатов.

Необходимые разрешения для настройки уведомлений о файлах для ADLS 2-го поколения и Хранилище BLOB-объектов Azure

Необходимо иметь разрешения на чтение для входного каталога. См. раздел Хранилище BLOB-объектов Azure.

Чтобы использовать режим уведомления о файлах, необходимо предоставить учетные данные для проверки подлинности, чтобы настроить службы уведомлений о событиях и получить к ним доступ. Для проверки подлинности требуется только субъект-служба.

  • Субъект-служба — использование встроенных ролей Azure

    Создайте приложение и субъект-службу Microsoft Entra ID (ранее Azure Active Directory) в виде идентификатора клиента и секрета клиента.

    Присвойте этому приложению указанные ниже роли для учетной записи хранения, в которой находится входной путь.

    • Участник: эта роль предназначена для настройки ресурсов в учетной записи хранения, таких как очереди и подписки на события.
    • Участник данных очереди хранилища: эта роль предназначена для выполнения операций с очередями, таких как извлечение и удаление сообщений из очередей. Эта роль требуется только при предоставлении субъекта-службы без строка подключения.

    Присвойте этому приложению указанную ниже роль в связанной группе ресурсов.

    • Участник EventGrid EventSubscription: эта роль предназначена для выполнения операций подписки на Сетку событий, таких как создание или вывод списка подписок на события.

    Дополнительные сведения см. в разделе Назначение ролей Azure с помощью портала Azure.

  • Субъект-служба — использование настраиваемой роли

    Если вы обеспокоены избыточными разрешениями, необходимыми для предыдущих ролей, можно создать пользовательскую роль по крайней мере со следующими разрешениями, перечисленными ниже в формате JSON роли Azure:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    Затем вы можете назначить эту настраиваемую роль приложению.

    Дополнительные сведения см. в разделе Назначение ролей Azure с помощью портала Azure.

Разрешения для автозагрузчика

Устранение распространенных ошибок

Ошибка.

java.lang.RuntimeException: Failed to create event grid subscription.

Если это сообщение об ошибке отображается при первом запуске Автозагрузчика, то Сетка событий не зарегистрирована в качестве поставщика ресурсов в подписке Azure. Чтобы зарегистрировать ее на портале Azure:

  1. Перейдите к своей подписке.
  2. В разделе параметров щелкните Поставщики ресурсов.
  3. Зарегистрируйте поставщик Microsoft.EventGrid.

Ошибка.

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

Если это сообщение об ошибке отображается при первом запуске Автозагрузчика, убедитесь, что вы назначили роль участника вашему субъекту-службе для Сетки событий, а также учетной записи хранения.

Необходимые разрешения для настройки уведомлений о файлах для AWS S3

Необходимо иметь разрешения на чтение для входного каталога. Дополнительные сведения см. в сведениях о подключении S3.

Чтобы использовать режим уведомления о файлах, подключите следующий JSON-документ политики к пользователю или роли IAM.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

где:

  • <bucket-name>: имя контейнера S3, в котором поток будет считывать файлы, например auto-logs. Можно использовать * в качестве подстановочного знака, например databricks-*-logs. Чтобы найти базовый контейнер S3 для пути DBFS, можно вывести список всех точек подключения DBFS в записной книжке, выполнив %fs mounts.
  • <region>: регион AWS, в котором находится контейнер S3, например us-west-2. Если вы не хотите указывать регион, используйте *.
  • <account-number>: номер учетной записи AWS, которой принадлежит контейнер S3, например 123456789012. Если вы не хотите указывать номер учетной записи, используйте *.

Строка databricks-auto-ingest-* в спецификации SQS и SNS ARN — это префикс имени, который источник cloudFiles использует при создании служб SQS и SNS. Так как Azure Databricks настраивает службы уведомлений при первоначальном запуске потока, после первоначального запуска (например, поток был остановлен, а затем перезапущен) вы можете использовать политику с ограниченными разрешениями.

Примечание.

Описанная выше политика связана только с разрешениями, необходимыми для настройки служб уведомлений о файлах, а именно служб уведомлений контейнера S3, SNS и SQS, и предполагается, что у вас уже есть доступ на чтение к контейнеру S3. Если необходимо добавить разрешения S3 только для чтения, добавьте следующий код в список Action в операторе DatabricksAutoLoaderSetup в документе JSON:

  • s3:ListBucket
  • s3:GetObject

Ограниченные разрешения после начальной настройки

Описанные выше разрешения на настройку ресурсов необходимы только во время начального запуска потока. После первого запуска можно переключиться на следующую политику IAM с ограниченными разрешениями.

Внимание

С ограниченными разрешениями вы не сможете запускать новые потоковые запросы или воссоздавать ресурсы в случае сбоев (например, если очередь SQS была случайно удалена). Вы также не сможете использовать API управления облачными ресурсами для перечисления или удаления ресурсов.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

Необходимые разрешения для настройки уведомлений о файлах для GCS

Необходимо иметь разрешения list и get для контейнера GCS и всех объектов. Дополнительные сведения см. в документации Google по разрешениям IAM.

Чтобы использовать режим уведомления о файлах, необходимо добавить разрешения для учетной записи службы GCS и учетной записи, используемой для доступа к ресурсам службы публикации/подписки Google Cloud.

Добавьте роль Pub/Sub Publisher в учетную запись службы GCS. Это позволит учетной записи публиковать уведомительные сообщения о событиях из контейнеров GCS в службе публикации и подписки Google Cloud.

Как и для учетной записи службы, используемой для ресурсов службы публикации и подписки Google Cloud, вам нужно добавить следующие разрешения.

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

Для этого можно создать настраиваемую роль IAM с этими разрешениями или назначить уже существующие роли GCP, чтобы обеспечить эти разрешения.

Поиск учетной записи службы GCS

В консоли Google Cloud для соответствующего проекта перейдите к Cloud Storage > Settings. Раздел "Учетная запись службы облачного хранения" содержит адрес электронной почты учетной записи службы GCS.

Учетная запись службы GCS

Создание настраиваемой роли IAM Google Cloud для режима уведомлений о файлах

В консоли Google Cloud для соответствующего проекта перейдите к IAM & Admin > Roles. Затем либо создайте роль в верхней части страницы, либо обновите существующую роль. На экране для создания или редактирования ролей щелкните Add Permissions. Появится меню, с помощью которого вы можете добавить требуемые разрешения для роли.

Настраиваемые роли IAM GCP

Настройка ресурсов уведомлений файлов или управление ими вручную

Привилегированные пользователи могут вручную настраивать ресурсы уведомлений о файлах или управлять ими.

  • Настройте службы уведомлений файлов вручную через поставщика облачных служб и вручную укажите идентификатор очереди. Дополнительные сведения см. в разделе Параметры уведомлений о файлах.
  • Используйте API Scala для создания уведомлений и служб очереди или управления ими, как показано в следующем примере:

Примечание.

Необходимо иметь соответствующие разрешения для настройки или изменения облачной инфраструктуры. См. документацию по разрешениям для Azure, S3 или GCS.

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices())

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Используется setUpNotificationServices(<resource-suffix>) для создания очереди и подписки с именем <prefix>-<resource-suffix> (префикс зависит от системы хранения, суммируемой в облачных ресурсах, используемых в режиме уведомления файла автозагрузчика. Если ресурс с таким же именем уже существует, Azure Databricks будет повторно использовать уже существующий ресурс вместо того, чтобы создавать новый. Эта функция возвращает идентификатор очереди, который можно передать источнику cloudFiles с помощью идентификатора в параметрах уведомлений о файлах. Это позволяет исходному пользователю cloudFiles иметь меньше разрешений, чем пользователю, который создает ресурсы.

Укажите для параметра "path" значение newManager только при вызове setUpNotificationServices; это не требуется для listNotificationServices или tearDownNotificationServices. Это тот же путь path, который вы используете при выполнении запроса потоковой передачи.

В следующей матрице указывается, какие методы API поддерживаются в среде выполнения Databricks для каждого типа хранилища:

Облачное хранилище API установки API списка Уничтожение API
AWS S3 Все версии Все версии Все версии
ADLS 2-го поколения Все версии Все версии Все версии
GCS Databricks Runtime 9.1 и выше Databricks Runtime 9.1 и выше Databricks Runtime 9.1 и выше
Хранилище BLOB-объектов Azure Все версии Все версии Все версии
ADLS 1-го поколения Не поддерживается Не поддерживается Не поддерживается