Co to jest tryb powiadamiania o pliku automatycznego ładowania?
W trybie powiadamiania plików automatyczne moduł ładujący automatycznie konfiguruje usługę powiadomień i usługę kolejki, która subskrybuje zdarzenia plików z katalogu wejściowego. Powiadomienia dotyczące plików umożliwiają skalowanie automatycznego modułu ładującego w celu pozyskiwania milionów plików na godzinę. W porównaniu z trybem listy katalogów tryb powiadomień plików jest bardziej wydajny i skalowalny dla dużych katalogów wejściowych lub dużej liczby plików, ale wymaga dodatkowych uprawnień do chmury.
Możesz przełączać się między powiadomieniami o plikach i listami katalogów w dowolnym momencie i nadal utrzymywać dokładnie jednokrotne gwarancje przetwarzania danych.
Uwaga
Tryb powiadamiania o plikach nie jest obsługiwany dla kont usługi Azure Premium Storage, ponieważ konta w warstwie Premium nie obsługują magazynu kolejek.
Ostrzeżenie
Zmiana ścieżki źródłowej dla automatycznego modułu ładującego nie jest obsługiwana w trybie powiadomień plików. Jeśli jest używany tryb powiadamiania o plikach i ścieżka zostanie zmieniona, pozyskanie plików, które są już obecne w nowym katalogu w momencie aktualizacji katalogu, może się nie powieść.
Tryb powiadomień plików jest obsługiwany tylko w obliczeniach pojedynczego użytkownika.
Zasoby w chmurze używane w trybie powiadamiania o plikach automatycznego ładowania
Ważne
Aby automatycznie skonfigurować infrastrukturę chmury na potrzeby trybu powiadomień o plikach, potrzebne są podwyższone uprawnienia. Skontaktuj się z administratorem chmury lub administratorem obszaru roboczego. Widzieć:
Automatyczne ładowanie może automatycznie konfigurować powiadomienia o plikach po ustawieniu opcji cloudFiles.useNotifications
na true
i podaniu niezbędnych uprawnień do tworzenia zasobów w chmurze. Ponadto może być konieczne podanie dodatkowych opcji udzielania autoryzacji modułu ładującego automatycznego w celu utworzenia tych zasobów.
Poniższa tabela zawiera podsumowanie zasobów tworzonych przez moduł automatycznego ładowania.
Magazyn w chmurze | Usługa subskrypcji | Queue Service | Przedrostek* | Ograniczać** |
---|---|---|---|---|
AWS S3 | AWS SNS | AWS SQS | databricks-auto-pozyskiwanie | 100 na zasobnik S3 |
ADLS Gen2 | Azure Event Grid | Azure Queue Storage | databricks | 500 na konto magazynu |
GCS | Google Pub/Sub | Google Pub/Sub | databricks-auto-pozyskiwanie | 100 na zasobnik GCS |
Azure Blob Storage | Azure Event Grid | Azure Queue Storage | databricks | 500 na konto magazynu |
- Automatyczne moduł ładujący nazywa zasoby tym prefiksem.
** Ile współbieżnych potoków powiadomień o plikach można uruchomić
Jeśli potrzebujesz więcej niż ograniczona liczba potoków powiadomień dotyczących plików dla danego konta magazynu, możesz:
- Skorzystaj z usługi, takiej jak AWS Lambda, Azure Functions lub Google Cloud Functions, aby rozsyłać powiadomienia z jednej kolejki, która nasłuchuje całego kontenera lub zasobnika do kolejek specyficznych dla katalogu.
Zdarzenia powiadomień o plikach
Usługa AWS S3 udostępnia ObjectCreated
zdarzenie, gdy plik jest przekazywany do zasobnika S3 niezależnie od tego, czy został przekazany przez przekazywanie put, czy wieloczęściowe.
Usługa ADLS Gen2 udostępnia różne powiadomienia o zdarzeniach dla plików wyświetlanych w kontenerze usługi Gen2.
- Moduł automatycznego ładowania nasłuchuje zdarzenia
FlushWithClose
do przetwarzania pliku. - Strumienie automatycznego modułu ładującego obsługują
RenameFile
akcję odnajdywania plików.RenameFile
akcje wymagają żądania interfejsu API do systemu magazynu w celu uzyskania rozmiaru zmienionego pliku. - Strumienie automatycznego modułu ładującego utworzone za pomocą środowiska Databricks Runtime 9.0 i po obsłudze
RenameDirectory
akcji odnajdywania plików.RenameDirectory
akcje wymagają, aby żądania interfejsu API do systemu magazynu wyświetlały listę zawartości zmienionego katalogu.
Usługa Google Cloud Storage udostępnia OBJECT_FINALIZE
zdarzenie podczas przekazywania pliku, w tym zastępowania i kopiowania plików. Przekazywanie nie powiodło się, nie generuje tego zdarzenia.
Uwaga
Dostawcy usług w chmurze nie gwarantują 100% dostarczania wszystkich zdarzeń plików w bardzo rzadkich warunkach i nie zapewniają ścisłych umów SLA dotyczących opóźnienia zdarzeń plików. Usługa Databricks zaleca, aby wyzwalać regularne wypełnianie za pomocą modułu automatycznego ładującego przy użyciu cloudFiles.backfillInterval
opcji zagwarantowania, że wszystkie pliki zostaną odnalezione w ramach danej umowy SLA, jeśli wymagana jest kompletność danych. Wyzwalanie regularnych wypełniania nie powoduje duplikatów.
Wymagane uprawnienia do konfigurowania powiadomień o plikach dla usług ADLS Gen2 i Azure Blob Storage
Musisz mieć uprawnienia do odczytu dla katalogu wejściowego. Zobacz Azure Blob Storage.
Aby użyć trybu powiadomień plików, należy podać poświadczenia uwierzytelniania na potrzeby konfigurowania usług powiadomień zdarzeń i uzyskiwania do nich dostępu. Do uwierzytelniania potrzebna jest tylko jednostka usługi.
Jednostka usługi — korzystanie z wbudowanych ról platformy Azure
Utwórz aplikację i jednostkę usługi Microsoft Entra ID (dawniej Azure Active Directory) w postaci identyfikatora klienta i klucza tajnego klienta.
Przypisz tę aplikację następujące role do konta magazynu, w którym znajduje się ścieżka wejściowa:
- Współautor: Ta rola służy do konfigurowania zasobów na koncie magazynu, takich jak kolejki i subskrypcje zdarzeń.
- Współautor danych kolejki usługi Storage: ta rola służy do wykonywania operacji kolejki, takich jak pobieranie i usuwanie komunikatów z kolejek. Ta rola jest wymagana tylko w przypadku podania jednostki usługi bez parametry połączenia.
Przypisz tę aplikację następującą rolę do powiązanej grupy zasobów:
- EventGrid EventSubscription Contributor: Ta rola służy do wykonywania operacji subskrypcji usługi Event Grid, takich jak tworzenie lub wyświetlanie subskrypcji zdarzeń.
Aby uzyskać więcej informacji, zobacz przypisywanie ról Azure za pomocą portalu Azure.
Jednostka usługi — używanie roli niestandardowej
Jeśli interesuje Cię nadmierne uprawnienia wymagane dla poprzednich ról, możesz utworzyć rolę niestandardową z co najmniej następującymi uprawnieniami wymienionymi poniżej w formacie JSON roli platformy Azure:
"permissions": [ { "actions": [ "Microsoft.EventGrid/eventSubscriptions/write", "Microsoft.EventGrid/eventSubscriptions/read", "Microsoft.EventGrid/eventSubscriptions/delete", "Microsoft.EventGrid/locations/eventSubscriptions/read", "Microsoft.Storage/storageAccounts/read", "Microsoft.Storage/storageAccounts/write", "Microsoft.Storage/storageAccounts/queueServices/read", "Microsoft.Storage/storageAccounts/queueServices/write", "Microsoft.Storage/storageAccounts/queueServices/queues/write", "Microsoft.Storage/storageAccounts/queueServices/queues/read", "Microsoft.Storage/storageAccounts/queueServices/queues/delete" ], "notActions": [], "dataActions": [ "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action" ], "notDataActions": [] } ]
Następnie możesz przypisać tę rolę niestandardową do aplikacji.
Aby uzyskać więcej informacji, zobacz przypisywanie ról Azure za pomocą portalu Azure.
Rozwiązywanie typowych problemów
Błąd:
java.lang.RuntimeException: Failed to create event grid subscription.
Jeśli ten komunikat o błędzie zostanie wyświetlony podczas pierwszego uruchomienia modułu automatycznego ładowania, oznacza to, że usługa Event Grid nie jest zarejestrowana jako dostawca zasobów w ramach subskrypcji platformy Azure. Aby zarejestrować ją w witrynie Azure Portal:
- Przejdź do subskrypcji.
- Kliknij pozycję Dostawcy zasobów w sekcji Ustawienia.
- Zarejestruj dostawcę
Microsoft.EventGrid
.
Błąd:
403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...
Jeśli ten komunikat o błędzie zostanie wyświetlony podczas pierwszego uruchomienia modułu automatycznego ładowania, upewnij się, że przyznano rolę Współautor jednostce usługi w przypadku usługi Event Grid, a także Twojego konta magazynu.
Wymagane uprawnienia do konfigurowania powiadomień o pliku dla platformy AWS S3
Musisz mieć uprawnienia do odczytu dla katalogu wejściowego. Aby uzyskać więcej informacji, zobacz szczegóły połączenia S3.
Aby użyć trybu powiadomień plików, dołącz następujący dokument zasad JSON do użytkownika lub roli IAM.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderSetup",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"s3:PutBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:CreateTopic",
"sns:TagResource",
"sns:Publish",
"sns:Subscribe",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:s3:::<bucket-name>",
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
},
{
"Sid": "DatabricksAutoLoaderList",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "*"
},
{
"Sid": "DatabricksAutoLoaderTeardown",
"Effect": "Allow",
"Action": [
"sns:Unsubscribe",
"sns:DeleteTopic",
"sqs:DeleteQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
}
]
}
gdzie:
<bucket-name>
: nazwa zasobnika S3, w którym strumień odczytuje pliki, na przykładauto-logs
. Można użyć*
jako symbolu wieloznacznych, na przykładdatabricks-*-logs
. Aby dowiedzieć się, jaki jest bazowy zasobnik S3 dla ścieżki systemu plików DBFS, możesz wyświetlić listę wszystkich punktów instalacji systemu plików DBFS w notesie, uruchamiając polecenie%fs mounts
.<region>
: region platformy AWS, w którym znajduje się zasobnik S3, na przykładus-west-2
. Jeśli nie chcesz określać regionu, użyj polecenia*
.<account-number>
: numer konta platformy AWS, który jest właścicielem zasobnika S3, na przykład123456789012
. Jeśli nie chcesz określać numeru konta, użyj polecenia*
.
Ciąg databricks-auto-ingest-*
w specyfikacji SQS i SNS ARN jest prefiksem nazwy używanym cloudFiles
przez źródło podczas tworzenia usług SQS i SNS. Ponieważ usługa Azure Databricks konfiguruje usługi powiadomień w początkowym uruchomieniu strumienia, możesz użyć zasad z ograniczonymi uprawnieniami po początkowym uruchomieniu (na przykład zatrzymać strumień, a następnie uruchomić go ponownie).
Uwaga
Powyższe zasady dotyczą tylko uprawnień wymaganych do konfigurowania usług powiadomień plików, a mianowicie powiadomień zasobników S3, SNS i SQS oraz zakłada, że masz już dostęp do odczytu do zasobnika S3. Jeśli musisz dodać uprawnienia tylko do odczytu S3, dodaj następujący kod do Action
listy w instrukcji w DatabricksAutoLoaderSetup
dokumencie JSON:
s3:ListBucket
s3:GetObject
Ograniczone uprawnienia po początkowej konfiguracji
Uprawnienia konfiguracji zasobów opisane powyżej są wymagane tylko podczas początkowego uruchomienia strumienia. Po pierwszym uruchomieniu możesz przełączyć się na następujące zasady zarządzania dostępem i tożsamościami z ograniczonymi uprawnieniami.
Ważne
Przy ograniczonych uprawnieniach nie można uruchomić nowych zapytań przesyłania strumieniowego ani odtworzyć zasobów w przypadku awarii (na przykład kolejka SQS została przypadkowo usunięta); nie można również użyć interfejsu API zarządzania zasobami w chmurze do wyświetlania listy lub usuwania zasobów.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderUse",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:TagResource",
"sns:Publish",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:<queue-name>",
"arn:aws:sns:<region>:<account-number>:<topic-name>",
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::<bucket-name>/*"
]
},
{
"Sid": "DatabricksAutoLoaderListTopics",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "arn:aws:sns:<region>:<account-number>:*"
}
]
}
Wymagane uprawnienia do konfigurowania powiadomień o plikach dla usługi GCS
Musisz mieć list
uprawnienia i get
w zasobniku GCS oraz na wszystkich obiektach. Aby uzyskać szczegółowe informacje, zobacz dokumentację firmy Google dotyczącą uprawnień do zarządzania dostępem i tożsamościami.
Aby użyć trybu powiadomień dotyczących plików, musisz dodać uprawnienia do konta usługi GCS i konta używanego do uzyskiwania dostępu do zasobów usługi Google Cloud Pub/Sub.
Pub/Sub Publisher
Dodaj rolę do konta usługi GCS. Dzięki temu konto może publikować komunikaty powiadomień o zdarzeniach z zasobników GCS do usługi Google Cloud Pub/Sub.
Jeśli chodzi o konto usługi używane dla zasobów Google Cloud Pub/Sub, należy dodać następujące uprawnienia:
pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update
W tym celu możesz utworzyć rolę niestandardową IAM z tymi uprawnieniami lub przypisać istniejące role GCP, aby uwzględnić te uprawnienia.
Znajdowanie konta usługi GCS
W konsoli Google Cloud Console dla odpowiedniego projektu przejdź do Cloud Storage > Settings
strony .
Sekcja "Konto usługi magazynu w chmurze" zawiera adres e-mail konta usługi GCS.
Tworzenie niestandardowej roli IAM w chmurze Google dla trybu powiadomień plików
W konsoli Google Cloud dla odpowiedniego projektu przejdź do IAM & Admin > Roles
strony . Następnie utwórz rolę u góry lub zaktualizuj istniejącą rolę. Na ekranie tworzenia lub edytowania roli kliknij pozycję Add Permissions
. Zostanie wyświetlone menu, w którym można dodać odpowiednie uprawnienia do roli.
Ręczne konfigurowanie zasobów powiadomień o plikach lub zarządzanie nimi
Użytkownicy uprzywilejowani mogą ręcznie konfigurować zasoby powiadomień o plikach lub zarządzać nimi.
- Ręcznie skonfiguruj usługi powiadomień o plikach za pośrednictwem dostawcy usług w chmurze i ręcznie określ identyfikator kolejki. Aby uzyskać więcej informacji, zobacz Opcje powiadomień o plikach.
- Użyj interfejsów API języka Scala, aby utworzyć powiadomienia i usługi kolejkowania oraz zarządzać nimi, jak pokazano w poniższym przykładzie:
Uwaga
Musisz mieć odpowiednie uprawnienia do konfigurowania lub modyfikowania infrastruktury chmury. Zobacz dokumentację uprawnień dla platformy Azure, S3 lub GCS.
Python
# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds
# COMMAND ----------
#####################################
## Creating a ResourceManager in AWS
#####################################
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in Azure
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.connectionString", <connection-string>) \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("cloudFiles.tenantId", <tenant-id>) \
.option("cloudFiles.clientId", <service-principal-client-id>) \
.option("cloudFiles.clientSecret", <service-principal-client-secret>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)
# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Scala
/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////
import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
.newManager
.option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
.option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////
import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
.newManager
.option("cloudFiles.connectionString", <connection-string>)
.option("cloudFiles.resourceGroup", <resource-group>)
.option("cloudFiles.subscriptionId", <subscription-id>)
.option("cloudFiles.tenantId", <tenant-id>)
.option("cloudFiles.clientId", <service-principal-client-id>)
.option("cloudFiles.clientSecret", <service-principal-client-secret>)
.option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////
import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
.newManager
.option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
.create()
// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
// List notification services created by <AL>
val df = manager.listNotificationServices()
// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Służy setUpNotificationServices(<resource-suffix>)
do tworzenia kolejki i subskrypcji o nazwie <prefix>-<resource-suffix>
(prefiks zależy od systemu magazynu podsumowanego w zasobach w chmurze używanych w trybie powiadomień pliku automatycznego ładowania. Jeśli istnieje zasób o tej samej nazwie, usługa Azure Databricks ponownie użyje istniejącego zasobu zamiast utworzyć nowy. Ta funkcja zwraca identyfikator kolejki, który można przekazać do cloudFiles
źródła przy użyciu identyfikatora w opcjach powiadomień pliku. Dzięki cloudFiles
temu użytkownik źródłowy może mieć mniej uprawnień niż użytkownik tworzący zasoby.
Podaj opcję newManager
tylko w przypadku wywołania setUpNotificationServices
metody ; nie jest ona wymagana dla listNotificationServices
elementu lub tearDownNotificationServices
."path"
Jest to samo path
, które jest używane podczas uruchamiania zapytania przesyłania strumieniowego.
Poniższa macierz wskazuje, które metody interfejsu API są obsługiwane w środowisku Databricks Runtime dla każdego typu magazynu:
Magazyn w chmurze | Konfigurowanie interfejsu API | Interfejs API listy | Zrywanie interfejsu API |
---|---|---|---|
AWS S3 | Wszystkie wersje | Wszystkie wersje | Wszystkie wersje |
ADLS Gen2 | Wszystkie wersje | Wszystkie wersje | Wszystkie wersje |
GCS | Databricks Runtime 9.1 i nowsze | Databricks Runtime 9.1 i nowsze | Databricks Runtime 9.1 i nowsze |
Azure Blob Storage | Wszystkie wersje | Wszystkie wersje | Wszystkie wersje |
ADLS Gen1 | Nieobsługiwane | Nieobsługiwane | Nieobsługiwane |