Kılavuz: Lakeflow Spark Tanımlayıcı İşlem Hatları ile ETL işlem hattı oluşturma

Bu öğreticide, Lakeflow Spark Bildirimli İşlem Hatları ve Otomatik Yükleyici kullanarak veri düzenleme için ETL (ayıklama, dönüştürme ve yükleme) işlem hattı oluşturma ve dağıtma işlemleri açıklanmaktadır. ETL işlem hattı, kaynak sistemlerden verileri okuma, veri kalitesi denetimleri ve yinelenenleri kaldırma gibi gereksinimlere göre bu verileri dönüştürme ve verileri veri ambarı veya veri gölü gibi bir hedef sisteme yazma adımlarını uygular.

Bu öğreticide işlem hatlarını ve Otomatik Yükleyici'yi kullanarak şunları yapacaksınız:

  • Ham kaynak verilerini hedef tabloya alma.
  • Ham kaynak verileri dönüştürün ve dönüştürülen verileri iki hedef materialize edilmiş görünüme yazın.
  • Dönüştürülen verileri sorgulama.
  • Databricks işiyle ETL işlem hattını otomatikleştirin.

İşlem hatları ve Otomatik Yükleyici hakkında daha fazla bilgi için bkz. Lakeflow Spark Bildirimli İşlem Hatları ve Otomatik Yükleyici nedir?

Gereksinimler

Bu öğreticiyi tamamlamak için aşağıdaki gereksinimleri karşılamanız gerekir:

Veri kümesi hakkında

Bu örnekte kullanılan veri kümesi, çağdaş müzik parçaları için bir özellik ve meta veri koleksiyonu olan Milyon Şarkı Veri Kümesinin bir alt kümesidir. Bu veri kümesi, Azure Databricks çalışma alanınıza dahil edilen örnek veri kümelerinde kullanılabilir.

1. Adım: İşlem hattı oluşturma

İlk olarak, işlem hattı söz dizimini kullanarak dosyalardaki veri kümelerini ( kaynak kodu olarak adlandırılır) tanımlayarak bir işlem hattı oluşturun. Her kaynak kod dosyası yalnızca bir dil içerebilir, ancak işlem hattına dile özgü birden çok dosya ekleyebilirsiniz. Daha fazla bilgi edinmek için bkz. Lakeflow Spark Bildirimli İşlem Hatları

Bu öğreticide sunucusuz işlem ve Unity Kataloğu kullanılır. Belirtilmeyen tüm yapılandırma seçenekleri için varsayılan ayarları kullanın. Sunucusuz işlem çalışma alanınızda etkinleştirilmediyse veya desteklenmiyorsa, öğreticiyi varsayılan işlem ayarlarını kullanarak yazıldı olarak tamamlayabilirsiniz.

Yeni işlem hattı oluşturmak için şu adımları izleyin:

  1. Çalışma alanınızda Artı simgesine tıklayın.Kenar çubuğunda yeni, ardından ETL İşlem Hattı'nı seçin.
  2. İşlem hattınıza benzersiz bir ad verin.
  3. Adın hemen altında, oluşturduğunuz veriler için varsayılan kataloğu ve şemayı seçin. Dönüştürmelerinizdeki diğer hedefleri belirtebilirsiniz, ancak bu öğreticide bu varsayılanlar kullanılır. Oluşturduğunuz katalog ve şema izinlerine sahip olmanız gerekir. Bkz . Gereksinimler.
  4. Bu öğretici için Boş bir dosyayla başla'yı seçin.
  5. Klasör yolu'nda, kaynak dosyalarınız için bir konum belirtin veya varsayılanı (kullanıcı klasörünüz) kabul edin.
  6. İlk kaynak dosyanızın dili olarak Python veya SQL'i seçin (işlem hattı dilleri karıştırabilir ve eşleştirebilir, ancak her dosyanın tek bir dilde olması gerekir).
  7. Seç'e tıklayın.

yeni işlem hattı için işlem hattı düzenleyicisi görüntülenir. Diliniz için boş bir kaynak dosyası oluşturulur ve ilk dönüştürmeniz için hazır olur.

2. Adım: İşlem hattı mantığınızı geliştirme

Bu adımda, işlem hattı için kaynak kodunu etkileşimli olarak geliştirmek ve doğrulamak için Lakeflow Pipelines Düzenleyicisi'ni kullanacaksınız.

Kod, artımlı veri alımı için Otomatik Yükleyici'yi kullanır. Otomatik Yükleyici, bulut nesne depolama alanına ulaşan yeni dosyaları otomatik olarak algılar ve işler. Daha fazla bilgi edinmek için bkz. Otomatik Yükleyici nedir?

İşlem hattı için otomatik olarak boş bir kaynak kod dosyası oluşturulur ve yapılandırılır. Dosya, işlem hattınızın dönüşümler klasöründe oluşturulur. Varsayılan olarak, dönüştürmeler klasöründeki tüm *.py ve *.sql dosyaları işlem hattınızın kaynağının bir parçasıdır.

  1. Aşağıdaki kodu kopyalayıp kaynak dosyanıza yapıştırın. 1. Adımda dosya için seçtiğiniz dili kullandığınızdan emin olun.

    Piton

    # Import modules
    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to the source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dp.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dp.materialized_view(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dp.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dp.expect("valid_title", "song_title IS NOT NULL")
    @dp.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dp.materialized_view(
      comment="A table summarizing counts of songs released by the artists who released the most songs each year."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
      '/databricks-datasets/songs/data-001/part*',
      format => "csv",
      header => "false",
      delimiter => "\t",
      schema => """
        artist_id STRING,
        artist_lat DOUBLE,
        artist_long DOUBLE,
        artist_location STRING,
        artist_name STRING,
        duration DOUBLE,
        end_of_fade_in DOUBLE,
        key INT,
        key_confidence DOUBLE,
        loudness DOUBLE,
        release STRING,
        song_hotnes DOUBLE,
        song_id STRING,
        start_of_fade_out DOUBLE,
        tempo DOUBLE,
        time_signature INT,
        time_signature_confidence DOUBLE,
        title STRING,
        year INT,
        partial_sequence STRING
      """,
      schemaEvolutionMode => "none");
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
      artist_name,
      year,
      COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC;
    

    Bu kaynak üç sorgu için kod içerir. Ayrıca bu sorguları ayrı dosyalara koyarak dosyaları istediğiniz gibi düzenleyebilir ve kodlayabilirsiniz.

  2. Yürüt simgesine tıklayın. Bağlı işlem hattı için bir güncelleştirme başlatmak için dosyayı çalıştırın veya işlem hattını çalıştırın. İşlem hattınızda yalnızca bir kaynak dosyası varsa bunlar işlevsel olarak eşdeğerdir.

Güncelleştirme tamamlandığında, editör işlem hattınızla ilgili bilgilerle güncellenir.

  • İşlem hattı grafiği (DAG), kodunuzun sağ tarafındaki kenar çubuğunda üç tablo gösterir: songs_raw, songs_preparedve top_artists_by_year.
  • İşlem hattı varlıkları tarayıcısının üst kısmında güncelleştirmenin özeti gösterilir.
  • Oluşturulan tabloların ayrıntıları alt bölmede gösterilir ve tablolardan birini seçerek verilere göz atabilirsiniz.

Buna ham ve temizlenmiş verilerin yanı sıra yıla göre en iyi sanatçıları bulmak için bazı basit analizler dahildir. Sonraki adımda, işlem hattınızdaki ayrı bir dosyada daha fazla analiz için geçici sorgular oluşturacaksınız.

3. Adım: İşlem hattınız tarafından oluşturulan veri kümelerini keşfetme

Bu adımda, Databricks SQL Düzenleyicisi'nde şarkı verilerini analiz etmek için ETL işlem hattında işlenen veriler üzerinde geçici sorgular gerçekleştirirsiniz. Bu sorgular, önceki adımda oluşturulan hazırlanmış kayıtları kullanır.

İlk olarak, 1990'dan bu yana her yıl en çok şarkı yayınlayan sanatçıları bulan bir sorgu çalıştırın.

  1. İşlem hattı varlıkları tarayıcı kenar çubuğunda Artı simgesine tıklayın.Ardından Keşif'i ekleyin.

  2. Bir Ad girin ve araştırma dosyası için SQL'i seçin. Yeni explorations bir klasörde bir SQL not defteri oluşturulur. Klasördeki explorations dosyalar varsayılan olarak bir işlem hattı güncelleştirmesinin parçası olarak çalıştırılmaz. SQL not defteri, birlikte veya ayrı ayrı çalıştırabileceğiniz hücrelere sahiptir.

  3. 1990 yılından sonra her yıl en çok şarkı yayınlayan sanatçılardan oluşan bir tablo oluşturmak için, yeni SQL dosyasına aşağıdaki kodu girin (dosyada örnek kod varsa, değiştirin). Bu not defteri işlem hattının bir parçası olmadığından, varsayılan kataloğu ve şemayı kullanmaz. öğesini <catalog>.<schema> , işlem hattı için varsayılan olarak kullandığınız katalog ve şemayla değiştirin:

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.top_artists_by_year
      WHERE year >= 1990
      ORDER BY total_number_of_songs DESC, year DESC;
    
  4. Yürüt simgesine tıklayın veya bu sorguyu çalıştırmak için basınShift + Enter.

Şimdi 4/4 tempolu ve dans edilebilir tempolu şarkıları bulan başka bir sorgu çalıştırın.

  1. Aşağıdaki kodu aynı dosyadaki bir sonraki hücreye ekleyin. Tekrar, <catalog>.<schema> öğesini, işlem hattı için varsayılan olarak kullandığınız katalog ve şemayla değiştirin.

    -- Find songs with a 4/4 beat and danceable tempo
    SELECT artist_name, song_title, tempo
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.songs_prepared
      WHERE time_signature = 4 AND tempo between 100 and 140;
    
  2. Yürüt simgesine tıklayın veya bu sorguyu çalıştırmak için basınShift + Enter.

4. Adım: İşlem hattını çalıştırmak için iş oluşturma

Ardından, bir zamanlamaya göre çalışan bir Databricks işi kullanarak veri alımı, işleme ve analiz adımlarını otomatikleştirmek için bir iş akışı oluşturun.

  1. Düzenleyicinin üst kısmında Zamanla düğmesini seçin.
  2. Zamanlamalar iletişim kutusu görüntülenirse Zamanlama ekle'yi seçin.
  3. Bu, Yeni zamanlama iletişim kutusunu açar, burada işlem hattınızı belirli bir zaman çizelgesine göre çalıştırmak için bir iş oluşturabilirsiniz.
  4. İsteğe bağlı olarak, işe bir ad verin.
  5. Varsayılan olarak, zamanlama günde bir kez çalışacak şekilde ayarlanır. Varsayılanı kabul edebilir veya kendi programınızı ayarlayabilirsiniz. Gelişmiş'i seçtiğinizde işin çalıştırılacağı belirli bir zamanı ayarlayabilirsiniz. Diğer seçenekler'i seçtiğinizde iş çalıştırıldığında bildirim oluşturabilirsiniz.
  6. Değişiklikleri uygulamak ve işi oluşturmak için Oluştur'u seçin.

Artık iş, işlem hattınızı güncel tutmak için günlük olarak çalışır. Zamanlama listesini görüntülemek için Yeniden Zamanla'yı seçebilirsiniz. İşlem hattınız için takvimleri ekleme, düzenleme veya kaldırma gibi zamanlamaları bu iletişim kutusundan yönetebilirsiniz.

Zamanlamanın (veya işin) adına tıklanması, sizi İşler ve işlem hatları listesindeki işin sayfasına götürür. Buradan çalıştırmaların geçmişi de dahil olmak üzere iş çalıştırmalarıyla ilgili ayrıntıları görüntüleyebilir veya şimdi çalıştır düğmesiyle işi hemen çalıştırabilirsiniz.

İş yürütmeleri hakkında daha fazla bilgi için bkz. Lakeflow İşleri için izleme ve gözlemleme.

Daha fazla bilgi edinin