Udostępnij za pośrednictwem


Samouczek: Azure Data Lake Storage, Azure Databricks i Spark

W tym samouczku pokazano, jak połączyć klaster usługi Azure Databricks z danymi przechowywanymi na koncie usługi Azure Storage z włączoną usługą Azure Data Lake Storage. Takie połączenie umożliwia natywne wykonywanie w klastrze zapytań i analiz dotyczących tych danych.

Ten samouczek obejmuje następujące kroki:

  • Pozyskiwanie danych bez struktury na koncie magazynu
  • Przeprowadzanie analiz na danych w pamięci masowej Blob

Jeśli nie masz subskrypcji platformy Azure, przed rozpoczęciem utwórz bezpłatne konto.

Wymagania wstępne

Utwórz obszar roboczy i notebook usługi Azure Databricks

  1. Tworzenie obszaru roboczego usługi Azure Databricks. Zobacz Tworzenie obszaru roboczego usługi Azure Databricks.

  2. Utwórz notes. Zobacz Tworzenie notesu. Wybierz język Python jako domyślny język notesu.

Pozostaw otwarty notatnik. Należy go użyć w poniższych sekcjach.

Pobieranie danych lotów

Samouczek ten wykorzystuje dane dotyczące punktualności lotów za styczeń 2016 r. z Biura Statystyki Transportu w celu zademonstrowania, jak wykonać operację ETL. Aby ukończyć samouczek, musisz pobrać te dane.

  1. Pobierz plik On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip. Ten plik zawiera dane lotu.

  2. Rozpakuj zawartość pliku zip i zanotuj nazwę pliku oraz jego ścieżkę. Te informacje będą potrzebne w późniejszym kroku.

Jeśli chcesz dowiedzieć się więcej o danych dotyczących terminowości raportowania, możesz zobaczyć opisy pól w witrynie internetowej Biura Statystyki Transportu.

Pozyskiwanie danych

W tej sekcji przekażesz dane lotów .csv do konta usługi Azure Data Lake Storage, a następnie zainstalujesz konto magazynu w klastrze usługi Databricks. Na koniec użyjesz usługi Databricks do odczytywania .csv danych lotu i zapisywania ich z powrotem do magazynu w formacie Apache parquet.

Przekaż dane lotu do swojego konta pamięci masowej.

Użyj narzędzia AzCopy, aby skopiować plik .csv na konto usługi Azure Data Lake Storage. Polecenie służy azcopy make do tworzenia kontenera na koncie magazynu. Następnie użyjesz polecenia , azcopy copy aby skopiować właśnie pobrane dane csv do katalogu w tym kontenerze.

W kolejnych krokach należy wprowadzić nazwę kontenera, który chcesz utworzyć, oraz nazwę katalogu i obiektu blob, do którego chcesz przekazać dane lotu w kontenerze. Możesz użyć sugerowanych nazw w każdym kroku lub określić własne, przestrzegając konwencji nazewnictwa kontenerów, katalogów i obiektów blob.

  1. Otwórz okno wiersza polecenia i wprowadź następujące polecenie, aby zalogować się do usługi Azure Active Directory w celu uzyskania dostępu do konta przechowywania.

    azcopy login
    

    Postępuj zgodnie z instrukcjami wyświetlanymi w oknie wiersza polecenia, aby uwierzytelnić konto użytkownika.

  2. Aby utworzyć kontener na koncie magazynu do przechowywania danych lotu, wprowadź następujące polecenie:

    azcopy make  "https://<storage-account-name>.dfs.core.windows.net/<container-name>" 
    
    • Zastąp wartość symbolu zastępczego <storage-account-name> nazwą konta magazynowego.

    • <container-name> Zastąp symbol zastępczy nazwą kontenera, który chcesz utworzyć, aby przechowywać dane csv, na przykład flight-data-container.

  3. Aby przekazać (skopiować) dane csv na konto magazynu, wprowadź następujące polecenie.

    azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
    
    • Zastąp wartość zastępnika <csv-folder-path> ścieżką do pliku .csv.

    • Zastąp wartość symbolu zastępczego <storage-account-name> nazwą konta magazynowego.

    • Zastąp symbol zastępczy <container-name> nazwą kontenera na Twoim koncie magazynowym.

    • <directory-name> Zastąp symbol zastępczy nazwą katalogu do przechowywania danych w kontenerze, na przykład jan2016.

Instalowanie konta magazynu w klastrze usługi Databricks

W tej sekcji podłączysz magazyn obiektów w chmurze Azure Data Lake Storage do systemu plików Databricks (DBFS). Używasz wcześniej utworzonej zasady usługi Azure AD do uwierzytelniania z użyciem konta magazynu. Aby uzyskać więcej informacji, zobacz Instalowanie magazynu obiektów w chmurze w usłudze Azure Databricks.

  1. Dołącz notebooka do klastra.

    1. W utworzonym wcześniej notesie wybierz przycisk Połącz w prawym górnym rogu paska narzędzi notesu. Ten przycisk otwiera selektor obliczeniowy. (Jeśli notes został już połączony z klastrem, nazwa tego klastra jest wyświetlana w tekście przycisku zamiast Połącz).

    2. W menu rozwijanym klastra wybierz dowolny utworzony wcześniej klaster.

    3. Zwróć uwagę, że tekst w selektorze klastra zmienia się na rozpoczęcie. Przed kontynuowaniem poczekaj na zakończenie uruchamiania klastra i zaczekaj na wyświetlenie nazwy klastra w przycisku.

  2. Skopiuj i wklej następujący blok kodu do pierwszej komórki, ale jeszcze nie uruchamiaj kodu.

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<clientSecret>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  3. W tym bloku kodu:

    • W configs zastąp wartości zastępcze <appId>, <clientSecret> i <tenantId> identyfikatorem aplikacji, kluczem tajnym klienta i identyfikatorem dzierżawy, które skopiowałeś podczas tworzenia jednostki usługi w ramach przygotowań.

    • W identyfikatorze source URI zastąp wartości symboli zastępczych <storage-account-name>, <container-name> i <directory-name> nazwą konta magazynowania w usłudze Azure Data Lake Storage oraz nazwą kontenera i katalogu, które określiłeś podczas przesyłania danych lotu do konta magazynowania.

      Uwaga

      Identyfikator schematu w identyfikatorze URI abfss informuje usługę Databricks o użyciu sterownika systemu plików Blob platformy Azure z protokołem Transport Layer Security (TLS). Aby dowiedzieć się więcej na temat identyfikatora URI, zobacz Use the Azure Data Lake Storage URI (Korzystanie z identyfikatora URI usługi Azure Data Lake Storage).

  4. Przed kontynuowaniem upewnij się, że klaster zakończył uruchamianie.

  5. Naciśnij klawisze SHIFT+ENTER, aby uruchomić kod w tym bloku.

Kontener i katalog, do którego przesłałeś dane lotu na koncie magazynu, jest teraz dostępny w notesie za pośrednictwem punktu montowania /mnt/flightdata.

Użyj notesu usługi Databricks do konwertowania formatu CSV na format Parquet

Teraz, gdy dane lotu csv są dostępne za pośrednictwem punktu montowania DBFS, możesz użyć ramki danych Apache Spark, aby załadować je do obszaru roboczego i zapisać je z powrotem w formacie Apache Parquet do magazynu obiektów usługi Azure Data Lake Storage.

  • Ramka danych platformy Spark to dwuwymiarowa struktura danych z kolumnami potencjalnie różnych typów. Za pomocą ramki danych można łatwo odczytywać i zapisywać dane w różnych obsługiwanych formatach. Za pomocą ramki danych można ładować dane z magazynu obiektów w chmurze i wykonywać na nim analizy i przekształcenia wewnątrz klastra obliczeniowego bez wpływu na dane bazowe w magazynie obiektów w chmurze. Aby dowiedzieć się więcej, zobacz Praca z ramkami danych PySpark w usłudze Azure Databricks.

  • Apache parquet to format pliku kolumnowego z optymalizacjami, które przyspieszają zapytania. Jest to bardziej wydajny format pliku niż CSV lub JSON. Aby dowiedzieć się więcej, zobacz Parquet Files.

W notesie dodaj nową komórkę i wklej do niego następujący kod.

# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

Naciśnij klawisze SHIFT+ENTER, aby uruchomić kod w tym bloku.

Przed przejściem do następnej sekcji upewnij się, że wszystkie dane parquet zostały zapisane, a komunikat "Done" (Gotowe) pojawia się w danych wyjściowych.

Eksplorowanie danych

W tej sekcji użyjesz narzędzia systemu plików Databricks do eksplorowania magazynu obiektów usługi Azure Data Lake Storage przy użyciu punktu montowania systemu plików DBFS utworzonego w poprzedniej sekcji.

W nowej komórce wklej następujący kod, aby uzyskać listę plików w punkcie instalacji. Pierwsze polecenie zwraca listę plików i katalogów. Drugie polecenie wyświetla dane wyjściowe w formacie tabelarycznym, aby ułatwić odczytywanie.

dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))

Naciśnij klawisze SHIFT+ENTER, aby uruchomić kod w tym bloku.

Zwróć uwagę, że katalog parquet pojawia się na liście. Dane lotów .csv zostały zapisane w formacie parquet w katalogu parquet/flights w poprzedniej sekcji. Aby wyświetlić listę plików w katalogu parquet/flights , wklej następujący kod do nowej komórki i uruchom go:

display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))

Aby utworzyć nowy plik i wyświetlić go, wklej następujący kod do nowej komórki i uruchom go:

dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))

Ponieważ w tym samouczku nie potrzebujesz pliku 1.txt, możesz wkleić następujący kod do komórki i uruchomić go, aby rekursywnie usunąć katalog mydirectory. Parametr True wskazuje cykliczne usuwanie.

dbutils.fs.rm("/mnt/flightdata/mydirectory", True)

Dla wygody możesz użyć polecenia pomocy, aby dowiedzieć się więcej o innych poleceniach.

dbutils.fs.help("rm")

Korzystając z tych przykładów kodu, zapoznałeś się z hierarchiczną strukturą HDFS, używając danych przechowywanych na koncie magazynowym z włączoną funkcją Azure Data Lake Storage.

Wykonywanie zapytań na danych

Następnie możesz rozpocząć wykonywanie zapytań dotyczących danych przekazanych na swoje konto magazynowe. Wprowadź każdy z poniższych bloków kodu w nowej komórce i naciśnij SHIFT + ENTER , aby uruchomić skrypt języka Python.

Ramki danych zapewniają bogaty zestaw funkcji (wybieranie kolumn, filtrowanie, sprzężenie, agregowanie), które umożliwiają efektywne rozwiązywanie typowych problemów z analizą danych.

Aby załadować DataFrame z danych lotu zapisanych w formacie parquet i zbadać obsługiwane funkcje, wpisz ten skrypt w nowej komórce i uruchom go.

# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")

# Print the schema of the dataframe
flight_df.printSchema()

# Print the flight database size
print("Number of flights in the database: ", flight_df.count())

# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)

# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected columns for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)

# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)

Wprowadź ten skrypt w nowej komórce, aby uruchomić kilka podstawowych zapytań analizy względem danych. Możesz uruchomić cały skrypt (SHIFT + ENTER), wyróżnić każde zapytanie i uruchomić je oddzielnie przy użyciu CTRL + SHIFT + ENTER lub wprowadzić każde zapytanie w osobnej komórce i uruchomić je tam.

# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')

# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())

# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()

# List out all the airports in Texas
airports_in_texas = spark.sql(
    "SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)

# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
    "SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)

# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
    "SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()

# List airlines by the highest percentage of delayed flights. A delayed flight is one with a  departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
    "CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
    "CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
    "SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()

Podsumowanie

W tym samouczku, Ty:

  • Utworzono zasoby platformy Azure, w tym konto magazynu usługi Azure Data Lake Storage i jednostkę usługi Azure AD oraz przypisano uprawnienia dostępu do konta magazynu.

  • Utworzono obszar roboczy i notes usługi Azure Databricks.

  • Użyto narzędzia AzCopy do przesyłania nieustrukturyzowanych plików .csv z danymi lotów do konta magazynującego Azure Data Lake Storage.

  • Użyj funkcji narzędzi systemu plików usługi Databricks, aby zamontować swoje konto usługi Azure Data Lake Storage i eksplorować jego hierarchiczny system plików.

  • Użyto ramek danych Apache Spark do przekształcania danych .csv lotów do formatu Apache Parquet i ponownego zapisania ich na koncie magazynowym usługi Azure Data Lake Storage.

  • Używane ramki danych do eksplorowania danych lotu i wykonywania prostego zapytania.

  • Usługa Apache Spark SQL umożliwia wykonywanie zapytań dotyczących danych lotów dotyczących łącznej liczby lotów dla każdej linii lotniczej w styczniu 2016 r., lotnisk w Teksasie, linii lotniczych, które latają z Teksasu, średniego opóźnienia przylotu w minutach dla każdej linii lotniczej na szczeblu krajowym oraz procent lotów każdej linii lotniczej, które opóźniły loty lub przyloty.

Czyszczenie zasobów

Jeśli chcesz zachować notatnik i wrócić do niego później, warto zamknąć (zakończyć) klaster, aby uniknąć naliczania opłat. Aby zakończyć działanie klastra, wybierz go w selektorze obliczeniowym znajdującym się w prawym górnym rogu paska narzędzi notesu, wybierz pozycję Zakończ z menu i potwierdź wybór. (Domyślnie klaster zostanie automatycznie zakończony po 120 minutach braku aktywności).

Jeśli chcesz usunąć poszczególne zasoby obszaru roboczego, takie jak notesy i klastry, możesz to zrobić na lewym pasku bocznym obszaru roboczego. Aby uzyskać szczegółowe instrukcje, zapoznaj się z sekcją Usuwanie klastra lub Usuwanie notesu.

Gdy grupa zasobów i wszystkie pokrewne zasoby nie będą już potrzebne, usuń je. Aby to zrobić w portalu Azure, wybierz grupę zasobów dla konta magazynu i obszaru roboczego, a następnie wybierz pozycję Usuń.

Następne kroki