自動ローダー ファイル通知モードとは
ファイル通知モードでは、自動ローダーは、入力ディレクトリからファイル イベントをサブスクライブする通知サービスとキュー サービスを自動的に設定します。 ファイル通知を使用して、自動ローダーをスケーリングし、1 時間に数百万のファイルを取り込むことができます。 ディレクトリ一覧モードと比較すると、ファイル通知モードは、大規模な入力ディレクトリや大量のファイルに対して高いパフォーマンスと拡張性をもたらしますが、追加のクラウド アクセス許可が必要になります。
ファイル通知とディレクトリ一覧をいつでも切り替えて、データが厳密に 1 回だけ処理されるという保証を引き続き維持できます。
Note
Azure Premium Storage アカウントではファイル通知モードはサポートされていません。Premium アカウントではキュー ストレージがサポートされていないためです。
警告
自動ローダーのソース パスの変更は、ファイル通知モードではサポートされていません。 ファイル通知モードが使用されていて、パスが変更済みの場合にディレクトリを更新する際、新しいディレクトリに既に存在するファイルは、その取り込みに失敗する可能性があります。
ファイル通知モードは、単一ユーザー コンピューティングでのみサポートされます。
自動ローダー ファイル通知モードで使用されるクラウド リソース
重要
ファイル通知モード用にクラウド インフラストラクチャを自動的に構成するには、高い権限のアクセス許可が必要です。 クラウド管理者またはワークスペース管理者に問い合わせてください。以下を参照してください。
オプション cloudFiles.useNotifications
を true
に設定し、クラウドリソースの作成に必要なアクセス許可を提供するときに、自動ローダーで自動的にファイル通知を設定できます。 さらに、これらのリソースを作成する認可を自動ローダーに与えるために、追加オプションを指定する必要が生じることもあります。
次の表に、自動ローダーによって作成されるリソースを示します。
クラウド ストレージ | サブスクリプション サービス | Queue サービス | プレフィックス * | 上限 ** |
---|---|---|---|---|
AWS S3 | AWS SNS | AWS SQS | databricks-auto-ingest | S3 バケットあたり 100 |
ADLS Gen2 | Azure Event Grid | Azure Queue Storage | databricks | ストレージ アカウントあたり 500 |
GCS | Google Pub/Sub | Google Pub/Sub | databricks-auto-ingest | GCS バケットあたり 100 |
Azure Blob Storage | Azure Event Grid | Azure Queue Storage | databricks | ストレージ アカウントあたり 500 |
- 自動ローダーにより、このプレフィックスを使用してリソースに名前を付けます。
** 起動できる同時実行ファイル通知パイプラインの数
特定のストレージ アカウントに対して、限られた数を超えるファイル通知パイプラインを実行する必要がある場合は、次のように行えます。
- AWS Lambda、Azure Functions、Google Cloud Functions などのサービスを利用して、コンテナーまたはバケット全体をリッスンする単一のキューからの通知を、ディレクトリ固有のキューにファンアウトします。
ファイル通知イベント
AWS S3 では、put アップロードによってアップロードされたか、マルチパート アップロードによってアップロードされたかにかかわらず、S3 バケットにファイルがアップロードされたときに ObjectCreated
イベントが提供されます。
ADLS Gen2 では、Gen2 コンテナーに表示されるファイルに対して異なるイベント通知が提供されます。
- 自動ローダーは、ファイルを処理するために
FlushWithClose
イベントをリッスンします。 - 自動ローダー ストリームでは、ファイルを検出するための
RenameFile
アクションがサポートされています。RenameFile
アクションでは、名前が変更されたファイルのサイズを取得するために、ストレージ システムへの API 要求が必要です。 - Databricks Runtime 9.0 以降で作成された自動ローダー ストリームは、ファイルを検出するための
RenameDirectory
アクションをサポートします。RenameDirectory
アクションでは、名前が変更されたディレクトリのコンテンツをリストするために、ストレージ システムへの API 要求が必要です。
Google Cloud Storage では、ファイルのアップロード時に、上書きやファイル コピーなどの OBJECT_FINALIZE
イベントが提供されます。 失敗したアップロードでは、このイベントは生成されません。
注意
クラウド プロバイダーは、非常にまれな条件下でのすべてのファイル イベントの 100% の配信を保証しておらず、ファイル イベントの待機時間に関する厳密な SLA を提供していません。 Databricks では、データの完全性が要件になる場合、cloudFiles.backfillInterval
オプションを使用することで自動ローダーによる定期的なバックフィルをトリガーし、所定の SLA の範囲内ですべてのファイルが検出されるように保証することを推奨しています。 定期的なバックフィルをトリガーしても、重複は発生しません。
ADLS Gen2 および Azure Blob Storage のファイル通知を構成するために必要なアクセス許可
入力ディレクトリに対する読み取りアクセス許可が必要です。 「Azure Blob ストレージ」を参照してください。
ファイル通知モードを使用するには、イベント通知サービスを設定してアクセスするための認証資格情報を指定する必要があります。 認証に必要なのはサービス プリンシパルのみです。
サービス プリンシパル - Azure 組み込みロールを使用
クライアント ID とクライアント シークレットの形式で、Microsoft Entra ID (旧称 Azure Active Directory) アプリとサービス プリンシパルを作成します。
このアプリに、入力パスが存在するストレージ アカウントへの次のロールを割り当てます。
- 共同作成者: これは、キューやイベント サブスクリプションなど、ストレージ アカウント内のリソースを設定するためのロールです。
- ストレージ キュー データ共同作成者: これは、キューからのメッセージの取得や削除などのキュー操作を実行するためのロールです。 このロールは、接続文字列を指定せずにサービス プリンシパルを指定する場合にのみ必要です。
このアプリに、関連リソース グループへの次のロールを割り当てます。
- EventGrid EventSubscription 共同作成者: このロールは、イベント サブスクリプションの作成や一覧表示などのイベント グリッド サブスクリプション操作を実行するためのロールです。
詳細については、Azure portal を使用して Azure ロールを割り当てる方法に関するページを参照してください。
サービス プリンシパル - カスタム ロールを使用
上記のロールに必要なアクセス許可の過剰な付与が懸念される場合は、Azure ロールの JSON 形式で、以下に一覧表示されている、少なくとも次のアクセス許可を持つカスタム役割を作成できます。
"permissions": [ { "actions": [ "Microsoft.EventGrid/eventSubscriptions/write", "Microsoft.EventGrid/eventSubscriptions/read", "Microsoft.EventGrid/eventSubscriptions/delete", "Microsoft.EventGrid/locations/eventSubscriptions/read", "Microsoft.Storage/storageAccounts/read", "Microsoft.Storage/storageAccounts/write", "Microsoft.Storage/storageAccounts/queueServices/read", "Microsoft.Storage/storageAccounts/queueServices/write", "Microsoft.Storage/storageAccounts/queueServices/queues/write", "Microsoft.Storage/storageAccounts/queueServices/queues/read", "Microsoft.Storage/storageAccounts/queueServices/queues/delete" ], "notActions": [], "dataActions": [ "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action" ], "notDataActions": [] } ]
その後、このカスタム役割をアプリに割り当てることができます。
詳細については、Azure portal を使用して Azure ロールを割り当てる方法に関するページを参照してください。
一般的なエラーのトラブルシューティング
エラー:
java.lang.RuntimeException: Failed to create event grid subscription.
自動ローダーを初めて実行するときにこのエラー メッセージが表示される場合、Event Grid は Azure サブスクリプションのリソース プロバイダーとして登録されていません。 これを登録するには、Azure portal で次のように実行します。
- サブスクリプションに移動します。
- [設定] セクションで、[リソース プロバイダー] をクリックします。
- プロバイダー
Microsoft.EventGrid
を登録します。
エラー:
403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...
自動ローダーを初めて実行するときにこのエラー メッセージが表示される場合は、Event Grid とストレージ アカウントのサービス プリンシパルに共同作成者ロールを指定してください。
AWS S3 のファイル通知を構成するために必要なアクセス許可
入力ディレクトリに対する読み取りアクセス許可が必要です。 詳細については、S3 接続の詳細を参照してください。
ファイル通知モードを使用するには、次の JSON ポリシー ドキュメントを IAM ユーザーまたはロールにアタッチします。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderSetup",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"s3:PutBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:CreateTopic",
"sns:TagResource",
"sns:Publish",
"sns:Subscribe",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:s3:::<bucket-name>",
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
},
{
"Sid": "DatabricksAutoLoaderList",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "*"
},
{
"Sid": "DatabricksAutoLoaderTeardown",
"Effect": "Allow",
"Action": [
"sns:Unsubscribe",
"sns:DeleteTopic",
"sqs:DeleteQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
}
]
}
各値の説明:
<bucket-name>
: ストリームでファイルが読み取られる S3 バケットの名前 (例:auto-logs
)。 ワイルドカードとして*
を使用できます (例:databricks-*-logs
)。 DBFS パスの基になる S3 バケットを見つけるには、%fs mounts
を実行して、ノートブック内のすべての DBFS マウント ポイントを一覧表示します。<region>
: S3 バケットが存在する AWS リージョン (例:us-west-2
)。 リージョンを指定しない場合は、*
を使用します。<account-number>
: S3 バケットを所有する AWS アカウント番号 (例:123456789012
)。 アカウント番号を指定しない場合は、*
を使用します。
SQS および SNS ARN 指定の文字列 databricks-auto-ingest-*
は、SQS および SNS サービスを作成するときに cloudFiles
ソースで使用される名前プレフィックスです。 Azure Databricks は、ストリームの初回実行時に通知サービスを設定するため、最初の実行後にアクセス許可が削減されたポリシーを使用できます (たとえば、ストリームを停止してから再起動するなど)。
注意
前のポリシーは、ファイル通知サービス (つまり、S3 バケット通知、SNS、および SQS の各サービス) を設定するために必要なアクセス許可にのみ関係し、S3 バケットへの読み取りアクセス権が既にあることを想定しています。 S3 読み取り専用アクセス許可を追加する必要がある場合は、JSON ドキュメントの DatabricksAutoLoaderSetup
ステートメントの Action
リストに以下を追加してください。
s3:ListBucket
s3:GetObject
初期セットアップ後のアクセス許可の削減
上記のリソース設定のアクセス許可は、ストリームの最初の実行時にのみ必要になります。 最初の実行後は、アクセス許可が削減された次の IAM ポリシーに切り替えることができます。
重要
アクセス許可を減らした場合、新しいストリーミング クエリを開始したり、失敗した場合にリソースを再作成したりすることはできません (たとえば、SQS キューが誤って削除された場合など)。また、クラウド リソース管理 API を使用して、リソースをリストしたり破棄したりすることもできません。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderUse",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:TagResource",
"sns:Publish",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:<queue-name>",
"arn:aws:sns:<region>:<account-number>:<topic-name>",
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::<bucket-name>/*"
]
},
{
"Sid": "DatabricksAutoLoaderListTopics",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "arn:aws:sns:<region>:<account-number>:*"
}
]
}
GCS のファイル通知を構成するために必要なアクセス許可
GCS バケットとすべてのオブジェクトに対する list
および get
の権限が必要です。 詳細については、IAM 権限に関する Google ドキュメントを参照してください。
ファイル通知モードを使用するには、GCS サービス アカウントと、Google Cloud Pub/Sub リソースへのアクセスに使用されるアカウントに権限を追加する必要があります。
GCS サービス アカウントに Pub/Sub Publisher
ロールを追加します。 これにより、このアカウントで GCS バケットから Google Cloud Pub/Sub にイベント通知メッセージを発行できます。
Google Cloud Pub/Sub リソースに使用されるサービス アカウントについては、次の権限を追加する必要があります。
pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update
これを行うには、これらの権限を持つ IAM カスタム ロールを作成するか、これらの権限をカバーする既存の GCP ロールを割り当てます。
GCS サービス アカウントを見つける
対応するプロジェクトの Google Cloud Console で、Cloud Storage > Settings
に移動します。
[Cloud Storage サービス アカウント] セクションには、GCS サービス アカウントのメールアドレスが含まれます。
ファイル通知モード用のカスタム Google Cloud IAM ロールを作成する
対応するプロジェクトの Google Cloud Console で、IAM & Admin > Roles
に移動します。 次に、上部でロールを作成するか、既存のロールを更新します。 ロールの作成または編集の画面で、Add Permissions
をクリックします。 メニューが表示され、ここで必要な権限をロールに追加できます。
ファイル通知リソースを手動で構成または管理する
権限のあるユーザーは、ファイル通知リソースを手動で構成または管理することができます。
- クラウド プロバイダーを使用してファイル通知サービスを手動で設定し、キュー識別子を手動で指定します。 詳細については「ファイル通知オプション」を参照してください。
- 次の例に示すように、Scala API を使用して、通知サービスとキュー サービスを作成または管理します。
Python
# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds
# COMMAND ----------
#####################################
## Creating a ResourceManager in AWS
#####################################
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in Azure
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.connectionString", <connection-string>) \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("cloudFiles.tenantId", <tenant-id>) \
.option("cloudFiles.clientId", <service-principal-client-id>) \
.option("cloudFiles.clientSecret", <service-principal-client-secret>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)
# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Scala
/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////
import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
.newManager
.option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
.option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////
import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
.newManager
.option("cloudFiles.connectionString", <connection-string>)
.option("cloudFiles.resourceGroup", <resource-group>)
.option("cloudFiles.subscriptionId", <subscription-id>)
.option("cloudFiles.tenantId", <tenant-id>)
.option("cloudFiles.clientId", <service-principal-client-id>)
.option("cloudFiles.clientSecret", <service-principal-client-secret>)
.option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////
import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
.newManager
.option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
.create()
// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
// List notification services created by <AL>
val df = manager.listNotificationServices()
// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
setUpNotificationServices(<resource-suffix>)
を使用して、名前 <prefix>-<resource-suffix>
を持つキューとサブスクリプションを作成します (プレフィックスは「自動ローダー ファイル通知モードで使用されるクラウド リソース」で要約されているストレージ システムにより異なります)。 同じ名前の既存のリソースがある場合、Azure Databricks は、新たに作成する代わりに、既存のリソースを再利用します。 この関数では、「ファイル通知オプション」の識別子を使用して cloudFiles
ソースに渡すことのできるキュー識別子が返されます。 これにより、cloudFiles
ソース ユーザーに、リソースを作成するユーザーよりも少ないアクセス許可を付与できます。
"path"
オプションは、setUpNotificationServices
を呼び出す場合にのみ newManager
に指定します。listNotificationServices
または tearDownNotificationServices
では必要ありません。 これは、ストリーミング クエリを実行するときに使用するのと同じ path
です。
次の表は、ストレージの種類ごとに Databricks Runtime でサポートされている API メソッドを示しています。
クラウド ストレージ | API のセットアップ | API の一覧 | API の破棄 |
---|---|---|---|
AWS S3 | すべてのバージョン | すべてのバージョン | すべてのバージョン |
ADLS Gen2 | すべてのバージョン | すべてのバージョン | すべてのバージョン |
GCS | Databricks Runtime 9.1 以降 | Databricks Runtime 9.1 以降 | Databricks Runtime 9.1 以降 |
Azure Blob Storage | すべてのバージョン | すべてのバージョン | すべてのバージョン |
ADLS Gen1 | サポートされていません | サポートされていない | サポートされていません |