Delta Live Tables akışlarıyla verileri artımlı olarak yükleme ve işleme
Bu makalede, bir kaynaktan hedef akış tablosuna verileri artımlı olarak işlemek için Delta Live Tables işlem hatlarındaki akışları nasıl kullanabileceğiniz açıklanır. Delta Live Tablolarında akışlar iki şekilde tanımlanır:
- Akış tablosunu güncelleştiren bir sorgu oluşturduğunuzda akış otomatik olarak tanımlanır.
- Delta Live Tables ayrıca birden çok akış kaynağından akış tablosuna ekleme gibi daha karmaşık işlemler için akışları açıkça tanımlama işlevi de sağlar.
Bu makalede, akış tablosunu güncelleştirmek için bir sorgu tanımladığınızda oluşturulan örtük akışlar açıklanır ve daha karmaşık akışlar tanımlamak için söz dizimi hakkında ayrıntılar sağlanır.
Akış nedir?
Delta Live Tablolarında akış, hedef akış tablosunu güncelleştirmek için kaynak verileri artımlı olarak işleyen bir akış sorgusudur. İşlem hattında oluşturduğunuz Delta Live Tables veri kümelerinin çoğu, akışı sorgunun bir parçası olarak tanımlar ve akışın açıkça tanımlanmasını gerektirmez. Örneğin, delta live tablolarında akış tablosunu oluşturmak için ayrı tablo ve akış deyimleri kullanmak yerine tek bir DDL komutunda akış tablosu oluşturursunuz:
Not
Bu CREATE FLOW
örnek yalnızca açıklayıcı amaçlarla sağlanır ve geçerli Delta Live Tables söz dizimi olmayan anahtar sözcükler içerir.
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
Sorgu tarafından tanımlanan varsayılan akışa ek olarak, Delta Live Tables Python ve SQL arabirimleri ekleme akışı işlevselliği sağlar. Ekleme akışı, tek bir akış tablosunu güncelleştirmek için birden çok akış kaynağından veri okumayı gerektiren işlemeyi destekler. Örneğin, var olan bir akış tablonuz ve akışınız varsa ve bu mevcut akış tablosuna yazan yeni bir akış kaynağı eklemek istediğinizde ekleme akışı işlevini kullanabilirsiniz.
Birden çok kaynak akıştan akış tablosuna yazmak için ekleme akışı kullanma
Not
Ekleme akışı işlemeyi kullanmak için işlem hattınızın önizleme kanalını kullanacak şekilde yapılandırılması gerekir.
@append_flow
Birden çok akış kaynağından bir akış tablosuna yazmak için Python arabiriminde dekoratörünü veya CREATE FLOW
SQL arabirimindeki yan tümcesini kullanın. Aşağıdaki gibi görevleri işlemek için ekleme akışını kullanın:
- Tam yenileme gerektirmeden mevcut bir akış tablosuna veri ekleyen akış kaynakları ekleyin. Örneğin, çalıştığınız her bölgeden bölgesel verileri birleştiren bir tablonuz olabilir. Yeni bölgeler kullanıma sunulduktan sonra, tam yenileme gerçekleştirmeden yeni bölge verilerini tabloya ekleyebilirsiniz. Bkz . Örnek: Birden çok Kafka konu başlığından akış tablosuna yazma.
- Eksik geçmiş verileri (geri doldurma) ekleyerek bir akış tablosunu güncelleştirin. Örneğin, Apache Kafka konusu tarafından yazılan mevcut bir akış tablonuz var. Ayrıca, akış tablosuna tam olarak bir kez eklenmesi gereken bir tabloda depolanan geçmiş verileriniz vardır ve işlemeniz verileri eklemeden önce karmaşık bir toplama gerçekleştirmeyi içerdiğinden verileri akışla aktaramazsınız. Bkz . Örnek: Tek seferlik veri doldurmayı çalıştırma.
- Birden çok kaynaktaki verileri birleştirin ve sorguda yan tümcesini kullanmak yerine tek bir akış tablosuna
UNION
yazın. Yerine ekleme akışı işlemeyiUNION
kullanmak, tam yenileme güncelleştirmesi çalıştırmadan hedef tabloyu artımlı olarak güncelleştirmenizi sağlar. Bkz . Örnek: UNION yerine ekleme akışı işlemeyi kullanma.
Ekleme akışı işleme tarafından kayıt çıkışının hedefi var olan bir tablo veya yeni bir tablo olabilir. Python sorguları için create_streaming_table() işlevini kullanarak bir hedef tablo oluşturun.
Önemli
- Veri kalitesi kısıtlamalarını beklentilerle tanımlamanız gerekiyorsa, hedef tablodaki beklentileri işlevin bir parçası
create_streaming_table()
olarak veya mevcut bir tablo tanımında tanımlayın. Tanımda beklentileri tanımlayamazsınız@append_flow
. - Akışlar bir akış adıyla tanımlanır ve bu ad akış denetim noktalarını tanımlamak için kullanılır. Denetim noktasını tanımlamak için akış adının kullanılması aşağıdaki anlamına gelir:
- İşlem hattındaki mevcut bir akış yeniden adlandırılırsa denetim noktası taşınmaz ve yeniden adlandırılan akış etkili bir şekilde tamamen yeni bir akıştır.
- Mevcut denetim noktası yeni akış tanımıyla eşleşmediğinden, bir akış adını işlem hattında yeniden kullanamazsınız.
için söz dizimi @append_flow
aşağıdadır:
Python
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
SQL
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
target_table BY NAME
SELECT * FROM
source;
Örnek: Birden çok Kafka konu başlığından akış tablosuna yazma
Aşağıdaki örnekler adlı kafka_target
bir akış tablosu oluşturur ve bu akış tablosuna iki Kafka konusundan yazar:
Python
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
SQL sorgularında kullanılan tablo değerli işlev hakkında read_kafka()
daha fazla bilgi edinmek için bkz . SQL dil başvurusunda read_kafka .
Örnek: Tek seferlik veri doldurma çalıştırma
Aşağıdaki örneklerde geçmiş verileri akış tablosuna eklemek için bir sorgu çalıştırılır:
Not
Yedekleme sorgusu zamanlanmış olarak veya sürekli olarak çalışan bir işlem hattının parçası olduğunda gerçek bir kerelik bir yedekleme sağlamak için, işlem hattını bir kez çalıştırdıktan sonra sorguyu kaldırın. Yeni veriler backfill dizinine ulaşırsa eklemek için sorguyu yerinde bırakın.
Python
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Örnek: Ekleme akışı işleme yerine kullanma UNION
Yan tümcesi olan UNION
bir sorgu kullanmak yerine, birden çok kaynağı birleştirmek ve tek bir akış tablosuna yazmak için ekleme akışı sorgularını kullanabilirsiniz. Yerine ekleme akışı sorgularını UNION
kullanmak, tam yenileme çalıştırmadan birden çok kaynaktan bir akış tablosuna eklemenizi sağlar.
Aşağıdaki Python örneği, birden çok veri kaynağını yan tümcesiyle birleştiren bir UNION
sorgu içerir:
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
Aşağıdaki örnekler sorguyu UNION
ekleme akışı sorgularıyla değiştirir:
Python
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/us",
"csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/eu",
"csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/apac",
"csv"
);