Bagikan melalui


Apa itu mode pemberitahuan file Auto Loader?

Dalam mode pemberitahuan file, Auto Loader secara otomatis menyiapkan layanan pemberitahuan dan layanan antrean yang berlangganan peristiwa file dari direktori input. Anda dapat menggunakan pemberitahuan file untuk menskalakan Auto Loader untuk menyerap jutaan file per jam. Jika dibandingkan dengan mode daftar direktori, mode pemberitahuan file lebih berkinerja dan dapat diskalakan untuk direktori input besar atau file dalam volume tinggi tetapi memerlukan izin cloud tambahan.

Anda dapat beralih antara pemberitahuan file dan daftar direktori kapan saja dan masih mempertahankan jaminan pemrosesan data sekali persis.

Catatan

Mode pemberitahuan file tidak didukung untuk akun penyimpanan premium Azure karena akun premium tidak mendukung penyimpanan antrean.

Peringatan

Mengubah jalur sumber untuk Auto Loader tidak didukung untuk mode pemberitahuan file. Jika mode notifikasi file digunakan dan jalur diubah, Anda mungkin gagal menyerap file yang sudah ada di direktori baru pada saat pembaruan direktori.

Sumber daya cloud yang digunakan dalam mode pemberitahuan file Auto Loader

Penting

Anda memerlukan izin yang ditingkatkan untuk mengonfigurasi infrastruktur cloud secara otomatis untuk mode pemberitahuan file. Hubungi administrator cloud atau admin ruang kerja Anda. Lihat:

Auto Loader dapat menyiapkan pemberitahuan file untuk Anda secara otomatis saat Anda mengatur opsi cloudFiles.useNotifications ke true dan memberikan izin yang diperlukan untuk membuat sumber daya cloud. Selain itu, Anda mungkin perlu memberikan opsi tambahan untuk memberikan otorisasi Auto Loader untuk membuat sumber daya ini.

Tabel berikut merangkum sumber daya mana yang dibuat oleh Auto Loader.

Penyimpanan cloud Layanan Langganan Layanan Antrean Awalan* Batas**
AWS S3 AWS SNS AWS SQS databricks-auto-ingest 100 per wadah S3
ADLS Gen2 Kisi Aktivitas Azure Azure Queue Storage databricks 500 TB per akun penyimpanan
GCS Google Pub/Sub Google Pub/Sub databricks-auto-ingest 100 per wadah GCS
Azure Blob Storage Kisi Aktivitas Azure Azure Queue Storage databricks 500 TB per akun penyimpanan
  • Auto Loader memberi nama sumber daya dengan awalan ini.

** Berapa banyak alur pemberitahuan file bersamaan yang dapat diluncurkan

Jika Anda memerlukan menjalankan lebih dari jumlah terbatas alur pemberitahuan file untuk akun penyimpanan tertentu, Anda dapat:

  • Manfaatkan layanan seperti AWS Lambda, Azure Functions, atau Google Cloud Functions untuk memberi tahu pemberitahuan dari satu antrean yang mendengarkan seluruh kontainer atau wadah ke dalam antrean tertentu direktori.

Peristiwa pemberitahuan file

AWS S3 menyediakan ObjectCreated peristiwa saat file diunggah ke wadah S3 terlepas dari apakah file tersebut diunggah oleh unggahan put atau multi-bagian.

ADLS Gen2 menyediakan pemberitahuan peristiwa yang berbeda untuk file yang muncul di kontainer Gen2 Anda.

  • Auto Loader mendengarkan FlushWithClose peristiwa untuk memproses file.
  • Aliran Auto Loader mendukung RenameFile tindakan untuk menemukan file. RenameFile tindakan memerlukan permintaan API ke sistem penyimpanan untuk mendapatkan ukuran file yang diganti namanya.
  • Aliran Auto Loader yang dibuat dengan Databricks Runtime 9.0 dan dukungan aksi sesudahnya RenameDirectory untuk menemukan file. RenameDirectory tindakan memerlukan permintaan API ke sistem penyimpanan untuk mencantumkan konten direktori yang diganti namanya.

Google Cloud Storage menyediakan OBJECT_FINALIZE acara saat file diunggah, yang mencakup menimpa dan menyalin file. Pengunggahan yang gagal tidak menghasilkan peristiwa ini.

Catatan

Penyedia cloud tidak menjamin pengiriman 100% dari semua peristiwa file dalam kondisi yang sangat langka dan tidak memberikan SLA yang ketat tentang latensi peristiwa file. Databricks merekomendasikan agar Anda memicu pengisian ulang reguler dengan Auto Loader dengan menggunakan cloudFiles.backfillInterval opsi untuk menjamin bahwa semua file ditemukan dalam SLA tertentu jika kelengkapan data adalah persyaratan. Memicu backfill reguler tidak menyebabkan duplikat.

Izin yang diperlukan untuk mengonfigurasi pemberitahuan file untuk ADLS Gen2 dan Azure Blob Storage

Anda harus memiliki izin baca untuk direktori input. Lihat Azure Blob Storage.

Untuk menggunakan mode pemberitahuan file, Anda harus memberikan kredensial autentikasi untuk menyiapkan dan mengakses layanan pemberitahuan acara. Anda hanya memerlukan perwakilan layanan untuk autentikasi.

  • Perwakilan layanan - menggunakan peran bawaan Azure

    Buat aplikasi microsoft Entra ID (sebelumnya Azure Active Directory) dan perwakilan layanan dalam bentuk ID klien dan rahasia klien.

    Tetapkan aplikasi ini peran berikut ke akun penyimpanan tempat jalur input berada:

    • Contributor: Peran ini untuk menyiapkan sumber daya di akun penyimpanan Anda, seperti antrean dan langganan peristiwa.
    • Storage Queue Data Contributor: Peran ini untuk melakukan operasi antrean seperti mengambil dan menghapus pesan dari antrean. Peran ini diperlukan hanya ketika Anda menyediakan perwakilan layanan tanpa string koneksi.

    Tetapkan aplikasi ini peran berikut ke grup sumber daya terkait:

    Untuk informasi selengkapnya, lihat Menetapkan peran Azure menggunakan portal Microsoft Azure.

  • Perwakilan layanan - menggunakan peran kustom

    Jika Anda khawatir dengan izin berlebihan yang diperlukan untuk peran sebelumnya, Anda dapat membuat Peran Kustom dengan setidaknya izin berikut, yang tercantum di bawah ini dalam format JSON peran Azure:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    Kemudian, Anda dapat menetapkan peran kustom ini ke aplikasi Anda.

    Untuk informasi selengkapnya, lihat Menetapkan peran Azure menggunakan portal Microsoft Azure.

Izin pemuat otomatis

Pemecahan masalah kesalahan umum

Kesalahan:

java.lang.RuntimeException: Failed to create event grid subscription.

Jika Anda melihat pesan kesalahan ini saat menjalankan Auto Loader untuk pertama kalinya, Event Grid tidak terdaftar sebagai Penyedia Sumber Daya dalam langganan Azure Anda. Untuk mendaftarkan ini di portal Microsoft Azure:

  1. Pergi ke langganan Anda.
  2. Klik Penyedia Sumber di bawah bagian Pengaturan.
  3. Mendaftarkan penyedia Microsoft.EventGrid.

Kesalahan:

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

Jika Anda melihat pesan kesalahan ini saat menjalankan Auto Loader untuk pertama kalinya, pastikan Anda telah memberikan peran Kontributor kepada perwakilan layanan Anda untuk Event Grid serta akun penyimpanan Anda.

Izin yang diperlukan untuk mengonfigurasi pemberitahuan file untuk AWS S3

Anda harus memiliki izin baca untuk direktori input. Lihat Detail koneksi S3 untuk detail selengkapnya.

Untuk menggunakan mode pemberitahuan file, lampirkan dokumen kebijakan JSON berikut ke pengguna atau peran IAM Anda.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

di mana:

  • <bucket-name>: Nama wadah S3 tempat stream Anda akan membaca file, misalnya, auto-logs. Anda dapat menggunakan * sebagai kartubebas, misalnya, databricks-*-logs. Untuk mengetahui wadah S3 yang mendasari jalur DBFS Anda, Anda dapat mencantumkan semua titik pemasangan DBFS di notebook dengan menjalankan %fs mounts.
  • <region>: Wilayah AWS tempat wadah S3 berada, misalnya, us-west-2. Jika Anda tidak ingin menentukan wilayah, gunakan *.
  • <account-number>: Nomor akun AWS yang memiliki wadah S3, misalnya, 123456789012. Jika tidak ingin menentukan nomor akun, gunakan *.

String databricks-auto-ingest-* dalam spesifikasi SQS dan SNS ARN adalah awalan nama yang cloudFiles digunakan sumber saat membuat layanan SQS dan SNS. Karena Azure Databricks menyiapkan layanan pemberitahuan di awal stream, Anda dapat menggunakan kebijakan dengan izin yang dikurangi setelah menjalankan awal (misalnya, hentikan stream, lalu menghidupkan ulang).

Catatan

Kebijakan sebelumnya hanya berkaitan dengan izin yang diperlukan untuk menyiapkan layanan notifikasi file, yaitu layanan wadah pemberitahuan, SNS, dan SQS S3 dan menganggap Anda sudah memiliki akses baca ke wadah S3. Jika Anda perlu menambahkan izin baca-saja S3, tambahkan yang berikut ke Action daftar dalam DatabricksAutoLoaderSetup pernyataan di dokumen JSON:

  • s3:ListBucket
  • s3:GetObject

Izin berkurang setelah penyiapan awal

Izin penyiapan sumber daya yang dijelaskan di atas hanya diperlukan selama menjalankan stream awalan. Setelah eksekusi pertama, Anda dapat beralih ke kebijakan IAM berikut dengan izin yang dikurangi.

Penting

Dengan pengurangan izin, Anda tidak dapat memulai kueri streaming baru atau membuat ulang sumber daya jika terjadi kegagalan (misalnya, antrean SQS telah dihapus secara tidak sengaja); Anda juga tidak dapat menggunakan API manajemen sumber daya cloud untuk mencantumkan atau merobek sumber daya.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

Izin yang diperlukan untuk mengonfigurasi pemberitahuan file untuk GCS

Anda harus memiliki list dan get izin pada wadah GCS Anda dan pada semua objek. Untuk mengetahui detailnya, lihat dokumentasi Google tentang izin IAM.

Untuk menggunakan mode pemberitahuan file, Anda perlu menambahkan izin untuk akun layanan GCS dan akun yang digunakan untuk mengakses sumber daya Google Cloud Pub/Sub.

Tambahkan Pub/Sub Publisher peran ke akun layanan GCS. Ini memungkinkan akun untuk memublikasikan pesan pemberitahuan peristiwa dari wadah GCS Anda ke Google Cloud Pub/Sub.

Untuk akun layanan yang digunakan untuk sumber daya Google Cloud Pub/Sub, Anda perlu menambahkan izin berikut:

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

Untuk melakukan ini, Anda dapat membuat peran kustom IAM dengan izin ini atau menetapkan peran GCP yang sudah ada sebelumnya untuk menutupi izin ini.

Menemukan Akun Layanan GCS

Di Google Cloud Console untuk proyek yang sesuai, navigasikan ke Cloud Storage > Settings. Bagian "Akun Layanan Penyimpanan Cloud" berisi email akun layanan GCS.

Akun Layanan GCS

Membuat Cloud Google Kustom Peran IAM untuk Mode Pemberitahuan File

Di konsol Google Cloud untuk proyek yang sesuai, navigasikan ke IAM & Admin > Roles. Kemudian, buat peran di bagian atas atau perbarui peran yang ada. Di layar untuk pembuatan atau pengeditan peran, klik Add Permissions. Menu muncul di mana Anda dapat menambahkan izin yang diinginkan ke peran.

Peran Kustom GCP IAM

Mengonfigurasi atau mengelola sumber daya pemberitahuan file secara manual

Pengguna istimewa dapat mengonfigurasi atau mengelola sumber daya pemberitahuan file secara manual.

  • Siapkan layanan pemberitahuan file secara manual melalui penyedia cloud dan tentukan pengidentifikasi antrean secara manual. Lihat Opsi pemberitahuan file untuk detail selengkapnya.
  • Gunakan API Scala untuk membuat atau mengelola pemberitahuan dan layanan antrean, seperti yang ditunjukkan dalam contoh berikut:

Catatan

Anda harus memiliki izin yang sesuai untuk mengonfigurasi atau memodifikasi infrastruktur cloud. Lihat dokumentasi izin untuk Azure, S3, atau GCS.

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices())

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Gunakan setUpNotificationServices(<resource-suffix>) untuk membuat antrean dan langganan dengan nama <prefix>-<resource-suffix> (awalan tergantung pada sistem penyimpanan yang dirangkum dalam sumber daya Cloud yang digunakan dalam mode pemberitahuan file Auto Loader. Jika ada sumber daya yang ada dengan nama yang sama, Azure Databricks menggunakan kembali sumber daya yang ada alih-alih membuat sumber daya baru. Fungsi ini mengembalikan pengidentifikasi antrean yang dapat Anda lewati ke cloudFiles sumber menggunakan pengidentifikasi dalam opsi pemberitahuan file. Hal ini memungkinkan cloudFiles pengguna sumber memiliki izin lebih sedikit daripada pengguna yang membuat sumber daya.

Berikan opsi "path" ke newManager hanya jika panggilan setUpNotificationServices; tidak diperlukan untuk listNotificationServices atau tearDownNotificationServices. Ini path yang sama yang Anda gunakan saat menjalankan kueri streaming.

Matriks berikut menunjukkan metode API mana yang didukung di mana Databricks Runtime untuk setiap jenis penyimpanan:

Penyimpanan cloud Setup API Daftar API Hancurkan API
AWS S3 Semua versi Semua versi Semua versi
ADLS Gen2 Semua versi Semua versi Semua versi
GCS Databricks Runtime 9.1 ke atas Databricks Runtime 9.1 ke atas Databricks Runtime 9.1 ke atas
Azure Blob Storage Semua versi Semua versi Semua versi
ADLS Gen1 Tidak didukung Tidak didukung Tidak didukung