Udostępnij za pośrednictwem


Przekształcanie danych przy użyciu dbt

Uwaga

Zadanie Apache Airflow jest obsługiwane przez Apache Airflow.

dbt(Data Build Tool) to interfejs wiersza polecenia typu open source, który upraszcza przekształcanie i modelowanie danych w magazynach danych, zarządzając złożonym kodem SQL w ustrukturyzowany, konserwowalny sposób. Umożliwia zespołom danych tworzenie niezawodnych, testowalnych przekształceń w rdzeniu potoków analitycznych.

W połączeniu z platformą Apache Airflow funkcje przekształcania dbt są ulepszane przez funkcje planowania, orkiestracji i zarządzania zadaniami firmy Airflow. To połączone podejście, wykorzystując wiedzę na temat transformacji dbt wraz z zarządzaniem przepływem pracy firmy Airflow, zapewnia wydajne i niezawodne potoki danych, co ostatecznie prowadzi do szybszych i bardziej szczegółowych decyzji opartych na danych.

W tym samouczku pokazano, jak utworzyć DAG Apache Airflow, który używa dbt do przekształcania danych przechowywanych w magazynie danych Microsoft Fabric.

Wymagania wstępne

Aby rozpocząć pracę, należy spełnić następujące wymagania wstępne:

Przekształć dane przechowywane w magazynie Fabric przy użyciu dbt

W tej sekcji przedstawiono następujące kroki:

  1. Określ wymagania.
  2. Utwórz projekt dbt w pamięci masowej zarządzanej przez Fabric, dostarczonej przez zadanie Apache Airflow.
  3. Utwórz DAG systemu Apache Airflow, aby koordynować zadania dbt

Określanie wymagań

Utwórz plik requirements.txt w folderze dags . Dodaj następujące pakiety jako wymagania dotyczące platformy Apache Airflow.

  • astronom-cosmos: Ten pakiet służy do uruchamiania podstawowych projektów dbt jako dags apache Airflow i grup zadań.

  • dbt-fabric: ten pakiet służy do tworzenia projektu dbt, który następnie można wdrożyć w Fabric Data Warehouse.

    astronomer-cosmos==1.10.1
    dbt-fabric==1.9.5   
    

Utwórz projekt dbt w zarządzanym magazynie danych dostarczonym przez zadanie Apache Airflow.

  1. W tej sekcji utworzymy przykładowy projekt dbt w zadaniu platformy Apache Airflow dla zestawu danych nyc_taxi_green z następującą strukturą katalogu.

       dags
       |-- my_cosmos_dag.py
       |-- nyc_taxi_green
       |  |-- profiles.yml
       |  |-- dbt_project.yml
       |  |-- models
       |  |   |-- nyc_trip_count.sql
       |  |-- target
    
  2. Utwórz folder o nazwie nyc_taxi_green w folderze dags z plikiem profiles.yml . Ten folder zawiera wszystkie pliki wymagane dla projektu dbt. Zrzut ekranu przedstawiający tworzenie plików dla projektu dbt.

  3. Skopiuj następującą zawartość do pliku profiles.yml. Ten plik konfiguracji zawiera szczegóły połączenia bazy danych i profile używane przez bazę danych dbt. Zaktualizuj wartości symboli zastępczych i zapisz plik.

    config:
      partial_parse: true
    nyc_taxi_green:
      target: fabric-dev
      outputs:
        fabric-dev:
          type: fabric
          driver: "ODBC Driver 18 for SQL Server"
          server: <sql connection string of your data warehouse>
          port: 1433
          database: "<name of the database>"
          schema: dbo
          threads: 4
          authentication: ServicePrincipal
          tenant_id: <Tenant ID of your service principal>
          client_id: <Client ID of your service principal>
          client_secret: <Client Secret of your service principal>
    
  4. dbt_project.yml Utwórz plik i skopiuj następującą zawartość. Ten plik określa konfigurację na poziomie projektu.

    name: "nyc_taxi_green"
    
    config-version: 2
    version: "0.1"
    
    profile: "nyc_taxi_green"
    
    model-paths: ["models"]
    seed-paths: ["seeds"]
    test-paths: ["tests"]
    analysis-paths: ["analysis"]
    macro-paths: ["macros"]
    
    target-path: "target"
    clean-targets:
      - "target"
      - "dbt_modules"
      - "logs"
    
    require-dbt-version: [">=1.0.0", "<2.0.0"]
    
    models:
      nyc_taxi_green:
        materialized: table
    
  5. Utwórz folder models w folderze nyc_taxi_green. Na potrzeby tego samouczka utworzymy przykładowy model w pliku o nazwie nyc_trip_count.sql , który tworzy tabelę zawierającą liczbę podróży dziennie na dostawcę. Skopiuj następującą zawartość w pliku.

       with new_york_taxis as (
           select * from nyctlc
       ),
       final as (
         SELECT
           vendorID,
           CAST(lpepPickupDatetime AS DATE) AS trip_date,
           COUNT(*) AS trip_count
         FROM
             [contoso-data-warehouse].[dbo].[nyctlc]
         GROUP BY
             vendorID,
             CAST(lpepPickupDatetime AS DATE)
         ORDER BY
             vendorID,
             trip_date;
       )
       select * from final
    

    Zrzut ekranu przedstawia modele projektu dbt.

Utwórz DAG Apache Airflow do orkiestracji zadań dbt

  • Utwórz plik o nazwie my_cosmos_dag.py w dags folderze i wklej w nim następującą zawartość.

    import os
    from pathlib import Path
    from datetime import datetime
    from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
    from airflow import DAG
    
    DEFAULT_DBT_ROOT_PATH = Path(__file__).parent.parent / "dags" / "nyc_taxi_green"
    DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
    profile_config = ProfileConfig(
         profile_name="nyc_taxi_green",
         target_name="fabric-dev",
         profiles_yml_filepath=DBT_ROOT_PATH / "profiles.yml",
    )
    
    dbt_fabric_dag = DbtDag(
         project_config=ProjectConfig(DBT_ROOT_PATH,),
         operator_args={"install_deps": True},
         profile_config=profile_config,
         schedule_interval="@daily",
         start_date=datetime(2023, 9, 10),
         catchup=False,
         dag_id="dbt_fabric_dag",
    )
    

Uruchom swój DAG

  1. Uruchom DAG w ramach zadania Apache Airflow. Zrzut ekranu przedstawiający uruchamianie narzędzia dag.

  2. Aby wyświetlić DAG załadowany w interfejsie użytkownika Apache Airflow, kliknij na Monitor in Apache Airflow.Zrzut ekranu pokazuje, jak monitorować DAG dbt.Zrzut ekranu przedstawia pomyślne wykonanie DAG.

Weryfikowanie danych

  • Po pomyślnym uruchomieniu, aby zweryfikować dane, możesz zobaczyć nową tabelę o nazwie "nyc_trip_count.sql" utworzoną w magazynie danych usługi Fabric. Zrzut ekranu przedstawiający pomyślny DAG dbt.

Szybki start: tworzenie zadania platformy Apache Airflow