Mi az automatikus betöltő fájlértesítési módja?
Fájlértesítési módban az Automatikus betöltő automatikusan beállít egy értesítési szolgáltatást és egy üzenetsor-szolgáltatást, amely feliratkozik a bemeneti könyvtárból származó fájleseményekre. Fájlértesítések használatával skálázhatja az Automatikus betöltőt úgy, hogy óránként több millió fájlt tölt be. A címtár-listázási módhoz képest a fájlértesítési mód nagyobb teljesítményű és méretezhető nagy bemeneti könyvtárak vagy nagy mennyiségű fájl esetén, de további felhőbeli engedélyeket igényel.
Bármikor válthat a fájlértesítések és a címtár-listaelemek között, és továbbra is pontosan egyszeri adatfeldolgozási garanciákat tart fenn.
Feljegyzés
A fájlértesítési mód az Azure Premium Storage-fiókok esetében nem támogatott, mert a prémium szintű fiókok nem támogatják az üzenetsor-tárolást.
Figyelmeztetés
Az automatikus betöltő forrásútvonalának módosítása a fájlértesítési mód esetében nem támogatott. Ha fájlértesítési módot használ, és az elérési útvonal megváltozik, előfordulhat, hogy nem sikerül olyan fájlokat bevinnie, amelyek a könyvtár frissítésének időpontjában már jelen vannak az új könyvtárban.
A fájlértesítési mód csak egyfelhasználós számítás esetén támogatott.
Az Automatikus betöltő fájlértesítési módban használt felhőerőforrások
Fontos
Emelt szintű engedélyekre van szüksége a felhőinfrastruktúra fájlértesítési módhoz való automatikus konfigurálásához. Forduljon a felhő rendszergazdájához vagy a munkaterület rendszergazdájához. Lát:
Az Automatikus betöltő automatikusan beállíthatja a fájlértesítéseket, amikor beállítja a beállítást cloudFiles.useNotifications
true
, és megadja a felhőerőforrások létrehozásához szükséges engedélyeket. Emellett előfordulhat, hogy további lehetőségeket kell megadnia az automatikus betöltő engedélyezéséhez ezeknek az erőforrásoknak a létrehozásához.
Az alábbi táblázat az Automatikus betöltő által létrehozott erőforrásokat foglalja össze.
Felhőalapú tárolás | Előfizetési szolgáltatás | Queue szolgáltatás | Előképző* | Korlát** |
---|---|---|---|---|
AWS S3 | AWS SNS | AWS SQS | databricks-auto-ingest | 100 S3-gyűjtőnként |
ADLS Gen2 | Azure Event Grid | Azure Queue Storage | databricks | Tárfiókonként 500 |
GCS | Google Pub/Sub | Google Pub/Sub | databricks-auto-ingest | GCS-gyűjtőnként 100 |
Azure Blob Storage | Azure Event Grid | Azure Queue Storage | databricks | Tárfiókonként 500 |
- Az automatikus betöltő ezzel az előtaggal nevezi el az erőforrásokat.
** Hány egyidejű fájlértesítési folyamat indítható el
Ha egy adott tárfiókhoz a korlátozott számú fájlértesítési folyamatnál többet kell futtatnia, a következőkre van lehetősége:
- Az olyan szolgáltatások, mint az AWS Lambda, az Azure Functions vagy a Google Cloud Functions használatával egyetlen üzenetsorból származó értesítéseket jeleníthet meg, amelyek egy teljes tárolót vagy gyűjtőt figyelnek címtárspecifikus üzenetsorokba.
Fájlértesítési események
Az AWS S3 eseményt ObjectCreated
biztosít, amikor egy fájlt feltöltenek egy S3-gyűjtőbe, függetlenül attól, hogy put vagy többrészes feltöltéssel töltötték fel.
Az ADLS Gen2 különböző eseményértesítéseket biztosít a Gen2-tárolóban megjelenő fájlokról.
- Az Automatikus betöltő figyeli az
FlushWithClose
eseményt egy fájl feldolgozásához. - Az automatikus betöltő streamek támogatják a
RenameFile
fájlok felderítésére szolgáló műveletet.RenameFile
A műveletekhez API-kérés szükséges a tárolórendszerhez az átnevezett fájl méretének lekéréséhez. - A Databricks Runtime 9.0-val létrehozott automatikus betöltő streamek, és a
RenameDirectory
fájlok felderítésére szolgáló művelet támogatása után.RenameDirectory
A műveletekhez API-kérések szükségesek a tárolórendszerhez az átnevezett könyvtár tartalmának listázásához.
A Google Cloud Storage eseményt OBJECT_FINALIZE
biztosít egy fájl feltöltésekor, amely felülírásokat és fájlmásolatokat is tartalmaz. A sikertelen feltöltések nem generálják ezt az eseményt.
Feljegyzés
A felhőszolgáltatók nem garantálják az összes fájlesemény 100%-os kézbesítését nagyon ritka feltételek mellett, és nem biztosítanak szigorú SLA-kat a fájlesemények késésére. A Databricks azt javasolja, hogy rendszeres háttérbetöltéseket aktiváljon az Automatikus betöltővel. Ezzel a cloudFiles.backfillInterval
beállítással garantálható, hogy minden fájl felderítve legyen egy adott SLA-on belül, ha az adatok teljessége követelmény. A normál visszatöltések aktiválása nem okoz duplikációkat.
Az ADLS Gen2 és az Azure Blob Storage fájlértesítésének konfigurálásához szükséges engedélyek
Olvasási engedélyekkel kell rendelkeznie a bemeneti könyvtárhoz. Lásd: Azure Blob Storage.
A fájlértesítési mód használatához meg kell adnia a hitelesítési hitelesítő adatokat az eseményértesítési szolgáltatások beállításához és eléréséhez. A hitelesítéshez csak szolgáltatásnévre van szükség.
Szolgáltatásnév – azure-beli beépített szerepkörök használata
Hozzon létre egy Microsoft Entra-azonosítót (korábbi nevén Azure Active Directory- alkalmazást) és szolgáltatásnevet ügyfél-azonosító és ügyfélkód formájában.
Rendelje hozzá az alkalmazáshoz a következő szerepköröket ahhoz a tárfiókhoz, amelyben a bemeneti elérési út található:
- Közreműködő: Ez a szerepkör a tárfiókban lévő erőforrások, például üzenetsorok és esemény-előfizetések beállítására szolgál.
- Tárolósor adatszolgáltatója: Ez a szerepkör üzenetsor-műveletek végrehajtására szolgál, például üzenetek lekérésére és törlésére az üzenetsorokból. Ez a szerepkör csak akkor szükséges, ha egy szolgáltatásnevet kapcsolati sztring nélkül ad meg.
Rendelje hozzá az alkalmazáshoz a következő szerepkört a kapcsolódó erőforráscsoporthoz:
- EventGrid EventSubscription közreműködő: Ez a szerepkör olyan eseményrács-előfizetési műveletek végrehajtására szolgál, mint például esemény-előfizetések létrehozása vagy listázása.
További információ: Azure-szerepkörök hozzárendelése a Azure Portal.
Szolgáltatásnév – egyéni szerepkör használata
Ha az előző szerepkörökhöz szükséges túlzott engedélyekkel foglalkozik, létrehozhat egy egyéni szerepkört legalább az alábbi engedélyekkel, az alábbiakban az Azure-szerepkör JSON-formátumában:
"permissions": [ { "actions": [ "Microsoft.EventGrid/eventSubscriptions/write", "Microsoft.EventGrid/eventSubscriptions/read", "Microsoft.EventGrid/eventSubscriptions/delete", "Microsoft.EventGrid/locations/eventSubscriptions/read", "Microsoft.Storage/storageAccounts/read", "Microsoft.Storage/storageAccounts/write", "Microsoft.Storage/storageAccounts/queueServices/read", "Microsoft.Storage/storageAccounts/queueServices/write", "Microsoft.Storage/storageAccounts/queueServices/queues/write", "Microsoft.Storage/storageAccounts/queueServices/queues/read", "Microsoft.Storage/storageAccounts/queueServices/queues/delete" ], "notActions": [], "dataActions": [ "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action" ], "notDataActions": [] } ]
Ezután hozzárendelheti ezt az egyéni szerepkört az alkalmazáshoz.
További információ: Azure-szerepkörök hozzárendelése a Azure Portal.
Gyakori hibák elhárítása
Hiba:
java.lang.RuntimeException: Failed to create event grid subscription.
Ha az Auto Loader első futtatásakor ez a hibaüzenet jelenik meg, akkor az eseményrács nincs erőforrás-szolgáltatóként regisztrálva az Azure-előfizetésben. Ennek regisztrálása az Azure portálon:
- Menj az előfizetésedhez.
- Kattintson az Erőforrás-szolgáltatók elemre a Beállítások szakaszban.
- A szolgáltató regisztrálása
Microsoft.EventGrid
.
Hiba:
403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...
Ha az Auto Loader első futtatásakor megjelenik ez a hibaüzenet, győződjön meg arról, hogy az Event Grid szolgáltatási megbízójának és a tárolási fiókjának megadta a Contributor szerepkört.
Az AWS S3 fájlértesítésének konfigurálásához szükséges engedélyek
Olvasási engedélyekkel kell rendelkeznie a bemeneti könyvtárhoz. További részletekért tekintse meg az S3 kapcsolati adatait .
A fájlértesítési mód használatához csatolja a következő JSON-szabályzatdokumentumot az IAM-felhasználóhoz vagy -szerepkörhöz.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderSetup",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"s3:PutBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:CreateTopic",
"sns:TagResource",
"sns:Publish",
"sns:Subscribe",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:s3:::<bucket-name>",
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
},
{
"Sid": "DatabricksAutoLoaderList",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "*"
},
{
"Sid": "DatabricksAutoLoaderTeardown",
"Effect": "Allow",
"Action": [
"sns:Unsubscribe",
"sns:DeleteTopic",
"sqs:DeleteQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
}
]
}
ahol:
<bucket-name>
: Az S3 gyűjtő neve,auto-logs
ahol a stream például fájlokat olvas fel.*
Használhatja példáuldatabricks-*-logs
helyettesítő karakterként. A DBFS-elérési út alapjául szolgáló S3-gyűjtő megkereséséhez futtassa%fs mounts
a jegyzetfüzet összes DBFS csatlakoztatási pontját.<region>
: Az AWS-régió, ahol az S3 gyűjtő található, példáulus-west-2
. Ha nem szeretné megadni a régiót, használja a következőt*
: .<account-number>
: Az S3 gyűjtőt birtokba vevő AWS-fiók száma, például123456789012
. Ha nem szeretné megadni a fiókszámot, használja a következőt*
: .
Az SQS és az SNS ARN specifikációjának sztringje databricks-auto-ingest-*
a forrás által az cloudFiles
SQS- és SNS-szolgáltatások létrehozásakor használt névelőtag. Mivel az Azure Databricks a stream kezdeti futtatásakor állítja be az értesítési szolgáltatásokat, a kezdeti futtatás után csökkentett engedélyekkel rendelkező szabályzatot használhat (például leállíthatja a streamet, majd újraindíthatja).
Feljegyzés
Az előző szabályzat csak a fájlértesítési szolgáltatások beállításához szükséges engedélyekkel foglalkozik, nevezetesen az S3 gyűjtőértesítési, SNS- és SQS-szolgáltatásokhoz, és feltételezi, hogy már rendelkezik olvasási hozzáféréssel az S3 gyűjtőhöz. Ha írásvédett S3-engedélyeket kell hozzáadnia, adja hozzá az alábbiakat a Action
DatabricksAutoLoaderSetup
JSON-dokumentum utasításában szereplő listához:
s3:ListBucket
s3:GetObject
Csökkentett engedélyek a kezdeti beállítás után
A fent leírt erőforrás-beállítási engedélyek csak a stream kezdeti futtatása során szükségesek. Az első futtatás után a következő, csökkentett engedélyekkel rendelkező IAM-szabályzatra válthat.
Fontos
A csökkentett engedélyekkel nem indíthat új streamelési lekérdezéseket, és nem hozhat létre erőforrásokat hibák esetén (például az SQS-üzenetsor véletlenül törölve lett); A felhőalapú erőforrás-kezelési API-val sem listázhatja vagy bonthatja le az erőforrásokat.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderUse",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:TagResource",
"sns:Publish",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:<queue-name>",
"arn:aws:sns:<region>:<account-number>:<topic-name>",
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::<bucket-name>/*"
]
},
{
"Sid": "DatabricksAutoLoaderListTopics",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "arn:aws:sns:<region>:<account-number>:*"
}
]
}
A GCS fájlértesítésének konfigurálásához szükséges engedélyek
Rendelkeznie kell és get
engedélyekkel kell rendelkeznie list
a GCS-gyűjtőn és az összes objektumon. Részletekért tekintse meg a Google IAM-engedélyekkel kapcsolatos dokumentációját.
A fájlértesítési mód használatához engedélyeket kell hozzáadnia a GCS szolgáltatásfiókhoz és a Google Cloud Pub/Alerőforrások eléréséhez használt fiókhoz.
Adja hozzá a szerepkört Pub/Sub Publisher
a GCS szolgáltatásfiókhoz. Ez lehetővé teszi, hogy a fiók eseményértesítési üzeneteket tegyen közzé a GCS-gyűjtőkből a Google Cloud Pub/Sub szolgáltatásban.
A Google Cloud Pub/Alerőforrásokhoz használt szolgáltatásfiókhoz a következő engedélyeket kell hozzáadnia:
pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update
Ehhez létrehozhat egy egyéni IAM-szerepkört ezekkel az engedélyekkel, vagy hozzárendelhet már meglévő GCP-szerepköröket ezekhez az engedélyekhez.
A GCS szolgáltatásfiók megkeresése
A megfelelő projekthez tartozó Google Cloud Consoleban keresse meg a következőt Cloud Storage > Settings
: .
A "Cloud Storage Szolgáltatásfiók" szakasz tartalmazza a GCS szolgáltatásfiók e-mail-címét.
Egyéni Google Cloud IAM-szerepkör létrehozása fájlértesítési módhoz
A megfelelő projektHez tartozó Google Cloud-konzolon keresse meg a következőt IAM & Admin > Roles
: . Ezután hozzon létre egy szerepkört felül, vagy frissítsen egy meglévő szerepkört. A szerepkör létrehozására vagy szerkesztésére szolgáló képernyőn kattintson a gombra Add Permissions
. Megjelenik egy menü, amelyben hozzáadhatja a kívánt engedélyeket a szerepkörhöz.
Fájlértesítési erőforrások manuális konfigurálása vagy kezelése
A kiemelt felhasználók manuálisan konfigurálhatják vagy kezelhetik a fájlértesítési erőforrásokat.
- Állítsa be manuálisan a fájlértesítési szolgáltatásokat a felhőszolgáltatón keresztül, és adja meg manuálisan az üzenetsor-azonosítót. További részletekért lásd a Fájlértesítési beállításokat .
- A Scala API-k használatával hozhatja létre vagy kezelheti az értesítéseket és a sorba állítási szolgáltatásokat az alábbi példában látható módon:
Feljegyzés
A felhőinfrastruktúra konfigurálásához vagy módosításához megfelelő engedélyekkel kell rendelkeznie. Tekintse meg az Azure, az S3 vagy a GCS engedélydokumentációját.
Python
# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds
# COMMAND ----------
#####################################
## Creating a ResourceManager in AWS
#####################################
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in Azure
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.connectionString", <connection-string>) \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("cloudFiles.tenantId", <tenant-id>) \
.option("cloudFiles.clientId", <service-principal-client-id>) \
.option("cloudFiles.clientSecret", <service-principal-client-secret>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)
# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Scala
/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////
import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
.newManager
.option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
.option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////
import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
.newManager
.option("cloudFiles.connectionString", <connection-string>)
.option("cloudFiles.resourceGroup", <resource-group>)
.option("cloudFiles.subscriptionId", <subscription-id>)
.option("cloudFiles.tenantId", <tenant-id>)
.option("cloudFiles.clientId", <service-principal-client-id>)
.option("cloudFiles.clientSecret", <service-principal-client-secret>)
.option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////
import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
.newManager
.option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
.create()
// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
// List notification services created by <AL>
val df = manager.listNotificationServices()
// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Üzenetsor setUpNotificationServices(<resource-suffix>)
és előfizetés létrehozása a névvel <prefix>-<resource-suffix>
(az előtag az Automatikus betöltő fájlértesítési módban használt felhőerőforrásokban összegzett tárolórendszertől függ). Ha egy meglévő erőforrás ugyanazzal a névvel rendelkezik, az Azure Databricks egy új erőforrás létrehozása helyett újra felhasználja a meglévő erőforrást. Ez a függvény egy üzenetsor-azonosítót ad vissza, amelyet a cloudFiles
fájlértesítési beállításokban szereplő azonosítóval továbbíthat a forrásnak. Ez lehetővé teszi, hogy a cloudFiles
forrásfelhasználó kevesebb engedéllyel rendelkezzen, mint az erőforrásokat létrehozó felhasználó.
Adja meg a "path"
lehetőséget, hogy newManager
csak akkor, ha a hívás setUpNotificationServices
; nem szükséges vagy listNotificationServices
tearDownNotificationServices
. Ez ugyanaz path
, mint a streamelési lekérdezések futtatásakor.
Az alábbi mátrix azt jelzi, hogy mely API-metódusok támogatottak az egyes tárolótípusokhoz tartozó Databricks-futtatókörnyezetben:
Felhőalapú tárolás | Az API beállítása | API listázása | AZ API bontása |
---|---|---|---|
AWS S3 | Az összes verzió | Az összes verzió | Az összes verzió |
ADLS Gen2 | Az összes verzió | Az összes verzió | Az összes verzió |
GCS | Databricks Runtime 9.1 vagy újabb | Databricks Runtime 9.1 vagy újabb | Databricks Runtime 9.1 vagy újabb |
Azure Blob Storage | Az összes verzió | Az összes verzió | Az összes verzió |
ADLS Gen1 | Nem támogatott | Nem támogatott | Támogatott |