Öğretici: Databricks platformunda Apache Spark ile ETL işlem hattı oluşturma

Bu öğreticide, Apache Spark ile veri düzenleme için ilk ETL (ayıklama, dönüştürme ve yükleme) işlem hattınızı nasıl geliştirip dağıtabileceğiniz gösterilmektedir. Bu öğreticide Databricks tüm amaçlı işlem kullanılıyor olsa da, çalışma alanınız için etkinleştirildiyse sunucusuz işlem de kullanabilirsiniz.

ETL işlem hatları oluşturmak için Lakeflow Spark Bildirimli İşlem Hatlarını da kullanabilirsiniz. Databricks Lakeflow Spark Bildirimli İşlem Hatları, üretim ETL işlem hatlarını oluşturma, dağıtma ve bakımının karmaşıklığını azaltır. Bkz. Öğretici: Lakeflow Spark Bildirimli İşlem Hatları ile ETL işlem hattı oluşturma.

Bu makalenin sonunda şunları nasıl yapacağınızı öğreneceksiniz:

  1. Databricks çok amaçlı işlem kaynağını başlatın.
  2. Databricks not defteri oluşturun.
  3. Otomatik Yükleyici ile Delta Lake'e artımlı veri alımını yapılandırın.
  4. Verileri işleme ve verilerle etkileşim kurma.
  5. Bir not defterini Databricks işi olarak zamanlayın.

Bu öğreticide, Python veya Scala'daki yaygın ETL görevlerini tamamlamak için etkileşimli not defterleri kullanılır.

Bu makalenin kaynaklarını oluşturmak için Databricks Terraform sağlayıcısını da kullanabilirsiniz. Bkz. Terraform ile kümeler, not defterleri ve işler oluşturma.

Gereksinimler

Not

İşlem denetimi ayrıcalıklarınız yoksa, işlem kaynağına erişiminiz olduğu sürece aşağıdaki adımların çoğunu yine de tamamlayabilirsiniz.

1. Adım: İşlem kaynağı oluşturma

Keşif veri analizi ve veri mühendisliği yapmak için komutları yürütmek üzere bir işlem kaynağı oluşturun.

  1. Kenar çubuğunda işlem simgesine tıklayın İşlem.
  2. Hesaplama sayfasında Hesaplama oluştur'a tıklayın.
  3. İşlem kaynağı için benzersiz bir ad belirtin, kalan değerleri varsayılan durumunda bırakın ve İşlem oluştur'a tıklayın.

Databricks hesaplama hakkında daha fazla bilgi edinmek için bkz. Hesaplama.

2. Adım: Databricks not defteri oluşturma

Çalışma alanınızda not defteri oluşturmak için kenar Yeni Simgeçubuğunda Yeni'ye ve ardından Not Defteri'ne tıklayın. Çalışma alanında boş bir not defteri açılır.

Not defterlerini oluşturma ve yönetme hakkında daha fazla bilgi edinmek için bkz . Not defterlerini yönetme.

3. Adım: Verileri Delta Lake'e almak için Otomatik Yükleyici'yi yapılandırma

Databricks, artımlı veri alımı için Otomatik Yükleyici'nin kullanılmasını önerir. Otomatik Yükleyici, bulut nesne depolama alanına ulaşan yeni dosyaları otomatik olarak algılar ve işler.

Databricks, Delta Lake ile veri depolamayı önerir. Delta Lake, ACID işlemleri sağlayan ve data lakehouse'a olanak tanıyan open source bir depolama katmanıdır. Delta Lake, Databricks'te oluşturulan tablolar için varsayılan biçimdir.

Verileri Delta Lake tablosuna almak üzere Otomatik Yükleyici'yi yapılandırmak için aşağıdaki kodu kopyalayıp not defterinizdeki boş hücreye yapıştırın:

Python

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

Scala programlama dili

// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

Not

Bu kodda tanımlanan değişkenler, mevcut çalışma alanı varlıklarıyla veya diğer kullanıcılarla çakışma riski olmadan bunu güvenli bir şekilde yürütmenize olanak sağlamalıdır. Kısıtlı ağ veya depolama izinleri bu kodu yürütürken hatalara neden olur; bu kısıtlamaları gidermek için çalışma alanı yöneticinize başvurun.

Otomatik Yükleyici hakkında daha fazla bilgi edinmek için bkz . Otomatik Yükleyici nedir?.

4. Adım: Verileri işleme ve verilerle etkileşim kurma

Not defterleri mantığı hücre hücre yürütür. Hücrenizdeki mantığı yürütmek için:

  1. Önceki adımda tamamladığınız hücreyi çalıştırmak için hücreyi seçin ve SHIFT+ENTERbasın.

  2. Yeni oluşturduğunuz tabloyu sorgulamak için, aşağıdaki kodu kopyalayıp boş bir hücreye yapıştırın, ardından SHIFT+ENTER tuşlarına basarak hücreyi çalıştırın.

    Python

    df = spark.read.table(table_name)
    

    Scala programlama dili

    val df = spark.read.table(table_name)
    
  3. DataFrame'inizdeki verilerin önizlemesini görüntülemek için aşağıdaki kodu kopyalayıp boş bir hücreye yapıştırın, ardından SHIFT+ENTER tuşlarına basarak hücreyi çalıştırın.

    Python

    display(df)
    

    Scala programlama dili

    display(df)
    

Verileri görselleştirmeye yönelik etkileşimli seçenekler hakkında daha fazla bilgi edinmek için bkz. Databricks not defterlerindeki görselleştirmeler ve SQL düzenleyicisi.

5. Adım: İş zamanlama

Databricks not defterlerini bir Databricks işine görev olarak ekleyerek üretim betikleri olarak çalıştırabilirsiniz. Bu adımda, el ile tetikleyebileceğiniz yeni bir iş oluşturacaksınız.

Not defterinizi görev olarak zamanlamak için:

  1. Üst bilgi çubuğunun sağ tarafındaki Zamanla'ya tıklayın.
  2. İş adı için benzersiz bir ad girin.
  3. El ile'ye tıklayın.
  4. İşlem açılan listesinde, 1. adımda oluşturduğunuz işlem kaynağını seçin.
  5. Oluştur’a tıklayın.
  6. Görüntülenen pencerede, Şimdi çalıştıröğesine tıklayın.
  7. İş çalıştırma sonuçlarına bakmak için External LinkExternal LinkSon çalıştırma zaman damgasının yanındaki simgesine tıklayın.

İşler hakkında daha fazla bilgi için bkz. İşler nedir?.

Ek tümleştirmeler

Azure Databricks ile veri mühendisliğine yönelik tümleştirmeler ve araçlar hakkında daha fazla bilgi edinin: