Поделиться через


Что такое режим уведомлений о файлах автозагрузчика?

В режиме уведомлений о файлах автозагрузчик автоматически настраивает службу уведомлений и службу очередей, которые подписываются на события файлов из входного каталога. Вы можете использовать уведомления файлов для масштабирования автозагрузчика для приема миллионов файлов в час. По сравнению с режимом перечисления каталогов режим уведомлений о файлах является более производительным и масштабируемым для больших каталогов ввода или большого объема файлов, но требует дополнительных облачных разрешений.

Вы можете переключаться между уведомлениями о файлах и списком каталогов в любое время и по-прежнему поддерживать гарантии однократной обработки данных.

Примечание.

Режим уведомлений файлов не поддерживается для учетных записей хранения Azure уровня "Премиум", так как учетные записи уровня "Премиум" не поддерживают хранилище очередей.

Предупреждение

Изменение исходного пути для автозагрузчика не поддерживается для режима уведомлений файлов. Если режим уведомлений о файлах используется и при этом путь изменяется, вам, вероятно, не удастся принять файлы, которые в момент обновления каталога уже находятся в новом каталоге.

Режим уведомлений файлов поддерживается только в вычислительных ресурсах с одним пользователем.

Облачные ресурсы, используемые в режиме уведомления файла автозагрузчика

Внимание

Вам нужны повышенные разрешения для автоматической настройки облачной инфраструктуры для режима уведомлений о файлах. Обратитесь к администратору облака или администратору рабочей области. Видеть:

Автозагрузчик может автоматически настраивать уведомления о файлах при настройке параметра cloudFiles.useNotifications true и предоставлении необходимых разрешений для создания облачных ресурсов. Кроме того, может потребоваться предоставить дополнительные параметры для предоставления авторизации автозагрузчика для создания этих ресурсов.

В следующей таблице перечислены ресурсы, создаваемые с помощью Автозагрузчика.

Облачное хранилище Служба подписки Использование хранилища очередей из Python Префикс* Ограничение**
AWS S3 AWS SNS AWS SQS databricks-auto-ingest 100 на каждый контейнер S3
ADLS 2-го поколения Сетку событий Azure Хранилище очередей Azure databricks 500 на каждую учетную запись хранения
GCS Публикация и подписка Google Публикация и подписка Google databricks-auto-ingest 100 на каждый контейнер GCS
Хранилище BLOB-объектов Azure Сетку событий Azure Хранилище очередей Azure databricks 500 на каждую учетную запись хранения
  • Автозагрузчик именует ресурсы с использованием этого префикса.

**Количество параллельных конвейеров уведомлений о файлах, которые можно запустить.

Если для конкретной учетной записи хранения требуется запускать больше, чем ограниченное количество конвейеров уведомлений о файлах, можно выполнить следующие действия.

  • Используйте службу, например AWS Lambda, Функции Azure или Google Cloud Functions, чтобы раздувать уведомления из одной очереди, которая прослушивает весь контейнер или контейнер в определенные очереди каталога.

События уведомлений о файлах

AWS S3 предоставляет событие ObjectCreated, когда файл передается в контейнер S3, независимо от того, был ли он отправлен методом PUT или передачи из нескольких частей.

ADLS 2-го поколения предоставляет различные уведомления о событиях для файлов, отображаемых в контейнере 2-го поколения.

  • Автозагрузчик прослушивает на наличие события FlushWithClose для обработки файла.
  • Потоки автозагрузчика поддерживают RenameFile действие обнаружения файлов. Действия RenameFile требуют выполнения запроса API к системе хранения, чтобы получить размер переименованного файла.
  • Потоки Автозагрузчика, созданные с помощью Databricks Runtime 9.0 и более поздних версий, поддерживают действие RenameDirectory для обнаружения файлов. Действия RenameDirectory требуют запросов API к системе хранения, чтобы отобразить содержимое переименованного каталога.

Google Cloud Storage предоставляет событие OBJECT_FINALIZE при передаче файла, включая перезапись и копирование файлов. Неудачные отправки не создают это событие.

Примечание.

Поставщики облачных служб не гарантируют 100 % доставки всех событий файлов в очень редких случаях и не предоставляют строгих соглашений об уровне обслуживания в отношении задержки событий файла. В Databricks рекомендуется активировать регулярные обратные заполнения с помощью Автозагрузчика, используя параметр cloudFiles.backfillInterval, чтобы гарантировать, что все файлы будут обнаружены в рамках данного соглашения об уровне обслуживания, если полнота данных является обязательным условием. Активация регулярных выполнения задним числом не приведет к появлению дубликатов.

Необходимые разрешения для настройки уведомлений о файлах для ADLS 2-го поколения и Хранилище BLOB-объектов Azure

Необходимо иметь разрешения на чтение для входного каталога. См. раздел Хранилище BLOB-объектов Azure.

Чтобы использовать режим уведомления о файлах, необходимо предоставить учетные данные для проверки подлинности, чтобы настроить службы уведомлений о событиях и получить к ним доступ. Для проверки подлинности требуется только субъект-служба.

  • Субъект-служба — использование встроенных ролей Azure

    Создайте приложение и субъект-службу Microsoft Entra ID (ранее Azure Active Directory) в виде идентификатора клиента и секрета клиента.

    Присвойте этому приложению указанные ниже роли для учетной записи хранения, в которой находится входной путь.

    • Участник: эта роль предназначена для настройки ресурсов в учетной записи хранения, таких как очереди и подписки на события.
    • Участник данных очереди хранилища: эта роль предназначена для выполнения операций с очередями, таких как извлечение и удаление сообщений из очередей. Эта роль требуется только при предоставлении субъекта-службы без строка подключения.

    Присвойте этому приложению указанную ниже роль в связанной группе ресурсов.

    • Участник EventGrid EventSubscription: эта роль предназначена для выполнения операций подписки на Сетку событий, таких как создание или вывод списка подписок на события.

    Дополнительные сведения см. в разделе Назначение ролей Azure с помощью портала Azure.

  • Субъект-служба — использование настраиваемой роли

    Если вы обеспокоены избыточными разрешениями, необходимыми для предыдущих ролей, можно создать пользовательскую роль по крайней мере со следующими разрешениями, перечисленными ниже в формате JSON роли Azure:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    Затем вы можете назначить эту настраиваемую роль приложению.

    Дополнительные сведения см. в разделе Назначение ролей Azure с помощью портала Azure.

Разрешения для автозагрузчика

Устранение распространенных ошибок

Ошибка.

java.lang.RuntimeException: Failed to create event grid subscription.

Если это сообщение об ошибке отображается при первом запуске Автозагрузчика, то Сетка событий не зарегистрирована в качестве поставщика ресурсов в подписке Azure. Чтобы зарегистрировать ее на портале Azure:

  1. Перейдите к своей подписке.
  2. В разделе параметров щелкните Поставщики ресурсов.
  3. Зарегистрируйте поставщик Microsoft.EventGrid.

Ошибка.

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

Если это сообщение об ошибке отображается при первом запуске Автозагрузчика, убедитесь, что вы назначили роль участника вашему субъекту-службе для Сетки событий, а также учетной записи хранения.

Необходимые разрешения для настройки уведомлений о файлах для AWS S3

Необходимо иметь разрешения на чтение для входного каталога. Дополнительные сведения см. в сведениях о подключении S3.

Чтобы использовать режим уведомления о файлах, подключите следующий JSON-документ политики к пользователю или роли IAM.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

где:

  • <bucket-name>: имя контейнера S3, в котором поток будет считывать файлы, например auto-logs. Можно использовать * в качестве подстановочного знака, например databricks-*-logs. Чтобы найти базовый контейнер S3 для пути DBFS, можно вывести список всех точек подключения DBFS в записной книжке, выполнив %fs mounts.
  • <region>: регион AWS, в котором находится контейнер S3, например us-west-2. Если вы не хотите указывать регион, используйте *.
  • <account-number>: номер учетной записи AWS, которой принадлежит контейнер S3, например 123456789012. Если вы не хотите указывать номер учетной записи, используйте *.

Строка databricks-auto-ingest-* в спецификации SQS и SNS ARN — это префикс имени, который источник cloudFiles использует при создании служб SQS и SNS. Так как Azure Databricks настраивает службы уведомлений при первоначальном запуске потока, после первоначального запуска (например, поток был остановлен, а затем перезапущен) вы можете использовать политику с ограниченными разрешениями.

Примечание.

Описанная выше политика связана только с разрешениями, необходимыми для настройки служб уведомлений о файлах, а именно служб уведомлений контейнера S3, SNS и SQS, и предполагается, что у вас уже есть доступ на чтение к контейнеру S3. Если необходимо добавить разрешения S3 только для чтения, добавьте следующий код в список Action в операторе DatabricksAutoLoaderSetup в документе JSON:

  • s3:ListBucket
  • s3:GetObject

Ограниченные разрешения после начальной настройки

Описанные выше разрешения на настройку ресурсов необходимы только во время начального запуска потока. После первого запуска можно переключиться на следующую политику IAM с ограниченными разрешениями.

Внимание

С ограниченными разрешениями вы не сможете запускать новые потоковые запросы или воссоздавать ресурсы в случае сбоев (например, если очередь SQS была случайно удалена). Вы также не сможете использовать API управления облачными ресурсами для перечисления или удаления ресурсов.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

Необходимые разрешения для настройки уведомлений о файлах для GCS

Необходимо иметь разрешения list и get для контейнера GCS и всех объектов. Дополнительные сведения см. в документации Google по разрешениям IAM.

Чтобы использовать режим уведомления о файлах, необходимо добавить разрешения для учетной записи службы GCS и учетной записи, используемой для доступа к ресурсам службы публикации/подписки Google Cloud.

Добавьте роль Pub/Sub Publisher в учетную запись службы GCS. Это позволит учетной записи публиковать уведомительные сообщения о событиях из контейнеров GCS в службе публикации и подписки Google Cloud.

Как и для учетной записи службы, используемой для ресурсов службы публикации и подписки Google Cloud, вам нужно добавить следующие разрешения.

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

Для этого можно создать настраиваемую роль IAM с этими разрешениями или назначить уже существующие роли GCP, чтобы обеспечить эти разрешения.

Поиск учетной записи службы GCS

В консоли Google Cloud для соответствующего проекта перейдите к Cloud Storage > Settings. Раздел "Учетная запись службы облачного хранения" содержит адрес электронной почты учетной записи службы GCS.

Учетная запись службы GCS

Создание настраиваемой роли IAM Google Cloud для режима уведомлений о файлах

В консоли Google Cloud для соответствующего проекта перейдите к IAM & Admin > Roles. Затем либо создайте роль в верхней части страницы, либо обновите существующую роль. На экране для создания или редактирования ролей щелкните Add Permissions. Появится меню, с помощью которого вы можете добавить требуемые разрешения для роли.

Настраиваемые роли IAM GCP

Настройка ресурсов уведомлений файлов или управление ими вручную

Привилегированные пользователи могут вручную настраивать ресурсы уведомлений о файлах или управлять ими.

  • Настройте службы уведомлений файлов вручную через поставщика облачных служб и вручную укажите идентификатор очереди. Дополнительные сведения см. в разделе Параметры уведомлений о файлах.
  • Используйте API Scala для создания уведомлений и служб очереди или управления ими, как показано в следующем примере:

Примечание.

Необходимо иметь соответствующие разрешения для настройки или изменения облачной инфраструктуры. См. документацию по разрешениям для Azure, S3 или GCS.

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Используется setUpNotificationServices(<resource-suffix>) для создания очереди и подписки с именем <prefix>-<resource-suffix> (префикс зависит от системы хранения, суммируемой в облачных ресурсах, используемых в режиме уведомления файла автозагрузчика. Если ресурс с таким же именем уже существует, Azure Databricks будет повторно использовать уже существующий ресурс вместо того, чтобы создавать новый. Эта функция возвращает идентификатор очереди, который можно передать источнику cloudFiles с помощью идентификатора в параметрах уведомлений о файлах. Это позволяет исходному пользователю cloudFiles иметь меньше разрешений, чем пользователю, который создает ресурсы.

Укажите для параметра "path" значение newManager только при вызове setUpNotificationServices; это не требуется для listNotificationServices или tearDownNotificationServices. Это тот же путь path, который вы используете при выполнении запроса потоковой передачи.

В следующей матрице указывается, какие методы API поддерживаются в среде выполнения Databricks для каждого типа хранилища:

Облачное хранилище API установки API списка Уничтожение API
AWS S3 Все версии Все версии Все версии
ADLS 2-го поколения Все версии Все версии Все версии
GCS Databricks Runtime 9.1 и выше Databricks Runtime 9.1 и выше Databricks Runtime 9.1 и выше
Хранилище BLOB-объектов Azure Все версии Все версии Все версии
ADLS 1-го поколения Не поддерживается Не поддерживается Не поддерживается