Aracılığıyla paylaş


Otomatik Yükleyici dosya bildirim modu nedir?

Dosya bildirim modunda, Otomatik Yükleyici otomatik olarak giriş dizininden dosya olaylarına abone olan bir bildirim hizmeti ve kuyruk hizmeti ayarlar. Otomatik Yükleyici'yi saatte milyonlarca dosya almak üzere ölçeklendirmek için dosya bildirimlerini kullanabilirsiniz. Dizin listeleme moduyla karşılaştırıldığında, dosya bildirim modu büyük giriş dizinleri veya yüksek hacimli dosyalar için daha performanslı ve ölçeklenebilir ancak ek bulut izinleri gerektirir.

İstediğiniz zaman dosya bildirimleriyle dizin listesi arasında geçiş yapabilir ve yine de tam olarak bir kez veri işleme garantisi sağlayabilirsiniz.

Not

Premium hesaplar kuyruk depolamayı desteklemediğinden Azure premium depolama hesapları için dosya bildirim modu desteklenmez.

Uyarı

Otomatik Yükleyici'nin kaynak yolunun değiştirilmesi dosya bildirim modunda desteklenmez. Dosya bildirim modu kullanılırsa ve yol değiştirilirse, dizin güncelleştirme sırasında yeni dizinde zaten var olan dosyaları alamayabilirsiniz.

Otomatik Yükleyici dosya bildirim modunda kullanılan bulut kaynakları

Önemli

Dosya bildirim modu için bulut altyapısını otomatik olarak yapılandırmak için yükseltilmiş izinlere ihtiyacınız vardır. Bulut yöneticinize veya çalışma alanı yöneticinize başvurun. Görmek:

Seçeneği olarak ayarladığınızda cloudFiles.useNotificationstrue ve bulut kaynakları oluşturmak için gerekli izinleri sağladığınızda Otomatik Yükleyici sizin için otomatik olarak dosya bildirimleri ayarlayabilir. Ayrıca, bu kaynakları oluşturmak için Otomatik Yükleyici yetkilendirmesi vermek için ek seçenekler sağlamanız gerekebilir.

Aşağıdaki tabloda Otomatik Yükleyici tarafından hangi kaynakların oluşturulduğu özetlenir.

Bulut Depolama Abonelik Hizmeti Kuyruk Hizmeti Önek* Sınırlamak**
AWS S3 AWS SNS AWS SQS databricks-auto-ingest S3 demet başına 100
ADLS 2. Nesil Azure Event Grid Azure Kuyruk Depolama databricks Depolama hesabı başına 500
GCS Google Pub/Sub Google Pub/Sub databricks-auto-ingest GCS demet başına 100
Azure Blob Storage Azure Event Grid Azure Kuyruk Depolama databricks Depolama hesabı başına 500
  • Otomatik Yükleyici kaynakları bu önekle adlandırir.

** Kaç eşzamanlı dosya bildirim işlem hattı başlatılabilir?

Belirli bir depolama hesabı için sınırlı sayıda dosya bildirimi işlem hattı çalıştırmanız gerekiyorsa şunları yapabilirsiniz:

  • Kapsayıcının veya demetin tamamını dizine özgü kuyruklara dinleyen tek bir kuyruktan gelen bildirimleri yaymak için AWS Lambda, Azure İşlevleri veya Google Cloud Functions gibi bir hizmetten yararlanın.

Dosya bildirim olayları

AWS S3, bir dosyanın bir S3 demetine yüklenip yüklenmediğine bakılmaksızın bir dosya yüklendiğinde bir olay sağlar ObjectCreated .

ADLS 2. Nesil, 2. Nesil kapsayıcınızda görünen dosyalar için farklı olay bildirimleri sağlar.

  • Otomatik Yükleyici, dosyayı işlemek için FlushWithClose olayı dinler.
  • Otomatik Yükleyici akışları, dosyaları bulma eylemini destekler RenameFile . RenameFile eylemler, yeniden adlandırılan dosyanın boyutunu almak için depolama sistemine bir API isteği gerektirir.
  • Databricks Runtime 9.0 ile oluşturulan ve dosyaları bulma eylemini RenameDirectory destekleyen Otomatik Yükleyici akışları. RenameDirectory eylemleri, yeniden adlandırılan dizinin içeriğini listelemek için depolama sistemine API isteklerini gerektirir.

Google Cloud Storage, üzerine yazma işlemleri ve dosya kopyaları içeren bir dosya karşıya yüklendiğinde bir OBJECT_FINALIZE olay sağlar. Başarısız karşıya yüklemeler bu olayı oluşturmaz.

Not

Bulut sağlayıcıları, çok nadir koşullarda tüm dosya olaylarının %100 teslimini garanti etmemektedir ve dosya olaylarının gecikme süresi konusunda katı SLA'lar sağlamaz. Databricks, veri tamlığı bir gereksinim olduğunda tüm dosyaların belirli bir SLA içinde bulunmasını garanti etme seçeneğini kullanarak cloudFiles.backfillInterval Otomatik Yükleyici ile düzenli doldurmaları tetiklemenizi önerir. Normal geri doldurmaların tetiklenmesi yinelemelere neden olmaz.

ADLS 2. Nesil ve Azure Blob Depolama için dosya bildirimini yapılandırmak için gerekli izinler

Giriş dizini için okuma izinlerine sahip olmanız gerekir. bkz. Azure Blob Depolama.

Dosya bildirim modunu kullanmak için olay bildirim hizmetlerini ayarlamak ve bunlara erişmek için kimlik doğrulama kimlik bilgilerini sağlamanız gerekir. Yalnızca kimlik doğrulaması için bir hizmet sorumlusuna ihtiyacınız vardır.

  • Hizmet sorumlusu - Azure yerleşik rollerini kullanma

    İstemci kimliği ve istemci gizli dizisi biçiminde bir Microsoft Entra Kimliği (eski adıYla Azure Active Directory) uygulaması ve hizmet sorumlusu oluşturun.

    Bu uygulamaya, giriş yolunun bulunduğu depolama hesabına aşağıdaki rolleri atayın:

    • Katkıda Bulunan: Bu rol, depolama hesabınızda kuyruklar ve olay abonelikleri gibi kaynakları ayarlamak içindir.
    • Depolama Kuyruğu Verileri Katkıda Bulunanı: Bu rol, kuyruklardan ileti alma ve silme gibi kuyruk işlemlerini gerçekleştirmek içindir. Bu rol yalnızca bağlantı dizesi olmayan bir hizmet sorumlusu sağladığınızda gereklidir.

    Bu uygulamayı ilgili kaynak grubuna aşağıdaki rolü atayın:

    Daha fazla bilgi edinmek için bkz. Azure portal kullanarak Azure rolleri atama.

  • Hizmet sorumlusu - özel rol kullanma

    Önceki roller için gereken aşırı izinlerle ilgileniyorsanız, aşağıda Azure rolü JSON biçiminde listelenen en azından aşağıdaki izinlere sahip bir Özel Rol oluşturabilirsiniz:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    Ardından, bu özel rolü uygulamanıza atayabilirsiniz.

    Daha fazla bilgi edinmek için bkz. Azure portal kullanarak Azure rolleri atama.

Otomatik yükleyici izinleri

Sık karşılaşılan hataları giderme

Hata:

java.lang.RuntimeException: Failed to create event grid subscription.

Otomatik Yükleyici'yi ilk kez çalıştırdığınızda bu hata iletisini görürseniz Event Grid, Azure aboneliğinizde Kaynak Sağlayıcısı olarak kaydedilmez. Bunu Azure portalına kaydetmek için:

  1. Aboneliğinize gidin.
  2. Ayarlar bölümünün altında Kaynak Sağlayıcıları'na tıklayın.
  3. Kaynak sağlayıcıyı kaydetme Microsoft.EventGrid.

Hata:

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

Otomatik Yükleyici'yi ilk kez çalıştırdığınızda bu hata iletisini görürseniz, Event Grid için hizmet sorumlunuza ve depolama hesabınıza Katkıda Bulunan rolü verdiğinizden emin olun.

AWS S3 için dosya bildirimini yapılandırmak için gerekli izinler

Giriş dizini için okuma izinlerine sahip olmanız gerekir. Diğer ayrıntılar için bkz. S3 bağlantı ayrıntıları.

Dosya bildirim modunu kullanmak için aşağıdaki JSON ilkesi belgesini IAM kullanıcınıza veya rolünüze ekleyin.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

konumu:

  • <bucket-name>: Akışınızın dosyaları okuyacağı S3 demet adı, örneğin. auto-logs Joker karakter olarak kullanabilirsiniz * , örneğin, databricks-*-logs. DBFS yolunuz için temel alınan S3 demetini bulmak için, komutunu çalıştırarak %fs mountsbir not defterindeki tüm DBFS bağlama noktalarını listeleyebilirsiniz.
  • <region>: S3 demetinin bulunduğu AWS bölgesi, örneğin. us-west-2 Bölgeyi belirtmek istemiyorsanız kullanın *.
  • <account-number>: S3 demetinin sahibi olan AWS hesap numarası, örneğin. 123456789012 Hesap numarasını belirtmek istemiyorsanız kullanın *.

SQS ve SNS ARN belirtimindeki dize databricks-auto-ingest-* , kaynağın SQS ve SNS hizmetleri oluştururken kullandığı ad ön ekidir cloudFiles . Azure Databricks, akışın ilk çalıştırmasında bildirim hizmetlerini ayarladığından, ilk çalıştırmadan sonra azaltılmış izinlere sahip bir ilke kullanabilirsiniz (örneğin, akışı durdurun ve sonra yeniden başlatın).

Not

Önceki ilke yalnızca S3 demeti bildirimi, SNS ve SQS hizmetleri gibi dosya bildirim hizmetlerini ayarlamak için gereken izinlerle ilgilidir ve S3 demetine zaten okuma erişiminiz olduğunu varsayar. S3 salt okunur izinlerini eklemeniz gerekiyorsa, JSON belgesindeki deyimdeki DatabricksAutoLoaderSetup listeye aşağıdakileri Action ekleyin:

  • s3:ListBucket
  • s3:GetObject

İlk kurulumdan sonra azaltılan izinler

Yukarıda açıklanan kaynak kurulum izinleri yalnızca akışın ilk çalıştırması sırasında gereklidir. İlk çalıştırmadan sonra, azaltılmış izinlerle aşağıdaki IAM ilkesine geçebilirsiniz.

Önemli

Azaltılmış izinlerle, hata durumunda yeni akış sorguları başlatamaz veya kaynakları yeniden oluşturamazsınız (örneğin, SQS kuyruğu yanlışlıkla silinmiştir); Ayrıca kaynakları listelemek veya yok etmek için bulut kaynak yönetimi API'sini kullanamazsınız.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

GCS için dosya bildirimini yapılandırmak için gerekli izinler

GCS demetinizde ve get tüm nesnelerde ve izinlerine sahip list olmanız gerekir. Ayrıntılar için IAM izinleriyle ilgili Google belgelerine bakın.

Dosya bildirim modunu kullanmak için GCS hizmet hesabı ve Google Cloud Pub/Sub kaynaklarına erişmek için kullanılan hesap için izinler eklemeniz gerekir.

Pub/Sub Publisher Rolü GCS hizmet hesabına ekleyin. Bu, hesabın GCS demetlerinizdeki olay bildirimi iletilerini Google Cloud Pub/Sub'a yayımlamasına olanak tanır.

Google Cloud Pub/Sub kaynakları için kullanılan hizmet hesabına gelince, aşağıdaki izinleri eklemeniz gerekir:

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

Bunu yapmak için, bu izinlerle bir IAM özel rolü oluşturabilir veya bu izinleri kapsayacak önceden var olan GCP rollerini atayabilirsiniz.

GCS Hizmet Hesabını Bulma

İlgili projenin Google Bulut Konsolu'nda adresine Cloud Storage > Settingsgidin. "Bulut Depolama Hizmeti Hesabı" bölümü GCS hizmet hesabının e-postasını içerir.

GCS Hizmet Hesabı

Dosya Bildirim Modu için Özel Google Cloud IAM Rolü Oluşturma

İlgili projenin Google Cloud konsolunda adresine IAM & Admin > Rolesgidin. Ardından, en üstte bir rol oluşturun veya mevcut bir rolü güncelleştirin. Rol oluşturma veya düzenleme ekranında öğesine tıklayın Add Permissions. role istediğiniz izinleri ekleyebileceğiniz bir menü görüntülenir.

GCP IAM Özel Rolleri

Dosya bildirim kaynaklarını el ile yapılandırma veya yönetme

Ayrıcalıklı kullanıcılar dosya bildirim kaynaklarını el ile yapılandırabilir veya yönetebilir.

  • Dosya bildirim hizmetlerini bulut sağlayıcısı aracılığıyla el ile ayarlayın ve kuyruk tanımlayıcısını el ile belirtin. Diğer ayrıntılar için bkz . Dosya bildirim seçenekleri .
  • Aşağıdaki örnekte gösterildiği gibi bildirimleri ve kuyruğa alma hizmetlerini oluşturmak veya yönetmek için Scala API'lerini kullanın:

Not

Bulut altyapısını yapılandırmak veya değiştirmek için uygun izinlere sahip olmanız gerekir. Bkz. Azure, S3 veya GCS için izin belgeleri.

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices())

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Adlı <prefix>-<resource-suffix> bir kuyruk ve abonelik oluşturmak için kullanın setUpNotificationServices(<resource-suffix>) (ön ek, Otomatik Yükleyici dosya bildirim modunda kullanılan Bulut kaynakları bölümünde özetlenen depolama sistemine bağlıdır. Aynı ada sahip bir kaynak varsa, Azure Databricks yeni bir kaynak oluşturmak yerine mevcut kaynağı yeniden kullanabilir. Bu işlev, Dosya bildirim seçeneklerindeki tanımlayıcıyı cloudFiles kullanarak kaynağa geçirebileceğiniz bir kuyruk tanımlayıcısı döndürür. Bu, kaynak kullanıcının kaynakları oluşturan kullanıcıdan daha az izine sahip olmasını sağlar cloudFiles .

"path"newManager seçeneğini belirtin; yalnızca çağrılırsa setUpNotificationServices; veya tearDownNotificationServicesiçin listNotificationServices gerekli değildir. Bu, akış sorgusu çalıştırırken kullandığınızla aynıdır path .

Aşağıdaki matris, her depolama türü için Databricks Runtime'ın hangi API yöntemlerinin desteklendiği gösterir:

Bulut Depolama Kurulum API'si Liste API'si API'yi yıkma
AWS S3 Tüm sürümler Tüm sürümler Tüm sürümler
ADLS 2. Nesil Tüm sürümler Tüm sürümler Tüm sürümler
GCS Databricks Runtime 9.1 ve üzeri Databricks Runtime 9.1 ve üzeri Databricks Runtime 9.1 ve üzeri
Azure Blob Storage Tüm sürümler Tüm sürümler Tüm sürümler
ADLS 1. Nesil Desteklenmeyen Desteklenmeyen Desteklenmeyen