Tutorial: Azure Data Lake Storage Gen2, Azure Databricks & Spark

Ez az oktatóanyag bemutatja, hogyan csatlakoztathatja az Azure Databricks-fürtöt egy olyan Azure Storage-fiókban tárolt adatokhoz, amelyeken engedélyezve van az Azure Data Lake Storage Gen2. Ezzel a kapcsolattal natív módon futtathat lekérdezéseket és elemzéseket a fürtből az adatokon.

Az oktatóanyag során az alábbi lépéseket fogja végrehajtani:

  • Strukturálatlan adatok betöltése egy tárfiókba
  • Elemzés futtatása az adatokon a Blob Storage-ban

Ha még nincs Azure-előfizetése, kezdés előtt hozzon létre egy ingyenes fiókot.

Előfeltételek

Azure Databricks-munkaterület, -fürt és -jegyzetfüzet létrehozása

  1. Azure Databricks-munkaterület létrehozása. Lásd: Azure Databricks-munkaterület létrehozása.

  2. Hozzon létre egy fürtöt. Lásd: Fürt létrehozása.

  3. Hozzon létre egy notebookot. Lásd: Jegyzetfüzet létrehozása. Válassza a Pythont a jegyzetfüzet alapértelmezett nyelveként.

Tartsa nyitva a jegyzetfüzetet. A következő szakaszokban használhatja.

A repülőjárat-adatok letöltése

Ez az oktatóanyag a Közlekedési Statisztikai Hivatal 2016. januári, időalapú teljesítmény-repülési adatait használja az ETL-műveletek végrehajtásának bemutatásához. Az oktatóanyag elvégzéséhez le kell töltenie ezeket az adatokat.

  1. Töltse le a On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip fájlt. Ez a fájl tartalmazza a repülési adatokat.

  2. Bontsa ki a tömörített fájl tartalmát, és jegyezze fel a fájl nevét és elérési útját. Egy későbbi lépésben szüksége lesz ezekre az információkra.

Ha szeretné megismerni az időalapú jelentéskészítési teljesítményadatokban rögzített információkat, a mezőleírásokat a Közlekedési Statisztikai Hivatal webhelyén tekintheti meg.

Adatok betöltése

Ebben a szakaszban feltölti a .csv repülési adatokat az Azure Data Lake Storage Gen2-fiókjába, majd csatlakoztatja a tárfiókot a Databricks-fürthöz. Végül a Databricks használatával olvassa be a .csv repülési adatokat, és írja vissza a tárolóba Apache parquet formátumban.

A repülési adatok feltöltése a tárfiókba

Az AzCopy használatával másolja a .csv fájlt az Azure Data Lake Storage Gen2-fiókjába. A parancs használatával azcopy make tárolót hozhat létre a tárfiókban. Ezután a azcopy copy paranccsal másolja az imént letöltött csv-adatokat a tároló egyik könyvtárába.

A következő lépésekben meg kell adnia a létrehozni kívánt tároló nevét, valamint azt a könyvtárat és blobot, amelybe fel szeretné tölteni a repülési adatokat a tárolóban. Minden lépésben használhatja a javasolt neveket, vagy megadhatja a tárolók, könyvtárak és blobok elnevezési konvencióit.

  1. Nyisson meg egy parancssori ablakot, és írja be a következő parancsot az Azure Active Directoryba való bejelentkezéshez a tárfiók eléréséhez.

    azcopy login
    

    Kövesse a parancssori ablakban megjelenő utasításokat a felhasználói fiók hitelesítéséhez.

  2. Ha tárolót szeretne létrehozni a tárfiókban a repülési adatok tárolásához, írja be a következő parancsot:

    azcopy make  "https://<storage-account-name>.dfs.core.windows.net/<container-name>" 
    
    • Cserélje le a <storage-account-name> helyőrző értékét a tárfiók nevére.

    • Cserélje le a <container-name> helyőrzőt annak a tárolónak a nevére, amelyet a csv-adatok tárolásához szeretne létrehozni, például flight-data-container.

  3. A csv-adatok tárfiókba való feltöltéséhez (másolásához) írja be a következő parancsot.

    azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
    
    • Cserélje le a <csv-folder-path> helyőrző értéket a .csv fájl elérési útjára.

    • Cserélje le a <storage-account-name> helyőrző értékét a tárfiók nevére.

    • Cserélje le a <container-name> helyőrzőt a tárfiókban lévő tároló nevére.

    • Cserélje le a <directory-name> helyőrzőt egy könyvtár nevére az adatok tárolóban való tárolásához, például 2016. januárban.

Tárfiók csatlakoztatása a Databricks-fürthöz

Ebben a szakaszban az Azure Data Lake Storage Gen2 felhőobjektum-tárolóját csatlakoztatja a Databricks fájlrendszerhez (DBFS). A tárfiókkal való hitelesítéshez használja a korábban létrehozott Azure AD szolgáltatás elvét. További információ: Felhőobjektum-tároló csatlakoztatása az Azure Databrickshez.

  1. Csatolja a jegyzetfüzetet a fürthöz.

    1. A korábban létrehozott jegyzetfüzetben válassza a Csatlakozás gombot a jegyzetfüzet eszköztárának jobb felső sarkában. Ez a gomb megnyitja a számítási választót. (Ha már csatlakoztatta a jegyzetfüzetet egy fürthöz, a fürt neve nem a gombszövegben, hanem aCsatlakozás).

    2. A fürt legördülő menüjében válassza ki a korábban létrehozott fürtöt.

    3. Figyelje meg, hogy a fürtkijelölő szövegének kezdete megváltozik. Várjon, amíg a fürt elindul, és a folytatás előtt a fürt neve megjelenik a gombon.

  2. Másolja és illessze be a következő kódblokkot az első cellába, de még ne futtassa ezt a kódot.

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<clientSecret>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  3. Ebben a kódblokkban:

    • Ebben configsa fájlban cserélje le a <appId>, <clientSecret>és <tenantId> helyőrző értékeket az alkalmazásazonosítóra, az ügyfél titkos kódjára és a bérlőazonosítóra, amelyet a szolgáltatásnév létrehozásakor másolt az előfeltételek között.

    • source Az URI-ban cserélje le a <storage-account-name>, <container-name>és <directory-name> helyőrző értékeket az Azure Data Lake Storage Gen2-tárfiók nevére, valamint annak a tárolónak és könyvtárnak a nevére, amelyet a repülési adatok tárfiókba való feltöltésekor adott meg.

      Megjegyzés:

      Az URI sémaazonosítója arra utasítja a Databrickset, abfsshogy használja az Azure Blob Fájlrendszer illesztőprogramját a Transport Layer Security (TLS) használatával. További információ az URI-ról: Az Azure Data Lake Storage Gen2 URI használata.

  4. A folytatás előtt győződjön meg arról, hogy a fürt elindult.

  5. Nyomja le a SHIFT + ENTER billentyűket a kód ebben a blokkban való futtatásához.

A tároló és a könyvtár, ahová a tárfiókba feltöltötte a repülési adatokat, mostantól elérhető a jegyzetfüzetben az /mnt/flightdata csatlakoztatási ponton keresztül.

CSV konvertálása parquetté a Databricks-jegyzetfüzet használatával

Most, hogy a CSV-repülési adatok egy DBFS-csatlakoztatási ponton keresztül érhetők el, az Apache Spark DataFrame használatával betöltheti azokat a munkaterületre, és Apache parquet formátumban újraírhatja őket az Azure Data Lake Storage Gen2 objektumtárolóba.

  • A Spark DataFrame egy kétdimenziós címkézett adatstruktúra, amelynek oszlopai különböző típusúak lehetnek. A DataFrame használatával egyszerűen olvashat és írhat adatokat különböző támogatott formátumokban. A DataFrame használatával adatokat tölthet be a felhőobjektum-tárolóból, és elemzéseket és átalakításokat végezhet rajta a számítási fürtön belül anélkül, hogy az hatással lenne a felhőobjektum-tároló mögöttes adataira. További információ: PySpark DataFrames használata az Azure Databricksen.

  • Az Apache parquet egy oszlopos fájlformátum, amely optimalizálással felgyorsítja a lekérdezéseket. Hatékonyabb fájlformátum, mint a CSV vagy a JSON. További információ: Parquet Files.

Vegyen fel egy új cellát a jegyzetfüzetbe, és illessze be a következő kódot.

# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

Nyomja le a SHIFT + ENTER billentyűket a kód ebben a blokkban való futtatásához.

Mielőtt továbblép a következő szakaszra, győződjön meg arról, hogy az összes parquet-adat meg van írva, és a kimenetben megjelenik a "Kész" szöveg.

Adatok megismerése

Ebben a szakaszban a Databricks fájlrendszer segédprogrammal ismerkedhet meg az Azure Data Lake Storage Gen2 objektumtárolóval az előző szakaszban létrehozott DBFS csatlakoztatási ponttal.

Egy új cellába illessze be a következő kódot, hogy lekérje a csatlakoztatási ponton található fájlok listáját. Az első parancs megjeleníti a fájlok és könyvtárak listáját. A második parancs táblázatos formátumban jeleníti meg a kimenetet a könnyebb olvashatóság érdekében.

dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))

Nyomja le a SHIFT + ENTER billentyűket a kód ebben a blokkban való futtatásához.

Figyelje meg, hogy a parquet könyvtár megjelenik a listaelemben. A .csv repülési adatokat parquet formátumban mentette az előző szakasz parquet/flights könyvtárába. A parquet/flights könyvtár fájljainak listázásához illessze be a következő kódot egy új cellába, és futtassa azt:

display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))

Új fájl létrehozásához és listázásához illessze be a következő kódot egy új cellába, és futtassa azt:

dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))

Mivel ebben az oktatóanyagban nincs szüksége az 1.txt fájlra, beillesztheti a következő kódot egy cellába, és futtathatja a könyvtár rekurzív törléséhez. A True paraméter rekurzív törlést jelez.

dbutils.fs.rm("/mnt/flightdata/mydirectory", True)

A súgó paranccsal részletesen megismerheti a többi parancsot.

dbutils.fs.help("rm")

Ezekkel a kódmintákkal megismerhette a HDFS hierarchikus jellegét egy tárfiókban tárolt adatokkal, amelyek engedélyezve vannak az Azure Data Lake Storage Gen2-ben.

Adatok lekérdezése

Következő lépésként megkezdheti a tárfiókba feltöltött adatok lekérdezését. Írja be az alábbi kódblokkokat egy új cellába, és nyomja le a SHIFT + ENTER billentyűkombinációt a Python-szkript futtatásához.

A DataFrame-ek függvények (oszlopok kiválasztása, szűrés, illesztés, összesítés) gazdag készletét biztosítják, amelyek lehetővé teszik a gyakori adatelemzési problémák hatékony megoldását.

Ha be szeretne tölteni egy DataFrame-et a korábban mentett parquet-repülési adatokból, és meg szeretné vizsgálni a támogatott funkciók némelyikét, írja be ezt a szkriptet egy új cellába, és futtassa azt.

# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")

# Print the schema of the dataframe
flight_df.printSchema()

# Print the flight database size
print("Number of flights in the database: ", flight_df.count())

# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)

# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected colums for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)

# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)

Írja be ezt a szkriptet egy új cellába, hogy alapvető elemzési lekérdezéseket futtasson az adatokon. Dönthet úgy, hogy a teljes szkriptet (SHIFT + ENTER) futtatja, kijelöli az egyes lekérdezéseket, és külön futtatja a CTRL + SHIFT +ENTER billentyűkombinációval, vagy beírhatja az egyes lekérdezéseket egy külön cellába, és ott futtathatja.

# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')

# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())

# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()

# List out all the airports in Texas
airports_in_texas = spark.sql(
    "SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)

# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
    "SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)

# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
    "SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()

# List airlines by the highest percentage of delayed flights. A delayed flight is one with a  departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
    "CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
    "CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
    "SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()

Összesítés

Az oktatóanyag során az alábbi lépéseket fogja végrehajtani:

  • Létrehozott Azure-erőforrásokat, köztük egy Azure Data Lake Storage Gen2-tárfiókot és Azure AD-szolgáltatásnevet, és hozzárendelte a tárfiókhoz való hozzáféréshez szükséges engedélyeket.

  • Létrehozott egy Azure Databricks-munkaterületet, jegyzetfüzetet és számítási fürtöt.

  • Az AzCopy használatával strukturálatlan .csv repülési adatokat töltött fel az Azure Data Lake Storage Gen2 tárfiókba.

  • A Databricks Fájlrendszer segédprogram funkcióival csatlakoztatta az Azure Data Lake Storage Gen2-tárfiókot, és feltárta annak hierarchikus fájlrendszerét.

  • Apache Spark DataFrame-ekkel alakította át a .csv-repülési adatokat Apache parquet formátumra, és azokat visszatárolja az Azure Data Lake Storage Gen2-tárfiókjába.

  • A DataFrames segítségével felderítheti a repülési adatokat, és egy egyszerű lekérdezést hajthat végre.

  • Az Apache Spark SQL használatával lekérdezte az egyes légitársaságok 2016 januárjában végrehajtott járatainak teljes számát, a texasi repülőtereket, a Texasból induló légitársaságokat, az egyes légitársaságok országosan percenkénti átlagos érkezési késését, valamint az egyes légitársaságok járatainak százalékos arányát, amelyek késéssel érkeztek.

Clean up resources

Ha meg szeretné őrizni a jegyzetfüzetet, és később visszatér hozzá, érdemes leállítani (leállítani) a fürtöt a költségek elkerülése érdekében. A fürt leállításához jelölje ki a jegyzetfüzet eszköztárának jobb felső sarkában található számítási választóban, válassza a Leállítás lehetőséget a menüből, és erősítse meg a kijelölést. (Alapértelmezés szerint a fürt 120 perc inaktivitás után automatikusan leáll.)

Ha törölni szeretné az egyes munkaterületi erőforrásokat, például jegyzetfüzeteket és fürtöket, ezt a munkaterület bal oldali oldalsávjáról teheti meg. Részletes útmutatásért tekintse meg a fürt törlése vagy a jegyzetfüzet törlése című témakört.

Ha már nincs rájuk szükség, törölje az erőforráscsoportot és az összes kapcsolódó erőforrást. Ehhez az Azure Portalon válassza ki a tárfiók és a munkaterület erőforráscsoportját, majd válassza a Törlés lehetőséget.

További lépések