بدء استخدام COPY INTO لتحميل البيانات
COPY INTO
يتيح لك أمر SQL تحميل البيانات من موقع ملف إلى جدول Delta. هذه عملية قابلة لإعادة الفرز ومكررة؛ يتم تخطي الملفات الموجودة في الموقع المصدر الذي تم تحميله بالفعل.
COPY INTO
يوفر الإمكانات التالية:
- تصفية الملفات أو الدليل القابلة للتكوين بسهولة من التخزين السحابي، بما في ذلك وحدات تخزين S3 وADS Gen2 وABFS وGCS ووحدات تخزين كتالوج Unity.
- دعم تنسيقات الملفات المصدر المتعددة: CSV وJSON وXML وAvro وORC وParquet والنصوص والملفات الثنائية
- معالجة الملفات مرة واحدة (غير متكررة) بشكل افتراضي
- استنتاج مخطط الجدول الهدف وتعيينه ودمجه وتطوره
إشعار
للحصول على تجربة استيعاب ملفات أكثر قابلية للتطوير وقوة، توصي Databricks بأن يستفيد مستخدمو SQL من جداول الدفق. راجع تحميل البيانات باستخدام جداول الدفق في Databricks SQL.
تحذير
COPY INTO
يحترم إعداد مساحة العمل لمتجهات الحذف. إذا تم تمكينها، يتم تمكين متجهات الحذف على الجدول الهدف عند COPY INTO
التشغيل على مستودع SQL أو حساب تشغيل Databricks Runtime 14.0 أو أعلى. بمجرد التمكين، تحظر متجهات الحذف الاستعلامات مقابل جدول في Databricks Runtime 11.3 LTS وما دونه. راجع ما هي متجهات الحذف؟ والتمكين التلقائي لنواقل الحذف.
المتطلبات
يجب أن يتبع مسؤول الحساب الخطوات الواردة في تكوين الوصول إلى البيانات لاستيعابها لتكوين الوصول إلى البيانات في تخزين كائن السحابة قبل أن يتمكن المستخدمون من تحميل البيانات باستخدام COPY INTO
.
مثال: تحميل البيانات في جدول Delta Lake بلا مخطط
إشعار
تتوفر هذه الميزة في Databricks Runtime 11.3 LTS وما فوق.
يمكنك إنشاء جداول دلتا العنصر النائب الفارغة بحيث يتم استنتاج المخطط لاحقا أثناء أمر COPY INTO
عن طريق تعيين mergeSchema
إلى true
في COPY_OPTIONS
:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
عبارة SQL أعلاه متكررة ويمكن جدولتها للتشغيل لاستيعاب البيانات مرة واحدة بالضبط في جدول Delta.
إشعار
جدول Delta الفارغ غير قابل للاستخدام خارج COPY INTO
. INSERT INTO
ولا MERGE INTO
يتم اعتماد لكتابة البيانات في جداول Delta بلا مخطط. بعد إدراج البيانات في الجدول باستخدام COPY INTO
، يصبح الجدول قابلا للاستعلام.
راجع إنشاء جداول الهدف ل COPY INTO.
مثال: تعيين المخطط وتحميل البيانات في جدول Delta Lake
يوضح المثال التالي كيفية إنشاء جدول Delta ثم استخدام COPY INTO
الأمر SQL لتحميل نموذج البيانات من مجموعات بيانات Databricks في الجدول. يمكنك تشغيل مثال التعليمات البرمجية Python أو R أو Scala أو SQL من دفتر ملاحظات مرفق بمجموعة Azure Databricks. يمكنك أيضا تشغيل التعليمات البرمجية SQL من استعلام مقترن بمستودع SQL في Databricks SQL.
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
للتنظيف، قم بتشغيل التعليمات البرمجية التالية، والتي تحذف الجدول:
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
تنظيف ملفات بيانات التعريف
يمكنك تشغيل فراغ لتنظيف ملفات بيانات التعريف غير المخزنة التي تم إنشاؤها بواسطة COPY INTO
في Databricks Runtime 15.2 وما فوق.
المرجع
- Databricks Runtime 7.x وما فوق: COPY INTO
الموارد الإضافية
تحميل البيانات باستخدام COPY INTO مع وحدات تخزين كتالوج Unity أو المواقع الخارجية
للحصول على أنماط الاستخدام الشائعة، بما في ذلك أمثلة عمليات متعددة
COPY INTO
مقابل نفس جدول Delta، راجع أنماط تحميل البيانات الشائعة باستخدام COPY INTO.