Mi az automatikus betöltő fájlértesítési módja?

Fájlértesítési módban az Automatikus betöltő automatikusan beállít egy értesítési szolgáltatást és egy üzenetsor-szolgáltatást, amely feliratkozik a bemeneti könyvtárból származó fájleseményekre. Fájlértesítések használatával skálázhatja az Automatikus betöltőt úgy, hogy óránként több millió fájlt tölt be. A címtár-listázási módhoz képest a fájlértesítési mód nagyobb teljesítményű és méretezhető nagy bemeneti könyvtárak vagy nagy mennyiségű fájl esetén, de további felhőbeli engedélyeket igényel.

Bármikor válthat a fájlértesítések és a címtár-listaelemek között, és továbbra is pontosan egyszeri adatfeldolgozási garanciákat tart fenn.

Figyelmeztetés

Az automatikus betöltő forrásútvonalának módosítása a fájlértesítési mód esetében nem támogatott. Ha fájlértesítési módot használ, és az elérési útvonal megváltozik, előfordulhat, hogy nem sikerül olyan fájlokat bevinnie, amelyek a könyvtár frissítésének időpontjában már jelen vannak az új könyvtárban.

Az Automatikus betöltő fájlértesítési módban használt felhőerőforrások

Fontos

Emelt szintű engedélyekre van szüksége a felhőinfrastruktúra fájlértesítési módhoz való automatikus konfigurálásához. Forduljon a felhő rendszergazdájához vagy a munkaterület rendszergazdájához. Lásd:

Az Automatikus betöltő automatikusan beállíthatja a fájlértesítéseket, amikor beállítja a beállítást cloudFiles.useNotificationstrue , és megadja a felhőerőforrások létrehozásához szükséges engedélyeket. Emellett előfordulhat, hogy további lehetőségeket kell megadnia az automatikus betöltő engedélyezéséhez ezeknek az erőforrásoknak a létrehozásához.

Az alábbi táblázat az Automatikus betöltő által létrehozott erőforrásokat foglalja össze.

Felhőalapú tárolás Előfizetési szolgáltatás Queue szolgáltatás Előtag* Korlátoz**
AWS S3 AWS SNS AWS SQS databricks-auto-ingest 100 S3-gyűjtőnként
ADLS Gen2 Azure Event Grid Azure Queue Storage databricks Tárfiókonként 500
GCS Google Pub/Sub Google Pub/Sub databricks-auto-ingest GCS-gyűjtőnként 100
Azure Blob Storage Azure Event Grid Azure Queue Storage databricks Tárfiókonként 500
  • Az automatikus betöltő ezzel az előtaggal nevezi el az erőforrásokat.

** Hány egyidejű fájlértesítési folyamat indítható el

Ha egy adott tárfiókhoz a korlátozott számú fájlértesítési folyamatnál többet kell futtatnia, a következőkre van lehetősége:

  • Az olyan szolgáltatások, mint az AWS Lambda, az Azure Functions vagy a Google Cloud Functions használatával egyetlen üzenetsorból származó értesítéseket jeleníthet meg, amelyek egy teljes tárolót vagy gyűjtőt figyelnek címtárspecifikus üzenetsorokba.

Fájlértesítési események

Az AWS S3 eseményt ObjectCreated biztosít, amikor egy fájlt feltöltenek egy S3-gyűjtőbe, függetlenül attól, hogy put vagy többrészes feltöltéssel töltötték fel.

Az ADLS Gen2 különböző eseményértesítéseket biztosít a Gen2-tárolóban megjelenő fájlokról.

  • Az Automatikus betöltő figyeli az FlushWithClose eseményt egy fájl feldolgozásához.
  • Az automatikus betöltő streamek támogatják a RenameFile fájlok felderítésére szolgáló műveletet. RenameFile A műveletekhez API-kérés szükséges a tárolórendszerhez az átnevezett fájl méretének lekéréséhez.
  • A Databricks Runtime 9.0-val létrehozott automatikus betöltő streamek, és a RenameDirectory fájlok felderítésére szolgáló művelet támogatása után. RenameDirectory A műveletekhez API-kérések szükségesek a tárolórendszerhez az átnevezett könyvtár tartalmának listázásához.

A Google Cloud Storage eseményt OBJECT_FINALIZE biztosít egy fájl feltöltésekor, amely felülírásokat és fájlmásolatokat is tartalmaz. A sikertelen feltöltések nem generálják ezt az eseményt.

Feljegyzés

A felhőszolgáltatók nem garantálják az összes fájlesemény 100%-os kézbesítését nagyon ritka feltételek mellett, és nem biztosítanak szigorú SLA-kat a fájlesemények késésére. A Databricks azt javasolja, hogy rendszeres háttérbetöltéseket aktiváljon az Automatikus betöltővel. Ezzel a cloudFiles.backfillInterval beállítással garantálható, hogy minden fájl felderítve legyen egy adott SLA-on belül, ha az adatok teljessége követelmény. A normál visszatöltések aktiválása nem okoz duplikációkat.

Az ADLS Gen2 és az Azure Blob Storage fájlértesítésének konfigurálásához szükséges engedélyek

Olvasási engedélyekkel kell rendelkeznie a bemeneti könyvtárhoz. Lásd: Azure Blob Storage.

A fájlértesítési mód használatához meg kell adnia a hitelesítési hitelesítő adatokat az eseményértesítési szolgáltatások beállításához és eléréséhez. A hitelesítéshez csak szolgáltatásnévre van szükség.

  • Szolgáltatásnév – azure-beli beépített szerepkörök használata

    Hozzon létre egy Microsoft Entra-azonosítót (korábbi nevén Azure Active Directory- alkalmazást) és szolgáltatásnevet ügyfél-azonosító és ügyfélkód formájában.

    Rendelje hozzá az alkalmazáshoz a következő szerepköröket ahhoz a tárfiókhoz, amelyben a bemeneti elérési út található:

    • Közreműködő: Ez a szerepkör a tárfiókban lévő erőforrások, például üzenetsorok és esemény-előfizetések beállítására szolgál.
    • Tárolósor adatszolgáltatója: Ez a szerepkör üzenetsor-műveletek végrehajtására szolgál, például üzenetek lekérésére és törlésére az üzenetsorokból. Ez a szerepkör csak akkor szükséges, ha egy szolgáltatásnevet kapcsolati sztring nélkül ad meg.

    Rendelje hozzá az alkalmazáshoz a következő szerepkört a kapcsolódó erőforráscsoporthoz:

    További információ: Azure-szerepkörök hozzárendelése a Azure Portal.

  • Szolgáltatásnév – egyéni szerepkör használata

    Ha az előző szerepkörökhöz szükséges túlzott engedélyekkel foglalkozik, létrehozhat egy egyéni szerepkört legalább az alábbi engedélyekkel, az alábbiakban az Azure-szerepkör JSON-formátumában:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    Ezután hozzárendelheti ezt az egyéni szerepkört az alkalmazáshoz.

    További információ: Azure-szerepkörök hozzárendelése a Azure Portal.

Automatikus betöltőengedélyek

Gyakori hibák elhárítása

Hiba:

java.lang.RuntimeException: Failed to create event grid subscription.

Ha az Auto Loader első futtatásakor ez a hibaüzenet jelenik meg, akkor az eseményrács nincs erőforrás-szolgáltatóként regisztrálva az Azure-előfizetésben. Ennek regisztrálása az Azure portálon:

  1. Menj az előfizetésedhez.
  2. Kattintson az Erőforrás-szolgáltatók elemre a Gépház szakaszban.
  3. A szolgáltató regisztrálása Microsoft.EventGrid.

Hiba:

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

Ha az Auto Loader első futtatásakor megjelenik ez a hibaüzenet, győződjön meg arról, hogy az Event Grid szolgáltatási megbízójának és a tárolási fiókjának megadta a Contributor szerepkört.

Az AWS S3 fájlértesítésének konfigurálásához szükséges engedélyek

Olvasási engedélyekkel kell rendelkeznie a bemeneti könyvtárhoz. További részletekért tekintse meg az S3 kapcsolati adatait .

A fájlértesítési mód használatához csatolja a következő JSON-szabályzatdokumentumot az IAM-felhasználóhoz vagy -szerepkörhöz.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

ahol:

  • <bucket-name>: Az S3 gyűjtő neve, auto-logsahol a stream például fájlokat olvas fel. * Használhatja például databricks-*-logshelyettesítő karakterként. A DBFS-elérési út alapjául szolgáló S3-gyűjtő megkereséséhez futtassa %fs mountsa jegyzetfüzet összes DBFS csatlakoztatási pontját.
  • <region>: Az AWS-régió, ahol az S3 gyűjtő található, például us-west-2. Ha nem szeretné megadni a régiót, használja a következőt *: .
  • <account-number>: Az S3 gyűjtőt birtokba vevő AWS-fiók száma, például 123456789012. Ha nem szeretné megadni a fiókszámot, használja a következőt *: .

Az SQS és az SNS ARN specifikációjának sztringje databricks-auto-ingest-* a forrás által az cloudFiles SQS- és SNS-szolgáltatások létrehozásakor használt névelőtag. Mivel az Azure Databricks a stream kezdeti futtatásakor állítja be az értesítési szolgáltatásokat, a kezdeti futtatás után csökkentett engedélyekkel rendelkező szabályzatot használhat (például leállíthatja a streamet, majd újraindíthatja).

Feljegyzés

Az előző szabályzat csak a fájlértesítési szolgáltatások beállításához szükséges engedélyekkel foglalkozik, nevezetesen az S3 gyűjtőértesítési, SNS- és SQS-szolgáltatásokhoz, és feltételezi, hogy már rendelkezik olvasási hozzáféréssel az S3 gyűjtőhöz. Ha írásvédett S3-engedélyeket kell hozzáadnia, adja hozzá az alábbiakat a ActionDatabricksAutoLoaderSetup JSON-dokumentum utasításában szereplő listához:

  • s3:ListBucket
  • s3:GetObject

Csökkentett engedélyek a kezdeti beállítás után

A fent leírt erőforrás-beállítási engedélyek csak a stream kezdeti futtatása során szükségesek. Az első futtatás után a következő, csökkentett engedélyekkel rendelkező IAM-szabályzatra válthat.

Fontos

A csökkentett engedélyekkel nem indíthat új streamelési lekérdezéseket, és nem hozhat létre erőforrásokat hibák esetén (például az SQS-üzenetsor véletlenül törölve lett); A felhőalapú erőforrás-kezelési API-val sem listázhatja vagy bonthatja le az erőforrásokat.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

A GCS fájlértesítésének konfigurálásához szükséges engedélyek

Rendelkeznie kell és get engedélyekkel kell rendelkeznie list a GCS-gyűjtőn és az összes objektumon. Részletekért tekintse meg a Google IAM-engedélyekkel kapcsolatos dokumentációját.

A fájlértesítési mód használatához engedélyeket kell hozzáadnia a GCS szolgáltatásfiókhoz és a Google Cloud Pub/Alerőforrások eléréséhez használt fiókhoz.

Adja hozzá a szerepkört Pub/Sub Publisher a GCS szolgáltatásfiókhoz. Ez lehetővé teszi, hogy a fiók eseményértesítési üzeneteket tegyen közzé a GCS-gyűjtőkből a Google Cloud Pub/Sub szolgáltatásban.

A Google Cloud Pub/Alerőforrásokhoz használt szolgáltatásfiókhoz a következő engedélyeket kell hozzáadnia:

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

Ehhez létrehozhat egy egyéni IAM-szerepkört ezekkel az engedélyekkel, vagy hozzárendelhet már meglévő GCP-szerepköröket ezekhez az engedélyekhez.

A GCS szolgáltatásfiók megkeresése

A megfelelő projekthez tartozó Google Cloud Consoleban keresse meg a következőt Cloud Storage > Settings: . A "Cloud Storage Szolgáltatásfiók" szakasz tartalmazza a GCS szolgáltatásfiók e-mail-címét.

GCS szolgáltatásfiók

Egyéni Google Cloud IAM-szerepkör létrehozása fájlértesítési módhoz

A megfelelő projektHez tartozó Google Cloud-konzolon keresse meg a következőt IAM & Admin > Roles: . Ezután hozzon létre egy szerepkört felül, vagy frissítsen egy meglévő szerepkört. A szerepkör létrehozására vagy szerkesztésére szolgáló képernyőn kattintson a gombra Add Permissions. Megjelenik egy menü, amelyben hozzáadhatja a kívánt engedélyeket a szerepkörhöz.

Egyéni GCP IAM-szerepkörök

Fájlértesítési erőforrások manuális konfigurálása vagy kezelése

A kiemelt felhasználók manuálisan konfigurálhatják vagy kezelhetik a fájlértesítési erőforrásokat.

  • Állítsa be manuálisan a fájlértesítési szolgáltatásokat a felhőszolgáltatón keresztül, és adja meg manuálisan az üzenetsor-azonosítót. További részletekért lásd a Fájlértesítési beállításokat .
  • A Scala API-k használatával hozhatja létre vagy kezelheti az értesítéseket és a sorba állítási szolgáltatásokat az alábbi példában látható módon:

Feljegyzés

A felhőinfrastruktúra konfigurálásához vagy módosításához megfelelő engedélyekkel kell rendelkeznie. Tekintse meg az Azure, az S3 vagy a GCS engedélydokumentációját.

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices())

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Üzenetsor setUpNotificationServices(<resource-suffix>) és előfizetés létrehozása a névvel <prefix>-<resource-suffix> (az előtag az Automatikus betöltő fájlértesítési módban használt felhőerőforrásokban összegzett tárolórendszertől függ). Ha egy meglévő erőforrás ugyanazzal a névvel rendelkezik, az Azure Databricks egy új erőforrás létrehozása helyett újra felhasználja a meglévő erőforrást. Ez a függvény egy üzenetsor-azonosítót ad vissza, amelyet a cloudFiles fájlértesítési beállításokban szereplő azonosítóval továbbíthat a forrásnak. Ez lehetővé teszi, hogy a cloudFiles forrásfelhasználó kevesebb engedéllyel rendelkezzen, mint az erőforrásokat létrehozó felhasználó.

Adja meg a "path" lehetőséget, hogy newManager csak akkor, ha a hívás setUpNotificationServices; nem szükséges vagy listNotificationServicestearDownNotificationServices. Ez ugyanaz path , mint a streamelési lekérdezések futtatásakor.

Az alábbi mátrix azt jelzi, hogy mely API-metódusok támogatottak az egyes tárolótípusokhoz tartozó Databricks-futtatókörnyezetben:

Felhőalapú tárolás Az API beállítása API listázása AZ API bontása
AWS S3 Az összes verzió Az összes verzió Az összes verzió
ADLS Gen2 Az összes verzió Az összes verzió Az összes verzió
GCS Databricks Runtime 9.1 vagy újabb Databricks Runtime 9.1 vagy újabb Databricks Runtime 9.1 vagy újabb
Azure Blob Storage Az összes verzió Az összes verzió Az összes verzió
ADLS Gen1 Nem támogatott Nem támogatott Támogatott