Sdílet prostřednictvím


Konfigurace datových proudů automatického načítače v režimu oznámení souborů

Tato stránka popisuje, jak nakonfigurovat streamy automatického zavaděče tak, aby používaly režim oznámení souborů k postupnému zjišťování a ingestování cloudových dat.

V režimu oznámení souborů Auto Loader automaticky nastaví službu oznámení a službu fronty, která se přihlásí k odběru událostí souborů ze vstupního adresáře. Oznámení o souborech můžete použít ke škálování Auto Loaderu na načítání milionů souborů za hodinu. Ve srovnání s režimem výpisu adresářů je režim oznámení souboru výkonnější a škálovatelný.

Můžete kdykoli přepínat mezi upozorněními na soubory a výpisem adresářů a stále zachovávat záruky jednorázového zpracování dat.

Poznámka:

Režim oznámení souborů není pro účty Azure Premium Storage podporovaný, protože účty Premium nepodporují službu Queue Storage.

Varování

Změna zdrojové cesty pro Auto Loader není podporována v režimu oznámení souboru. Pokud se použije režim oznámení souboru a cesta se změní, může se stát, že se nepodaří ingestovat soubory, které jsou již v novém adresáři v době aktualizace adresáře.

Režim oznámení souborů s událostmi souborů povolenými i bez nich na externích umístěních

Existují dva způsoby konfigurace automatického zavaděče tak, aby používal režim oznámení souboru:

  • (Doporučeno) Události souborů (Veřejný Náhled): Použijete jednu frontu oznámení o souboru pro všechny streamy, které zpracovávají soubory z daného externího umístění.

    Tento přístup má oproti starší verzi režimu oznámení souborů následující výhody:

    • Azure Databricks může ve vašem cloudovém úložném účtu nakonfigurovat předplatné a události souborů, aniž byste museli zadávat další přihlašovací údaje k Auto Loaderu pomocí přihlašovacích údajů služby nebo jiných cloudově specifických ověřovacích možností. Viz (doporučeno) Povolit události souborů pro externí umístění.
    • V účtu cloudového úložiště máte méně zásad spravované identity Azure k vytvoření.
    • Vzhledem k tomu, že už nemusíte vytvářet frontu pro každý stream automatického zavaděče, je jednodušší vyhnout se dosažení limitů oznámení od poskytovatele cloudových služeb uvedených v cloudových prostředcích používaných ve starším režimu oznámení souboru automatického zavaděče.
    • Azure Databricks automaticky spravuje ladění požadavků na prostředky, takže nemusíte ladit parametry, jako je cloudFiles.fetchParallelism.
    • Funkce čištění znamená, že se nemusíte starat o životní cyklus oznámení vytvořených v cloudu, například při odstranění nebo úplné aktualizaci datového proudu.

Pokud používáte Auto Loader v režimu výpisu adresáře, Databricks doporučuje migraci do režimu oznámení o souborech s událostmi. Automatický zavaděč souborů s událostmi souborů nabízí významná vylepšení výkonu. Začněte tím, že povolíte události souborů pro vaše externí umístění a pak nastavíte cloudFiles.useManagedFileEvents konfiguraci datového proudu Auto Loader.

  • Režim oznámení zastaralých souborů: Spravujete fronty oznámení souborů pro každý datový proud Auto Loaderu samostatně. Automatický zavaděč nastaví službu oznámení a službu fronty, které se přihlásí k odběru událostí souborů ze vstupního adresáře.

    Toto je starší přístup.

Použijte režim oznámení o souborech s událostmi souborů

Tato část popisuje, jak vytvořit a aktualizovat streamy automatického zavaděče tak, aby používaly události souborů.

Důležité

Podpora automatického načítání pro události souborů je ve veřejné verzi Preview.

Než začnete

Nastavení událostí souborů vyžaduje:

  • Pracovní prostor Azure Databricks, který je aktivovaný pro Unity Catalog.
  • Oprávnění k vytváření přihlašovacích údajů úložiště a objektů externího umístění v katalogu Unity

Automatické streamy zavaděče s událostmi souborů vyžadují:

  • Výpočty ve službě Databricks Runtime 14.3 LTS nebo vyšší

Pokyny ke konfiguraci

Následující pokyny platí bez ohledu na to, jestli vytváříte nové streamy Auto Loaderu nebo migrujete existující streamy pro použití upgradovaného režimu oznamování souborů pomocí událostí:

  1. Vytvořte přihlašovací údaje úložiště a externí umístění v katalogu Unity, které udělují přístup ke zdrojovému umístění v cloudovém úložišti pro streamy automatického zavaděče.

  2. Povolte události souborů pro externí umístění. Viz (doporučeno) Povolit události souborů pro externí umístění.

  3. Když vytvoříte nový stream Auto Loaderu nebo upravíte existující pro práci s externím umístěním:

    • Pokud máte existující streamy automatického zavaděče založené na oznámeních , které využívají data z externího umístění, vypněte je a odstraňte přidružené prostředky oznámení.
    • Ujistěte se, že pathRewrites není nastavená (nejedná se o běžnou možnost).
    • Projděte si seznam nastavení, která automatický zavaděč ignoruje při správě oznámení o souborech prostřednictvím událostí souborů. Vyhněte se jim v nových Auto Loader datových proudech a odeberte je z existujících datových proudů, které migrujete do tohoto režimu.
    • Nastavte možnost cloudFiles.useManagedFileEvents na true ve vašem kódu automatického zavaděče.

Například:

autoLoaderStream = (spark.readStream
  .format("cloudFiles")
  ...
  .options("cloudFiles.useManagedFileEvents", True)
  ...)

Pokud používáte deklarativní kanály Sparku pro Lakeflow a už máte kanál s tabulkou streamování, aktualizujte ho tak, aby zahrnoval tuto useManagedFileEvents možnost:

CREATE OR REFRESH STREAMING LIVE TABLE <table-name>
AS SELECT <select clause expressions>
  FROM STREAM read_files('abfss://path/to/external/location/or/volume',
                   format => '<format>',
                   useManagedFileEvents => 'True'
                   ...
                   );

Nepodporovaná nastavení automatického zavaděče

Pokud streamy používají události souborů, nejsou podporována následující nastavení Auto Loaderu:

Nastavení Změna
useIncremental Už se nemusíte rozhodovat mezi efektivitou oznámení o souborech a jednoduchostí výpisu adresářů. Automatický zavaděč souborů s událostmi je dostupný pouze v jednom režimu.
useNotifications Pro každé externí umístění existuje pouze jedna fronta a odběr událostí úložiště.
cloudFiles.fetchParallelism Auto Loader se souborovými událostmi nenabízí manuální optimalizaci paralelismu.
cloudFiles.backfillInterval Azure Databricks zpracovává automatické vyplňování externích umístění, která jsou povolená pro události souborů.
cloudFiles.pathRewrites Tato možnost platí jenom v případě, že připojíte externí umístění dat k systému souborů DBFS, což je zastaralé.
resourceTags Značky prostředků byste měli nastavit pomocí cloudové konzoly.

Osvědčené postupy pro události spravovaných souborů najdete v tématu Osvědčené postupy automatického zavaděče s událostmi souborů.

Omezení automatického načítače s událostmi souborů

Služba událostí souborů optimalizuje zjišťování souborů tím, že ukládá do mezipaměti nejnověji vytvořené soubory. Pokud automatický zavaděč běží zřídka, platnost této mezipaměti může vypršet a automatický zavaděč se vrátí k vyhledávání adresářů, aby nalezl soubory a aktualizoval mezipaměť. Pokud se chcete tomuto scénáři vyhnout, spusťte Auto Loader alespoň jednou každých sedm dní.

Obecný seznam omezení událostí souborů najdete v tématu Omezení událostí souborů.

Spravovat fronty oznámení souborů pro každý stream Auto Loaderu samostatně (starší verze)

Důležité

K automatické konfiguraci cloudové infrastruktury pro režim oznámení souborů potřebujete zvýšená oprávnění. Obraťte se na správce cloudu nebo správce pracovního prostoru. Vidět:

Cloudové prostředky používané v režimu oznámení souborů u starší verze Auto Loaderu

Zavaděč může automaticky nastavit oznámení o souborech, když nastavíte možnost cloudFiles.useNotifications na true a poskytnete potřebná oprávnění pro vytváření cloudových zdrojů. Kromě toho možná budete muset zvolit další možnosti, abyste Auto Loaderu udělili autorizaci k vytvoření těchto prostředků.

Následující tabulka obsahuje seznam prostředků vytvořených Auto Loaderem pro každého poskytovatele cloudu.

Cloudové úložiště Předplatná služba Služba front Předpona* Omezení**
Amazon S3 AWS SNS AWS SQS (Amazon Simple Queue Service) Automatický příjem Databricks 100 na kbelík S3
ADLS Azure Event Grid Azure Queue Storage databricks, platforma pro analýzu dat 500 na účet úložiště
GKS Google Pub/Sub (služba pro zasílání zpráv) Google Pub/Sub (služba pro zasílání zpráv) Automatický příjem Databricks 100 na kbelík GCS
Azure Blob Storage Azure Event Grid Azure Queue Storage databricks, platforma pro analýzu dat 500 na účet úložiště

* Automaticky zavaděč pojmenuje prostředky s touto předponou.

** Kolik souběžných kanálů oznámení o souborech je možné spustit

Pokud musíte spustit více datových proudů automatického zavaděče založeného na souborech, než je povoleno těmito limity, můžete použít události souborů nebo službu, jako je AWS Lambda, Azure Functions nebo Google Cloud Functions, k rozšíření oznámení z jedné fronty, která naslouchá celému kontejneru nebo kbelíku, do front specifických pro jednotlivé adresáře.

Události oznámení o starších verzích souborů

Amazon S3 poskytuje událost ObjectCreated, když se soubor nahraje do S3 bucketu, bez ohledu na to, zda byl nahrán jednou operací nebo vícedílným nahráváním.

Azure Data Lake Storage poskytuje různá oznámení událostí pro soubory, které se zobrazují v kontejneru úložiště.

  • Automatický zavaděč naslouchá události FlushWithClose ke zpracování souboru.
  • Streamy automatického zavaděče podporují akci RenameFile pro zjišťování souborů. RenameFile akce potřebují provést API požadavek na systém úložiště, aby získaly velikost přejmenovaného souboru.
  • Automatické streamy zavaděče vytvořené pomocí Databricks Runtime 9.0 a novější podporují akci RenameDirectory pro zjišťování souborů. RenameDirectory akce vyžadují, aby požadavky rozhraní API na systém úložiště vypsaly obsah přejmenovaného adresáře.

Google Cloud Storage poskytuje OBJECT_FINALIZE událost při nahrání souboru, který zahrnuje přepsání a kopie souborů. Neúspěšné nahrání negenerují tuto událost.

Poznámka:

Poskytovatelé cloudu nezaručují 100% doručení všech událostí souborů za velmi vzácných podmínek a neposkytují přísné smlouvy SLA o latenci událostí souborů. Databricks doporučuje aktivovat pravidelné zpracování doplňků pomocí možnosti cloudFiles.backfillInterval, aby bylo zaručeno, že se všechny soubory v rámci dané smlouvy SLA zjistí, pokud je požadována úplnost dat. Aktivace pravidelných backfillů nezpůsobí duplicity.

Požadovaná oprávnění pro konfiguraci oznámení o souboru pro Azure Data Lake Storage a Azure Blob Storage

Pro vstupní adresář musíte mít oprávnění ke čtení. Viz Azure Blob Storage.

Pokud chcete použít režim oznámení souboru, musíte zadat přihlašovací údaje pro ověřování pro nastavení a přístup ke službám oznámení událostí.

Ověření můžete provést pomocí jedné z následujících metod:

Po získání autentizačních údajů přiřaďte potřebná oprávnění buď ke konektoru pro přístup Databricks (pro údaje služby), nebo k aplikaci Microsoft Entra ID (pro objekt služby).

  • Použití předdefinovaných rolí Azure

    Přiřaďte přístupový konektor následujícím rolím k účtu úložiště, ve kterém se nachází vstupní cesta:

    • Přispěvatel: Tato role slouží k nastavení prostředků ve vašem účtu úložiště, jako jsou fronty a odběry událostí.
    • Přispěvatel dat fronty úložiště: Tato role slouží k provádění operací s frontami, jako je načítání a odstraňování zpráv z front. Tato role se vyžaduje pouze v případě, že zadáte předmět služby bez připojovacího řetězce.

    Přiřaďte k související skupině prostředků následující roli přístupového konektoru:

    Další informace viz Přiřazení rolí Azure pomocí webu Azure Portal.

  • Použití vlastní role

    Pokud máte obavy o nadměrná oprávnění požadovaná pro předchozí role, můžete vytvořit vlastní roli s alespoň následujícími oprávněními, která jsou uvedená níže ve formátu JSON role Azure:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    Tuto vlastní roli pak můžete přiřadit ke svému přístupového konektoru.

    Další informace viz Přiřazení rolí Azure pomocí webu Azure Portal.

nastavení oprávnění automatického nakladače

Požadovaná oprávnění pro konfiguraci oznámení o souboru pro Amazon S3

Pro vstupní adresář musíte mít oprávnění ke čtení. Další podrobnosti najdete v podrobnostech o připojení S3.

Pokud chcete použít režim oznámení souboru, připojte k uživateli nebo roli IAM následující dokument zásad JSON. Tato role IAM je vyžadována k vytvoření přihlašovacích údajů služby, se kterými se Auto Loader může autentizovat.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:PurgeQueue"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": ["sqs:ListQueues", "sqs:ListQueueTags", "sns:ListTopics"],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": ["sns:Unsubscribe", "sns:DeleteTopic", "sqs:DeleteQueue"],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

kde:

  • <bucket-name>: Název kontejneru S3, ve kterém bude stream číst soubory, například auto-logs. Můžete použít * například databricks-*-logsjako zástupný znak . Pokud chcete zjistit základní kontejner S3 pro cestu DBFS, můžete zobrazit seznam všech přípojných bodů DBFS v poznámkovém bloku spuštěním %fs mounts.
  • <region>: Oblast AWS, ve které se nachází kontejner S3, například us-west-2. Pokud nechcete zadávat oblast, použijte *.
  • <account-number>: Číslo účtu AWS, které vlastní bucket S3, například 123456789012. Pokud nechcete zadat číslo účtu, použijte *.

Řetězec databricks-auto-ingest-* ve specifikaci SQS a SNS ARN je předpona názvu, kterou cloudFiles zdroj používá při vytváření služeb SQS a SNS. Vzhledem k tomu, že Azure Databricks nastaví služby oznámení v počátečním spuštění streamu, můžete po počátečním spuštění použít zásadu s omezenými oprávněními (například zastavit stream a poté ho restartovat).

Poznámka:

Předchozí zásada se zabývá pouze oprávněními potřebnými k nastavení služeb oznámení souborů, konkrétně služby S3 bucket notification, SNS a SQS a předpokládá, že už máte přístup pro čtení do kontejneru S3. Pokud potřebujete přidat práva S3 pouze pro čtení, přidejte do seznamu Action v příkazu DatabricksAutoLoaderSetup v dokumentu JSON následující:

  • s3:ListBucket
  • s3:GetObject

Omezená oprávnění po počátečním nastavení

Výše popsaná oprávnění k nastavení prostředků se vyžadují pouze při počátečním spuštění datového proudu. Po prvním spuštění můžete přepnout na následující zásady IAM s omezenými oprávněními.

Důležité

S omezenými oprávněními nemůžete v případě selhání spustit nové dotazy streamování ani znovu vytvořit prostředky (například fronta SQS byla omylem odstraněna); Nemůžete také použít rozhraní API pro správu cloudových prostředků k výpisu nebo odstraňování prostředků.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:TagResource",
        "sns:Publish",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:PurgeQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:<queue-name>",
        "arn:aws:sns:<region>:<account-number>:<topic-name>",
        "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": ["s3:GetBucketLocation", "s3:ListBucket"],
      "Resource": ["arn:aws:s3:::<bucket-name>"]
    },
    {
      "Effect": "Allow",
      "Action": ["s3:PutObject", "s3:PutObjectAcl", "s3:GetObject", "s3:DeleteObject"],
      "Resource": ["arn:aws:s3:::<bucket-name>/*"]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": ["sqs:ListQueues", "sqs:ListQueueTags", "sns:ListTopics"],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

Požadovaná oprávnění ke konfiguraci oznámení o souboru pro službu GCS

Musíte mít oprávnění list a get jak ke kontejneru GCS, tak ke všem objektům. Podrobnosti najdete v dokumentaci Google k oprávněním IAM.

Pokud chcete použít režim oznámení souborů, musíte přidat oprávnění pro účet služby GCS a účet služby použitý pro přístup k prostředkům Google Cloud Pub/Sub.

Pub/Sub Publisher Přidejte roli do účtu služby GCS. Účet tak může publikovat zprávy oznámení o událostech z kontejnerů GCS do Google Cloud Pub/Sub.

Pokud jde o účet služby používaný pro prostředky Google Cloud Pub/Sub, musíte přidat následující oprávnění. Tento účet služby se automaticky vytvoří při vytváření přihlašovacích údajů služby Databricks . Podpora přihlašovacích údajů služby je dostupná v Databricks Runtime 16.1 a novějších.

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.detachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

K tomu můžete buď vytvořit vlastní roli IAM s těmito oprávněními, nebo přiřadit existující role GCP k pokrytí těchto oprávnění.

Vyhledání účtu služby GCS

V konzole Google Cloud Console pro odpovídající projekt přejděte na Cloud Storage > Settings. Část "Účet služby cloudového úložiště" obsahuje e-mail účtu služby GCS.

Účet služby GCS

Vytvoření vlastní role IAM cloudu Google pro režim oznámení souborů

V konzole Google Cloud pro odpovídající projekt přejděte na IAM & Admin > Roles. Pak buď vytvořte roli v horní části, nebo aktualizujte existující roli. Na obrazovce pro vytvoření nebo úpravu role klikněte na Add Permissions. Zobrazí se nabídka, ve které můžete přidat požadovaná oprávnění k roli.

Vlastní role GCP IAM

Ruční konfigurace nebo správa prostředků oznámení o souborech

Privilegovaní uživatelé můžou ručně konfigurovat nebo spravovat prostředky oznámení o souborech.

  • Služby oznámení souborů nastavte ručně prostřednictvím poskytovatele cloudu a zadejte identifikátor fronty ručně. Viz možnosti oznámení o souboru pro více podrobností.
  • Rozhraní SCALA API slouží k vytvoření nebo správě oznámení a služeb řazení do front, jak je znázorněno v následujícím příkladu:

Poznámka:

Ke konfiguraci nebo úpravě cloudové infrastruktury musíte mít příslušná oprávnění. Prohlédni si dokumentaci k oprávněním pro Azure, S3 nebo GCS.

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .create()

# Using AWS access key and secret key
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("cloudFiles.awsAccessKey", <aws-access-key>) \
  .option("cloudFiles.awsSecretKey", <aws-secret-key>) \
  .option("cloudFiles.roleArn", <role-arn>) \
  .option("cloudFiles.roleExternalId", <role-external-id>) \
  .option("cloudFiles.roleSessionName", <role-session-name>) \
  .option("cloudFiles.stsEndpoint", <sts-endpoint>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

# Using an Azure service principal
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("cloudFiles.projectId", <project-id>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Using a Google service account
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("cloudFiles.projectId", <project-id>) \
  .option("cloudFiles.client", <client-id>) \
  .option("cloudFiles.clientEmail", <client-email>) \
  .option("cloudFiles.privateKey", <private-key>) \
  .option("cloudFiles.privateKeyId", <private-key-id>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager

/**
 * Using a Databricks service credential
 */
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("databricks.serviceCredential", <service-credential-name>)
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

/**
 * Using AWS access key and secret key
 */
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>)
    .option("cloudFiles.awsAccessKey", <aws-access-key>)
    .option("cloudFiles.awsSecretKey", <aws-secret-key>)
    .option("cloudFiles.roleArn", <role-arn>)
    .option("cloudFiles.roleExternalId", <role-external-id>)
    .option("cloudFiles.roleSessionName", <role-session-name>)
    .option("cloudFiles.stsEndpoint", <sts-endpoint>)
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager

/**
 * Using a Databricks service credential
 */
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("databricks.serviceCredential", <service-credential-name>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

/**
 * Using an Azure service principal
 */
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager

/**
 * Using a Databricks service credential
 */
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("cloudFiles.projectId", <project-id>)
    .option("databricks.serviceCredential", <service-credential-name>)
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

/**
 * Using a Google service account
 */
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("cloudFiles.projectId", <project-id>)
    .option("cloudFiles.client", <client-id>)
    .option("cloudFiles.clientEmail", <client-email>)
    .option("cloudFiles.privateKey", <private-key>)
    .option("cloudFiles.privateKeyId", <private-key-id>)
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Použijte setUpNotificationServices(<resource-suffix>) k vytvoření fronty a předplatného s názvem <prefix>-<resource-suffix> (předpona závisí na systému úložiště, který je shrnut v cloudových prostředcích používaných ve starším režimu oznámení souboru automatického zavaděče. Pokud existuje existující prostředek se stejným názvem, Azure Databricks znovu použije existující prostředek místo vytvoření nového prostředku. Tato funkce vrátí identifikátor fronty, který můžete předat zdroji cloudFiles pomocí identifikátoru v možnostech oznámení Souboru . To umožňuje zdrojovému cloudFiles uživateli mít méně oprávnění než uživatel, který prostředky vytvoří.

Poskytněte možnost "path" pouze při volání newManager; není potřeba pro setUpNotificationServices nebo listNotificationServices. To je stejné path , jaké používáte při spouštění streamovacího dotazu.

Následující matice označuje, které metody rozhraní API jsou podporované ve které verzi Databricks Runtime pro každý typ úložiště:

Cloudové úložiště Nastavení rozhraní API Rozhraní API pro seznam Demontáž rozhraní API
Amazon S3 Všechny verze Všechny verze Všechny verze
ADLS Všechny verze Všechny verze Všechny verze
GKS Databricks Runtime 9.1 a novější Databricks Runtime 9.1 a novější Databricks Runtime 9.1 a novější
Azure Blob Storage Všechny verze Všechny verze Všechny verze

Vyčištění prostředků oznámení událostí vytvořených automatickým zavaděčem

Automatický zavaděč automaticky neodbourává prostředky oznámení o souborech. K odstranění prostředků oznámení o souborech musíte použít správce cloudových prostředků, jak bylo ukázáno v předchozí části. Tyto prostředky můžete odstranit také ručně pomocí uživatelského rozhraní nebo rozhraní API poskytovatele cloudu.

Řešení běžných chyb

Tato část popisuje běžné chyby při použití Auto Loader v režimu upozornění na soubory a jejich řešení.

Vytvoření odběru Event Gridu se nezdařilo.

Pokud při prvním spuštění Auto Loaderu uvidíte následující chybovou zprávu, znamená to, že Event Grid není v rámci předplatného Azure registrován jako Poskytovatel prostředků.

java.lang.RuntimeException: Failed to create event grid subscription.

Pokud chcete službu Event Grid zaregistrovat jako poskytovatele prostředků, postupujte takto:

  1. Na webu Azure Portal přejděte do svého předplatného.
  2. V části Nastavení klikni na Poskytovatelé prostředků.
  3. Zaregistrujte poskytovatele Microsoft.EventGrid.

Autorizace požadovaná k provádění operací odběru služby Event Grid

Pokud se při prvním spuštění nástroje Auto Loader zobrazí následující chybová zpráva, ověřte, že je role přispěvatele přiřazena k hlavní instanci pro Event Grid a úložišti.

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

Klient Event Grid obchází proxy server

Ve verzích Databricks Runtime 15.2 a novějších připojení Event Grid v Auto Loaderu ve výchozím nastavení využívají nastavení proxy serveru z vlastností systému. V Databricks Runtime 13.3 LTS, 14.3 LTS a 15.0 až 15.2 můžete ručně nakonfigurovat připojení Event Gridu tak, aby používala proxy server nastavením vlastnosti Konfigurace Sparkuspark.databricks.cloudFiles.eventGridClient.useSystemProperties true. Viz Nastavení vlastností konfigurace Sparku v azure Databricks.