Aracılığıyla paylaş


SQL ile işlem hattı kodu geliştirme

Lakeflow Bildirimli İşlem Hatları, işlem hatlarında gerçekleştirilmiş görünümleri ve akış tablolarını tanımlamaya yönelik birkaç yeni SQL anahtar sözcüğü ve işlevi sunar. İşlem hattı geliştirmeye yönelik SQL desteği, Spark SQL'in temellerini temel alır ve Yapılandırılmış Akış işlevselliği için destek ekler.

PySpark DataFrames hakkında bilgi sahibi olan kullanıcılar Python ile işlem hattı kodu geliştirmeyi tercih edebilir. Python, meta programlama işlemleri gibi SQL ile uygulanması zor olan daha kapsamlı test ve işlemleri destekler. Bkz. Pythonile işlem hattı kodu geliştirme.

Lakeflow Bildirimli İşlem Hatları SQL söz diziminin tam başvurusu için bkz. Lakeflow Bildirimli İşlem Hatları SQL dil başvurusu.

İşlem hattı geliştirme için SQL'in temelleri

Lakeflow Deklaratif İşlem Hatları veri kümelerini oluşturan SQL kodu, sorgu sonuçlarına karşılaştırarak gerçekleştirilmiş görünümleri ve akış tablolarını tanımlamak için CREATE OR REFRESH söz dizimini kullanır.

STREAM anahtar sözcüğü, bir SELECT yan tümcesinde başvuruda bulunılan veri kaynağının akış semantiğiyle okunması gerekip gerekmediğini belirtir.

İşlem hattı yapılandırması sırasında belirtilen kataloğa ve şemaya varsayılan olarak okur ve yazar. Bakınız Hedef kataloğu ve şemayı ayarla.

Lakeflow Bildirimli İşlem Hatları kaynak kodu sql betiklerinden kritik ölçüde farklıdır: Lakeflow Bildirimli İşlem Hatları, bir işlem hattında yapılandırılan tüm kaynak kodu dosyalarındaki tüm veri kümesi tanımlarını değerlendirir ve sorgular çalıştırilmeden önce bir veri akışı grafiği oluşturur. Not defterinde veya betikte görünen sorguların sırası, kod değerlendirme sırasını tanımlar, ancak sorgu yürütme sırasını tanımlamaz.

SQL ile maddileştirilmiş görünüm oluşturma

Aşağıdaki kod örneği, SQL ile gerçekleştirilmiş görünüm oluşturmaya yönelik temel söz dizimini gösterir:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

SQL ile akış tablosu oluşturma

Aşağıdaki kod örneği, SQL ile akış tablosu oluşturmaya yönelik temel söz dizimini gösterir. Akış tablosu için bir kaynağı okurken anahtar sözcüğü kaynak STREAM için akış semantiğinin kullanılacağını belirtir. Gerçekleştirilmiş görünüm oluştururken anahtar sözcüğünü STREAM kullanmayın:

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Not

Kaynaktan okumak üzere akış semantiğini kullanmak için STREAM anahtar sözcüğünü kullanın. Okuma işlemi var olan bir kayıtta bir değişiklik veya silme işlemiyle karşılaşırsa bir hata oluşur. Statik veya yalnızca ekleme kaynaklarından okumak en güvenlidir. Değişiklik taahhütleri içeren verileri içeri çekmek için Python'ı ve SkipChangeCommits seçeneğini hataları işlemek için kullanabilirsiniz.

Nesne depolamadan veri yükleme

Lakeflow Bildirimli İşlem Hatları, Azure Databricks tarafından desteklenen tüm biçimlerden veri yüklemeyi destekler. bkz. Veri biçimi seçenekleri.

Not

Bu örneklerde, çalışma alanınıza otomatik olarak bağlanan /databricks-datasets'da yer alan veriler kullanılır. Databricks, bulut nesne depolama alanında depolanan verilere başvurmak için birim yollarının veya bulut URI'lerinin kullanılmasını önerir. Bkz. Unity Kataloğu birimleri nelerdir?.

Databricks, artımlı alım iş yüklerini bulut nesne depolamasında depolanan verilere göre yapılandırırken Otomatik Yükleyici ve akış tablolarının kullanılmasını önerir. Bkz. Otomatik Yükleyici nedir?.

SQL, Otomatik Yükleyici işlevselliğini çağırmak için read_files işlevini kullanır. STREAMile bir akış okuması yapılandırmak için read_files anahtar sözcüğünü de kullanmanız gerekir.

Aşağıda, SQL'de için read_files söz dizimi açıklanmaktadır:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM STREAM read_files(
    "<file-path>",
    [<option-key> => <option_value>, ...]
  )

Otomatik Yükleyici için seçenekler anahtar-değer çiftleridir. Desteklenen biçimler ve seçenekler hakkında ayrıntılı bilgi için bkz. Seçenekler.

Aşağıdaki örnek, Otomatik Yükleyici kullanarak JSON dosyalarından bir akış tablosu oluşturur:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
  "/databricks-datasets/retail-org/sales_orders",
  format => "json");

read_files işlevi, materyalize edilmiş görünümler oluşturmak için toplu semantiği de destekler. Aşağıdaki örnek, JSON dizinini okumak ve gerçekleştirilmiş bir görünüm oluşturmak için toplu semantiği kullanır:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
  "/databricks-datasets/retail-org/sales_orders",
  format => "json");

Verileri beklentilerle doğrulama

Veri kalitesi kısıtlamalarını ayarlamak ve uygulamak için beklentileri kullanabilirsiniz. bkz. İşlem hattı beklentileriyle veri kalitesini yönetme.

Aşağıdaki kod, veri alımı sırasında null olan kayıtları düşüren valid_data adlı bir beklenti tanımlar:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

İşlem hattınızda tanımlanan gerçekleştirilmiş görünümleri ve akış tablolarını sorgulayın

Aşağıdaki örnek dört veri kümesini tanımlar:

  • JSON verilerini yükleyen orders adlı bir akış tablosu.
  • customers adlı, CSV verilerini yükleyen maddi görünüm.
  • customer_orders ve orders veri kümelerindeki kayıtları birleştiren, sipariş zaman damgasını bir tarih formatına dönüştüren ve customers, customer_id, order_numberve state alanlarını seçen order_date adlı oluşturulmuş bir görünüm.
  • Her eyalet için günlük sipariş sayısını toplayan daily_orders_by_state adlı bir materialize edilmiş görünüm.

Not

İşlem hattınızdaki görünümleri veya tabloları sorgularken, kataloğu ve şemayı doğrudan belirtebilir veya işlem hattınızda yapılandırılan varsayılanları kullanabilirsiniz. Bu örnekte, orders, customersve customer_orders tabloları, işlem hattınız için yapılandırılan varsayılan katalogdan ve şemadan yazılır ve okunur.

Eski yayımlama modu, işlem hattınızda tanımlanan diğer gerçekleştirilmiş görünümleri ve akış tablolarını sorgulamak için LIVE şemasını kullanır. Yeni işlem hatlarında, LIVE şema söz dizimi görmezden gelinir. Bkz. LIVE şeması (eski).

CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;

Özel tablo tanımlama

Gerçekleştirilmiş görünüm veya akış tablosu oluştururken yan tümcesini PRIVATE kullanabilirsiniz. Özel tablo oluşturduğunuzda, tabloyu oluşturursunuz, ancak tablonun meta verilerini oluşturmazsınız. PRIVATE yan tümcesi, Lakeflow Bildirimli İşlem Hatlarına işlem hattı tarafından kullanılabilen ancak işlem hattı dışında erişilmemesi gereken bir tablo oluşturmasını bildirir. İşleme süresini kısaltmak için, özel bir tablo yalnızca bir güncelleştirme değil, bunu oluşturan işlem hattının ömrü boyunca kalır.

Özel tablolar, katalogdaki tablolarla aynı ada sahip olabilir. İşlem hattı içindeki bir tablo için nitelenmemiş bir ad belirtirseniz, hem özel tablo hem de bu ada sahip bir katalog tablosu varsa, özel tablo kullanılır.

Özel tablolar daha önce geçici tablolar olarak adlandırıldı.

Gerçekleştirilmiş görünümden veya akış tablosundan kayıtları kalıcı olarak silme

GDPR uyumluluğu gibi silme vektörlerinin etkinleştirildiği bir akış tablosundan kayıtları kalıcı olarak silmek için nesnenin temel delta tablolarında ek işlemler gerçekleştirilmelidir. Akış tablosundan kayıtların silinmesini sağlamak için bkz. Akış tablosundan kayıtları kalıcı olarak silme.

Gerçekleştirilmiş görünümler, yenilendiklerinde her zaman temel tablolardaki verileri yansıtır. Gerçekleştirilmiş görünümdeki verileri silmek için verileri kaynaktan silmeniz ve gerçekleştirilmiş görünümü yenilemeniz gerekir.

SQL ile tabloları veya görünümleri bildirirken kullanılan değerleri parametreleştirme

Spark yapılandırmaları dahil olmak üzere bir tablo veya görünüm bildiren bir sorguda yapılandırma değeri belirtmek için SET kullanın. SET deyiminden sonra not defterinde tanımladığınız herhangi bir tablo veya görünüm, tanımlı değere erişebilir. SET deyimi kullanılarak belirtilen tüm Spark yapılandırmaları, SET deyimini izleyen herhangi bir tablo veya görünüm için Spark sorgusu yürütülürken kullanılır. Sorgudaki yapılandırma değerini okumak için ${}dize ilişkilendirme söz dizimini kullanın. Aşağıdaki örnek, startDate adlı bir Spark yapılandırma değeri ayarlar ve bu değeri sorguda kullanır:

SET startDate='2025-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Birden çok yapılandırma değeri belirtmek için her değer için ayrı bir SET deyimi kullanın.

Sınırlamalar

PIVOT yan tümcesi desteklenmiyor. Spark'taki pivot işlemi, çıkış şemasını hesaplamak için giriş verilerinin hevesle yüklenmesini gerektirir. Bu özellik Lakeflow Bildirimli İşlem Hatlarında desteklenmez.

Not

Gerçekleştirilmiş görünüm oluşturmak için CREATE OR REFRESH LIVE TABLE söz dizimi kullanım dışıdır. Bunun yerine CREATE OR REFRESH MATERIALIZED VIEWkullanın.