자동 로더 파일 알림 모드란?

파일 알림 모드에서 자동 로더는 입력 디렉터리의 파일 이벤트를 구독하는 알림 서비스 및 큐 서비스를 자동으로 설정합니다. 파일 알림을 사용하여 한 시간에 수백만 개의 파일을 수집하도록 자동 로더를 스케일링할 수 있습니다. 디렉터리 목록 모드와 비교할 때, 파일 알림 모드는 대규모 입력 디렉터리 또는 대용량 파일에 대해 더 성능이 좋고 확장 가능하지만 설정을 위해 추가 클라우드 권한이 필요합니다.

파일 알림과 디렉터리 목록 사이를 언제든지 전환할 수 있으며 데이터 처리가 보장하는 한 번만 유지 관리할 수 있습니다.

Warning

자동 로더 의 원본 경로 변경은 파일 알림 모드에서 지원되지 않습니다. 파일 알림 모드를 사용하고 경로가 변경된 경우 디렉터리 업데이트 시 새 디렉터리에 이미 있는 파일을 수집하지 못할 수 있습니다.

자동 로더 파일 알림 모드에서 사용되는 클라우드 리소스

Important

파일 알림 모드에 대한 클라우드 인프라를 자동으로 구성하려면 상승된 권한이 필요합니다. 클라우드 관리자 또는 작업 영역 관리자에게 문의하세요. 참조:

자동 로더는 옵션을 cloudFiles.useNotificationstrue 설정하고 클라우드 리소스를 만드는 데 필요한 권한을 제공할 때 자동으로 파일 알림을 설정할 수 있습니다. 또한 이러한 리소스를 만들 수 있는 자동 로더 권한을 부여하려면 추가 옵션을 제공해야 할 수 있습니다.

다음 표는 자동 로더에서 만들어지는 리소스를 요약한 것입니다.

클라우드 스토리지 구독 서비스 큐 서비스 접두사 * 제한 **
AWS S3 AWS SNS AWS SQS databricks-auto-ingest S3 버킷당 100
ADLS Gen2 Azure Event Grid Azure Queue Storage databricks 스토리지 계정당 500
GCS Google Pub/Sub Google Pub/Sub databricks-auto-ingest GCS 버킷당 100
Azure Blob Storage Azure Event Grid Azure Queue Storage databricks 스토리지 계정당 500
  • 자동 로더는 이 접두사를 사용하여 리소스의 이름을 지정합니다.

** 동시에 실행할 수 있는 파일 알림 파이프라인 수

지정된 스토리지 계정에 대해 제한된 수 이상의 파일 알림 파이프라인을 실행해야 하는 경우 다음을 수행할 수 있습니다.

  • AWS Lambda, Azure Functions 또는 Google Cloud Functions와 같은 서비스를 활용하여 전체 컨테이너 또는 버킷을 수신 대기하는 단일 큐의 알림을 디렉터리별 큐로 확장합니다.

파일 알림 이벤트

AWS S3는 파일이 넣기 또는 멀티파트 업로드로 업로드되었는지 여부에 관계없이 S3 버킷에 파일을 업로드할 때 ObjectCreated 이벤트를 제공합니다.

ADLS Gen2는 Gen2 컨테이너에 표시되는 파일에 대한 다양한 이벤트 알림을 제공합니다.

  • 자동 로더는 파일 처리를 위해 FlushWithClose 이벤트를 수신 대기합니다.
  • 자동 로더 스트림은 파일 검색 작업을 지원 RenameFile 합니다. RenameFile 작업을 수행하려면 스토리지 시스템에 대한 API 요청을 통해 이름이 바뀐 파일의 크기를 가져와야 합니다.
  • Databricks Runtime 9.0 이상에서 만든 자동 로더 스트림은 파일 검색을 위한 RenameDirectory 작업을 지원합니다. RenameDirectory 작업을 수행하려면 스토리지 시스템에 대한 API 요청을 통해 이름이 바뀐 디렉터리의 내용을 나열해야 합니다.

Google Cloud Storage는 덮어쓰기 및 파일 복사를 포함하여 파일이 업로드될 때 OBJECT_FINALIZE 이벤트를 제공합니다. 업로드가 실패하면 이 이벤트가 생성되지 않습니다.

참고 항목

클라우드 공급자는 매우 드문 조건에서 모든 파일 이벤트의 100% 배달을 보장하지 않으며 파일 이벤트의 대기 시간에 대한 엄격한 SLA를 제공하지 않습니다. Databricks는 데이터 완전성이 요구 사항인 경우 지정된 SLA 내에서 모든 파일이 검색되도록 보장하기 위해 cloudFiles.backfillInterval 옵션을 사용하여 자동 로더로 정기적인 백필을 트리거하는 것이 좋습니다. 일반 백필을 트리거해도 중복이 발생하지 않습니다.

ADLS Gen2 및 Azure Blob Storage에 대한 파일 알을 구성에 필요한 권한

입력 디렉터리에 대한 읽기 권한이 있어야 합니다. Azure Blob Storage를 참조하세요.

파일 알림 모드를 사용하려면 이벤트 알림 서비스를 설정하고 액세스하기 위한 인증 자격 증명을 제공해야 합니다. 인증을 위해 서비스 주체만 필요합니다.

  • 서비스 주체 - Azure 기본 제공 역할 사용

    클라이언트 ID 및 클라이언트 암호 형식으로 Microsoft Entra ID(이전의 Azure Active Directory) 앱 및 서비스 주체를 만듭니다.

    입력 경로가 있는 스토리지 계정에 대한 다음 역할을 이 앱에 할당합니다.

    • 기여자: 이 역할은 큐, 이벤트 구독과 같은 스토리지 계정의 리소스를 설정하는 데 사용됩니다.
    • Storage 큐 데이터 기여자: 이 역할은 큐에서 메시지 검색 및 삭제와 같은 큐 작업을 수행하기 위한 것입니다. 이 역할은 연결 문자열 없이 서비스 주체를 제공하는 경우에만 필요합니다.

    관련 리소스 그룹에 대한 다음 역할을 이 앱에 할당합니다.

    자세한 내용은 Azure Portal을 사용하여 Azure 역할 할당을 참조하십시오.

  • 서비스 주체 - 사용자 지정 역할 사용

    이전 역할에 필요한 과도한 사용 권한과 관련된 경우 Azure 역할 JSON 형식으로 아래에 나열된 다음 권한 이상을 사용하여 사용자 지정 역할을 만들 수 있습니다.

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    그런 다음 이 사용자 지정 역할을 앱에 할당할 수 있습니다.

    자세한 내용은 Azure Portal을 사용하여 Azure 역할 할당을 참조하십시오.

자동 로더 사용 권한

일반적인 오류 문제 해결

오류:

java.lang.RuntimeException: Failed to create event grid subscription.

자동 로더를 처음 실행할 때 이 오류 메시지가 표시되는 경우 Event Grid가 Azure 구독에 리소스 공급자로 등록되지 않은 것입니다. Azure Portal에서 등록하려면 다음을 수행합니다.

  1. 구독으로 이동합니다.
  2. 설정 섹션에서 리소스 공급자를 클릭합니다.
  3. Microsoft.EventGrid 공급자를 등록합니다.

오류:

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

자동 로더를 처음 실행할 때 이 오류 메시지가 표시되면 Event Grid의 서비스 주체와 스토리지 계정에 기여자 역할을 부여했는지 확인합니다.

AWS S3에 대한 파일 알림 구성에 필요한 권한

입력 디렉터리에 대한 읽기 권한이 있어야 합니다. 자세한 내용은 S3 연결 세부 정보를 참조하세요.

파일 알림 모드를 사용하려면 다음 JSON 정책 문서를 IAM 사용자 또는 역할에 연결합니다.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

여기서

  • <bucket-name>: 스트림에서 파일을 읽을 S3 버킷 이름입니다(예: auto-logs). *를 와일드카드로 사용할 수 있습니다(예: databricks-*-logs). DBFS 경로에 대한 기본 S3 버킷을 찾으려면 %fs mounts를 실행하여 Notebook의 모든 DBFS 탑재 지점을 나열할 수 있습니다.
  • <region>: S3 버킷이 있는 AWS 지역입니다(예: us-west-2). 지역을 지정하지 않으려면 *를 사용합니다.
  • <account-number>: S3 버킷을 소유하는 AWS 계정 번호입니다(예: 123456789012). 계정 번호를 지정하지 않으려면 *를 사용합니다.

SQS 및 SNS ARN 사양의 databricks-auto-ingest-* 문자열은 SQS 및 SNS 서비스를 만들 때 cloudFiles 원본에서 사용하는 이름 접두사입니다. Azure Databricks는 스트림의 초기 실행에서 알림 서비스를 설정하므로 초기 실행 후 권한이 축소된 정책을 사용할 수 있습니다(예: 스트림을 알림 서비스를 다시 시작).

참고 항목

앞의 정책은 파일 알림 서비스(즉, S3 버킷 알림), SNS 및 SQS 서비스를 설정하는 데 필요한 권한에만 관련되며 사용자에게 이미 S3 버킷에 대한 읽기 권한이 있다고 가정합니다. S3 읽기 전용 권한을 추가해야 하는 경우 JSON 문서의 DatabricksAutoLoaderSetup 문의 Action 목록에 다음을 추가합니다.

  • s3:ListBucket
  • s3:GetObject

초기 설정 후 권한 감소

위에서 설명한 리소스 설정 권한은 스트림의 초기 실행 중에만 필요합니다. 첫 번째 실행 후 권한이 감소된 다음 IAM 정책으로 전환할 수 있습니다.

Important

권한이 감소하면 새로운 스트리밍 쿼리를 시작하거나 장애가 발생한 경우 리소스를 다시 만들 수 없습니다(예: SQS 큐가 실수로 삭제됨). 또한 클라우드 리소스 관리 API를 사용하여 리소스를 나열하거나 삭제할 수 없습니다.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

GCS에 대한 파일 알림 구성에 필요한 권한

GCS 버킷과 모든 개체에 대한 listget 권한이 있어야 합니다. 자세한 내용은 IAM 권한에 대한 Google 설명서를 참조하세요.

파일 알림 모드를 사용하려면 GCS 서비스 계정과 Google Cloud Pub/Sub 리소스에 액세스하는 데 사용되는 계정에 대한 권한을 추가해야 합니다.

GCS 서비스 계정에 Pub/Sub Publisher 역할을 추가합니다. 이렇게 하면 계정에서 GCS 버킷의 이벤트 알림 메시지를 Google Cloud Pub/Sub에 게시할 수 있습니다.

Google Cloud Pub/Sub 리소스에 사용되는 서비스 계정의 경우 다음 권한을 추가해야 합니다.

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

이렇게 하려면 이러한 권한으로 IAM 사용자 지정 역할을 만들거나기존 GCP 역할을 할당하여 이러한 권한을 처리할 수 있습니다.

GCS 서비스 계정 찾기

해당 프로젝트의 Google Cloud 콘솔에서 Cloud Storage > Settings로 이동합니다. "Cloud Storage 서비스 계정" 섹션에는 GCS 서비스 계정의 이메일이 포함되어 있습니다.

GCS 서비스 계정

파일 알림 모드에 대한 사용자 지정 Google Cloud IAM 역할 만들기

해당 프로젝트의 Google Cloud 콘솔에서 IAM & Admin > Roles로 이동합니다. 그런 다음 상단에 역할을 만들거나 기존 역할을 업데이트합니다. 역할 만들기 또는 편집 화면에서 Add Permissions를 클릭합니다. 역할에 원하는 권한을 추가할 수 있는 메뉴가 나타납니다.

GCP IAM 사용자 지정 역할

파일 알림 리소스 수동 구성 또는 관리

권한 있는 사용자는 파일 알림 리소스를 수동으로 구성하거나 관리할 수 있습니다.

  • 클라우드 공급자를 통해 파일 알림 서비스를 수동으로 설정하고 큐 식별자를 수동으로 지정합니다. 자세한 내용은 파일 알림 옵션을 참조하세요.
  • 다음 예제와 같이 Scala API를 사용하여 알림 및 큐 서비스를 만들거나 관리합니다.

참고 항목

클라우드 인프라를 구성하거나 수정하려면 적절한 권한이 있어야 합니다. Azure, S3 또는 GCS에 대한 권한 설명서를 참조하세요.

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices())

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

setUpNotificationServices(<resource-suffix>)를 사용하여 이름이 <prefix>-<resource-suffix>인 구독과 큐를 만듭니다(접두사는 자동 로더 파일 알림 모드에 사용된 클라우드 리소스에 요약된 스토리지 시스템에 따라 다릅니다). 이름이 같은 기존 리소스가 있는 경우 Azure Databricks는 새 리소스를 만드는 대신 기존 리소스를 재사용합니다. 이 함수는 파일 알림 옵션의 식별자를 사용하여 cloudFiles 원본에 전달할 수 있는 큐 식별자를 반환합니다. 이렇게 하면 cloudFiles 원본 사용자가 리소스를 만드는 사용자보다 적은 권한을 가질 수 있습니다.

setUpNotificationServices를 호출하는 경우에만 "path" 옵션을 newManager에 제공합니다. listNotificationServices 또는 tearDownNotificationServices에는 필요하지 않습니다. 이는 스트리밍 쿼리를 실행할 때 사용하는 것과 동일한 path입니다.

다음 매트릭스는 각 스토리지 유형에 대해 Databricks Runtime에서 지원되는 API 메서드를 나타냅니다.

클라우드 스토리지 설치 API입니다 목록 API API 분해
AWS S3 모든 버전 모든 버전 모든 버전
ADLS Gen2 모든 버전 모든 버전 모든 버전
GCS Databricks Runtime 9.1 이상 Databricks Runtime 9.1 이상 Databricks Runtime 9.1 이상
Azure Blob Storage 모든 버전 모든 버전 모든 버전
ADLS Gen1 지원되지 않음 지원되지 않음 지원되지 않음