Aracılığıyla paylaş


Lakeflow Spark Bildirimli İşlem Hatlarındaki akış örnekleri

Örnek: Birden çok Kafka konu başlığından akış tablosuna yazma

Aşağıdaki örnek, "kafka_target" adlı bir akış tablosu oluşturur ve iki Kafka kanalından bu akış tablosuna veri yazar.

Piton

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dp.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

SQL sorgularında kullanılan tablo değerli fonksiyon hakkında read_kafka() daha fazla bilgi edinmek için bkz. SQL dil referansında read_kafka.

Python'da program aracılığıyla tek bir tabloyu hedefleyen birden çok akış oluşturabilirsiniz. Aşağıdaki örnekte Kafka konularının listesi için bu desen gösterilmektedir.

Uyarı

Bu desen, tablo oluşturmak için döngü for kullanmakla aynı gereksinimlere sahiptir. Akışı tanımlayan işleve açıkça bir Python değeri geçirmeniz gerekir. Bkz. Döngüde for tablo oluşturma.

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

  @dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
  def topic_flow(topic=topic_name):
    return (
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,...")
        .option("subscribe", topic)
        .load()
    )

Örnek: Tek seferlik veri geri doldurma işlemi yürütme

Var olan bir akış tablosuna veri eklemek için bir sorgu çalıştırmak istiyorsanız kullanın append_flow.

Var olan bir veri kümesini ekledikten sonra birden çok seçeneğiniz vardır:

  • Sorgunun, backfill dizinine ulaşırsa yeni verileri eklemesini istiyorsanız sorguyu yerinde bırakın.
  • Bunun tek seferlik bir dolum olmasını ve bir daha asla çalıştırılmamasını istiyorsanız, işlem hattını bir kez çalıştırdıktan sonra sorguyu kaldırın.
  • Sorgunun bir kez çalıştırılmasını ve yalnızca verilerin tamamen güncellendiği durumlarda yeniden çalıştırılmasını istiyorsanız, ekleme sürecinde parametresini once olarak True ayarlayın. SQL'de kullanın INSERT INTO ONCE.

Aşağıdaki örneklerde geçmiş verileri akış tablosuna eklemek için bir sorgu çalıştırılır:

Piton

from pyspark import pipelines as dp

@dp.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dp.append_flow(
  target = "csv_target",
  once = True)
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO ONCE
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Daha ayrıntılı bir örnek için bkz. Geçmiş verileri işlem hatlarıyla doldurma.

Örnek: Ekleme akışı işlemesini UNION yerine kullanın

Yan tümcesi olan UNION bir sorgu kullanmak yerine, birden çok kaynağı birleştirmek ve tek bir akış tablosuna yazmak için ekleme akışı sorgularını kullanabilirsiniz. Kullanıcının akış tablosuna birden çok kaynaktan veri eklemesine olanak tanıyan ekleme akış sorgularını, UNION çalıştırmadan yerine kullanabilirsiniz.

Aşağıdaki Python örneği, birden çok veri kaynağını yan tümcesiyle birleştiren bir UNION sorgu içerir:

@dp.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")
  )

  raw_orders_eu = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")
  )

  return raw_orders_us.union(raw_orders_eu)

Aşağıdaki örnekler sorguyu UNION ekleme akışı sorgularıyla değiştirir:

Piton

dp.create_streaming_table("raw_orders")

@dp.append_flow(target="raw_orders")
def raw_orders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dp.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/us",
    format => "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/eu",
    format => "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/apac",
    format => "csv"
  );