Dela via


Självstudie: Azure Data Lake Storage Gen2, Azure Databricks och Spark

Den här självstudien visar hur du ansluter ditt Azure Databricks-kluster till data som lagras i ett Azure-lagringskonto som har Azure Data Lake Storage Gen2 aktiverat. Med den här anslutningen kan du internt köra frågor och analyser från klustret på dina data.

I den här självstudien kommer vi att:

  • Mata in ostrukturerade data i ett lagringskonto
  • Köra analyser på data i Blob Storage

Om du inte har någon Azure-prenumeration skapar du ett kostnadsfritt konto innan du börjar.

Förutsättningar

Skapa en Azure Databricks-arbetsyta, ett kluster och en notebook-fil

  1. Skapa en Azure Databricks-arbetsyta. Se Skapa en Azure Databricks-arbetsyta.

  2. Skapa ett kluster. Se Skapa ett kluster.

  3. Skapa en anteckningsbok. Se Skapa en notebook-fil. Välj Python som standardspråk för notebook-filen.

Håll anteckningsboken öppen. Du använder den i följande avsnitt.

Ladda ned flygdata

I den här självstudien används prestandadata i tid för januari 2016 från Bureau of Transportation Statistics för att visa hur du utför en ETL-åtgärd. Du måste hämta dessa data för att kunna gå självstudien.

  1. Ladda ned filen On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip. Den här filen innehåller flygdata.

  2. Packa upp innehållet i den komprimerade filen och anteckna filnamnet och sökvägen. Du behöver den här informationen i ett senare steg.

Om du vill lära dig mer om den information som samlas in i prestandadata för rapportering i tid kan du se fältbeskrivningarna på webbplatsen för Bureau of Transportation Statistics.

Mata in data

I det här avsnittet laddar du upp .csv-flygdata till ditt Azure Data Lake Storage Gen2-konto och monterar sedan lagringskontot till ditt Databricks-kluster. Slutligen använder du Databricks för att läsa .csv-flygdata och skriva tillbaka dem till lagringen i Apache parquet-format.

Ladda upp flygdata till ditt lagringskonto

Använd AzCopy för att kopiera .csv-filen till ditt Azure Data Lake Storage Gen2-konto. Du använder azcopy make kommandot för att skapa en container i ditt lagringskonto. Sedan använder azcopy copy du kommandot för att kopiera csv-data som du nyss laddade ned till en katalog i containern.

I följande steg måste du ange namn för den container som du vill skapa och den katalog och blob som du vill ladda upp flygdata till i containern. Du kan använda de föreslagna namnen i varje steg eller ange egna namngivningskonventioner för containrar, kataloger och blobar.

  1. Öppna ett kommandotolksfönster och ange följande kommando för att logga in på Azure Active Directory för att få åtkomst till ditt lagringskonto.

    azcopy login
    

    Följ anvisningarna som visas i kommandotolken för att autentisera ditt användarkonto.

  2. Om du vill skapa en container i ditt lagringskonto för att lagra flygdata anger du följande kommando:

    azcopy make  "https://<storage-account-name>.dfs.core.windows.net/<container-name>" 
    
    • Ersätt platshållarvärdet <storage-account-name> med namnet på ditt lagringskonto.

    • <container-name> Ersätt platshållaren med ett namn på containern som du vill skapa för att lagra csv-data, till exempel flight-data-container.

  3. Om du vill ladda upp (kopiera) csv-data till ditt lagringskonto anger du följande kommando.

    azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
    
    • <csv-folder-path> Ersätt platshållarvärdet med sökvägen till .csv-filen.

    • Ersätt platshållarvärdet <storage-account-name> med namnet på ditt lagringskonto.

    • <container-name> Ersätt platshållaren med namnet på containern i ditt lagringskonto.

    • <directory-name> Ersätt platshållaren med namnet på en katalog för att lagra dina data i containern, till exempel jan2016.

Montera ditt lagringskonto i databricks-klustret

I det här avsnittet monterar du azure Data Lake Storage Gen2-molnobjektlagringen till Databricks-filsystemet (DBFS). Du använder azure AD-tjänstprincipen som du skapade tidigare för autentisering med lagringskontot. Mer information finns i Montera molnobjektlagring på Azure Databricks.

  1. Koppla anteckningsboken till klustret.

    1. I anteckningsboken som du skapade tidigare väljer du knappen Anslut i det övre högra hörnet i notebook-verktygsfältet. Den här knappen öppnar beräkningsväljaren. (Om du redan har anslutit anteckningsboken till ett kluster visas namnet på klustret i knapptexten i stället förAnslut).

    2. I den nedrullningsbara menyn för kluster väljer du det kluster som du skapade tidigare.

    3. Observera att texten i klusterväljaren ändras till att starta. Vänta tills klustret har startats och för att namnet på klustret ska visas i knappen innan du fortsätter.

  2. Kopiera och klistra in följande kodblock i den första cellen, men kör inte den här koden än.

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<clientSecret>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  3. I det här kodblocket:

    • I configsersätter <appId>du platshållarvärdena , <clientSecret>och <tenantId> med program-ID, klienthemlighet och klient-ID som du kopierade när du skapade tjänstens huvudnamn i förutsättningarna.

    • source I URI:n ersätter <storage-account-name>du platshållarvärdena , <container-name>och <directory-name> med namnet på ditt Azure Data Lake Storage Gen2-lagringskonto och namnet på den container och katalog som du angav när du laddade upp flygdata till lagringskontot.

      Kommentar

      Schemaidentifieraren i URI:n säger abfsstill Databricks att använda Drivrutinen för Azure Blob File System med Transport Layer Security (TLS). Mer information om URI finns i Använda Azure Data Lake Storage Gen2 URI.

  4. Kontrollera att klustret har startats innan du fortsätter.

  5. Tryck på SKIFT + RETUR för att köra koden i det här blocket.

Containern och katalogen där du laddade upp flygdata i ditt lagringskonto är nu tillgänglig i notebook-filen via monteringspunkten / mnt/flightdata.

Använda Databricks-anteckningsboken för att konvertera CSV till Parquet

Nu när csv-flygdata är tillgängliga via en DBFS-monteringspunkt kan du använda en Apache Spark DataFrame för att läsa in den på din arbetsyta och skriva tillbaka den i Apache parquet-format till din Azure Data Lake Storage Gen2-objektlagring.

  • En Spark DataFrame är en tvådimensionell etiketterad datastruktur med kolumner av potentiellt olika typer. Du kan använda en DataFrame för att enkelt läsa och skriva data i olika format som stöds. Med en DataFrame kan du läsa in data från molnobjektlagring och utföra analyser och transformeringar på dem i beräkningsklustret utan att påverka underliggande data i molnobjektlagringen. Mer information finns i Arbeta med PySpark DataFrames på Azure Databricks.

  • Apache parquet är ett kolumnformat med optimeringar som påskyndar frågor. Det är ett effektivare filformat än CSV eller JSON. Mer information finns i Parquet Files.

Lägg till en ny cell i anteckningsboken och klistra in följande kod i den.

# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

Tryck på SKIFT + RETUR för att köra koden i det här blocket.

Innan du fortsätter till nästa avsnitt kontrollerar du att alla parquet-data har skrivits och att "Klar" visas i utdata.

Utforska data

I det här avsnittet använder du databricks-filsystemverktyget för att utforska ditt Azure Data Lake Storage Gen2-objektlagring med hjälp av DBFS-monteringspunkten som du skapade i föregående avsnitt.

I en ny cell klistrar du in följande kod för att hämta en lista över filerna vid monteringspunkten. Det första kommandot matar ut en lista över filer och kataloger. Det andra kommandot visar utdata i tabellformat för enklare läsning.

dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))

Tryck på SKIFT + RETUR för att köra koden i det här blocket.

Observera att parquet-katalogen visas i listan. Du sparade .csv-flygdata i parquet-format till katalogen parquet/flights i föregående avsnitt. Om du vill visa filer i katalogen parquet/flights klistrar du in följande kod i en ny cell och kör den:

display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))

Om du vill skapa en ny fil och lista den klistrar du in följande kod i en ny cell och kör den:

dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))

Eftersom du inte behöver filen 1.txt i den här självstudien kan du klistra in följande kod i en cell och köra den för att rekursivt ta bort mydirectory. Parametern True anger en rekursiv borttagning.

dbutils.fs.rm("/mnt/flightdata/mydirectory", True)

Som en bekvämlighet kan du använda hjälpkommandot för att lära dig mer om andra kommandon.

dbutils.fs.help("rm")

Med dessa kodexempel har du utforskat hdfs hierarkiska karaktär med hjälp av data som lagras i ett lagringskonto med Azure Data Lake Storage Gen2 aktiverat.

Fråga efter data

Därefter kan du börja fråga efter de data du har laddat upp till ditt lagringskonto. Ange vart och ett av följande kodblock i en ny cell och tryck på SKIFT + RETUR för att köra Python-skriptet.

DataFrames tillhandahåller en omfattande uppsättning funktioner (välj kolumner, filtrera, koppla, aggregera) som gör att du kan lösa vanliga dataanalysproblem effektivt.

Om du vill läsa in en DataFrame från dina tidigare sparade parquet-flygdata och utforska några av de funktioner som stöds anger du det här skriptet i en ny cell och kör den.

# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")

# Print the schema of the dataframe
flight_df.printSchema()

# Print the flight database size
print("Number of flights in the database: ", flight_df.count())

# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)

# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected colums for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)

# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)

Ange det här skriptet i en ny cell för att köra några grundläggande analysfrågor mot data. Du kan välja att köra hela skriptet (SKIFT + RETUR), markera varje fråga och köra den separat med CTRL + SKIFT + RETUR, eller ange varje fråga i en separat cell och köra den där.

# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')

# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())

# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()

# List out all the airports in Texas
airports_in_texas = spark.sql(
    "SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)

# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
    "SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)

# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
    "SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()

# List airlines by the highest percentage of delayed flights. A delayed flight is one with a  departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
    "CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
    "CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
    "SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()

Sammanfattning

I den här kursen får du:

  • Skapade Azure-resurser, inklusive ett Azure Data Lake Storage Gen2-lagringskonto och Azure AD-tjänstens huvudnamn, och tilldelade behörigheter för åtkomst till lagringskontot.

  • Skapade en Azure Databricks-arbetsyta, notebook-fil och beräkningskluster.

  • Använde AzCopy för att ladda upp ostrukturerade .csv-flygdata till Azure Data Lake Storage Gen2-lagringskontot.

  • Databricks-filsystemets verktygsfunktioner användes för att montera ditt Azure Data Lake Storage Gen2-lagringskonto och utforska dess hierarkiska filsystem.

  • Använde Apache Spark DataFrames för att omvandla dina .csv-flygdata till Apache parquet-format och lagra dem tillbaka till ditt Azure Data Lake Storage Gen2-lagringskonto.

  • Använde DataFrames för att utforska flygdata och utföra en enkel fråga.

  • Använde Apache Spark SQL för att fråga flygdata för det totala antalet flygningar för varje flygbolag i januari 2016, flygplatserna i Texas, flygbolagen som flyger från Texas, den genomsnittliga ankomstfördröjningen i minuter för varje flygbolag nationellt och procentandelen av varje flygbolags flygningar som har försenat avgångar eller ankomster.

Rensa resurser

Om du vill bevara anteckningsboken och återkomma till den senare är det en bra idé att stänga av (avsluta) klustret för att undvika avgifter. Om du vill avsluta klustret väljer du det i beräkningsväljaren längst upp till höger i notebook-verktygsfältet, väljer Avsluta från menyn och bekräftar ditt val. (Som standard avslutas klustret automatiskt efter 120 minuters inaktivitet.)

Om du vill ta bort enskilda arbetsyteresurser som notebook-filer och kluster kan du göra det från arbetsytans vänstra sidopanel. Detaljerade anvisningar finns i Ta bort ett kluster eller Ta bort en notebook-fil.

Ta bort resursgruppen och alla relaterade resurser när de inte längre behövs. Om du vill göra det i Azure-portalen väljer du resursgruppen för lagringskontot och arbetsytan och väljer Ta bort.

Nästa steg