البرنامج التعليمي: Azure Data Lake Storage Gen2 وAzure Databricks وSpark
يوضح لك هذا البرنامج التعليمي كيفية توصيل نظام مجموعة Azure Databricks بالبيانات المخزنة في حساب تخزين Azure الذي يشتمل على Azure Data Lake Storage Gen2 ممكناً. يُمكّنك هذا الاتصال من تشغيل الاستعلامات والتحليلات على بياناتك من نظام المجموعة محلياً.
في هذا البرنامج التعليمي، سوف نتعلم:
- استيعاب بيانات غير مهيكلة في حساب تخزين
- تشغيل التحليلات المتعلقة ببياناتك في مخزن البيانات الثنائية الكبيرة
في حال لم يكن لديك اشتراك Azure، فأنشئ حساباً مجانيّاً قبل البدء.
المتطلبات الأساسية
إنشاء حساب تخزين يحتوي على مساحة اسم هرمية (Azure Data Lake Storage Gen2)
راجع إنشاء حساب تخزين للاستخدام مع Azure Data Lake Storage Gen2.
التحقق من تعيين دور مساهم بيانات كائن التخزين الثنائي كبير الحجم لحساب المستخدم لديك.
تثبيت AzCopy v10. راجع نقل البيانات باستخدام AzCopy v10
أنشئ كيان خدمة، وأنشئ سر العميل، ثم امنح كيان الخدمة حق الوصول إلى حساب التخزين.
راجع البرنامج التعليمي: الاتصال إلى Azure Data Lake Storage Gen2 (الخطوات من 1 إلى 3). بعد إكمال هذه الخطوات، تأكد من لصق معرف المستأجر ومعرف التطبيق والقيم السرية للعميل في ملف نصي. يمكنك استخدام ذلك لاحقًا في هذا البرنامج التعليمي.
إنشاء مساحة عمل Azure Databricks، نظام المجموعة، ودفتر الملاحظات
أنشئ مساحة عمل Azure Databricks. راجع إنشاء مساحة عمل Azure Databricks.
إنشاء نظام مجموعة. راجع إنشاء نظام مجموعة.
إنشاء دفتر ملاحظات. راجع إنشاء دفتر ملاحظات. اختر Python كلغة افتراضية لدفتر الملاحظات.
احتفظ بدفتر الملاحظات مفتوحا. يمكنك استخدامه في الأقسام التالية.
تنزيل بيانات الرحلة
يستخدم هذا البرنامج التعليمي بيانات رحلات الأداء في الوقت المحدد لشهر يناير 2016 من مكتب إحصاءات النقل لتوضيح كيفية تنفيذ عملية ETL. يجب عليك تحميل هذه البيانات لإكمال البرنامج التعليمي.
قم بتنزيل ملف On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip. يحتوي هذا الملف على بيانات الرحلة.
يجب إلغاء ضغط محتويات الملف المضغوط وتدوين اسم الملف ومساره. ستحتاج إلى هذه المعلومات في خطوة لاحقة.
إذا كنت ترغب في التعرف على المعلومات المسجلة في بيانات أداء التقارير في الوقت المحدد، يمكنك الاطلاع على الأوصاف الميدانية على موقع مكتب إحصاءات النقل على الويب.
استيعاب البيانات
في هذا القسم، يمكنك تحميل بيانات الرحلة .csv إلى حساب Azure Data Lake Storage Gen2 ثم تحميل حساب التخزين إلى مجموعة Databricks. وأخيرا، يمكنك استخدام Databricks لقراءة بيانات رحلة .csv وكتابتها مرة أخرى إلى التخزين بتنسيق Apache parquet.
تحميل بيانات الرحلة إلى حساب التخزين الخاص بك
استخدم AzCopy لنسخ ملف .csv إلى حساب Azure Data Lake Storage Gen2. يمكنك استخدام azcopy make
الأمر لإنشاء حاوية في حساب التخزين الخاص بك. ثم يمكنك استخدام azcopy copy
الأمر لنسخ بيانات csv التي قمت بتنزيلها للتو إلى دليل في تلك الحاوية.
في الخطوات التالية، تحتاج إلى إدخال أسماء الحاوية التي تريد إنشاؤها، والدليل والكائنات الثنائية كبيرة الحجم التي تريد تحميل بيانات الرحلة إليها في الحاوية. يمكنك استخدام الأسماء المقترحة في كل خطوة أو تحديد اصطلاحات التسمية للحاويات والدلائل والكائنات الثنائية كبيرة الحجم الخاصة بك.
افتح نافذة موجه الأوامر، وأدخل الأمر التالي لتسجيل الدخول إلى Azure Active Directory للوصول إلى حساب التخزين الخاص بك.
azcopy login
اتبع الإرشادات التي تظهر في نافذة موجه الأوامر لمصادقة حساب المستخدم الخاص بك.
لإنشاء حاوية في حساب التخزين الخاص بك لتخزين بيانات الرحلة، أدخل الأمر التالي:
azcopy make "https://<storage-account-name>.dfs.core.windows.net/<container-name>"
استبدل
<storage-account-name>
قيمة العنصر النائب باسم حساب التخزين الخاص بك.<container-name>
استبدل العنصر النائب باسم الحاوية التي تريد إنشاؤها لتخزين بيانات csv؛ على سبيل المثال، حاوية بيانات الرحلة.
لتحميل (نسخ) بيانات csv إلى حساب التخزين الخاص بك، أدخل الأمر التالي.
azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
استبدل
<csv-folder-path>
قيمة العنصر النائب بالمسار إلى ملف .csv.استبدل
<storage-account-name>
قيمة العنصر النائب باسم حساب التخزين الخاص بك.استبدل
<container-name>
العنصر النائب باسم الحاوية في حساب التخزين الخاص بك.<directory-name>
استبدل العنصر النائب باسم دليل لتخزين البيانات في الحاوية؛ على سبيل المثال، يناير 2016.
تحميل حساب التخزين الخاص بك إلى مجموعة Databricks
في هذا القسم، يمكنك تحميل تخزين كائن سحابة Azure Data Lake Storage Gen2 إلى نظام ملفات Databricks (DBFS). يمكنك استخدام مبدأ خدمة Azure AD الذي أنشأته مسبقا للمصادقة باستخدام حساب التخزين. لمزيد من المعلومات، راجع تحميل تخزين كائن السحابة على Azure Databricks.
قم بإرفاق دفتر الملاحظات إلى نظام المجموعة.
في دفتر الملاحظات الذي أنشأته مسبقا، حدد الزر الاتصال في الزاوية العلوية اليسرى من شريط أدوات دفتر الملاحظات. يفتح هذا الزر محدد الحوسبة. (إذا قمت بالفعل بتوصيل دفتر ملاحظاتك بمجموعة، يتم عرض اسم نظام المجموعة هذا في نص الزر بدلا من الاتصال).
في القائمة المنسدلة لنظام المجموعة، حدد نظام المجموعة الذي قمت بإنشائه مسبقا.
لاحظ أن النص في محدد نظام المجموعة يتغير إلى البدء. انتظر حتى تنتهي المجموعة من البدء وحتى يظهر اسم المجموعة في الزر قبل المتابعة.
انسخ كتلة التعليمات البرمجية الآتية وألصقها في أول خلية، لكن دون تشغيل هذه التعليمة البرمجية بعد.
configs = {"fs.azure.account.auth.type": "OAuth", "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", "fs.azure.account.oauth2.client.id": "<appId>", "fs.azure.account.oauth2.client.secret": "<clientSecret>", "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token", "fs.azure.createRemoteFileSystemDuringInitialization": "true"} dbutils.fs.mount( source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>", mount_point = "/mnt/flightdata", extra_configs = configs)
في كتلة التعليمات البرمجية هذه:
في
configs
، استبدل<appId>
قيم العنصر<clientSecret>
النائب و و<tenantId>
بمعرف التطبيق وسر العميل ومعرف المستأجر الذي نسخته عند إنشاء كيان الخدمة في المتطلبات الأساسية.source
في URI، استبدل<storage-account-name>
قيم العنصر<container-name>
النائب و و<directory-name>
باسم حساب تخزين Azure Data Lake Storage Gen2 واسم الحاوية والدليل الذي حددته عند تحميل بيانات الرحلة إلى حساب التخزين.إشعار
معرف النظام في URI،
abfss
، يخبر Databricks باستخدام برنامج تشغيل نظام ملفات Azure Blob مع أمان طبقة النقل (TLS). لمعرفة المزيد حول URI، راجع استخدام Azure Data Lake Storage Gen2 URI.
تأكد من انتهاء نظام المجموعة الخاص بك من البدء قبل المتابعة.
اضغط على المفاتيح SHIFT + ENTER لتشغيل التعليمات البرمجية في هذه الكتلة.
يمكن الآن الوصول إلى الحاوية والدليل حيث قمت بتحميل بيانات الرحلة في حساب التخزين الخاص بك في دفتر الملاحظات الخاص بك من خلال نقطة التحميل/ mnt/flightdata.
استخدم دفتر ملاحظات Databricks لتحويل CSV إلى Parquet
الآن بعد أن أصبح من الممكن الوصول إلى بيانات الرحلة csv من خلال نقطة تحميل DBFS، يمكنك استخدام Apache Spark DataFrame لتحميلها في مساحة العمل الخاصة بك وكتابتها مرة أخرى بتنسيق Apache parquet إلى تخزين كائن Azure Data Lake Storage Gen2.
Spark DataFrame هو بنية بيانات ثنائية الأبعاد تحمل أعمدة من أنواع مختلفة محتملة. يمكنك استخدام DataFrame لقراءة البيانات وكتابتها بسهولة بتنسيقات مختلفة مدعومة. باستخدام DataFrame، يمكنك تحميل البيانات من تخزين الكائنات السحابية وإجراء التحليل والتحويلات عليها داخل مجموعة الحوسبة الخاصة بك دون التأثير على البيانات الأساسية في تخزين كائن السحابة. لمعرفة المزيد، راجع العمل مع PySpark DataFrames على Azure Databricks.
Apache parquet هو تنسيق ملف عمودي مع تحسينات تعمل على تسريع الاستعلامات. إنه تنسيق ملف أكثر كفاءة من CSV أو JSON. لمعرفة المزيد، راجع ملفات Parquet.
في دفتر الملاحظات، أضف خلية جديدة، والصق التعليمات البرمجية التالية فيها.
# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
header='true', inferschema='true').load("/mnt/flightdata/*.csv")
# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")
اضغط على المفاتيح SHIFT + ENTER لتشغيل التعليمات البرمجية في هذه الكتلة.
قبل المتابعة إلى القسم التالي، تأكد من كتابة جميع بيانات parquet، ويظهر "Done" في الإخراج.
استكشاف البيانات
في هذا القسم، يمكنك استخدام الأداة المساعدة لنظام ملفات Databricks لاستكشاف تخزين كائن Azure Data Lake Storage Gen2 باستخدام نقطة تحميل DBFS التي قمت بإنشائها في القسم السابق.
في خلية جديدة، الصق التعليمات البرمجية التالية للحصول على قائمة بالملفات عند نقطة التحميل. يقوم الأمر الأول إخراج قائمة بالملفات والدلائل. يعرض الأمر الثاني الإخراج بتنسيق جدولي لتسهيل القراءة.
dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))
اضغط على المفاتيح SHIFT + ENTER لتشغيل التعليمات البرمجية في هذه الكتلة.
لاحظ أن دليل parquet يظهر في القائمة. حفظت بيانات الرحلة .csv بتنسيق parquet إلى دليل parquet/flights في القسم السابق. لسرد الملفات في دليل parquet/flights ، الصق التعليمات البرمجية التالية في خلية جديدة وقم بتشغيلها:
display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))
لإنشاء ملف جديد وإدراجه، الصق التعليمات البرمجية التالية في خلية جديدة وقم بتشغيله:
dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))
نظرا لأنك لا تحتاج إلى ملف 1.txt في هذا البرنامج التعليمي، يمكنك لصق التعليمات البرمجية التالية في خلية وتشغيلها لحذف mydirectory بشكل متكرر. True
تشير المعلمة إلى حذف متكرر.
dbutils.fs.rm("/mnt/flightdata/mydirectory", True)
كملاءمة، يمكنك استخدام أمر التعليمات لمعرفة التفاصيل حول الأوامر الأخرى.
dbutils.fs.help("rm")
باستخدام نماذج التعليمات البرمجية هذه، استكشفت الطبيعة الهرمية ل HDFS باستخدام البيانات المخزنة في حساب تخزين مع تمكين Azure Data Lake Storage Gen2.
الاستعلام عن البيانات
بعد ذلك، يمكنك البدء في الاستعلام عن البيانات التي حملتها في حساب التخزين الخاص بك. أدخل كل كتلة من كتل التعليمات البرمجية التالية في خلية جديدة واضغط على SHIFT + ENTER لتشغيل البرنامج النصي Python.
توفر DataFrames مجموعة غنية من الوظائف (تحديد الأعمدة، والتصفية، والانضمام، والتجاميع) التي تسمح لك بحل مشكلات تحليل البيانات الشائعة بكفاءة.
لتحميل DataFrame من بيانات رحلة parquet المحفوظة مسبقا واستكشاف بعض الوظائف المدعومة، أدخل هذا البرنامج النصي في خلية جديدة وقم بتشغيله.
# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")
# Print the schema of the dataframe
flight_df.printSchema()
# Print the flight database size
print("Number of flights in the database: ", flight_df.count())
# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)
# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected colums for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)
# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)
أدخل هذا البرنامج النصي في خلية جديدة لتشغيل بعض استعلامات التحليل الأساسية مقابل البيانات. يمكنك اختيار تشغيل البرنامج النصي بأكمله (SHIFT + ENTER)، أو تمييز كل استعلام وتشغيله بشكل منفصل باستخدام CTRL + SHIFT + ENTER، أو إدخال كل استعلام في خلية منفصلة وتشغيله هناك.
# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')
# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())
# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()
# List out all the airports in Texas
airports_in_texas = spark.sql(
"SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)
# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
"SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)
# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
"SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()
# List airlines by the highest percentage of delayed flights. A delayed flight is one with a departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
"CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
"CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
"SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()
الملخص
في هذا البرنامج التعليمي، سوف تتعلّم:
إنشاء موارد Azure، بما في ذلك حساب تخزين Azure Data Lake Storage Gen2 وأساس خدمة Azure AD، والأذونات المعينة للوصول إلى حساب التخزين.
إنشاء مساحة عمل Azure Databricks ودفتر الملاحظات والمجموعة الحسابية.
استخدم AzCopy لتحميل بيانات رحلة .csv غير منظمة البنية إلى حساب تخزين Azure Data Lake Storage Gen2.
استخدم وظائف الأداة المساعدة لنظام ملفات Databricks لتحميل حساب تخزين Azure Data Lake Storage Gen2 واستكشاف نظام الملفات الهرمي الخاص به.
استخدم Apache Spark DataFrames لتحويل بيانات رحلة .csv إلى تنسيق Apache parquet وتخزينها مرة أخرى إلى حساب تخزين Azure Data Lake Storage Gen2.
استخدم DataFrames لاستكشاف بيانات الرحلة وتنفيذ استعلام بسيط.
استخدمت Apache Spark SQL للاستعلام عن بيانات الرحلة عن العدد الإجمالي للرحلات الجوية لكل شركة طيران في يناير 2016، والمطارات في تكساس، وشركات الطيران التي تطير من تكساس، ومتوسط تأخير الوصول في دقائق لكل شركة طيران على الصعيد الوطني، والنسبة المئوية لرحلات كل شركة طيران التي تأخرت في المغادرة أو الوصول.
تنظيف الموارد
إذا كنت تريد الاحتفاظ بدفتر الملاحظات والعودة إليه لاحقا، فمن المستحسن إيقاف تشغيل (إنهاء) مجموعتك لتجنب الرسوم. لإنهاء نظام المجموعة، حدده في محدد الحساب الموجود في أعلى يمين شريط أدوات دفتر الملاحظات، وحدد إنهاء من القائمة، وتأكد من التحديد. (بشكل افتراضي، سيتم إنهاء المجموعة تلقائيا بعد 120 دقيقة من عدم النشاط.)
إذا كنت تريد حذف موارد مساحة العمل الفردية مثل دفاتر الملاحظات والمجموعات، يمكنك القيام بذلك من الشريط الجانبي الأيسر لمساحة العمل. للحصول على إرشادات مفصلة، راجع حذف نظام مجموعة أو حذف دفتر ملاحظات.
عند انعدام الحاجة إلى مجموعة الموارد، احذفها واحذف جميع الموارد ذات الصلة. للقيام بذلك في مدخل Microsoft Azure، حدد مجموعة الموارد لحساب التخزين ومساحة العمل وحدد حذف.