Tworzenie kompleksowego potoku danych w usłudze Databricks

W tym artykule przedstawiono sposób tworzenia i wdrażania kompleksowego potoku przetwarzania danych, w tym sposobu pozyskiwania danych pierwotnych, przekształcania danych i uruchamiania analiz na przetworzonych danych.

Uwaga

Chociaż w tym artykule pokazano, jak utworzyć kompletny potok danych przy użyciu notesów usługi Databricks i zadania usługi Azure Databricks do organizowania przepływu pracy, usługa Databricks zaleca używanie tabel delta Live Tables, interfejsu deklaratywnego do tworzenia niezawodnych, konserwacyjnych i testowalnych potoków przetwarzania danych.

Co to jest potok danych?

Potok danych implementuje kroki wymagane do przenoszenia danych z systemów źródłowych, przekształcania tych danych na podstawie wymagań i przechowywania danych w systemie docelowym. Potok danych obejmuje wszystkie procesy niezbędne do przekształcenia danych pierwotnych w przygotowane dane, które użytkownicy mogą wykorzystywać. Na przykład potok danych może przygotować dane, aby analitycy danych i analitycy danych mogli wyodrębnić wartość z danych za pośrednictwem analizy i raportowania.

Przepływ pracy wyodrębniania, przekształcania i ładowania (ETL) jest typowym przykładem potoku danych. W przypadku przetwarzania ETL dane są pozyskiwane z systemów źródłowych i zapisywane w obszarze przejściowym, przekształcane na podstawie wymagań (zapewnienie jakości danych, deduplikowanie rekordów itd.), a następnie zapisywane w systemie docelowym, takim jak magazyn danych lub data lake.

Kroki potoku danych

Aby ułatwić rozpoczęcie tworzenia potoków danych w usłudze Azure Databricks, przykład zawarty w tym artykule zawiera instrukcje tworzenia przepływu pracy przetwarzania danych:

  • Używanie funkcji usługi Azure Databricks do eksplorowania nieprzetworzonego zestawu danych.
  • Utwórz notes usługi Databricks, aby pozyskiwać nieprzetworzone dane źródłowe i zapisywać nieprzetworzone dane w tabeli docelowej.
  • Utwórz notes usługi Databricks, aby przekształcić nieprzetworzone dane źródłowe i zapisać przekształcone dane w tabeli docelowej.
  • Utwórz notes usługi Databricks w celu wykonywania zapytań dotyczących przekształconych danych.
  • Automatyzowanie potoku danych za pomocą zadania usługi Azure Databricks.

Wymagania

Przykład: zestaw danych z milionem piosenek

Zestaw danych używany w tym przykładzie jest podzbiorem zestawu danych Million Song Dataset, kolekcją funkcji i metadanych utworów muzyki współczesnej. Ten zestaw danych jest dostępny w przykładowych zestawach danych zawartych w obszarze roboczym usługi Azure Databricks.

Krok 1. Tworzenie klastra

Aby wykonać przetwarzanie i analizę danych w tym przykładzie, utwórz klaster w celu udostępnienia zasobów obliczeniowych potrzebnych do uruchamiania poleceń.

Uwaga

Ponieważ w tym przykładzie jest używany przykładowy zestaw danych przechowywany w systemie plików DBFS i zaleca utrwalanie tabel w wykazie aparatu Unity, należy utworzyć klaster skonfigurowany z trybem dostępu pojedynczego użytkownika. Tryb dostępu pojedynczego użytkownika zapewnia pełny dostęp do systemu plików DBFS, a jednocześnie umożliwia dostęp do katalogu aparatu Unity. Zobacz Najlepsze rozwiązania dotyczące systemu plików DBFS i wykazu aparatu Unity.

  1. Kliknij pozycję Obliczenia na pasku bocznym.
  2. Na stronie Obliczenia kliknij pozycję Utwórz klaster.
  3. Na stronie Nowy klaster wprowadź unikatową nazwę klastra.
  4. W obszarze Tryb dostępu wybierz pozycję Pojedynczy użytkownik.
  5. W obszarze Dostęp do pojedynczego użytkownika lub jednostki usługi wybierz nazwę użytkownika.
  6. Pozostaw pozostałe wartości w stanie domyślnym, a następnie kliknij pozycję Utwórz klaster.

Aby dowiedzieć się więcej o klastrach usługi Databricks, zobacz Obliczenia.

Krok 2. Eksplorowanie danych źródłowych

Aby dowiedzieć się, jak używać interfejsu usługi Azure Databricks do eksplorowania nieprzetworzonych danych źródłowych, zobacz Eksplorowanie danych źródłowych dla potoku danych. Jeśli chcesz przejść bezpośrednio do pozyskiwania i przygotowywania danych, przejdź do kroku 3. Pozyskiwanie danych pierwotnych.

Krok 3. Pozyskiwanie danych pierwotnych

W tym kroku załadujesz nieprzetworzone dane do tabeli, aby udostępnić je do dalszego przetwarzania. Aby zarządzać zasobami danych na platformie databricks, takiej jak tabele, usługa Databricks zaleca katalog aparatu Unity. Jeśli jednak nie masz uprawnień do tworzenia wymaganego wykazu i schematu do publikowania tabel w wykazie aparatu Unity, nadal możesz wykonać następujące kroki, publikując tabele w magazynie metadanych Hive.

Aby pozyskiwać dane, usługa Databricks zaleca korzystanie z modułu automatycznego ładowania. Automatycznie moduł ładujący automatycznie wykrywa i przetwarza nowe pliki w miarę ich przybycia do magazynu obiektów w chmurze.

Automatyczne ładowanie można skonfigurować tak, aby automatycznie wykrywał schemat załadowanych danych, umożliwiając inicjowanie tabel bez jawnego deklarowania schematu danych i rozwijania schematu tabeli w miarę wprowadzania nowych kolumn. Eliminuje to konieczność ręcznego śledzenia i stosowania zmian schematu w czasie. Usługa Databricks zaleca wnioskowanie schematu podczas korzystania z automatycznego modułu ładującego. Jednak jak pokazano w kroku eksploracji danych, dane utworów nie zawierają informacji nagłówka. Ponieważ nagłówek nie jest przechowywany z danymi, należy jawnie zdefiniować schemat, jak pokazano w następnym przykładzie.

  1. Na pasku bocznym kliknij pozycję New IconNowy i wybierz pozycję Notes z menu. Zostanie wyświetlone okno dialogowe Tworzenie notesu .

  2. Wprowadź nazwę notesu, na przykład Ingest songs data. Domyślnie:

    • Język Python jest wybranym językiem.
    • Notes jest dołączony do ostatniego użytego klastra. W takim przypadku klaster utworzony w kroku 1: Tworzenie klastra.
  3. Wprowadź następujące informacje w pierwszej komórce notesu:

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    Jeśli używasz wykazu aparatu Unity, zastąp <table-name> ciąg katalogiem, schematem i nazwą tabeli, aby zawierać pozyskane rekordy (na przykład data_pipelines.songs_data.raw_song_data). W przeciwnym razie zastąp <table-name> ciąg nazwą tabeli, aby zawierała pozyskane rekordy, na przykład raw_song_data.

    Zastąp <checkpoint-path> ciąg ścieżką do katalogu w systemie plików DBFS, aby zachować pliki punktów kontrolnych, na przykład /tmp/pipeline_get_started/_checkpoint/song_data.

  4. Kliknij pozycję Run Menu, a następnie wybierz pozycję Uruchom komórkę. W tym przykładzie zdefiniowano schemat danych przy użyciu informacji z READMEelementu , pozyskuje dane utworów ze wszystkich plików zawartych w file_pathpliku i zapisuje dane w tabeli określonej przez table_nameelement .

Krok 4. Przygotowywanie danych pierwotnych

Aby przygotować nieprzetworzone dane do analizy, poniższe kroki przekształcają nieprzetworzone dane utworów przez odfiltrowanie niepotrzebnych kolumn i dodanie nowego pola zawierającego znacznik czasu tworzenia nowego rekordu.

  1. Na pasku bocznym kliknij pozycję New IconNowy i wybierz pozycję Notes z menu. Zostanie wyświetlone okno dialogowe Tworzenie notesu .

  2. Wprowadź nazwę notesu. Na przykład Prepare songs data. Zmień język domyślny na SQL.

  3. Wprowadź następujące informacje w pierwszej komórce notesu:

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    Jeśli używasz wykazu aparatu Unity, zastąp <table-name> ciąg katalogiem, schematem i nazwą tabeli, aby zawierać przefiltrowane i przekształcone rekordy (na przykład data_pipelines.songs_data.prepared_song_data). W przeciwnym razie zastąp <table-name> ciąg nazwą tabeli zawierającą przefiltrowane i przekształcone rekordy (na przykład prepared_song_data).

    Zastąp <raw-songs-table-name> ciąg nazwą tabeli zawierającej nieprzetworzone rekordy utworów pozyskane w poprzednim kroku.

  4. Kliknij pozycję Run Menu, a następnie wybierz pozycję Uruchom komórkę.

Krok 5. Wykonywanie zapytań dotyczących przekształconych danych

W tym kroku rozszerzysz potok przetwarzania, dodając zapytania do analizowania danych utworów. Te zapytania używają przygotowanych rekordów utworzonych w poprzednim kroku.

  1. Na pasku bocznym kliknij pozycję New IconNowy i wybierz pozycję Notes z menu. Zostanie wyświetlone okno dialogowe Tworzenie notesu .

  2. Wprowadź nazwę notesu. Na przykład Analyze songs data. Zmień język domyślny na SQL.

  3. Wprowadź następujące informacje w pierwszej komórce notesu:

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    Zastąp <prepared-songs-table-name> ciąg nazwą tabeli zawierającej przygotowane dane. Na przykład data_pipelines.songs_data.prepared_song_data.

  4. Kliknij Down Caret menu akcji komórki, wybierz pozycję Dodaj komórkę poniżej i wprowadź następujące polecenie w nowej komórce:

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    Zastąp <prepared-songs-table-name> ciąg nazwą przygotowanej tabeli utworzonej w poprzednim kroku. Na przykład data_pipelines.songs_data.prepared_song_data.

  5. Aby uruchomić zapytania i wyświetlić dane wyjściowe, kliknij pozycję Uruchom wszystko.

Krok 6. Tworzenie zadania usługi Azure Databricks w celu uruchomienia potoku

Możesz utworzyć przepływ pracy, aby zautomatyzować uruchamianie kroków pozyskiwania, przetwarzania i analizy danych przy użyciu zadania usługi Azure Databricks.

  1. W obszarze roboczym Nauka o danych i inżynierii wykonaj jedną z następujących czynności:
    • Kliknij pozycję Jobs IconPrzepływy pracy na pasku bocznym i kliknij pozycję .Create Job Button
    • Na pasku bocznym kliknij pozycję New IconNowy i wybierz pozycję Zadanie.
  2. W oknie dialogowym zadania na karcie Zadania zastąp ciąg Dodaj nazwę zadania... nazwą zadania. Na przykład "Przepływ pracy utworów".
  3. W polu Nazwa zadania wprowadź nazwę pierwszego zadania, na przykład Ingest_songs_data.
  4. W polu Typ wybierz typ zadania Notes .
  5. W obszarze Źródło wybierz pozycję Obszar roboczy.
  6. Użyj przeglądarki plików, aby znaleźć notes pozyskiwania danych, kliknij nazwę notesu, a następnie kliknij przycisk Potwierdź.
  7. W obszarze Klaster wybierz pozycję Shared_job_cluster lub klaster utworzony w Create a cluster kroku.
  8. Kliknij pozycję Utwórz.
  9. Kliknij Add Task Button poniżej właśnie utworzonego zadania i wybierz pozycję Notes.
  10. W polu Nazwa zadania wprowadź nazwę zadania, na przykład Prepare_songs_data.
  11. W polu Typ wybierz typ zadania Notes .
  12. W obszarze Źródło wybierz pozycję Obszar roboczy.
  13. Użyj przeglądarki plików, aby znaleźć notes przygotowywania danych, kliknij nazwę notesu, a następnie kliknij przycisk Potwierdź.
  14. W obszarze Klaster wybierz pozycję Shared_job_cluster lub klaster utworzony w Create a cluster kroku.
  15. Kliknij pozycję Utwórz.
  16. Kliknij Add Task Button poniżej właśnie utworzonego zadania i wybierz pozycję Notes.
  17. W polu Nazwa zadania wprowadź nazwę zadania, na przykład Analyze_songs_data.
  18. W polu Typ wybierz typ zadania Notes .
  19. W obszarze Źródło wybierz pozycję Obszar roboczy.
  20. Użyj przeglądarki plików, aby znaleźć notes analizy danych, kliknij nazwę notesu, a następnie kliknij przycisk Potwierdź.
  21. W obszarze Klaster wybierz pozycję Shared_job_cluster lub klaster utworzony w Create a cluster kroku.
  22. Kliknij pozycję Utwórz.
  23. Aby uruchomić przepływ pracy, kliknij pozycję Run Now Button. Aby wyświetlić szczegóły przebiegu, kliknij link w kolumnie Godzina rozpoczęcia przebiegu w widoku przebiegów zadania. Kliknij każde zadanie, aby wyświetlić szczegóły przebiegu zadania.
  24. Aby wyświetlić wyniki po zakończeniu przepływu pracy, kliknij ostateczne zadanie analizy danych. Zostanie wyświetlona strona Dane wyjściowe i wyświetli wyniki zapytania.

Krok 7. Planowanie zadania potoku danych

Uwaga

Aby zademonstrować użycie zadania usługi Azure Databricks do organizowania zaplanowanego przepływu pracy, ten przykład wprowadzający oddziela etapy pozyskiwania, przygotowywania i analizy w oddzielnych notesach, a każdy notes jest następnie używany do tworzenia zadania w zadaniu. Jeśli wszystkie operacje przetwarzania znajdują się w jednym notesie, możesz łatwo zaplanować notes bezpośrednio z poziomu interfejsu użytkownika notesu usługi Azure Databricks. Zobacz Tworzenie zaplanowanych zadań notesu i zarządzanie nimi.

Typowym wymaganiem jest uruchomienie potoku danych zgodnie z harmonogramem. Aby zdefiniować harmonogram zadania, w ramach którego jest uruchamiany potok:

  1. Kliknij pozycję Jobs IconPrzepływy pracy na pasku bocznym.
  2. W kolumnie Nazwa kliknij nazwę zadania. Na panelu bocznym są wyświetlane szczegóły zadania.
  3. Kliknij pozycję Dodaj wyzwalacz na panelu Szczegóły zadania i wybierz pozycję Zaplanowane w polu Typ wyzwalacza.
  4. Określ okres, czas rozpoczęcia i strefę czasową. Opcjonalnie zaznacz pole wyboru Pokaż składnię Cron, aby wyświetlić i edytować harmonogram w składni Kron kwarcu.
  5. Kliknij przycisk Zapisz.

Dowiedz się więcej