다음을 통해 공유


Lakeflow Spark 선언적 파이프라인의 흐름 예제

예: 여러 Kafka 토픽의 스트리밍 테이블에 쓰기

다음 예제들에서는 이름이 kafka_target인 스트리밍 테이블을 만들고 두 개의 Kafka 토픽에서 스트리밍 테이블로 기록합니다.

파이썬

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dp.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

SQL 쿼리에서 read_kafka() 사용되는 테이블 반환 함수에 대한 자세한 내용은 SQL 언어 참조의 read_kafka 참조하세요.

Python에서는 프로그래밍 방식으로 단일 테이블을 대상으로 하는 여러 흐름을 만들 수 있습니다. 다음 예제에서는 Kafka 항목 목록에 대해 이 패턴을 보여 줍니다.

비고

이 패턴에는 루프를 사용하여 for 테이블을 만드는 것과 동일한 요구 사항이 있습니다. 흐름을 정의하는 함수에 Python 값을 명시적으로 전달해야 합니다. 루프에서 테이블 만들기를 for 참조하세요.

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

  @dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
  def topic_flow(topic=topic_name):
    return (
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,...")
        .option("subscribe", topic)
        .load()
    )

예: 일회성 데이터 백필 실행

쿼리를 실행하여 기존 스트리밍 테이블에 데이터를 추가하려면 .를 사용합니다 append_flow.

기존 데이터 집합을 추가한 후 다음과 같은 여러 옵션이 있습니다.

  • 쿼리가 백필 디렉터리에 도착하는 경우 새 데이터를 추가하려면 쿼리를 그대로 둡니다.
  • 이를 한 번 백필하고 다시 실행하지 않으려면 파이프라인을 한 번 실행한 후 쿼리를 제거합니다.
  • 쿼리를 한 번 실행하고 데이터가 완전히 새로 고쳐지는 경우에만 다시 실행하려면 추가 흐름에서 매개 변수를 once 설정합니다True. SQL에서 INSERT INTO ONCE을(를) 사용하십시오.

다음 예제에서는 쿼리를 실행하여 기록 데이터를 스트리밍 테이블에 추가합니다.

파이썬

from pyspark import pipelines as dp

@dp.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dp.append_flow(
  target = "csv_target",
  once = True)
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO ONCE
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

자세한 예제는 파이프라인을 사용하여 기록 데이터 백필을 참조하세요.

예: 대신 추가 흐름 처리 사용 UNION

다중 원본을 결합하고 단일 스트리밍 테이블에 쓸 수 있도록 UNION 절 쿼리를 사용하는 대신 어펜드 플로우 쿼리를 사용할 수 있습니다. 대신 추가 흐름 쿼리를 UNION 사용하면 전체 새로 고침을 실행하지 않고 여러 원본에서 스트리밍 테이블에 추가할 수 있습니다.

다음 Python 예제에는 여러 데이터 원본을 구문과 결합하는 쿼리에 UNION이(가) 포함되어 있습니다.

@dp.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")
  )

  raw_orders_eu = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")
  )

  return raw_orders_us.union(raw_orders_eu)

다음 예제에서는 UNION 쿼리를 추가 흐름 쿼리로 대체합니다.

파이썬

dp.create_streaming_table("raw_orders")

@dp.append_flow(target="raw_orders")
def raw_orders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dp.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/us",
    format => "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/eu",
    format => "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/apac",
    format => "csv"
  );

예시: 센서 하트비트를 모니터링할 때 transformWithState를 사용하세요

다음 예제에서는 Kafka로부터 데이터를 읽어오고 센서가 주기적으로 하트비트를 내보내고 있는지 확인하는 상태유지 프로세서를 보여 줍니다. 하트비트가 5분 이내에 수신되지 않으면 프로세서는 분석을 위해 대상 델타 테이블에 대한 항목을 내보낸다.

사용자 지정 상태 저장 애플리케이션을 빌드하는 방법에 대한 자세한 내용은 사용자 지정 상태 저장 애플리케이션 빌드를 참조하세요.

비고

RocksDB는 Databricks Runtime 17.2부터 시작하는 기본 상태 공급자입니다. 지원되지 않는 공급자 예외로 인해 쿼리가 실패하는 경우 다음 파이프라인 구성을 추가하고 전체 새로 고침 또는 검사점 재설정을 수행한 다음 파이프라인을 다시 실행합니다.

"configuration": {
    "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider",
    "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled": "true"
}
from typing import Iterator

import pandas as pd

from pyspark import pipelines as dp
from pyspark.sql.functions import col, from_json
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType, TimestampType

KAFKA_TOPIC = "<your-kafka-topic>"

output_schema = StructType([
    StructField("sensor_id", LongType(), False),
    StructField("sensor_type", StringType(), False),
    StructField("last_heartbeat_time", TimestampType(), False)])

class SensorHeartbeatProcessor(StatefulProcessor):
    def init(self, handle: StatefulProcessorHandle) -> None:
        # Define state schema to store sensor information (sensor_id is the grouping key)
        state_schema = StructType([
            StructField("sensor_type", StringType(), False),
            StructField("last_heartbeat_time", TimestampType(), False)])
        self.sensor_state = handle.getValueState("sensorState", state_schema)
        # State variable to track the previously registered timer
        timer_schema = StructType([StructField("timer_ts", LongType(), False)])
        self.timer_state = handle.getValueState("timerState", timer_schema)
        self.handle = handle

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        # Process one row from input and update state
        pdf = next(rows)
        row = pdf.iloc[0]
        # Store or update the sensor information in state using current timestamp
        current_time = pd.Timestamp(timerValues.getCurrentProcessingTimeInMs(), unit='ms')
        self.sensor_state.update((
            row["sensor_type"],
            current_time
        ))

        # Delete old timer if already registered
        if self.timer_state.exists():
            old_timer = self.timer_state.get()[0]
            self.handle.deleteTimer(old_timer)

        # Register a timer for 5 minutes from current processing time
        expiry_time = timerValues.getCurrentProcessingTimeInMs() + (5 * 60 * 1000)
        self.handle.registerTimer(expiry_time)
        # Store the new timer timestamp in state
        self.timer_state.update((expiry_time,))

        # No output on input processing, output only on timer expiry
        return iter([])

    def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> Iterator[pd.DataFrame]:
        # Emit output row based on state store
        if self.sensor_state.exists():
            state = self.sensor_state.get()
            output = pd.DataFrame({
                "sensor_id": [key[0]],  # Use grouping key as sensor_id
                "sensor_type": [state[0]],
                "last_heartbeat_time": [state[1]]
            })
            # Remove the entry for the sensor from the state store
            self.sensor_state.clear()
            # Remove the timer state entry
            self.timer_state.clear()
            yield output

    def close(self) -> None:
        pass

dp.create_streaming_table("sensorAlerts")

# Define the schema for the Kafka message value
sensor_schema = StructType([
    StructField("sensor_id", LongType(), False),
    StructField("sensor_type", StringType(), False),
    StructField("sensor_value", LongType(), False)])

@dp.append_flow(target = "sensorAlerts")
def kafka_delta_flow():
    return (
      spark.readStream
        .format("kafka")
        .option("subscribe", KAFKA_TOPIC)
        .option("startingOffsets", "earliest")
        .load()
        .select(from_json(col("value").cast("string"), sensor_schema).alias("data"), col("timestamp"))
        .select("data.*", "timestamp")
        .withWatermark('timestamp', '1 hour')
        .groupBy(col("sensor_id"))
        .transformWithStateInPandas(
          statefulProcessor = SensorHeartbeatProcessor(),
          outputStructType = output_schema,
          outputMode = 'update',
          timeMode = 'ProcessingTime'))