Delen via


Wat is de meldingsmodus voor het automatisch laden van bestanden?

In de bestandsmeldingsmodus stelt Automatisch laden automatisch een meldingsservice en wachtrijservice in die zich abonneert op bestandsevenementen uit de invoermap. U kunt bestandsmeldingen gebruiken om automatisch laden te schalen om miljoenen bestanden per uur op te nemen. In vergelijking met de modus mapvermelding is de modus voor bestandsmeldingen beter presterend en schaalbaar voor grote invoermappen of een groot aantal bestanden, maar hiervoor zijn extra cloudmachtigingen vereist.

U kunt op elk gewenst moment schakelen tussen bestandsmeldingen en adreslijstvermeldingen en nog steeds exactly-once gegevensverwerkingsgaranties onderhouden.

Notitie

De bestandsmeldingsmodus wordt niet ondersteund voor Azure Premium-opslagaccounts omdat Premium-accounts geen ondersteuning bieden voor wachtrijopslag.

Waarschuwing

Het wijzigen van het bronpad voor automatisch laden wordt niet ondersteund voor de modus voor bestandsmeldingen. Als de modus voor bestandsmeldingen wordt gebruikt en het pad wordt gewijzigd, kunt u mogelijk geen bestanden opnemen die al aanwezig zijn in de nieuwe map op het moment van de update van de map.

De modus Voor bestandsmeldingen wordt alleen ondersteund op rekenkracht van één gebruiker.

Cloudresources die worden gebruikt in de meldingsmodus voor het automatisch laden van bestanden

Belangrijk

U hebt verhoogde machtigingen nodig om de cloudinfrastructuur automatisch te configureren voor de modus voor bestandsmeldingen. Neem contact op met uw cloudbeheerder of werkruimtebeheerder. Zien:

Automatisch laden kan bestandsmeldingen automatisch voor u instellen wanneer u de optie cloudFiles.useNotifications true instelt en de benodigde machtigingen voor het maken van cloudresources opgeeft. Daarnaast moet u mogelijk aanvullende opties opgeven om autoladerautorisatie te verlenen om deze resources te maken.

De volgende tabel bevat een overzicht van de resources die door Auto Loader worden gemaakt.

Cloudopslag Abonnementsservice Wachtrijservice Voorvoegsel* Grens**
AWS S3 AWS SNS AWS SQS databricks-auto-opname 100 per S3-bucket
ADLS Gen2 Azure Event Grid Azure Queue Storage databricks 500 per opslagaccount
GCS Google Pub/Sub Google Pub/Sub databricks-auto-opname 100 per GCS-bucket
Azure Blob-opslag Azure Event Grid Azure Queue Storage databricks 500 per opslagaccount
  • Automatisch laadprogramma noemt de resources met dit voorvoegsel.

** Hoeveel gelijktijdige bestandsmeldingspijplijnen kunnen worden gestart

Als u meer dan het beperkte aantal bestandsmeldingspijplijnen voor een bepaald opslagaccount wilt uitvoeren, kunt u het volgende doen:

  • Maak gebruik van een service zoals AWS Lambda, Azure Functions of Google Cloud Functions om meldingen uit te waaieren uit één wachtrij die luistert naar een hele container of bucket in mapspecifieke wachtrijen.

Gebeurtenissen voor bestandsmeldingen

AWS S3 biedt een ObjectCreated gebeurtenis wanneer een bestand wordt geüpload naar een S3-bucket, ongeacht of het is geüpload door een put- of multi-part upload.

ADLS Gen2 biedt verschillende gebeurtenismeldingen voor bestanden die worden weergegeven in uw Gen2-container.

  • Auto Loader luistert naar de gebeurtenis voor het FlushWithClose verwerken van een bestand.
  • Automatische laadprogramma's ondersteunen de actie voor het RenameFile detecteren van bestanden. RenameFile voor acties is een API-aanvraag naar het opslagsysteem vereist om de grootte van het hernoemde bestand op te halen.
  • Automatisch laden streams gemaakt met Databricks Runtime 9.0 en na ondersteuning van de actie voor het RenameDirectory detecteren van bestanden. RenameDirectory voor acties zijn API-aanvragen voor het opslagsysteem vereist om de inhoud van de hernoemde map weer te geven.

Google Cloud Storage biedt een OBJECT_FINALIZE gebeurtenis wanneer een bestand wordt geüpload, waaronder overschrijven en bestandskopieën. Mislukte uploads genereren deze gebeurtenis niet.

Notitie

Cloudproviders garanderen geen 100% levering van alle bestandsgebeurtenissen onder zeer zeldzame omstandigheden en bieden geen strikte SLA's voor de latentie van de bestandsgebeurtenissen. Databricks raadt u aan om reguliere backfills te activeren met autolader met behulp van de cloudFiles.backfillInterval optie om te garanderen dat alle bestanden in een bepaalde SLA worden gedetecteerd als volledigheid van gegevens een vereiste is. Het activeren van reguliere backfills veroorzaakt geen duplicaten.

Vereiste machtigingen voor het configureren van bestandsmeldingen voor ADLS Gen2 en Azure Blob Storage

U moet leesmachtigingen hebben voor de invoermap. Zie Azure Blob Storage.

Als u de modus voor bestandsmeldingen wilt gebruiken, moet u verificatiereferenties opgeven voor het instellen en openen van de gebeurtenismeldingsservices. U hebt alleen een service-principal nodig voor verificatie.

  • Service-principal: ingebouwde Azure-rollen gebruiken

    Maak een Microsoft Entra ID-app (voorheen Azure Active Directory) en service-principal in de vorm van client-id en clientgeheim.

    Wijs deze app de volgende rollen toe aan het opslagaccount waarin het invoerpad zich bevindt:

    • Inzender: Deze rol is bedoeld voor het instellen van resources in uw opslagaccount, zoals wachtrijen en gebeurtenisabonnementen.
    • Inzender voor opslagwachtrijgegevens: deze rol is bedoeld voor het uitvoeren van wachtrijbewerkingen, zoals het ophalen en verwijderen van berichten uit de wachtrijen. Deze rol is alleen vereist wanneer u een service-principal zonder verbindingsreeks opgeeft.

    Wijs deze app de volgende rol toe aan de gerelateerde resourcegroep:

    Zie voor meer informatie Azure-rollen toewijzen met behulp van de Azure-portal.

  • Service-principal: aangepaste rol gebruiken

    Als u zich bezig houdt met de overmatige machtigingen die zijn vereist voor de voorgaande rollen, kunt u een aangepaste rol maken met ten minste de volgende machtigingen, die hieronder worden vermeld in de JSON-indeling van De Azure-rol:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    Vervolgens kunt u deze aangepaste rol toewijzen aan uw app.

    Zie voor meer informatie Azure-rollen toewijzen met behulp van de Azure-portal.

Machtigingen voor automatisch laden

Algemene fouten oplossen

Fout:

java.lang.RuntimeException: Failed to create event grid subscription.

Als u dit foutbericht ziet wanneer u voor de eerste keer een automatisch laadprogramma uitvoert, wordt Event Grid niet geregistreerd als resourceprovider in uw Azure-abonnement. Ga als volgt te werk om dit te registreren in de Azure-portal:

  1. Ga naar uw abonnement.
  2. Klik op Resourceproviders in de sectie Instellingen.
  3. Registreer de provider Microsoft.EventGrid.

Fout:

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

Als u dit foutbericht ziet wanneer u voor de eerste keer een automatisch laadprogramma uitvoert, moet u ervoor zorgen dat u de rol Inzender hebt gegeven aan uw service-principal voor Event Grid en uw opslagaccount.

Vereiste machtigingen voor het configureren van bestandsmeldingen voor AWS S3

U moet leesmachtigingen hebben voor de invoermap. Zie S3-verbindingsgegevens voor meer informatie.

Als u de modus voor bestandsmeldingen wilt gebruiken, voegt u het volgende JSON-beleidsdocument toe aan uw IAM-gebruiker of -rol.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

waarbij geldt:

  • <bucket-name>: de naam van de S3-bucket waar uw stream bestanden leest, auto-logsbijvoorbeeld. U kunt bijvoorbeeld databricks-*-logsals jokerteken gebruiken*. Als u de onderliggende S3-bucket voor uw DBFS-pad wilt achterhalen, kunt u alle DBFS-koppelpunten in een notebook weergeven door deze uit te voeren %fs mounts.
  • <region>: De AWS-regio waar de S3-bucket zich bevindt, bijvoorbeeld us-west-2. Als u de regio niet wilt opgeven, gebruikt *u .
  • <account-number>: het AWS-accountnummer dat eigenaar is van de S3-bucket, bijvoorbeeld 123456789012. Als u het accountnummer niet wilt opgeven, gebruikt u *.

De tekenreeks databricks-auto-ingest-* in de SPECIFICATIE SQS en SNS ARN is het naamvoorvoegsel dat de cloudFiles bron gebruikt bij het maken van SQS- en SNS-services. Omdat Azure Databricks de meldingsservices instelt in de eerste uitvoering van de stream, kunt u een beleid met beperkte machtigingen gebruiken na de eerste uitvoering (bijvoorbeeld de stream stoppen en vervolgens opnieuw opstarten).

Notitie

Het voorgaande beleid heeft alleen betrekking op de machtigingen die nodig zijn voor het instellen van bestandsmeldingsservices, namelijk S3 bucketmeldingen, SNS- en SQS-services en gaat ervan uit dat u al leestoegang hebt tot de S3-bucket. Als u S3 alleen-lezenmachtigingen wilt toevoegen, voegt u het volgende toe aan de Action lijst in de DatabricksAutoLoaderSetup instructie in het JSON-document:

  • s3:ListBucket
  • s3:GetObject

Beperkte machtigingen na de eerste installatie

De hierboven beschreven machtigingen voor het instellen van resources zijn alleen vereist tijdens de eerste uitvoering van de stream. Na de eerste uitvoering kunt u overschakelen naar het volgende IAM-beleid met beperkte machtigingen.

Belangrijk

Met de beperkte machtigingen kunt u geen nieuwe streamingquery's starten of resources opnieuw maken in geval van fouten (de SQS-wachtrij is bijvoorbeeld per ongeluk verwijderd); U kunt ook de API voor cloudresourcebeheer niet gebruiken om resources weer te geven of te verwijderen.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

Vereiste machtigingen voor het configureren van bestandsmeldingen voor GCS

U moet beschikken over en get machtigingen hebben list voor uw GCS-bucket en voor alle objecten. Zie de Google-documentatie over IAM-machtigingen voor meer informatie.

Als u de modus voor bestandsmeldingen wilt gebruiken, moet u machtigingen toevoegen voor het GCS-serviceaccount en het account dat wordt gebruikt voor toegang tot de Google Cloud Pub/Sub-resources.

Voeg de Pub/Sub Publisher rol toe aan het GCS-serviceaccount. Hierdoor kan het account meldingen van gebeurtenissen van uw GCS-buckets publiceren naar Google Cloud Pub/Sub.

Wat betreft het serviceaccount dat wordt gebruikt voor de Google Cloud Pub/Sub-resources, moet u de volgende machtigingen toevoegen:

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

Hiervoor kunt u een aangepaste IAM-rol maken met deze machtigingen of vooraf bestaande GCP-rollen toewijzen om deze machtigingen te dekken.

Het GCS-serviceaccount zoeken

Navigeer in de Google Cloud Console voor het bijbehorende project naar Cloud Storage > Settings. De sectie Cloud Storage-serviceaccount bevat het e-mailadres van het GCS-serviceaccount.

GCS-serviceaccount

Een aangepaste Google Cloud IAM-rol maken voor de modus Voor bestandsmeldingen

Ga in de Google Cloud-console voor het bijbehorende project naar IAM & Admin > Roles. Maak vervolgens een rol bovenaan of werk een bestaande rol bij. Klik op Add Permissionshet scherm voor het maken of bewerken van rollen. Er wordt een menu weergegeven waarin u de gewenste machtigingen aan de rol kunt toevoegen.

Aangepaste GCP IAM-rollen

Bestandsmeldingsbronnen handmatig configureren of beheren

Bevoegde gebruikers kunnen bronnen voor bestandsmeldingen handmatig configureren of beheren.

  • Stel de bestandsmeldingsservices handmatig in via de cloudprovider en geef handmatig de wachtrij-id op. Zie opties voor bestandsmeldingen voor meer informatie.
  • Gebruik Scala-API's om de meldingen en wachtrijservices te maken of te beheren, zoals wordt weergegeven in het volgende voorbeeld:

Notitie

U moet over de juiste machtigingen beschikken om de cloudinfrastructuur te configureren of te wijzigen. Raadpleeg de documentatie over machtigingen voor Azure, S3 of GCS.

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Gebruik setUpNotificationServices(<resource-suffix>) dit diagram om een wachtrij en een abonnement met de naam <prefix>-<resource-suffix> te maken (het voorvoegsel is afhankelijk van het opslagsysteem dat wordt samengevat in cloudresources die worden gebruikt in de meldingsmodus voor het automatisch laden van bestanden. Als er een bestaande resource met dezelfde naam is, gebruikt Azure Databricks de bestaande resource opnieuw in plaats van een nieuwe resource te maken. Deze functie retourneert een wachtrij-id die u aan de cloudFiles bron kunt doorgeven met behulp van de id in opties voor bestandsmeldingen. Hierdoor kan de cloudFiles brongebruiker minder machtigingen hebben dan de gebruiker die de resources maakt.

Geef de "path" optie alleen op newManager als u belt setUpNotificationServices; dit is niet nodig voor listNotificationServices of tearDownNotificationServices. Dit is hetzelfde path dat u gebruikt bij het uitvoeren van een streamingquery.

De volgende matrix geeft aan welke API-methoden worden ondersteund waarin Databricks Runtime voor elk type opslag wordt ondersteund:

Cloudopslag API instellen Lijst-API Api voor uitsplitsen
AWS S3 Alle versies Alle versies Alle versies
ADLS Gen2 Alle versies Alle versies Alle versies
GCS Databricks Runtime 9.1 en hoger Databricks Runtime 9.1 en hoger Databricks Runtime 9.1 en hoger
Azure Blob-opslag Alle versies Alle versies Alle versies
ADLS Gen1 Niet ondersteund Niet ondersteund Niet ondersteund