Поделиться через


Источник файла хранилища BLOB-объектов Azure с хранилищем очередей Azure (устаревшая версия)

Внимание

Поддержка этой документации прекращена, она может больше не обновляться. Продукты, службы или технологии, упомянутые в этом контенте, больше не поддерживаются. См. статью об автозагрузчике.

Соединитель ABS-AQS предоставляет оптимизированный источник файлов, который использует Хранилище очередей Azure (AQS) для поиска новых файлов, записанных в контейнер Хранилища BLOB-объектов Azure (ABS), без многократного вывода всех файлов. Это дает два преимущества:

  • Малая задержка: не нужно перечислять вложенные структуры каталогов в ABS, что снижает скорость работы и интенсивно использует ресурсы.
  • Более низкие затраты: отсутствуют дорогие запросы API LIST к ABS.

Примечание.

Источник ABS-AQS удаляет сообщения из очереди AQS по мере использования событий. Если требуется, чтобы другие конвейеры использовали сообщения из этой очереди, настройте отдельную очередь AQS для оптимизированного средства для чтения. Можно настроить несколько подписок на Сетку событий для публикации в разных очередях.

Использование источника файлов ABS-AQS

Чтобы использовать источник файлов ABS-AQS, необходимо выполнить следующие действия:

  • Настройте уведомления о событиях ABS, используя подписки на Сетку событий Azure, и направьте их в AQS. См. Реагирование на события хранилища BLOB-объектов.

  • Укажите параметры fileFormat и queueUrl, а также схему. Например:

    spark.readStream \
      .format("abs-aqs") \
      .option("fileFormat", "json") \
      .option("queueName", ...) \
      .option("connectionString", ...) \
      .schema(...) \
      .load()
    

Аутентификация с помощью Хранилища очередей Azure и хранилища Blob-объектов

Для аутентификации с помощью Хранилища очередей Azure и хранилища Blob-объектов используйте маркеры подписанного URL-адреса или ключи учетной записи хранения. Необходимо указать строку подключения для учетной записи хранения, в которой развертывается очередь с маркером SAS или ключи доступа к вашей учетной записи хранения. Дополнительные сведения см. в статье Настройка строк подключения службы хранилища Azure.

Кроме того, вам потребуется предоставить доступ к контейнерам хранилища BLOB-объектов Azure. Сведения о настройке доступа к контейнеру хранилища BLOB-объектов Azure см. в статье "Подключение к Azure Data Lake Storage 2-го поколения и хранилищу BLOB-объектов".

Примечание.

Мы настоятельно рекомендуем использовать секреты для предоставления строк подключения.

Настройка

Вариант Тип По умолчанию. Description
allowOverwrites Логический true Следует ли повторно обрабатывать BLOB-объект, который перезаписывается.
connectionString Строка Нет (обязательный параметр) Строк подключения для доступа к очереди.
fetchParallelism Целое 1 Число потоков, используемых для получения сообщений из службы очередей.
fileFormat Строка Нет (обязательный параметр) Формат файлов, такой как parquet, json, csv, text и т. д.
ignoreFileDeletion Логический false При наличии конфигураций жизненного цикла или удалении исходных файлов вручную необходимо установить для этого параметра значение true.
maxFileAge Целое 604800 Определяет, сколько времени (в секундах) уведомления о файлах хранятся в состоянии, чтобы предотвратить повторную обработку.
pathRewrites Строка JSON. "{}" При использовании точек подключения можно переписать префикс пути container@storageAccount/key с точкой подключения. Перезаписывать можно только префиксы. Например, для конфигурации {"myContainer@myStorageAccount/path": "dbfs:/mnt/data-warehouse"} путь wasbs://myContainer@myStorageAccount.blob.windows.core.net/path/2017/08/fileA.json переписывается в
dbfs:/mnt/data-warehouse/2017/08/fileA.json.
queueFetchInterval Строка длительности, например, 2m в течение 2 минут. "5s" Время ожидания между выборками, если очередь пуста. Расходы Azure за запрос API к AQS. Поэтому, если данные поступают редко, это значение можно установить на большую продолжительность. Пока очередь не пуста, мы будем получать данные непрерывно. Если новые файлы создаются каждые 5 минут, может потребоваться установить высокий уровень queueFetchInterval, чтобы снизить затраты на AQS.
queueName Строка Нет (обязательный параметр) Имя очереди AQS.

Если вы видите много сообщений в журналах драйверов, которые выглядят подобно Fetched 0 new events and 3 old events., как если бы вы наблюдали гораздо более старые события, чем новые, следует уменьшить интервал триггера в потоке.

Если вы используете файлы из расположения в хранилище BLOB-объектов, где предполагается, что некоторые файлы можно удалить перед обработкой, можно настроить следующую конфигурацию, чтобы пропустить ошибку и продолжить обработку:

spark.sql("SET spark.sql.files.ignoreMissingFiles=true")

Вопросы и ответы

Если для параметра ignoreFileDeletion установлено значение False (ложь) (по умолчанию), и объект был удален, будет ли он завершаться сбоем всего конвейера?

Да, если мы получаем событие, сообщающее, что файл был удален, он завершится ошибкой всего конвейера.

Как настроить maxFileAge?

Хранилище очередей Azure обеспечивает по крайней мере семантику доставки сообщений, поэтому необходимо оставаться в состоянии удаления дубликатов. Значение по умолчанию для параметра maxFileAge равно 7 дням, что соответствует максимальному значению TTL сообщения в очереди.