Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Lakeflow Bildirimli İşlem Hatları, işlem hatlarında gerçekleştirilmiş görünümleri ve akış tablolarını tanımlamaya yönelik birkaç yeni SQL anahtar sözcüğü ve işlevi sunar. İşlem hattı geliştirmeye yönelik SQL desteği, Spark SQL'in temellerini temel alır ve Yapılandırılmış Akış işlevselliği için destek ekler.
PySpark DataFrames hakkında bilgi sahibi olan kullanıcılar Python ile işlem hattı kodu geliştirmeyi tercih edebilir. Python, meta programlama işlemleri gibi SQL ile uygulanması zor olan daha kapsamlı test ve işlemleri destekler. Bkz. Pythonile işlem hattı kodu geliştirme.
Lakeflow Bildirimli İşlem Hatları SQL söz diziminin tam başvurusu için bkz. Lakeflow Bildirimli İşlem Hatları SQL dil başvurusu.
İşlem hattı geliştirme için SQL'in temelleri
Lakeflow Deklaratif İşlem Hatları veri kümelerini oluşturan SQL kodu, sorgu sonuçlarına karşılaştırarak gerçekleştirilmiş görünümleri ve akış tablolarını tanımlamak için CREATE OR REFRESH
söz dizimini kullanır.
STREAM
anahtar sözcüğü, bir SELECT
yan tümcesinde başvuruda bulunılan veri kaynağının akış semantiğiyle okunması gerekip gerekmediğini belirtir.
İşlem hattı yapılandırması sırasında belirtilen kataloğa ve şemaya varsayılan olarak okur ve yazar. Bakınız Hedef kataloğu ve şemayı ayarla.
Lakeflow Bildirimli İşlem Hatları kaynak kodu sql betiklerinden kritik ölçüde farklıdır: Lakeflow Bildirimli İşlem Hatları, bir işlem hattında yapılandırılan tüm kaynak kodu dosyalarındaki tüm veri kümesi tanımlarını değerlendirir ve sorgular çalıştırilmeden önce bir veri akışı grafiği oluşturur. Not defterinde veya betikte görünen sorguların sırası, kod değerlendirme sırasını tanımlar, ancak sorgu yürütme sırasını tanımlamaz.
SQL ile maddileştirilmiş görünüm oluşturma
Aşağıdaki kod örneği, SQL ile gerçekleştirilmiş görünüm oluşturmaya yönelik temel söz dizimini gösterir:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
SQL ile akış tablosu oluşturma
Aşağıdaki kod örneği, SQL ile akış tablosu oluşturmaya yönelik temel söz dizimini gösterir. Akış tablosu için bir kaynağı okurken anahtar sözcüğü kaynak STREAM
için akış semantiğinin kullanılacağını belirtir. Gerçekleştirilmiş görünüm oluştururken anahtar sözcüğünü STREAM
kullanmayın:
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Not
Kaynaktan okumak üzere akış semantiğini kullanmak için STREAM anahtar sözcüğünü kullanın. Okuma işlemi var olan bir kayıtta bir değişiklik veya silme işlemiyle karşılaşırsa bir hata oluşur. Statik veya yalnızca ekleme kaynaklarından okumak en güvenlidir. Değişiklik taahhütleri içeren verileri içeri çekmek için Python'ı ve SkipChangeCommits
seçeneğini hataları işlemek için kullanabilirsiniz.
Nesne depolamadan veri yükleme
Lakeflow Bildirimli İşlem Hatları, Azure Databricks tarafından desteklenen tüm biçimlerden veri yüklemeyi destekler. bkz. Veri biçimi seçenekleri.
Not
Bu örneklerde, çalışma alanınıza otomatik olarak bağlanan /databricks-datasets
'da yer alan veriler kullanılır. Databricks, bulut nesne depolama alanında depolanan verilere başvurmak için birim yollarının veya bulut URI'lerinin kullanılmasını önerir. Bkz. Unity Kataloğu birimleri nelerdir?.
Databricks, artımlı alım iş yüklerini bulut nesne depolamasında depolanan verilere göre yapılandırırken Otomatik Yükleyici ve akış tablolarının kullanılmasını önerir. Bkz. Otomatik Yükleyici nedir?.
SQL, Otomatik Yükleyici işlevselliğini çağırmak için read_files
işlevini kullanır.
STREAM
ile bir akış okuması yapılandırmak için read_files
anahtar sözcüğünü de kullanmanız gerekir.
Aşağıda, SQL'de için read_files
söz dizimi açıklanmaktadır:
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)
Otomatik Yükleyici için seçenekler anahtar-değer çiftleridir. Desteklenen biçimler ve seçenekler hakkında ayrıntılı bilgi için bkz. Seçenekler.
Aşağıdaki örnek, Otomatik Yükleyici kullanarak JSON dosyalarından bir akış tablosu oluşturur:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
read_files
işlevi, materyalize edilmiş görünümler oluşturmak için toplu semantiği de destekler. Aşağıdaki örnek, JSON dizinini okumak ve gerçekleştirilmiş bir görünüm oluşturmak için toplu semantiği kullanır:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
Verileri beklentilerle doğrulama
Veri kalitesi kısıtlamalarını ayarlamak ve uygulamak için beklentileri kullanabilirsiniz. bkz. İşlem hattı beklentileriyle veri kalitesini yönetme.
Aşağıdaki kod, veri alımı sırasında null olan kayıtları düşüren valid_data
adlı bir beklenti tanımlar:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
İşlem hattınızda tanımlanan gerçekleştirilmiş görünümleri ve akış tablolarını sorgulayın
Aşağıdaki örnek dört veri kümesini tanımlar:
- JSON verilerini yükleyen
orders
adlı bir akış tablosu. -
customers
adlı, CSV verilerini yükleyen maddi görünüm. -
customer_orders
veorders
veri kümelerindeki kayıtları birleştiren, sipariş zaman damgasını bir tarih formatına dönüştüren vecustomers
,customer_id
,order_number
vestate
alanlarını seçenorder_date
adlı oluşturulmuş bir görünüm. - Her eyalet için günlük sipariş sayısını toplayan
daily_orders_by_state
adlı bir materialize edilmiş görünüm.
Not
İşlem hattınızdaki görünümleri veya tabloları sorgularken, kataloğu ve şemayı doğrudan belirtebilir veya işlem hattınızda yapılandırılan varsayılanları kullanabilirsiniz. Bu örnekte, orders
, customers
ve customer_orders
tabloları, işlem hattınız için yapılandırılan varsayılan katalogdan ve şemadan yazılır ve okunur.
Eski yayımlama modu, işlem hattınızda tanımlanan diğer gerçekleştirilmiş görünümleri ve akış tablolarını sorgulamak için LIVE
şemasını kullanır. Yeni işlem hatlarında, LIVE
şema söz dizimi görmezden gelinir. Bkz. LIVE şeması (eski).
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;
Özel tablo tanımlama
Gerçekleştirilmiş görünüm veya akış tablosu oluştururken yan tümcesini PRIVATE
kullanabilirsiniz. Özel tablo oluşturduğunuzda, tabloyu oluşturursunuz, ancak tablonun meta verilerini oluşturmazsınız.
PRIVATE
yan tümcesi, Lakeflow Bildirimli İşlem Hatlarına işlem hattı tarafından kullanılabilen ancak işlem hattı dışında erişilmemesi gereken bir tablo oluşturmasını bildirir. İşleme süresini kısaltmak için, özel bir tablo yalnızca bir güncelleştirme değil, bunu oluşturan işlem hattının ömrü boyunca kalır.
Özel tablolar, katalogdaki tablolarla aynı ada sahip olabilir. İşlem hattı içindeki bir tablo için nitelenmemiş bir ad belirtirseniz, hem özel tablo hem de bu ada sahip bir katalog tablosu varsa, özel tablo kullanılır.
Özel tablolar daha önce geçici tablolar olarak adlandırıldı.
Gerçekleştirilmiş görünümden veya akış tablosundan kayıtları kalıcı olarak silme
GDPR uyumluluğu gibi silme vektörlerinin etkinleştirildiği bir akış tablosundan kayıtları kalıcı olarak silmek için nesnenin temel delta tablolarında ek işlemler gerçekleştirilmelidir. Akış tablosundan kayıtların silinmesini sağlamak için bkz. Akış tablosundan kayıtları kalıcı olarak silme.
Gerçekleştirilmiş görünümler, yenilendiklerinde her zaman temel tablolardaki verileri yansıtır. Gerçekleştirilmiş görünümdeki verileri silmek için verileri kaynaktan silmeniz ve gerçekleştirilmiş görünümü yenilemeniz gerekir.
SQL ile tabloları veya görünümleri bildirirken kullanılan değerleri parametreleştirme
Spark yapılandırmaları dahil olmak üzere bir tablo veya görünüm bildiren bir sorguda yapılandırma değeri belirtmek için SET
kullanın.
SET
deyiminden sonra not defterinde tanımladığınız herhangi bir tablo veya görünüm, tanımlı değere erişebilir.
SET
deyimi kullanılarak belirtilen tüm Spark yapılandırmaları, SET deyimini izleyen herhangi bir tablo veya görünüm için Spark sorgusu yürütülürken kullanılır. Sorgudaki yapılandırma değerini okumak için ${}
dize ilişkilendirme söz dizimini kullanın. Aşağıdaki örnek, startDate
adlı bir Spark yapılandırma değeri ayarlar ve bu değeri sorguda kullanır:
SET startDate='2025-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
Birden çok yapılandırma değeri belirtmek için her değer için ayrı bir SET
deyimi kullanın.
Sınırlamalar
PIVOT
yan tümcesi desteklenmiyor. Spark'taki pivot
işlemi, çıkış şemasını hesaplamak için giriş verilerinin hevesle yüklenmesini gerektirir. Bu özellik Lakeflow Bildirimli İşlem Hatlarında desteklenmez.
Not
Gerçekleştirilmiş görünüm oluşturmak için CREATE OR REFRESH LIVE TABLE
söz dizimi kullanım dışıdır. Bunun yerine CREATE OR REFRESH MATERIALIZED VIEW
kullanın.