Aracılığıyla paylaş


append_flow

Dekoratör, @dp.append_flow işlem hattı tablolarınız için ekleme akışları veya geri doldurmalar oluşturur. İşlev bir Apache Spark akış veri çerçevesi döndürmelidir. Bkz. Lakeflow Spark Bildirimli İşlem Hatları akışlarıyla verileri artımlı olarak yükleme ve işleme.

Ekleme akışları akış tablolarını veya havuzlarını hedefleyebilir.

Sözdizimi

from pyspark import pipelines as dp

dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dp.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  once = <bool>, # optional, defaults to false
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming-query>) #

Parametreler

Parametre Türü Description
function function Gerekli. Kullanıcı tanımlı bir sorgudan Apache Spark akış DataFrame'i döndüren işlev.
target str Gerekli. Ekleme akışının hedefi olan tablo veya havuzun adı.
name str Akış adı. Sağlanmadıysa, varsayılan olarak işlev adını kullanır.
once bool İsteğe bağlı olarak, akışı yedek doldurma gibi tek seferlik bir akış olarak tanımlayın. Kullanımı once=True , akışı iki şekilde değiştirir:
  • Geri dönüş değeri. streaming-query. Bu durumda, bir akış DataFrame değil, bir yığın DataFrame olmalıdır.
  • Akış varsayılan olarak bir kez çalıştırılır. Eğer işlem hattı eksiksiz bir yenilemeyle güncellenirse, ONCE akış verileri yeniden oluşturmak için tekrar çalıştırılır.
comment str Akış açıklaması.
spark_conf dict Bu sorgunun yürütülmesi için Spark yapılandırmalarının listesi

Örnekler

from pyspark import pipelines as dp

# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
    return (
      spark.read
      .format("json")
      .load("/path/to/backfill/")
    )

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))