إشعار
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تسجيل الدخول أو تغيير الدلائل.
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تغيير الدلائل.
يعمل "التحميل التلقائي" على تبسيط عدد من مهام استيعاب البيانات الشائعة. يوفر هذا المرجع السريع أمثلة للعديد من الأنماط الشائعة.
تصفية الدلائل أو الملفات باستخدام أنماط glob
يمكن استخدام أنماط Glob لتصفية الدلائل والملفات عند توفيرها في المسار.
| النمط | الوصف |
|---|---|
? |
يتطابق مع أي حرف واحد |
* |
يطابق صفرا أو أكثر من الأحرف |
[abc] |
يطابق حرفا واحدا من مجموعة الأحرف {a,b,c}. |
[a-z] |
يطابق حرفا واحدا من نطاق الأحرف {a... z}. |
[^a] |
يطابق حرفا واحدا ليس من مجموعة أحرف أو نطاق {a}. لاحظ أنه ^ يجب أن يحدث الحرف على الفور إلى يمين قوس الفتح. |
{ab,cd} |
يطابق سلسلة من مجموعة السلسلة {ab، cd}. |
{ab,c{de, fh}} |
يطابق سلسلة من مجموعة السلسلة {ab، cde، cfh}. |
path استخدم لتوفير أنماط البادئة، على سبيل المثال:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base-path>/*/files")
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", <format>)
.schema(schema)
.load("<base-path>/*/files")
هام
تحتاج إلى استخدام الخيار pathGlobFilter لتوفير أنماط لاحقة بشكل صريح. يوفر path عامل تصفية البادئة الوحيد.
على سبيل المثال، إذا كنت ترغب في توزيع الملفات فقط png في دليل يحتوي على ملفات ذات لاحقات مختلفة، يمكنك القيام بما يلي:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base-path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobfilter", "*.png")
.load(<base-path>)
إشعار
يختلف سلوك globbing الافتراضي ل Auto Loader عن السلوك الافتراضي لمصادر ملفات Spark الأخرى. أضف .option("cloudFiles.useStrictGlobber", "true") إلى القراءة لاستخدام globbing الذي يطابق سلوك Spark الافتراضي مقابل مصادر الملفات. راجع الجدول التالي للحصول على المزيد حول الكآبة:
| النمط | مسار الملف | globber الافتراضي | globber صارم |
|---|---|---|---|
| /a/b | /a/b/c/file.txt | نعم | نعم |
| /a/b | /a/b_dir/c/file.txt | لا | لا |
| /a/b | /a/b.txt | لا | لا |
| /a/b/ | /a/b.txt | لا | لا |
| /a/*/c/ | /a/b/c/file.txt | نعم | نعم |
| /a/*/c/ | /a/b/c/d/file.txt | نعم | نعم |
| /a/*/c/ | /a/b/x/y/c/file.txt | نعم | لا |
| /a/*/c | /a/b/c_file.txt | نعم | لا |
| /a/*/c/ | /a/b/c_file.txt | نعم | لا |
| /a/*/c/ | /a/*/cookie/file.txt | نعم | لا |
| /a/b* | /a/b.txt | نعم | نعم |
| /a/b* | /a/b/file.txt | نعم | نعم |
| /a/{0.txt,1.txt} | /a/0.txt | نعم | نعم |
| /a/*/{0.txt,1.txt} | /a/0.txt | لا | لا |
| /a/b/[cde-h]/i/ | /a/b/c/i/file.txt | نعم | نعم |
تمكين ETL السهل
طريقة سهلة للحصول على بياناتك في Delta Lake دون فقدان أي بيانات هي استخدام النمط التالي وتمكين استدلال المخطط مع Auto Loader. توصي Databricks بتشغيل التعليمات البرمجية التالية في مهمة Azure Databricks حتى تقوم بإعادة تشغيل الدفق تلقائيا عند تغيير مخطط بيانات المصدر. بشكل افتراضي، يتم استنتاج المخطط لأنواع سلاسل، وستنتقل أي أخطاء تحليل (يجب ألا يكون هناك شيء إذا بقي كل شيء كسلسلة) إلى _rescued_data، وستفشل أي أعمدة جديدة في الدفق وتتطور المخطط.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
.load("<path-to-source-data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
منع فقدان البيانات في البيانات المنظمة بشكل جيد
عندما تعرف مخططك، ولكنك تريد معرفة متى تتلقى بيانات غير متوقعة، توصي Databricks باستخدام rescuedDataColumn.
Python
spark.readStream.format("cloudFiles") \
.schema(expected_schema) \
.option("cloudFiles.format", "json") \
# will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.schema(expected_schema)
.option("cloudFiles.format", "json")
// will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
إذا كنت تريد أن يتوقف الدفق عن المعالجة إذا تم تقديم حقل جديد لا يتطابق مع المخطط، يمكنك إضافة:
.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")
تمكين البنية الأساسية لبرنامج ربط العمليات التجارية للبيانات شبه المنظمة المرنة
عندما تتلقى بيانات من مورد يقدم أعمدة جديدة إلى المعلومات التي يوفرها، قد لا تكون على دراية تامة بمتى يقومون بذلك، أو قد لا يكون لديك النطاق الترددي لتحديث البنية الأساسية لبرنامج ربط العمليات التجارية للبيانات. يمكنك الآن الاستفادة من تطور المخطط لإعادة تشغيل الدفق والسماح للتحميل التلقائي بتحديث المخطط المستنتج تلقائيا. يمكنك أيضا الاستفادة schemaHints من بعض الحقول "بلا مخطط" التي قد يوفرها المورد.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT") \
.load("/api/requests") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT")
.load("/api/requests")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
تحويل بيانات JSON المتداخلة
نظرا لأن "المحمل التلقائي" يستنتج أعمدة JSON ذات المستوى الأعلى كسلاسل، يمكن تركك مع كائنات JSON المتداخلة التي تتطلب المزيد من التحويلات. يمكنك استخدام واجهات برمجة التطبيقات للوصول إلى البيانات شبه المنظمة لزيادة تحويل محتوى JSON المعقد.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.load("<source-data-with-nested-json>") \
.selectExpr(
"*",
"tags:page.name", # extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" # extracts {"tags":{"eventType":...}}
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<source-data-with-nested-json>")
.selectExpr(
"*",
"tags:page.name", // extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", // extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" // extracts {"tags":{"eventType":...}}
)
استنتاج بيانات JSON المتداخلة
عندما تكون لديك بيانات متداخلة cloudFiles.inferColumnTypes ، يمكنك استخدام الخيار للاستدلال على البنية المتداخلة للبيانات وأنواع الأعمدة الأخرى.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("<source-data-with-nested-json>")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.option("cloudFiles.inferColumnTypes", "true")
.load("<source-data-with-nested-json>")
تحميل ملفات CSV بدون رؤوس
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
فرض مخطط على ملفات CSV باستخدام الرؤوس
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
استيعاب الصورة أو البيانات الثنائية إلى Delta Lake ل ML
بمجرد تخزين البيانات في Delta Lake، يمكنك تشغيل الاستدلال الموزع على البيانات. راجع إجراء الاستدلال الموزع باستخدام Pandas UDF.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
بناء جملة المحمل التلقائي ل DLT
توفر Delta Live Tables بناء جملة Python معدلا قليلا للتحميل التلقائي يضيف دعم SQL للتحميل التلقائي.
تستخدم الأمثلة التالية Auto Loader لإنشاء مجموعات بيانات من ملفات CSV وJSON:
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")
يمكنك استخدام خيارات التنسيق المعتمدة مع "المحمل التلقائي". باستخدام الدالة map() ، يمكنك تمرير الخيارات إلى cloud_files() الأسلوب . الخيارات هي أزواج قيم المفاتيح، حيث تكون المفاتيح والقيم سلاسل. يصف ما يلي بناء الجملة للعمل مع Auto Loader في SQL:
CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
FROM cloud_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
يقرأ المثال التالي البيانات من ملفات CSV محددة بعلامات جدولة مع عنوان:
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))
يمكنك استخدام schema لتحديد التنسيق يدويا؛ يجب تحديد schema للتنسيقات التي لا تدعم استنتاج المخطط:
Python
@dlt.table
def wiki_raw():
return (
spark.readStream.format("cloudFiles")
.schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
.option("cloudFiles.format", "parquet")
.load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
)
SQL
CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM cloud_files(
"/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
"parquet",
map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
)
إشعار
تقوم Delta Live Tables تلقائيا بتكوين وإدارة المخطط ودلائل نقاط التحقق عند استخدام "المحمل التلقائي" لقراءة الملفات. ومع ذلك، إذا قمت بتكوين أي من هذه الدلائل يدويا، فإن إجراء تحديث كامل لا يؤثر على محتويات الدلائل المكونة. توصي Databricks باستخدام الدلائل التي تم تكوينها تلقائيا لتجنب الآثار الجانبية غير المتوقعة أثناء المعالجة.