Что такое режим уведомлений о файлах автозагрузчика?
В режиме уведомлений о файлах автозагрузчик автоматически настраивает службу уведомлений и службу очередей, которые подписываются на события файлов из входного каталога. Вы можете использовать уведомления файлов для масштабирования автозагрузчика для приема миллионов файлов в час. По сравнению с режимом перечисления каталогов режим уведомлений о файлах является более производительным и масштабируемым для больших каталогов ввода или большого объема файлов, но требует дополнительных облачных разрешений.
Вы можете переключаться между уведомлениями о файлах и списком каталогов в любое время и по-прежнему поддерживать гарантии однократной обработки данных.
Предупреждение
Изменение исходного пути для автозагрузчика не поддерживается для режима уведомлений файлов. Если режим уведомлений о файлах используется и при этом путь изменяется, вам, вероятно, не удастся принять файлы, которые в момент обновления каталога уже находятся в новом каталоге.
Облачные ресурсы, используемые в режиме уведомления файла автозагрузчика
Внимание
Вам нужны повышенные разрешения для автоматической настройки облачной инфраструктуры для режима уведомлений о файлах. Обратитесь к администратору облака или администратору рабочей области. Видеть:
Автозагрузчик может автоматически настраивать уведомления о файлах при настройке параметра cloudFiles.useNotifications
true
и предоставлении необходимых разрешений для создания облачных ресурсов. Кроме того, может потребоваться предоставить дополнительные параметры для предоставления авторизации автозагрузчика для создания этих ресурсов.
В следующей таблице перечислены ресурсы, создаваемые с помощью Автозагрузчика.
Облачное хранилище | Служба подписки | Использование хранилища очередей из Python | Префикс* | Ограничение** |
---|---|---|---|---|
AWS S3 | AWS SNS | AWS SQS | databricks-auto-ingest | 100 на каждый контейнер S3 |
ADLS 2-го поколения | Сетку событий Azure | Хранилище очередей Azure | databricks | 500 на каждую учетную запись хранения |
GCS | Публикация и подписка Google | Публикация и подписка Google | databricks-auto-ingest | 100 на каждый контейнер GCS |
Хранилище BLOB-объектов Azure | Сетку событий Azure | Хранилище очередей Azure | databricks | 500 на каждую учетную запись хранения |
- Автозагрузчик именует ресурсы с использованием этого префикса.
**Количество параллельных конвейеров уведомлений о файлах, которые можно запустить.
Если для конкретной учетной записи хранения требуется запускать больше, чем ограниченное количество конвейеров уведомлений о файлах, можно выполнить следующие действия.
- Используйте службу, например AWS Lambda, Функции Azure или Google Cloud Functions, чтобы раздувать уведомления из одной очереди, которая прослушивает весь контейнер или контейнер в определенные очереди каталога.
События уведомлений о файлах
AWS S3 предоставляет событие ObjectCreated
, когда файл передается в контейнер S3, независимо от того, был ли он отправлен методом PUT или передачи из нескольких частей.
ADLS 2-го поколения предоставляет различные уведомления о событиях для файлов, отображаемых в контейнере 2-го поколения.
- Автозагрузчик прослушивает на наличие события
FlushWithClose
для обработки файла. - Потоки автозагрузчика поддерживают
RenameFile
действие обнаружения файлов. ДействияRenameFile
требуют выполнения запроса API к системе хранения, чтобы получить размер переименованного файла. - Потоки Автозагрузчика, созданные с помощью Databricks Runtime 9.0 и более поздних версий, поддерживают действие
RenameDirectory
для обнаружения файлов. ДействияRenameDirectory
требуют запросов API к системе хранения, чтобы отобразить содержимое переименованного каталога.
Google Cloud Storage предоставляет событие OBJECT_FINALIZE
при передаче файла, включая перезапись и копирование файлов. Неудачные отправки не создают это событие.
Примечание.
Поставщики облачных служб не гарантируют 100 % доставки всех событий файлов в очень редких случаях и не предоставляют строгих соглашений об уровне обслуживания в отношении задержки событий файла. В Databricks рекомендуется активировать регулярные обратные заполнения с помощью Автозагрузчика, используя параметр cloudFiles.backfillInterval
, чтобы гарантировать, что все файлы будут обнаружены в рамках данного соглашения об уровне обслуживания, если полнота данных является обязательным условием. Активация регулярных выполнения задним числом не приведет к появлению дубликатов.
Необходимые разрешения для настройки уведомлений о файлах для ADLS 2-го поколения и Хранилище BLOB-объектов Azure
Необходимо иметь разрешения на чтение для входного каталога. См. раздел Хранилище BLOB-объектов Azure.
Чтобы использовать режим уведомления о файлах, необходимо предоставить учетные данные для проверки подлинности, чтобы настроить службы уведомлений о событиях и получить к ним доступ. Для проверки подлинности требуется только субъект-служба.
Субъект-служба — использование встроенных ролей Azure
Создайте приложение и субъект-службу Microsoft Entra ID (ранее Azure Active Directory) в виде идентификатора клиента и секрета клиента.
Присвойте этому приложению указанные ниже роли для учетной записи хранения, в которой находится входной путь.
- Участник: эта роль предназначена для настройки ресурсов в учетной записи хранения, таких как очереди и подписки на события.
- Участник данных очереди хранилища: эта роль предназначена для выполнения операций с очередями, таких как извлечение и удаление сообщений из очередей. Эта роль требуется только при предоставлении субъекта-службы без строка подключения.
Присвойте этому приложению указанную ниже роль в связанной группе ресурсов.
- Участник EventGrid EventSubscription: эта роль предназначена для выполнения операций подписки на Сетку событий, таких как создание или вывод списка подписок на события.
Дополнительные сведения см. в разделе Назначение ролей Azure с помощью портала Azure.
Субъект-служба — использование настраиваемой роли
Если вы обеспокоены избыточными разрешениями, необходимыми для предыдущих ролей, можно создать пользовательскую роль по крайней мере со следующими разрешениями, перечисленными ниже в формате JSON роли Azure:
"permissions": [ { "actions": [ "Microsoft.EventGrid/eventSubscriptions/write", "Microsoft.EventGrid/eventSubscriptions/read", "Microsoft.EventGrid/eventSubscriptions/delete", "Microsoft.EventGrid/locations/eventSubscriptions/read", "Microsoft.Storage/storageAccounts/read", "Microsoft.Storage/storageAccounts/write", "Microsoft.Storage/storageAccounts/queueServices/read", "Microsoft.Storage/storageAccounts/queueServices/write", "Microsoft.Storage/storageAccounts/queueServices/queues/write", "Microsoft.Storage/storageAccounts/queueServices/queues/read", "Microsoft.Storage/storageAccounts/queueServices/queues/delete" ], "notActions": [], "dataActions": [ "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action" ], "notDataActions": [] } ]
Затем вы можете назначить эту настраиваемую роль приложению.
Дополнительные сведения см. в разделе Назначение ролей Azure с помощью портала Azure.
Устранение распространенных ошибок
Ошибка.
java.lang.RuntimeException: Failed to create event grid subscription.
Если это сообщение об ошибке отображается при первом запуске Автозагрузчика, то Сетка событий не зарегистрирована в качестве поставщика ресурсов в подписке Azure. Чтобы зарегистрировать ее на портале Azure:
- Перейдите к своей подписке.
- В разделе параметров щелкните Поставщики ресурсов.
- Зарегистрируйте поставщик
Microsoft.EventGrid
.
Ошибка.
403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...
Если это сообщение об ошибке отображается при первом запуске Автозагрузчика, убедитесь, что вы назначили роль участника вашему субъекту-службе для Сетки событий, а также учетной записи хранения.
Необходимые разрешения для настройки уведомлений о файлах для AWS S3
Необходимо иметь разрешения на чтение для входного каталога. Дополнительные сведения см. в сведениях о подключении S3.
Чтобы использовать режим уведомления о файлах, подключите следующий JSON-документ политики к пользователю или роли IAM.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderSetup",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"s3:PutBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:CreateTopic",
"sns:TagResource",
"sns:Publish",
"sns:Subscribe",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:s3:::<bucket-name>",
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
},
{
"Sid": "DatabricksAutoLoaderList",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "*"
},
{
"Sid": "DatabricksAutoLoaderTeardown",
"Effect": "Allow",
"Action": [
"sns:Unsubscribe",
"sns:DeleteTopic",
"sqs:DeleteQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
}
]
}
где:
<bucket-name>
: имя контейнера S3, в котором поток будет считывать файлы, напримерauto-logs
. Можно использовать*
в качестве подстановочного знака, напримерdatabricks-*-logs
. Чтобы найти базовый контейнер S3 для пути DBFS, можно вывести список всех точек подключения DBFS в записной книжке, выполнив%fs mounts
.<region>
: регион AWS, в котором находится контейнер S3, напримерus-west-2
. Если вы не хотите указывать регион, используйте*
.<account-number>
: номер учетной записи AWS, которой принадлежит контейнер S3, например123456789012
. Если вы не хотите указывать номер учетной записи, используйте*
.
Строка databricks-auto-ingest-*
в спецификации SQS и SNS ARN — это префикс имени, который источник cloudFiles
использует при создании служб SQS и SNS. Так как Azure Databricks настраивает службы уведомлений при первоначальном запуске потока, после первоначального запуска (например, поток был остановлен, а затем перезапущен) вы можете использовать политику с ограниченными разрешениями.
Примечание.
Описанная выше политика связана только с разрешениями, необходимыми для настройки служб уведомлений о файлах, а именно служб уведомлений контейнера S3, SNS и SQS, и предполагается, что у вас уже есть доступ на чтение к контейнеру S3. Если необходимо добавить разрешения S3 только для чтения, добавьте следующий код в список Action
в операторе DatabricksAutoLoaderSetup
в документе JSON:
s3:ListBucket
s3:GetObject
Ограниченные разрешения после начальной настройки
Описанные выше разрешения на настройку ресурсов необходимы только во время начального запуска потока. После первого запуска можно переключиться на следующую политику IAM с ограниченными разрешениями.
Внимание
С ограниченными разрешениями вы не сможете запускать новые потоковые запросы или воссоздавать ресурсы в случае сбоев (например, если очередь SQS была случайно удалена). Вы также не сможете использовать API управления облачными ресурсами для перечисления или удаления ресурсов.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderUse",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:TagResource",
"sns:Publish",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:<queue-name>",
"arn:aws:sns:<region>:<account-number>:<topic-name>",
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::<bucket-name>/*"
]
},
{
"Sid": "DatabricksAutoLoaderListTopics",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "arn:aws:sns:<region>:<account-number>:*"
}
]
}
Необходимые разрешения для настройки уведомлений о файлах для GCS
Необходимо иметь разрешения list
и get
для контейнера GCS и всех объектов. Дополнительные сведения см. в документации Google по разрешениям IAM.
Чтобы использовать режим уведомления о файлах, необходимо добавить разрешения для учетной записи службы GCS и учетной записи, используемой для доступа к ресурсам службы публикации/подписки Google Cloud.
Добавьте роль Pub/Sub Publisher
в учетную запись службы GCS. Это позволит учетной записи публиковать уведомительные сообщения о событиях из контейнеров GCS в службе публикации и подписки Google Cloud.
Как и для учетной записи службы, используемой для ресурсов службы публикации и подписки Google Cloud, вам нужно добавить следующие разрешения.
pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update
Для этого можно создать настраиваемую роль IAM с этими разрешениями или назначить уже существующие роли GCP, чтобы обеспечить эти разрешения.
Поиск учетной записи службы GCS
В консоли Google Cloud для соответствующего проекта перейдите к Cloud Storage > Settings
.
Раздел "Учетная запись службы облачного хранения" содержит адрес электронной почты учетной записи службы GCS.
Создание настраиваемой роли IAM Google Cloud для режима уведомлений о файлах
В консоли Google Cloud для соответствующего проекта перейдите к IAM & Admin > Roles
. Затем либо создайте роль в верхней части страницы, либо обновите существующую роль. На экране для создания или редактирования ролей щелкните Add Permissions
. Появится меню, с помощью которого вы можете добавить требуемые разрешения для роли.
Настройка ресурсов уведомлений файлов или управление ими вручную
Привилегированные пользователи могут вручную настраивать ресурсы уведомлений о файлах или управлять ими.
- Настройте службы уведомлений файлов вручную через поставщика облачных служб и вручную укажите идентификатор очереди. Дополнительные сведения см. в разделе Параметры уведомлений о файлах.
- Используйте API Scala для создания уведомлений и служб очереди или управления ими, как показано в следующем примере:
Примечание.
Необходимо иметь соответствующие разрешения для настройки или изменения облачной инфраструктуры. См. документацию по разрешениям для Azure, S3 или GCS.
Python
# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds
# COMMAND ----------
#####################################
## Creating a ResourceManager in AWS
#####################################
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in Azure
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.connectionString", <connection-string>) \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("cloudFiles.tenantId", <tenant-id>) \
.option("cloudFiles.clientId", <service-principal-client-id>) \
.option("cloudFiles.clientSecret", <service-principal-client-secret>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices())
# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Scala
/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////
import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
.newManager
.option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
.option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////
import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
.newManager
.option("cloudFiles.connectionString", <connection-string>)
.option("cloudFiles.resourceGroup", <resource-group>)
.option("cloudFiles.subscriptionId", <subscription-id>)
.option("cloudFiles.tenantId", <tenant-id>)
.option("cloudFiles.clientId", <service-principal-client-id>)
.option("cloudFiles.clientSecret", <service-principal-client-secret>)
.option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////
import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
.newManager
.option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
.create()
// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
// List notification services created by <AL>
val df = manager.listNotificationServices()
// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Используется setUpNotificationServices(<resource-suffix>)
для создания очереди и подписки с именем <prefix>-<resource-suffix>
(префикс зависит от системы хранения, суммируемой в облачных ресурсах, используемых в режиме уведомления файла автозагрузчика. Если ресурс с таким же именем уже существует, Azure Databricks будет повторно использовать уже существующий ресурс вместо того, чтобы создавать новый. Эта функция возвращает идентификатор очереди, который можно передать источнику cloudFiles
с помощью идентификатора в параметрах уведомлений о файлах. Это позволяет исходному пользователю cloudFiles
иметь меньше разрешений, чем пользователю, который создает ресурсы.
Укажите для параметра "path"
значение newManager
только при вызове setUpNotificationServices
; это не требуется для listNotificationServices
или tearDownNotificationServices
. Это тот же путь path
, который вы используете при выполнении запроса потоковой передачи.
В следующей матрице указывается, какие методы API поддерживаются в среде выполнения Databricks для каждого типа хранилища:
Облачное хранилище | API установки | API списка | Уничтожение API |
---|---|---|---|
AWS S3 | Все версии | Все версии | Все версии |
ADLS 2-го поколения | Все версии | Все версии | Все версии |
GCS | Databricks Runtime 9.1 и выше | Databricks Runtime 9.1 и выше | Databricks Runtime 9.1 и выше |
Хранилище BLOB-объектов Azure | Все версии | Все версии | Все версии |
ADLS 1-го поколения | Не поддерживается | Не поддерживается | Не поддерживается |