أنماط تحميل البيانات الشائعة

يعمل "التحميل التلقائي" على تبسيط عدد من مهام استيعاب البيانات الشائعة. يوفر هذا المرجع السريع أمثلة للعديد من الأنماط الشائعة.

تصفية الدلائل أو الملفات باستخدام أنماط glob

يمكن استخدام أنماط Glob لتصفية الدلائل والملفات عند توفيرها في المسار.

النمط ‏‏الوصف
? يتطابق مع أي حرف واحد
* يطابق صفرا أو أكثر من الأحرف
[abc] يطابق حرفا واحدا من مجموعة الأحرف {a,b,c}.
[a-z] يطابق حرفا واحدا من نطاق الأحرف {a... z}.
[^a] يطابق حرفا واحدا ليس من مجموعة أحرف أو نطاق {a}. لاحظ أنه ^ يجب أن يحدث الحرف على الفور إلى يمين قوس الفتح.
{ab,cd} يطابق سلسلة من مجموعة السلسلة {ab، cd}.
{ab,c{de, fh}} يطابق سلسلة من مجموعة السلسلة {ab، cde، cfh}.

path استخدم لتوفير أنماط البادئة، على سبيل المثال:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

التطوير

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

هام

تحتاج إلى استخدام الخيار pathGlobFilter لتوفير أنماط لاحقة بشكل صريح. يوفر path عامل تصفية البادئة الوحيد.

على سبيل المثال، إذا كنت ترغب في توزيع الملفات فقط png في دليل يحتوي على ملفات ذات لاحقات مختلفة، يمكنك القيام بما يلي:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

التطوير

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

إشعار

يختلف سلوك globbing الافتراضي ل Auto Loader عن السلوك الافتراضي لمصادر ملفات Spark الأخرى. أضف .option("cloudFiles.useStrictGlobber", "true") إلى القراءة لاستخدام globbing الذي يطابق سلوك Spark الافتراضي مقابل مصادر الملفات. راجع الجدول التالي للحصول على المزيد حول الكآبة:

النمط مسار الملف globber الافتراضي globber صارم
/a/b /a/b/c/file.txt نعم نعم
/a/b /a/b_dir/c/file.txt لا لا
/a/b /a/b.txt لا لا
/a/b/ /a/b.txt لا لا
/a/*/c/ /a/b/c/file.txt نعم نعم
/a/*/c/ /a/b/c/d/file.txt نعم نعم
/a/*/c/ /a/b/x/y/c/file.txt نعم لا
/a/*/c /a/b/c_file.txt نعم لا
/a/*/c/ /a/b/c_file.txt نعم لا
/a/*/c/ /a/*/cookie/file.txt نعم لا
/a/b* /a/b.txt نعم نعم
/a/b* /a/b/file.txt نعم نعم
/a/{0.txt,1.txt} /a/0.txt نعم نعم
/a/*/{0.txt,1.txt} /a/0.txt لا لا
/a/b/[cde-h]/i/ /a/b/c/i/file.txt نعم نعم

تمكين ETL السهل

طريقة سهلة للحصول على بياناتك في Delta Lake دون فقدان أي بيانات هي استخدام النمط التالي وتمكين استدلال المخطط مع Auto Loader. توصي Databricks بتشغيل التعليمات البرمجية التالية في مهمة Azure Databricks حتى تقوم بإعادة تشغيل الدفق تلقائيا عند تغيير مخطط بيانات المصدر. بشكل افتراضي، يتم استنتاج المخطط لأنواع سلاسل، وستنتقل أي أخطاء تحليل (يجب ألا يكون هناك شيء إذا بقي كل شيء كسلسلة) إلى _rescued_data، وستفشل أي أعمدة جديدة في الدفق وتتطور المخطط.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

التطوير

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

منع فقدان البيانات في البيانات المنظمة بشكل جيد

عندما تعرف مخططك، ولكنك تريد معرفة متى تتلقى بيانات غير متوقعة، توصي Databricks باستخدام rescuedDataColumn.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

التطوير

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

إذا كنت تريد أن يتوقف الدفق عن المعالجة إذا تم تقديم حقل جديد لا يتطابق مع المخطط، يمكنك إضافة:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

تمكين البنية الأساسية لبرنامج ربط العمليات التجارية للبيانات شبه المنظمة المرنة

عندما تتلقى بيانات من مورد يقدم أعمدة جديدة إلى المعلومات التي يوفرها، قد لا تكون على دراية تامة بمتى يقومون بذلك، أو قد لا يكون لديك النطاق الترددي لتحديث البنية الأساسية لبرنامج ربط العمليات التجارية للبيانات. يمكنك الآن الاستفادة من تطور المخطط لإعادة تشغيل الدفق والسماح للتحميل التلقائي بتحديث المخطط المستنتج تلقائيا. يمكنك أيضا الاستفادة schemaHints من بعض الحقول "بلا مخطط" التي قد يوفرها المورد.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

التطوير

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

تحويل بيانات JSON المتداخلة

نظرا لأن "المحمل التلقائي" يستنتج أعمدة JSON ذات المستوى الأعلى كسلاسل، يمكن تركك مع كائنات JSON المتداخلة التي تتطلب المزيد من التحويلات. يمكنك استخدام واجهات برمجة التطبيقات للوصول إلى البيانات شبه المنظمة لزيادة تحويل محتوى JSON المعقد.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

التطوير

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

استنتاج بيانات JSON المتداخلة

عندما تكون لديك بيانات متداخلة cloudFiles.inferColumnTypes ، يمكنك استخدام الخيار للاستدلال على البنية المتداخلة للبيانات وأنواع الأعمدة الأخرى.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

التطوير

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

تحميل ملفات CSV بدون رؤوس

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

التطوير

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

فرض مخطط على ملفات CSV باستخدام الرؤوس

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

التطوير

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

استيعاب الصورة أو البيانات الثنائية إلى Delta Lake ل ML

بمجرد تخزين البيانات في Delta Lake، يمكنك تشغيل الاستدلال الموزع على البيانات. راجع إجراء الاستدلال الموزع باستخدام Pandas UDF.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

التطوير

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

بناء جملة المحمل التلقائي ل DLT

توفر Delta Live Tables بناء جملة Python معدلا قليلا للتحميل التلقائي يضيف دعم SQL للتحميل التلقائي.

تستخدم الأمثلة التالية Auto Loader لإنشاء مجموعات بيانات من ملفات CSV وJSON:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

يمكنك استخدام خيارات التنسيق المعتمدة مع "المحمل التلقائي". باستخدام الدالة map() ، يمكنك تمرير الخيارات إلى cloud_files() الأسلوب . الخيارات هي أزواج قيم المفاتيح، حيث تكون المفاتيح والقيم سلاسل. يصف ما يلي بناء الجملة للعمل مع Auto Loader في SQL:

CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

يقرأ المثال التالي البيانات من ملفات CSV محددة بعلامات جدولة مع عنوان:

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

يمكنك استخدام schema لتحديد التنسيق يدويا؛ يجب تحديد schema للتنسيقات التي لا تدعم استنتاج المخطط:

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
  FROM cloud_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

إشعار

تقوم Delta Live Tables تلقائيا بتكوين وإدارة المخطط ودلائل نقاط التحقق عند استخدام "المحمل التلقائي" لقراءة الملفات. ومع ذلك، إذا قمت بتكوين أي من هذه الدلائل يدويا، فإن إجراء تحديث كامل لا يؤثر على محتويات الدلائل المكونة. توصي Databricks باستخدام الدلائل التي تم تكوينها تلقائيا لتجنب الآثار الجانبية غير المتوقعة أثناء المعالجة.