Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Bu öğreticide, Apache Spark ile veri düzenleme için ilk ETL (ayıklama, dönüştürme ve yükleme) işlem hattınızı nasıl geliştirip dağıtabileceğiniz gösterilmektedir. Bu öğreticide Databricks tüm amaçlı işlem kullanılıyor olsa da, çalışma alanınız için etkinleştirildiyse sunucusuz işlem de kullanabilirsiniz.
ETL işlem hatları oluşturmak için Lakeflow Spark Bildirimli İşlem Hatlarını da kullanabilirsiniz. Databricks Lakeflow Spark Bildirimli İşlem Hatları, üretim ETL işlem hatlarını oluşturma, dağıtma ve bakımının karmaşıklığını azaltır. Bkz. Öğretici: Lakeflow Spark Bildirimli İşlem Hatları ile ETL işlem hattı oluşturma.
Bu makalenin sonunda şunları nasıl yapacağınızı öğreneceksiniz:
- Databricks çok amaçlı işlem kaynağını başlatın.
- Databricks not defteri oluşturun.
- Otomatik Yükleyici ile Delta Lake'e artımlı veri alımını yapılandırın.
- Verileri işleme ve verilerle etkileşim kurma.
- Bir not defterini Databricks işi olarak zamanlayın.
Bu öğreticide, Python veya Scala'daki yaygın ETL görevlerini tamamlamak için etkileşimli not defterleri kullanılır.
Bu makalenin kaynaklarını oluşturmak için Databricks Terraform sağlayıcısını da kullanabilirsiniz. Bkz. Terraform ile kümeler, not defterleri ve işler oluşturma.
Gereksinimler
- bir Azure Databricks çalışma alanında oturum açtınız.
- İşlem kaynağı oluşturma izniniz var.
Not
İşlem denetimi ayrıcalıklarınız yoksa, işlem kaynağına erişiminiz olduğu sürece aşağıdaki adımların çoğunu yine de tamamlayabilirsiniz.
1. Adım: İşlem kaynağı oluşturma
Keşif veri analizi ve veri mühendisliği yapmak için komutları yürütmek üzere bir işlem kaynağı oluşturun.
- Kenar çubuğunda
tıklayın İşlem. - Hesaplama sayfasında Hesaplama oluştur'a tıklayın.
- İşlem kaynağı için benzersiz bir ad belirtin, kalan değerleri varsayılan durumunda bırakın ve İşlem oluştur'a tıklayın.
Databricks hesaplama hakkında daha fazla bilgi edinmek için bkz. Hesaplama.
2. Adım: Databricks not defteri oluşturma
Çalışma alanınızda not defteri oluşturmak için kenar
çubuğunda Yeni'ye ve ardından Not Defteri'ne tıklayın. Çalışma alanında boş bir not defteri açılır.
Not defterlerini oluşturma ve yönetme hakkında daha fazla bilgi edinmek için bkz . Not defterlerini yönetme.
3. Adım: Verileri Delta Lake'e almak için Otomatik Yükleyici'yi yapılandırma
Databricks, artımlı veri alımı için Otomatik Yükleyici'nin kullanılmasını önerir. Otomatik Yükleyici, bulut nesne depolama alanına ulaşan yeni dosyaları otomatik olarak algılar ve işler.
Databricks, Delta Lake ile veri depolamayı önerir. Delta Lake, ACID işlemleri sağlayan ve data lakehouse'a olanak tanıyan open source bir depolama katmanıdır. Delta Lake, Databricks'te oluşturulan tablolar için varsayılan biçimdir.
Verileri Delta Lake tablosuna almak üzere Otomatik Yükleyici'yi yapılandırmak için aşağıdaki kodu kopyalayıp not defterinizdeki boş hücreye yapıştırın:
Python
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
Scala programlama dili
// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._
// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"
// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)
// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(Trigger.AvailableNow)
.toTable(table_name)
Not
Bu kodda tanımlanan değişkenler, mevcut çalışma alanı varlıklarıyla veya diğer kullanıcılarla çakışma riski olmadan bunu güvenli bir şekilde yürütmenize olanak sağlamalıdır. Kısıtlı ağ veya depolama izinleri bu kodu yürütürken hatalara neden olur; bu kısıtlamaları gidermek için çalışma alanı yöneticinize başvurun.
Otomatik Yükleyici hakkında daha fazla bilgi edinmek için bkz . Otomatik Yükleyici nedir?.
4. Adım: Verileri işleme ve verilerle etkileşim kurma
Not defterleri mantığı hücre hücre yürütür. Hücrenizdeki mantığı yürütmek için:
Önceki adımda tamamladığınız hücreyi çalıştırmak için hücreyi seçin ve SHIFT+ENTER
basın. Yeni oluşturduğunuz tabloyu sorgulamak için, aşağıdaki kodu kopyalayıp boş bir hücreye yapıştırın, ardından SHIFT+ENTER tuşlarına basarak hücreyi çalıştırın.
Python
df = spark.read.table(table_name)Scala programlama dili
val df = spark.read.table(table_name)DataFrame'inizdeki verilerin önizlemesini görüntülemek için aşağıdaki kodu kopyalayıp boş bir hücreye yapıştırın, ardından SHIFT+ENTER tuşlarına basarak hücreyi çalıştırın.
Python
display(df)Scala programlama dili
display(df)
Verileri görselleştirmeye yönelik etkileşimli seçenekler hakkında daha fazla bilgi edinmek için bkz. Databricks not defterlerindeki görselleştirmeler ve SQL düzenleyicisi.
5. Adım: İş zamanlama
Databricks not defterlerini bir Databricks işine görev olarak ekleyerek üretim betikleri olarak çalıştırabilirsiniz. Bu adımda, el ile tetikleyebileceğiniz yeni bir iş oluşturacaksınız.
Not defterinizi görev olarak zamanlamak için:
- Üst bilgi çubuğunun sağ tarafındaki Zamanla'ya tıklayın.
- İş adı için benzersiz bir ad girin.
- El ile'ye tıklayın.
- İşlem açılan listesinde, 1. adımda oluşturduğunuz işlem kaynağını seçin.
- Oluştur’a tıklayın.
- Görüntülenen pencerede, Şimdi çalıştıröğesine tıklayın.
- İş çalıştırma sonuçlarına bakmak için
External LinkSon çalıştırma zaman damgasının yanındaki simgesine tıklayın.
İşler hakkında daha fazla bilgi için bkz. İşler nedir?.
Ek tümleştirmeler
Azure Databricks ile veri mühendisliğine yönelik tümleştirmeler ve araçlar hakkında daha fazla bilgi edinin: