Aracılığıyla paylaş


Delta Live Tables akışlarıyla verileri artımlı olarak yükleme ve işleme

Bu makalede, bir kaynaktan hedef akış tablosuna verileri artımlı olarak işlemek için Delta Live Tables işlem hatlarındaki akışları nasıl kullanabileceğiniz açıklanır. Delta Live Tablolarında akışlar iki şekilde tanımlanır:

  1. Akış tablosunu güncelleştiren bir sorgu oluşturduğunuzda akış otomatik olarak tanımlanır.
  2. Delta Live Tables ayrıca birden çok akış kaynağından akış tablosuna ekleme gibi daha karmaşık işlemler için akışları açıkça tanımlama işlevi de sağlar.

Bu makalede, akış tablosunu güncelleştirmek için bir sorgu tanımladığınızda oluşturulan örtük akışlar açıklanır ve daha karmaşık akışlar tanımlamak için söz dizimi hakkında ayrıntılar sağlanır.

Akış nedir?

Delta Live Tablolarında akış, hedef akış tablosunu güncelleştirmek için kaynak verileri artımlı olarak işleyen bir akış sorgusudur. İşlem hattında oluşturduğunuz Delta Live Tables veri kümelerinin çoğu, akışı sorgunun bir parçası olarak tanımlar ve akışın açıkça tanımlanmasını gerektirmez. Örneğin, delta live tablolarında akış tablosunu oluşturmak için ayrı tablo ve akış deyimleri kullanmak yerine tek bir DDL komutunda akış tablosu oluşturursunuz:

Not

Bu CREATE FLOW örnek yalnızca açıklayıcı amaçlarla sağlanır ve geçerli Delta Live Tables söz dizimi olmayan anahtar sözcükler içerir.

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

Sorgu tarafından tanımlanan varsayılan akışa ek olarak, Delta Live Tables Python ve SQL arabirimleri ekleme akışı işlevselliği sağlar. Ekleme akışı, tek bir akış tablosunu güncelleştirmek için birden çok akış kaynağından veri okumayı gerektiren işlemeyi destekler. Örneğin, var olan bir akış tablonuz ve akışınız varsa ve bu mevcut akış tablosuna yazan yeni bir akış kaynağı eklemek istediğinizde ekleme akışı işlevini kullanabilirsiniz.

Birden çok kaynak akıştan akış tablosuna yazmak için ekleme akışı kullanma

Not

Ekleme akışı işlemeyi kullanmak için işlem hattınızın önizleme kanalını kullanacak şekilde yapılandırılması gerekir.

@append_flow Birden çok akış kaynağından bir akış tablosuna yazmak için Python arabiriminde dekoratörünü veya CREATE FLOW SQL arabirimindeki yan tümcesini kullanın. Aşağıdaki gibi görevleri işlemek için ekleme akışını kullanın:

  • Tam yenileme gerektirmeden mevcut bir akış tablosuna veri ekleyen akış kaynakları ekleyin. Örneğin, çalıştığınız her bölgeden bölgesel verileri birleştiren bir tablonuz olabilir. Yeni bölgeler kullanıma sunulduktan sonra, tam yenileme gerçekleştirmeden yeni bölge verilerini tabloya ekleyebilirsiniz. Bkz . Örnek: Birden çok Kafka konu başlığından akış tablosuna yazma.
  • Eksik geçmiş verileri (geri doldurma) ekleyerek bir akış tablosunu güncelleştirin. Örneğin, Apache Kafka konusu tarafından yazılan mevcut bir akış tablonuz var. Ayrıca, akış tablosuna tam olarak bir kez eklenmesi gereken bir tabloda depolanan geçmiş verileriniz vardır ve işlemeniz verileri eklemeden önce karmaşık bir toplama gerçekleştirmeyi içerdiğinden verileri akışla aktaramazsınız. Bkz . Örnek: Tek seferlik veri doldurmayı çalıştırma.
  • Birden çok kaynaktaki verileri birleştirin ve sorguda yan tümcesini kullanmak yerine tek bir akış tablosuna UNION yazın. Yerine ekleme akışı işlemeyi UNION kullanmak, tam yenileme güncelleştirmesi çalıştırmadan hedef tabloyu artımlı olarak güncelleştirmenizi sağlar. Bkz . Örnek: UNION yerine ekleme akışı işlemeyi kullanma.

Ekleme akışı işleme tarafından kayıt çıkışının hedefi var olan bir tablo veya yeni bir tablo olabilir. Python sorguları için create_streaming_table() işlevini kullanarak bir hedef tablo oluşturun.

Önemli

  • Veri kalitesi kısıtlamalarını beklentilerle tanımlamanız gerekiyorsa, hedef tablodaki beklentileri işlevin bir parçası create_streaming_table() olarak veya mevcut bir tablo tanımında tanımlayın. Tanımda beklentileri tanımlayamazsınız @append_flow .
  • Akışlar bir akış adıyla tanımlanır ve bu ad akış denetim noktalarını tanımlamak için kullanılır. Denetim noktasını tanımlamak için akış adının kullanılması aşağıdaki anlamına gelir:
    • İşlem hattındaki mevcut bir akış yeniden adlandırılırsa denetim noktası taşınmaz ve yeniden adlandırılan akış etkili bir şekilde tamamen yeni bir akıştır.
    • Mevcut denetim noktası yeni akış tanımıyla eşleşmediğinden, bir akış adını işlem hattında yeniden kullanamazsınız.

için söz dizimi @append_flowaşağıdadır:

Python

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

SQL

CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.

CREATE FLOW
  flow_name
AS INSERT INTO
  target_table BY NAME
SELECT * FROM
  source;

Örnek: Birden çok Kafka konu başlığından akış tablosuna yazma

Aşağıdaki örnekler adlı kafka_target bir akış tablosu oluşturur ve bu akış tablosuna iki Kafka konusundan yazar:

Python

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

SQL sorgularında kullanılan tablo değerli işlev hakkında read_kafka() daha fazla bilgi edinmek için bkz . SQL dil başvurusunda read_kafka .

Örnek: Tek seferlik veri doldurma çalıştırma

Aşağıdaki örneklerde geçmiş verileri akış tablosuna eklemek için bir sorgu çalıştırılır:

Not

Yedekleme sorgusu zamanlanmış olarak veya sürekli olarak çalışan bir işlem hattının parçası olduğunda gerçek bir kerelik bir yedekleme sağlamak için, işlem hattını bir kez çalıştırdıktan sonra sorguyu kaldırın. Yeni veriler backfill dizinine ulaşırsa eklemek için sorguyu yerinde bırakın.

Python

import dlt

@dlt.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Örnek: Ekleme akışı işleme yerine kullanma UNION

Yan tümcesi olan UNION bir sorgu kullanmak yerine, birden çok kaynağı birleştirmek ve tek bir akış tablosuna yazmak için ekleme akışı sorgularını kullanabilirsiniz. Yerine ekleme akışı sorgularını UNION kullanmak, tam yenileme çalıştırmadan birden çok kaynaktan bir akış tablosuna eklemenizi sağlar.

Aşağıdaki Python örneği, birden çok veri kaynağını yan tümcesiyle birleştiren bir UNION sorgu içerir:

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")

  raw_orders_eu =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")

  return raw_orders_us.union(raw_orders_eu)

Aşağıdaki örnekler sorguyu UNION ekleme akışı sorgularıyla değiştirir:

Python

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/us",
    "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/eu",
    "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/apac",
    "csv"
  );