Oktatóanyag: Azure Data Lake Storage, Azure Databricks > Spark
Ez az oktatóanyag bemutatja, hogyan csatlakoztathatja az Azure Databricks-fürtöt egy olyan Azure Storage-fiókban tárolt adatokhoz, amelyeken engedélyezve van az Azure Data Lake Storage. Ezzel a kapcsolattal natív módon futtathat lekérdezéseket és elemzéseket a fürtből az adatokon.
Az oktatóanyag során az alábbi lépéseket fogja végrehajtani:
- Strukturálatlan adatok betöltése egy tárfiókba
- Elemzés futtatása az adatokon a Blob Storage-ban
Ha még nincs Azure-előfizetése, kezdés előtt hozzon létre egy ingyenes fiókot.
Előfeltételek
Hierarchikus névtérrel rendelkező tárfiók létrehozása (Azure Data Lake Storage)
Győződjön meg arról, hogy a felhasználói fiókjához hozzá van rendelve a Storage Blob Data Közreműködő szerepkör .
Telepítse az AzCopy 10-et. Lásd: Adatok átvitele az AzCopy 10-zel
Hozzon létre egy szolgáltatásnevet, hozzon létre egy ügyfélkulcsot, majd adjon hozzáférést a szolgáltatásnévnek a tárfiókhoz.
Lásd az oktatóanyagot: Csatlakozás az Azure Data Lake Storage-hoz (1–3. lépés). A lépések elvégzése után illessze be a bérlőazonosítót, az alkalmazásazonosítót és az ügyfél titkos kódértékeket egy szövegfájlba. Az oktatóanyag részében használni fogja őket.
Azure Databricks-munkaterület, -fürt és -jegyzetfüzet létrehozása
Azure Databricks-munkaterület létrehozása. Lásd: Azure Databricks-munkaterület létrehozása.
Hozzon létre egy fürtöt. Lásd: Fürt létrehozása.
Hozzon létre egy notebookot. Lásd: Jegyzetfüzet létrehozása. Válassza a Pythont a jegyzetfüzet alapértelmezett nyelveként.
Tartsa nyitva a jegyzetfüzetet. A következő szakaszokban használhatja.
A repülőjárat-adatok letöltése
Ez az oktatóanyag a Közlekedési Statisztikai Hivatal 2016. januári, időalapú teljesítmény-repülési adatait használja az ETL-műveletek végrehajtásának bemutatásához. Az oktatóanyag elvégzéséhez le kell töltenie ezeket az adatokat.
Töltse le a On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip fájlt. Ez a fájl tartalmazza a repülési adatokat.
Bontsa ki a tömörített fájl tartalmát, és jegyezze fel a fájl nevét és elérési útját. Egy későbbi lépésben szüksége lesz ezekre az információkra.
Ha szeretné megismerni az időalapú jelentéskészítési teljesítményadatokban rögzített információkat, a mezőleírásokat a Közlekedési Statisztikai Hivatal webhelyén tekintheti meg.
Adatok betöltése
Ebben a szakaszban feltölti a .csv repülési adatokat az Azure Data Lake Storage-fiókjába, majd csatlakoztatja a tárfiókot a Databricks-fürthöz. Végül a Databricks segítségével olvassa el a .csv repülési adatokat, és írja vissza a tárolóba Apache parquet formátumban.
A repülési adatok feltöltése a tárfiókba
Az AzCopy használatával másolja a .csv fájlt az Azure Data Lake Storage-fiókjába. A parancs használatával azcopy make
tárolót hozhat létre a tárfiókban. Ezután a azcopy copy
paranccsal másolja az imént letöltött csv-adatokat a tároló egyik könyvtárába.
A következő lépésekben meg kell adnia a létrehozni kívánt tároló nevét, valamint azt a könyvtárat és blobot, amelybe fel szeretné tölteni a repülési adatokat a tárolóban. Minden lépésben használhatja a javasolt neveket, vagy megadhatja a tárolók, könyvtárak és blobok elnevezési konvencióit.
Nyisson meg egy parancssori ablakot, és írja be a következő parancsot az Azure Active Directoryba való bejelentkezéshez a tárfiók eléréséhez.
azcopy login
Kövesse a parancssori ablakban megjelenő utasításokat a felhasználói fiók hitelesítéséhez.
Ha tárolót szeretne létrehozni a tárfiókban a repülési adatok tárolásához, írja be a következő parancsot:
azcopy make "https://<storage-account-name>.dfs.core.windows.net/<container-name>"
Cserélje le a
<storage-account-name>
helyőrző értékét a tárfiók nevére.Cserélje le a
<container-name>
helyőrzőt annak a tárolónak a nevére, amelyet a csv-adatok tárolásához szeretne létrehozni, például flight-data-container.
A csv-adatok tárfiókba való feltöltéséhez (másolásához) írja be a következő parancsot.
azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
Cserélje le a
<csv-folder-path>
helyőrző értékét a .csv fájl elérési útjára.Cserélje le a
<storage-account-name>
helyőrző értékét a tárfiók nevére.Cserélje le a
<container-name>
helyőrzőt a tárfiókban lévő tároló nevére.Cserélje le a
<directory-name>
helyőrzőt egy könyvtár nevére az adatok tárolóban való tárolásához, például 2016. januárban.
Tárfiók csatlakoztatása a Databricks-fürthöz
Ebben a szakaszban csatlakoztatja az Azure Data Lake Storage felhőobjektum-tárolóját a Databricks fájlrendszerhez (DBFS). A tárfiókkal való hitelesítéshez használja a korábban létrehozott Azure AD szolgáltatás elvét. További információ: Felhőobjektum-tároló csatlakoztatása az Azure Databrickshez.
Csatolja a jegyzetfüzetet a fürthöz.
A korábban létrehozott jegyzetfüzetben válassza a Csatlakozás gombot a jegyzetfüzet eszköztárának jobb felső sarkában. Ez a gomb megnyitja a számítási választót. (Ha már csatlakoztatta a jegyzetfüzetet egy fürthöz, a fürt neve nem a gombszövegben, hanem aCsatlakozás).
A fürt legördülő menüjében válassza ki a korábban létrehozott fürtöt.
Figyelje meg, hogy a fürtkijelölő szövegének kezdete megváltozik. Várjon, amíg a fürt elindul, és a folytatás előtt a fürt neve megjelenik a gombon.
Másolja és illessze be a következő kódblokkot az első cellába, de még ne futtassa ezt a kódot.
configs = {"fs.azure.account.auth.type": "OAuth", "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", "fs.azure.account.oauth2.client.id": "<appId>", "fs.azure.account.oauth2.client.secret": "<clientSecret>", "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token", "fs.azure.createRemoteFileSystemDuringInitialization": "true"} dbutils.fs.mount( source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>", mount_point = "/mnt/flightdata", extra_configs = configs)
Ebben a kódblokkban:
Ebben
configs
a fájlban cserélje le a<appId>
,<clientSecret>
és<tenantId>
helyőrző értékeket az alkalmazásazonosítóra, az ügyfél titkos kódjára és a bérlőazonosítóra, amelyet a szolgáltatásnév létrehozásakor másolt az előfeltételek között.source
Az URI-ban cserélje le a<storage-account-name>
,<container-name>
és<directory-name>
helyőrző értékeket az Azure Data Lake Storage-tárfiók nevére, valamint annak a tárolónak és könyvtárnak a nevére, amelyet a repülési adatok tárfiókba való feltöltésekor adott meg.Feljegyzés
Az URI sémaazonosítója arra utasítja a Databrickset,
abfss
hogy használja az Azure Blob Fájlrendszer illesztőprogramját a Transport Layer Security (TLS) használatával. További információ az URI-ról: Az Azure Data Lake Storage URI használata.
A folytatás előtt győződjön meg arról, hogy a fürt elindult.
Nyomja le a SHIFT + ENTER billentyűket a kód ebben a blokkban való futtatásához.
A tároló és a könyvtár, ahová a tárfiókba feltöltötte a repülési adatokat, mostantól elérhető a jegyzetfüzetben az /mnt/flightdata csatlakoztatási ponton keresztül.
CSV konvertálása parquetté a Databricks-jegyzetfüzet használatával
Most, hogy a CSV-repülési adatok egy DBFS-csatlakoztatási ponton keresztül érhetők el, az Apache Spark DataFrame használatával betöltheti a munkaterületre, és apache parquet formátumban újraírhatja azOkat az Azure Data Lake Storage-objektumtárolóba.
A Spark DataFrame egy kétdimenziós címkézett adatstruktúra, amelynek oszlopai különböző típusúak lehetnek. A DataFrame használatával egyszerűen olvashat és írhat adatokat különböző támogatott formátumokban. A DataFrame használatával adatokat tölthet be a felhőobjektum-tárolóból, és elemzéseket és átalakításokat végezhet rajta a számítási fürtön belül anélkül, hogy az hatással lenne a felhőobjektum-tároló mögöttes adataira. További információ: PySpark DataFrames használata az Azure Databricksen.
Az Apache parquet egy oszlopos fájlformátum, amely optimalizálással felgyorsítja a lekérdezéseket. Hatékonyabb fájlformátum, mint a CSV vagy a JSON. További információ: Parquet Files.
Vegyen fel egy új cellát a jegyzetfüzetbe, és illessze be a következő kódot.
# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
header='true', inferschema='true').load("/mnt/flightdata/*.csv")
# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")
Nyomja le a SHIFT + ENTER billentyűket a kód ebben a blokkban való futtatásához.
Mielőtt továbblép a következő szakaszra, győződjön meg arról, hogy az összes parquet-adat meg van írva, és a kimenetben megjelenik a "Kész" szöveg.
Adatok megismerése
Ebben a szakaszban a Databricks fájlrendszer segédprogrammal ismerkedhet meg az Azure Data Lake Storage-objektumtárolóval az előző szakaszban létrehozott DBFS-csatlakoztatási ponttal.
Egy új cellába illessze be a következő kódot, hogy lekérje a csatlakoztatási ponton található fájlok listáját. Az első parancs megjeleníti a fájlok és könyvtárak listáját. A második parancs táblázatos formátumban jeleníti meg a kimenetet a könnyebb olvashatóság érdekében.
dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))
Nyomja le a SHIFT + ENTER billentyűket a kód ebben a blokkban való futtatásához.
Figyelje meg, hogy a parquet könyvtár megjelenik a listaelemben. A .csv repülési adatokat parquet formátumban mentette az előző szakasz parquet/flights könyvtárába. A parquet/flights könyvtár fájljainak listázásához illessze be a következő kódot egy új cellába, és futtassa azt:
display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))
Új fájl létrehozásához és listázásához illessze be a következő kódot egy új cellába, és futtassa azt:
dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))
Mivel ebben az oktatóanyagban nincs szüksége a 1.txt fájlra, beillesztheti a következő kódot egy cellába, és futtathatja a könyvtár rekurzív törléséhez. A True
paraméter rekurzív törlést jelez.
dbutils.fs.rm("/mnt/flightdata/mydirectory", True)
A súgó paranccsal részletesen megismerheti a többi parancsot.
dbutils.fs.help("rm")
Ezekkel a kódmintákkal megismerkedett a HDFS hierarchikus jellegével egy olyan tárfiókban tárolt adatokkal, amelyek engedélyezve vannak az Azure Data Lake Storage-ban.
Adatok lekérdezése
Következő lépésként megkezdheti a tárfiókba feltöltött adatok lekérdezését. Írja be az alábbi kódblokkokat egy új cellába, és nyomja le a SHIFT + ENTER billentyűkombinációt a Python-szkript futtatásához.
A DataFrame-ek függvények (oszlopok kiválasztása, szűrés, illesztés, összesítés) gazdag készletét biztosítják, amelyek lehetővé teszik a gyakori adatelemzési problémák hatékony megoldását.
Ha be szeretne tölteni egy DataFrame-et a korábban mentett parquet-repülési adatokból, és meg szeretné vizsgálni a támogatott funkciók némelyikét, írja be ezt a szkriptet egy új cellába, és futtassa azt.
# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")
# Print the schema of the dataframe
flight_df.printSchema()
# Print the flight database size
print("Number of flights in the database: ", flight_df.count())
# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)
# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected colums for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)
# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)
Írja be ezt a szkriptet egy új cellába, hogy alapvető elemzési lekérdezéseket futtasson az adatokon. Dönthet úgy, hogy a teljes szkriptet (SHIFT + ENTER) futtatja, kijelöli az egyes lekérdezéseket, és külön futtatja a CTRL + SHIFT +ENTER billentyűkombinációval, vagy beírhatja az egyes lekérdezéseket egy külön cellába, és ott futtathatja.
# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')
# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())
# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()
# List out all the airports in Texas
airports_in_texas = spark.sql(
"SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)
# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
"SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)
# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
"SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()
# List airlines by the highest percentage of delayed flights. A delayed flight is one with a departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
"CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
"CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
"SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()
Összegzés
Az oktatóanyag során az alábbi lépéseket fogja végrehajtani:
Létrehozott Azure-erőforrásokat, beleértve egy Azure Data Lake Storage-tárfiókot és az Azure AD-szolgáltatásnevet, és hozzárendelte a tárfiókhoz való hozzáféréshez szükséges engedélyeket.
Létrehozott egy Azure Databricks-munkaterületet, jegyzetfüzetet és számítási fürtöt.
Az AzCopy használatával strukturálatlan .csv repülési adatokat töltött fel az Azure Data Lake Storage-tárfiókba.
A Databricks fájlrendszer segédprogramfüggvényeivel csatlakoztatta az Azure Data Lake Storage-tárfiókot, és feltárta annak hierarchikus fájlrendszerét.
Apache Spark DataFrame-ekkel alakította át a .csv repülési adatait Apache parquet formátumba, és tárolja vissza az Azure Data Lake Storage-tárfiókba.
A DataFrames segítségével felderítheti a repülési adatokat, és egy egyszerű lekérdezést hajthat végre.
Az Apache Spark SQL használatával lekérdezte az egyes légitársaságok 2016 januárjában végrehajtott járatainak teljes számát, a texasi repülőtereket, a Texasból induló légitársaságokat, az egyes légitársaságok országosan percenkénti átlagos érkezési késését, valamint az egyes légitársaságok járatainak százalékos arányát, amelyek késéssel érkeztek.
Az erőforrások eltávolítása
Ha meg szeretné őrizni a jegyzetfüzetet, és később visszatér hozzá, érdemes leállítani (leállítani) a fürtöt a költségek elkerülése érdekében. A fürt leállításához jelölje ki a jegyzetfüzet eszköztárának jobb felső sarkában található számítási választóban, válassza a Leállítás lehetőséget a menüből, és erősítse meg a kijelölést. (Alapértelmezés szerint a fürt 120 perc inaktivitás után automatikusan leáll.)
Ha törölni szeretné az egyes munkaterületi erőforrásokat, például jegyzetfüzeteket és fürtöket, ezt a munkaterület bal oldali oldalsávjáról teheti meg. Részletes útmutatásért tekintse meg a fürt törlése vagy a jegyzetfüzet törlése című témakört.
Ha már nincs rájuk szükség, törölje az erőforráscsoportot és az összes kapcsolódó erőforrást. Ehhez az Azure Portalon válassza ki a tárfiók és a munkaterület erőforráscsoportját, majd válassza a Törlés lehetőséget.