Kurz: Azure Data Lake Storage Gen2, Azure Databricks a Spark

V tomto kurzu se dozvíte, jak připojit cluster Azure Databricks k datům uloženým v účtu úložiště Azure, který má povolenou službu Azure Data Lake Storage Gen2. Toto připojení umožňuje nativně spouštět dotazy a analýzy z clusteru na vašich datech.

V tomto kurzu:

  • Ingestace nestrukturovaných dat do účtu úložiště
  • Spouštění analýz dat v úložišti objektů blob

Pokud ještě nemáte předplatné Azure, vytvořte si napřed bezplatný účet.

Požadavky

Vytvoření pracovního prostoru, clusteru a poznámkového bloku Azure Databricks

  1. Vytvořte pracovní prostor Azure Databricks. Viz Vytvoření pracovního prostoru Azure Databricks.

  2. Vytvořte cluster. Viz Vytvoření clusteru.

  3. Vytvořte poznámkový blok. Viz Vytvoření poznámkového bloku. Jako výchozí jazyk poznámkového bloku zvolte Python.

Nechte poznámkový blok otevřený. Použijete ho v následujících částech.

Stažení údajů o letech

Tento kurz používá údaje o časově probíhajícím letu z ledna 2016 od úřadu pro statistiku dopravy k předvedení toho, jak provést operaci ETL. Abyste mohli absolvovat kurz, musíte si tato data stáhnout.

  1. Stáhněte soubor On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip. Tento soubor obsahuje letová data.

  2. Rozbalte obsah komprimovaného souboru a poznamenejte si název souboru a cestu k souboru. Tyto informace budete potřebovat v pozdějším kroku.

Pokud se chcete dozvědět o informacích zachycených v datech o výkonu sestav na čase, můžete si prohlédnout popisy polí na webu Bureau of Transportation Statistics .

Ingestace dat

V této části nahrajete do svého účtu Azure Data Lake Storage Gen2 data letu .csv a pak připojíte účet úložiště ke clusteru Databricks. Nakonec použijete Databricks ke čtení testovacích dat .csv a jejich zpětný zápis do úložiště ve formátu Apache parquet.

Nahrání testovacích dat do účtu úložiště

Pomocí AzCopy zkopírujte soubor .csv do účtu Azure Data Lake Storage Gen2. Pomocí azcopy make příkazu vytvoříte v účtu úložiště kontejner. Pak pomocí azcopy copy příkazu zkopírujete data csv , která jste právě stáhli do adresáře v daném kontejneru.

V následujících krocích musíte zadat názvy kontejneru, který chcete vytvořit, a adresář a objekt blob, do kterého chcete nahrát testovací data do kontejneru. V každém kroku můžete použít navrhované názvy nebo můžete zadat vlastní zásady vytváření názvů kontejnerů, adresářů a objektů blob.

  1. Otevřete okno příkazového řádku a zadáním následujícího příkazu se přihlaste ke službě Azure Active Directory pro přístup k účtu úložiště.

    azcopy login
    

    Postupujte podle pokynů zobrazených v okně příkazového řádku a ověřte svůj uživatelský účet.

  2. Pokud chcete v účtu úložiště vytvořit kontejner pro ukládání testovacích dat, zadejte následující příkaz:

    azcopy make  "https://<storage-account-name>.dfs.core.windows.net/<container-name>" 
    
    • <storage-account-name> Nahraďte zástupnou hodnotu názvem vašeho účtu úložiště.

    • <container-name> Zástupný symbol nahraďte názvem kontejneru, který chcete vytvořit, aby se ukládaly data csv, například flight-data-container.

  3. Pokud chcete nahrát (zkopírovat) data CSV do účtu úložiště, zadejte následující příkaz.

    azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
    
    • <csv-folder-path> Nahraďte zástupnou hodnotu cestou k souboru .csv.

    • <storage-account-name> Nahraďte zástupnou hodnotu názvem vašeho účtu úložiště.

    • <container-name> Zástupný symbol nahraďte názvem kontejneru v účtu úložiště.

    • <directory-name> Zástupný symbol nahraďte názvem adresáře, který bude ukládat data do kontejneru, například jan2016.

Připojení účtu úložiště ke clusteru Databricks

V této části připojíte cloudové úložiště azure Data Lake Storage Gen2 k systému souborů Databricks (DBFS). K ověřování pomocí účtu úložiště použijete instanční objekt Azure AD, který jste vytvořili dříve. Další informace najdete v tématu Připojení cloudového úložiště objektů v Azure Databricks.

  1. Připojte poznámkový blok ke clusteru.

    1. V dříve vytvořeném poznámkovém bloku vyberte tlačítko Připojení v pravém horním rohu panelu nástrojů poznámkového bloku. Toto tlačítko otevře selektor výpočetních prostředků. (Pokud jste už poznámkový blok připojili ke clusteru, zobrazí se název tohoto clusteru v textu tlačítka, nikoli v textu tlačítka.Připojení).

    2. V rozevírací nabídce clusteru vyberte cluster, který jste vytvořili dříve.

    3. Všimněte si, že se text v selektoru clusteru změní na spuštění. Než budete pokračovat, počkejte, až se cluster dokončí a název clusteru se zobrazí na tlačítku.

  2. Zkopírujte a vložte následující blok kódu do první buňky, ale tento kód zatím nespustíte.

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<clientSecret>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  3. V tomto bloku kódu:

    • V configsčásti , <clientSecret>nahraďte <appId>a <tenantId> zástupné hodnoty ID aplikace, tajný klíč klienta a ID tenanta, které jste zkopírovali při vytváření instančního objektu v požadavcích.

    • V identifikátoru source URI nahraďte hodnoty a <directory-name> zástupné symboly <storage-account-name><container-name>názvem vašeho účtu úložiště Azure Data Lake Storage Gen2 a názvem kontejneru a adresáře, který jste zadali při nahrání testovacích dat do účtu úložiště.

      Poznámka:

      Identifikátor schématu v identifikátoru URI říká Databricks, abfssaby používal ovladač systému souborů Azure Blob s protokolem TLS (Transport Layer Security). Další informace o identifikátoru URI najdete v tématu Použití identifikátoru URI Azure Data Lake Storage Gen2.

  4. Než budete pokračovat, ujistěte se, že se cluster dokončil.

  5. Stisknutím kláves SHIFT + ENTER spusťte kód v tomto bloku.

Kontejner a adresář, do kterého jste nahráli letová data do účtu úložiště, jsou teď dostupné v poznámkovém bloku prostřednictvím přípojného bodu / mnt/flightdata.

Použití poznámkového bloku Databricks k převodu CSV na formát Parquet

Teď, když jsou data testovací verze CSV přístupná prostřednictvím přípojného bodu DBFS, můžete ho pomocí datového rámce Apache Spark načíst do svého pracovního prostoru a zapsat je zpět ve formátu Apache Parquet do úložiště objektů Azure Data Lake Storage Gen2.

  • Datový rámec Sparku je dvourozměrná datová struktura s sloupci potenciálně různých typů. Datový rámec můžete použít k snadnému čtení a zápisu dat v různých podporovaných formátech. Pomocí datového rámce můžete načíst data z cloudového úložiště objektů a provádět jejich analýzu a transformace uvnitř výpočetního clusteru, aniž by to mělo vliv na podkladová data v cloudovém úložišti objektů. Další informace najdete v tématu Práce s datovými rámci PySpark v Azure Databricks.

  • Apache parquet je sloupcový formát souboru s optimalizacemi, které urychlují dotazy. Jedná se o efektivnější formát souboru než CSV nebo JSON. Další informace najdete v tématu Soubory Parquet.

V poznámkovém bloku přidejte novou buňku a vložte do ní následující kód.

# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

Stisknutím kláves SHIFT + ENTER spusťte kód v tomto bloku.

Než budete pokračovat k další části, ujistěte se, že jsou zapsána všechna data parquet, a ve výstupu se zobrazí text Hotovo.

Zkoumání dat

V této části použijete nástroj systému souborů Databricks k prozkoumání úložiště objektů Azure Data Lake Storage Gen2 pomocí přípojného bodu DBFS, který jste vytvořili v předchozí části.

Do nové buňky vložte následující kód, abyste získali seznam souborů v přípojovém bodu. První příkaz vypíše seznam souborů a adresářů. Druhý příkaz zobrazí výstup v tabulkovém formátu pro snadnější čtení.

dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))

Stisknutím kláves SHIFT + ENTER spusťte kód v tomto bloku.

Všimněte si, že adresář parquet se zobrazí v seznamu. Data letu .csv jste uložili ve formátu parquet do adresáře parquet/flights v předchozí části. Pokud chcete vypsat soubory v adresáři parquet/flights , vložte do nové buňky následující kód a spusťte ho:

display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))

Pokud chcete vytvořit nový soubor a vypsat ho, vložte do nové buňky následující kód a spusťte ho:

dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))

Vzhledem k tomu, že v tomto kurzu nepotřebujete soubor 1.txt , můžete do buňky vložit následující kód a spustit ho pro rekurzivní odstranění adresáře. Parametr True označuje rekurzivní odstranění.

dbutils.fs.rm("/mnt/flightdata/mydirectory", True)

Jako pohodlí můžete pomocí příkazu nápovědy zjistit podrobnosti o dalších příkazech.

dbutils.fs.help("rm")

Pomocí těchto ukázek kódu jste prozkoumali hierarchickou povahu HDFS pomocí dat uložených v účtu úložiště s povolenou službou Azure Data Lake Storage Gen2.

Vytváření dotazů na data

Teď můžete začít vytvářet dotazy na data, která jste nahráli do svého účtu úložiště. Do nové buňky zadejte každý z následujících bloků kódu a stisknutím kombinace kláves SHIFT+ENTER spusťte skript Pythonu.

Datové rámce poskytují bohatou sadu funkcí (výběr sloupců, filtrování, spojení, agregace), které umožňují efektivně řešit běžné problémy s analýzou dat.

Pokud chcete načíst datový rámec z dříve uložených letových dat parquet a prozkoumat některé z podporovaných funkcí, zadejte tento skript do nové buňky a spusťte ho.

# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")

# Print the schema of the dataframe
flight_df.printSchema()

# Print the flight database size
print("Number of flights in the database: ", flight_df.count())

# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)

# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected colums for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)

# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)

Zadáním tohoto skriptu do nové buňky spustíte některé základní analytické dotazy na data. Můžete zvolit spuštění celého skriptu (SHIFT+ENTER), zvýraznit každý dotaz a spustit ho samostatně pomocí kombinace kláves CTRL+SHIFT+ENTER, nebo zadat každý dotaz do samostatné buňky a spustit ho tam.

# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')

# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())

# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()

# List out all the airports in Texas
airports_in_texas = spark.sql(
    "SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)

# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
    "SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)

# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
    "SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()

# List airlines by the highest percentage of delayed flights. A delayed flight is one with a  departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
    "CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
    "CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
    "SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()

Shrnutí

V tomto kurzu se naučíte:

  • Vytvořili jste prostředky Azure, včetně účtu úložiště Azure Data Lake Storage Gen2 a instančního objektu Azure AD a přiřazených oprávnění pro přístup k účtu úložiště.

  • Vytvořili jste pracovní prostor, poznámkový blok a výpočetní cluster Azure Databricks.

  • Nástroj AzCopy slouží k nahrání nestrukturovaných testovacích dat .csv do účtu úložiště Azure Data Lake Storage Gen2.

  • Pomocí funkcí systému souborů Databricks jste připojili svůj účet úložiště Azure Data Lake Storage Gen2 a prozkoumali jeho hierarchický systém souborů.

  • Datové rámce Apache Sparku slouží k transformaci testovacích dat .csv do formátu Apache parquet a jejich uložení zpět do účtu úložiště Azure Data Lake Storage Gen2.

  • Pomocí datových rámců můžete prozkoumat letová data a provést jednoduchý dotaz.

  • Apache Spark SQL se používá k dotazování na údaje o letu celkového počtu letů pro jednotlivé letecké společnosti v lednu 2016, letiště v Texasu, letecké společnosti, které letí z Texasu, průměrné zpoždění příletu v minutách pro jednotlivé letecké společnosti na národní úrovni a procento letů jednotlivých leteckých společností, které mají zpožděné odlety nebo přílety.

Vyčištění prostředků

Pokud chcete poznámkový blok zachovat a vrátit se k němu později, je vhodné cluster vypnout (ukončit), abyste se vyhnuli poplatkům. Pokud chcete cluster ukončit, vyberte ho v selektoru výpočetních prostředků v pravém horním rohu panelu nástrojů poznámkového bloku, v nabídce vyberte Možnost Ukončit a potvrďte výběr. (Ve výchozím nastavení se cluster automaticky ukončí po 120 minutách nečinnosti.)

Pokud chcete odstranit jednotlivé prostředky pracovního prostoru, jako jsou poznámkové bloky a clustery, můžete to udělat na levém bočním panelu pracovního prostoru. Podrobné pokyny najdete v tématu Odstranění clusteru nebo odstranění poznámkového bloku.

Pokud už nejsou potřeba, odstraňte skupinu prostředků a všechny související prostředky. Uděláte to tak, že na webu Azure Portal vyberete skupinu prostředků pro účet úložiště a pracovní prostor a vyberete Odstranit.

Další kroky