ما هو وضع إعلام ملف التحميل التلقائي؟

في وضع إعلام الملف، يقوم "التحميل التلقائي" تلقائيا بإعداد خدمة إعلام وخدمة قائمة انتظار تشترك في أحداث الملفات من دليل الإدخال. يمكنك استخدام إعلامات الملفات لتوسيع نطاق "المحمل التلقائي" لاستيعاب ملايين الملفات في الساعة. عند مقارنتها بوضع سرد الدليل، يكون وضع إعلام الملف أكثر أداء وقابلا للتطوير لدلائل الإدخال الكبيرة أو حجم كبير من الملفات ولكنه يتطلب أذونات سحابية إضافية.

يمكنك التبديل بين إعلامات الملفات وإدراج الدليل في أي وقت مع الحفاظ على ضمانات معالجة البيانات مرة واحدة بالضبط.

تحذير

تغيير مسار المصدر للتحميل التلقائي غير معتمد لوضع إعلام الملف. إذا تم استخدام وضع إعلام الملف وتم تغيير المسار، فقد تفشل في استيعاب الملفات الموجودة بالفعل في الدليل الجديد في وقت تحديث الدليل.

موارد السحابة المستخدمة في وضع إعلام ملف التحميل التلقائي

هام

تحتاج إلى أذونات مرتفعة لتكوين البنية الأساسية السحابية تلقائيا لوضع إعلام الملف. اتصل بمسؤول السحابة أو مسؤول مساحة العمل. انظر:

يمكن للمحمل التلقائي إعداد إعلامات الملفات تلقائيا عند تعيين الخيار cloudFiles.useNotifications إلى true وتوفير الأذونات الضرورية لإنشاء موارد السحابة. بالإضافة إلى ذلك، قد تحتاج إلى توفير خيارات إضافية لمنح تفويض التحميل التلقائي لإنشاء هذه الموارد.

يلخص الجدول التالي الموارد التي تم إنشاؤها بواسطة "المحمل التلقائي".

تخزين على السحابة خدمة الاشتراك خدمة قائمة الانتظار بادئه* الحد**
AWS S3 AWS SNS AWS SQS استيعاب databricks تلقائيا 100 لكل مستودع S3
ADLS Gen2 Azure Event Grid Azure Queue Storage databricks 500 لكل حساب تخزين
GCS Google Pub/Sub Google Pub/Sub استيعاب databricks تلقائيا 100 لكل مستودع GCS
Azure Blob Storage Azure Event Grid Azure Queue Storage databricks 500 لكل حساب تخزين
  • يقوم "المحمل التلقائي" بتسمية الموارد بهذه البادئة.

** عدد مسارات إعلامات الملفات المتزامنة التي يمكن تشغيلها

إذا كنت تحتاج إلى تشغيل أكثر من عدد محدود من مسارات إعلامات الملفات لحساب تخزين معين، يمكنك:

  • استفد من خدمة مثل AWS Lambda أو Azure Functions أو Google Cloud Functions لنشر الإعلامات من قائمة انتظار واحدة تستمع إلى حاوية بأكملها أو مستودع في قوائم انتظار محددة للدليل.

أحداث إعلام الملف

يوفر AWS S3 حدثا ObjectCreated عند تحميل ملف إلى مستودع S3 بغض النظر عما إذا كان قد تم تحميله بواسطة وضع أو تحميل متعدد الأجزاء.

يوفر ADLS Gen2 إعلامات أحداث مختلفة للملفات التي تظهر في حاوية Gen2.

  • يستمع "المحمل التلقائي" للحدث FlushWithClose لمعالجة ملف.
  • تدعم RenameFile تدفقات التحميل التلقائي الإجراء لاكتشاف الملفات. RenameFile تتطلب الإجراءات طلب API إلى نظام التخزين للحصول على حجم الملف الذي تمت إعادة تسميته.
  • تدفقات التحميل التلقائي التي تم إنشاؤها باستخدام Databricks Runtime 9.0 وبعد دعم RenameDirectory الإجراء لاكتشاف الملفات. RenameDirectory تتطلب الإجراءات طلبات واجهة برمجة التطبيقات إلى نظام التخزين لسرد محتويات الدليل الذي تمت إعادة تسميته.

يوفر Google Cloud Storage حدثا OBJECT_FINALIZE عند تحميل ملف، والذي يتضمن الكتابة فوق ونسخ الملفات. لا تنشئ عمليات التحميل الفاشلة هذا الحدث.

إشعار

لا يضمن موفرو السحابة تسليم جميع أحداث الملفات بنسبة 100٪ في ظل ظروف نادرة للغاية ولا يوفرون اتفاقيات مستوى الخدمة الصارمة بشأن زمن انتقال أحداث الملفات. توصي Databricks بتشغيل ملفات الملفات الاحتياطية العادية باستخدام "المحمل التلقائي" باستخدام cloudFiles.backfillInterval الخيار لضمان اكتشاف جميع الملفات داخل اتفاقية مستوى الخدمة (SLA) معينة إذا كان اكتمال البيانات أحد المتطلبات. لا يؤدي تشغيل ملفات التصفية العادية إلى تكرارات.

الأذونات المطلوبة لتكوين إعلام الملف ل ADLS Gen2 وAzure Blob Storage

يجب أن يكون لديك أذونات قراءة لدليل الإدخال. راجع Azure Blob Storage.

لاستخدام وضع إعلام الملف، يجب توفير بيانات اعتماد المصادقة لإعداد خدمات إعلام الحدث والوصول إليها. تحتاج فقط إلى كيان خدمة للمصادقة.

  • كيان الخدمة - استخدام الأدوار المضمنة في Azure

    إنشاء تطبيق Microsoft Entra ID (المعروف سابقا ب Azure Active Directory) ومدير الخدمة في شكل معرف العميل وسر العميل.

    عين هذا التطبيق الأدوار التالية لحساب التخزين الذي يوجد فيه مسار الإدخال:

    • المساهم: هذا الدور مخصص لإعداد الموارد في حساب التخزين الخاص بك، مثل قوائم الانتظار واشتراكات الأحداث.
    • مساهم بيانات قائمة انتظار التخزين: هذا الدور مخصص لتنفيذ عمليات قائمة الانتظار مثل استرداد الرسائل وحذفها من قوائم الانتظار. هذا الدور مطلوب فقط عند توفير كيان خدمة دون سلسلة الاتصال.

    عين هذا التطبيق الدور التالي لمجموعة الموارد ذات الصلة:

    • EventGrid EventSubscription Contributor: هذا الدور مخصص لتنفيذ عمليات الاشتراك في شبكة الأحداث مثل إنشاء اشتراكات الأحداث أو سردها.

    لمزيد من المعلومات، راجع تعيين أدوار Azure باستخدام مدخل Azure.

  • كيان الخدمة - باستخدام دور مخصص

    إذا كنت مهتما بالأذونات الزائدة المطلوبة للأدوار السابقة، يمكنك إنشاء دور مخصص بالأذونات التالية على الأقل، المدرجة أدناه بتنسيق JSON لدور Azure:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    بعد ذلك، يمكنك تعيين هذا الدور المخصص لتطبيقك.

    لمزيد من المعلومات، راجع تعيين أدوار Azure باستخدام مدخل Azure.

أذونات التحميل التلقائي

استكشاف الأخطاء الشائعة وإصلاحها

الخطا:

java.lang.RuntimeException: Failed to create event grid subscription.

إذا رأيت رسالة الخطأ هذه عند تشغيل "المحمل التلقائي" للمرة الأولى، فلن يتم تسجيل Event Grid كموفر موارد في اشتراك Azure الخاص بك. لتسجيل ذلك على مدخل Microsoft Azure:

  1. انتقل إلى اشتراكك.
  2. انقر فوق موفرو الموارد ضمن قسم الإعدادات.
  3. تسجيل الموفر Microsoft.EventGrid.

الخطا:

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

إذا رأيت رسالة الخطأ هذه عند تشغيل "المحمل التلقائي" للمرة الأولى، فتأكد من منح دور المساهم إلى كيان الخدمة لشبكة الأحداث بالإضافة إلى حساب التخزين الخاص بك.

الأذونات المطلوبة لتكوين إعلام الملف ل AWS S3

يجب أن يكون لديك أذونات قراءة لدليل الإدخال. راجع تفاصيل اتصال S3 لمزيد من التفاصيل.

لاستخدام وضع إعلام الملف، قم بإرفاق مستند نهج JSON التالي إلى مستخدم أو دور IAM.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

الموقع:

  • <bucket-name>: اسم مستودع S3 حيث سيقرأ الدفق الملفات، على سبيل المثال، auto-logs. يمكنك استخدام * كحرف بدل، على سبيل المثال، databricks-*-logs. لمعرفة مستودع S3 الأساسي لمسار DBFS، يمكنك سرد جميع نقاط تحميل DBFS في دفتر ملاحظات عن طريق تشغيل %fs mounts.
  • <region>: منطقة AWS حيث يوجد مستودع S3، على سبيل المثال، us-west-2. إذا كنت لا تريد تحديد المنطقة، فاستخدم *.
  • <account-number>: رقم حساب AWS الذي يمتلك مستودع S3، على سبيل المثال، 123456789012. إذا كنت لا تريد تحديد رقم الحساب، فاستخدم *.

السلسلة databricks-auto-ingest-* في مواصفات SQS وSNS ARN هي بادئة الاسم التي يستخدمها cloudFiles المصدر عند إنشاء خدمات SQS وSNS. نظرا لأن Azure Databricks يقوم بإعداد خدمات الإعلام في التشغيل الأولي للدفق، يمكنك استخدام نهج بأذونات مخفضة بعد التشغيل الأولي (على سبيل المثال، إيقاف الدفق ثم إعادة تشغيله).

إشعار

لا يهتم النهج السابق إلا بالأذونات المطلوبة لإعداد خدمات إعلام الملفات، وهي إعلام مستودع S3 وخدمات SNS وSQS ويفترض أن لديك بالفعل حق الوصول للقراءة إلى مستودع S3. إذا كنت بحاجة إلى إضافة أذونات للقراءة فقط ل S3، أضف ما يلي إلى Action القائمة في العبارة DatabricksAutoLoaderSetup في مستند JSON:

  • s3:ListBucket
  • s3:GetObject

أذونات مخفضة بعد الإعداد الأولي

أذونات إعداد المورد الموضحة أعلاه مطلوبة فقط أثناء التشغيل الأولي للتدفق. بعد التشغيل الأول، يمكنك التبديل إلى نهج IAM التالي بأذونات مخفضة.

هام

باستخدام الأذونات المنخفضة، لا يمكنك بدء استعلامات دفق جديدة أو إعادة إنشاء الموارد في حالة الفشل (على سبيل المثال، تم حذف قائمة انتظار SQS عن طريق الخطأ)؛ لا يمكنك أيضا استخدام واجهة برمجة تطبيقات إدارة موارد السحابة لسرد الموارد أو تقطيعها.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

الأذونات المطلوبة لتكوين إعلام الملف ل GCS

يجب أن يكون لديك list أذونات و get على مستودع GCS الخاص بك وعلى جميع الكائنات. للحصول على التفاصيل، راجع وثائق Google حول أذونات IAM.

لاستخدام وضع إعلام الملف، تحتاج إلى إضافة أذونات لحساب خدمة GCS والحساب المستخدم للوصول إلى موارد Google Cloud Pub/Sub.

Pub/Sub Publisher أضف الدور إلى حساب خدمة GCS. يسمح هذا للحساب بنشر رسائل إعلام الحدث من مجموعات GCS الخاصة بك إلى Google Cloud Pub/Sub.

بالنسبة لحساب الخدمة المستخدم لموارد Google Cloud Pub/Sub، تحتاج إلى إضافة الأذونات التالية:

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

للقيام بذلك، يمكنك إما إنشاء دور مخصص IAM بهذه الأذونات أو تعيين أدوار GCP موجودة مسبقا لتغطية هذه الأذونات.

العثور على حساب خدمة GCS

في Google Cloud Console للمشروع المقابل، انتقل إلى Cloud Storage > Settings. يحتوي القسم "حساب خدمة التخزين السحابي" على البريد الإلكتروني لحساب خدمة GCS.

حساب خدمة GCS

إنشاء دور مخصص ل Google Cloud IAM لوضع إعلام الملفات

في وحدة تحكم Google Cloud للمشروع المقابل، انتقل إلى IAM & Admin > Roles. بعد ذلك، إما إنشاء دور في الأعلى أو تحديث دور موجود. في شاشة إنشاء الدور أو تحريره، انقر فوق Add Permissions. تظهر قائمة يمكنك فيها إضافة الأذونات المطلوبة إلى الدور.

أدوار GCP IAM المخصصة

تكوين موارد إعلام الملفات أو إدارتها يدويا

يمكن للمستخدمين المميزين تكوين موارد إعلام الملفات أو إدارتها يدويا.

  • قم بإعداد خدمات إعلام الملفات يدويا من خلال موفر السحابة وحدد معرف قائمة الانتظار يدويا. راجع خيارات إعلام الملف للحصول على مزيد من التفاصيل.
  • استخدم واجهات برمجة تطبيقات Scala لإنشاء أو إدارة الإعلامات وخدمات الانتظار، كما هو موضح في المثال التالي:

إشعار

يجب أن يكون لديك الأذونات المناسبة لتكوين البنية الأساسية السحابية أو تعديلها. راجع وثائق الأذونات ل Azure أو S3 أو GCS.

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices())

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

يستخدم setUpNotificationServices(<resource-suffix>) لإنشاء قائمة انتظار واشتراك بالاسم <prefix>-<resource-suffix> (تعتمد البادئة على نظام التخزين الملخص في موارد السحابة المستخدمة في وضع إعلام ملف التحميل التلقائي. إذا كان هناك مورد موجود بنفس الاسم، فإن Azure Databricks يعيد استخدام المورد الموجود بدلا من إنشاء مورد جديد. تقوم هذه الدالة بإرجاع معرف قائمة انتظار يمكنك تمريره cloudFiles إلى المصدر باستخدام المعرف في خيارات إعلام الملف. وهذا يمكن cloudFiles المستخدم المصدر من الحصول على أذونات أقل من المستخدم الذي يقوم بإنشاء الموارد.

"path" قم بتوفير الخيار newManager فقط في حالة الاتصال setUpNotificationServices؛ لا يلزم ذلك ل listNotificationServices أو tearDownNotificationServices. هذا هو نفسه path الذي تستخدمه عند تشغيل استعلام دفق.

تشير المصفوفة التالية إلى أساليب واجهة برمجة التطبيقات المدعومة في وقت تشغيل Databricks لكل نوع من أنواع التخزين:

تخزين على السحابة إعداد واجهة برمجة التطبيقات واجهة برمجة تطبيقات القائمة هدم واجهة برمجة التطبيقات
AWS S3 جميع الإصدارات جميع الإصدارات جميع الإصدارات
ADLS Gen2 جميع الإصدارات جميع الإصدارات جميع الإصدارات
GCS Databricks Runtime 9.1 وما فوق Databricks Runtime 9.1 وما فوق Databricks Runtime 9.1 وما فوق
Azure Blob Storage جميع الإصدارات جميع الإصدارات جميع الإصدارات
ADLS Gen1 غير مدعوم غير مدعوم غير مدعوم