Nota
Capaian ke halaman ini memerlukan kebenaran. Anda boleh cuba mendaftar masuk atau menukar direktori.
Capaian ke halaman ini memerlukan kebenaran. Anda boleh cuba menukar direktori.
For an overview of flows, see Load and process data incrementally with Lakeflow Spark Declarative Pipelines flows.
Example: Create a default flow
When you create a pipeline, you typically define a table or view along with the query that supports it. For example, this query creates a streaming table named customers_silver by reading from customers_bronze. The streaming table and its default flow are created together in a single step.
SQL
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
Python
from pyspark import pipelines as dp
@dp.table()
def customers_silver():
return spark.readStream.table("customers_bronze")
The default flow for a streaming table is an append flow that adds new rows with each update, and it has the same name as the target. This is the most common way to use pipelines—creating a flow and its target in a single step—and you can use it to ingest or transform data. For more about flow concepts, see Load and process data incrementally with Lakeflow Spark Declarative Pipelines flows.
Example: Define a flow separately from its target
You can also create a flow for a table that you defined separately. The result is identical to creating a default flow, including using the same name for the streaming table and the flow:
Python
from pyspark import pipelines as dp
# create streaming table
dp.create_streaming_table("customers_silver")
# add a flow
@dp.append_flow(
target = "customers_silver")
def customer_silver():
return spark.readStream.table("customers_bronze")
SQL
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;
-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);
Defining a flow separately from its target lets you create multiple flows that append data to the same target. Use the @dp.append_flow decorator in the Python interface or the CREATE FLOW...INSERT INTO clause in the SQL interface to add flows for tasks such as the following:
- Add streaming sources that append data to an existing streaming table without requiring a full refresh. For example, you might have a table combining regional data from every region you operate in. As new regions are rolled out, you can add the new region data to the table without performing a full refresh. See Example: Write to a streaming table from multiple Kafka topics.
- Update a streaming table by appending missing historical data (backfilling). You can use the
INSERT INTO ONCEsyntax to create a historical backfill that runs one time. See Example: Run a one-time data backfill and Backfilling historical data with pipelines. - Combine data from multiple sources and write to a single streaming table instead of using the
UNIONclause in a query. Using append flow processing instead ofUNIONallows you to update the target table incrementally without running a full refresh update. See Example: Use append flow processing instead ofUNION.
For Python queries, use the create_streaming_table() function to create a target table.
Important
- If you need to define data quality constraints with expectations, define the expectations on the target table as part of the
create_streaming_table()function or on an existing table definition. You cannot define expectations in the@append_flowdefinition. - Flows are identified by a flow name, and this name is used to identify streaming checkpoints. The use of the flow name to identify the checkpoint means the following:
- If an existing flow in a pipeline is renamed, the checkpoint does not carry over, and the renamed flow is effectively an entirely new flow.
- You cannot reuse a flow name in a pipeline, because the existing checkpoint won't match the new flow definition.
Example: Write to a streaming table from multiple Kafka topics
The following examples creates a streaming table named kafka_target and writes to that streaming table from two Kafka topics:
Python
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dp.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
To learn more about the read_kafka() table-valued function used in the SQL queries, see read_kafka in the SQL language reference.
In Python, you can programmatically create multiple flows that target a single table. The following example shows this pattern for a list of Kafka topics.
Note
This pattern has the same requirements as using a for loop to create tables. You must explicitly pass a Python value to the function defining the flow. See Create tables in a for loop.
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Example: Run a one-time data backfill
If you want to run a query to append data to an existing streaming table, use append_flow.
After appending a set of existing data, you have multiple options:
- If you want the query to append new data if it arrives in the backfill directory, leave the query in place.
- If you want this to be a one time backfill, and never run again, remove the query after running the pipeline once.
- If you want the query to run once, and only run again in cases where the data is being fully refreshed, set the
onceparameter toTrueon the append flow. In SQL, useINSERT INTO ONCE.
The following examples run a query to append historical data to a streaming table:
Python
from pyspark import pipelines as dp
@dp.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dp.append_flow(
target = "csv_target",
once = True)
def backfill():
return spark.read
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO ONCE
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
For a more in-depth example, see Backfilling historical data with pipelines.
Example: Use append flow processing instead of UNION
Instead of using a query with a UNION clause, you can use append flow queries to combine multiple sources and write to a single streaming table. Using append flow queries instead of UNION allows you to append to a streaming table from multiple sources without running a full refresh.
The following Python example includes a query that combines multiple data sources with a UNION clause:
@dp.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
)
raw_orders_eu = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
)
return raw_orders_us.union(raw_orders_eu)
The following examples replace the UNION query with append flow queries:
Python
dp.create_streaming_table("raw_orders")
@dp.append_flow(target="raw_orders")
def raw_orders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dp.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);
Example: Use transformWithState to monitor sensor heartbeats
The following example shows a stateful processor that reads from Kafka and verifies that sensors are emitting heartbeats periodically. If a heartbeat isn't received within 5 minutes, the processor emits an entry to the target Delta table for analysis.
For more information about building custom stateful applications, see Build a custom stateful application.
Note
RocksDB is the default state provider starting with Databricks Runtime 17.2. If the query fails due to an unsupported provider exception, add the following pipeline configurations, perform a full refresh or checkpoint reset, and then rerun your pipeline:
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider",
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled": "true"
}
from typing import Iterator
import pandas as pd
from pyspark import pipelines as dp
from pyspark.sql.functions import col, from_json
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType, TimestampType
KAFKA_TOPIC = "<your-kafka-topic>"
output_schema = StructType([
StructField("sensor_id", LongType(), False),
StructField("sensor_type", StringType(), False),
StructField("last_heartbeat_time", TimestampType(), False)])
class SensorHeartbeatProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
# Define state schema to store sensor information (sensor_id is the grouping key)
state_schema = StructType([
StructField("sensor_type", StringType(), False),
StructField("last_heartbeat_time", TimestampType(), False)])
self.sensor_state = handle.getValueState("sensorState", state_schema)
# State variable to track the previously registered timer
timer_schema = StructType([StructField("timer_ts", LongType(), False)])
self.timer_state = handle.getValueState("timerState", timer_schema)
self.handle = handle
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
# Process one row from input and update state
pdf = next(rows)
row = pdf.iloc[0]
# Store or update the sensor information in state using current timestamp
current_time = pd.Timestamp(timerValues.getCurrentProcessingTimeInMs(), unit='ms')
self.sensor_state.update((
row["sensor_type"],
current_time
))
# Delete old timer if already registered
if self.timer_state.exists():
old_timer = self.timer_state.get()[0]
self.handle.deleteTimer(old_timer)
# Register a timer for 5 minutes from current processing time
expiry_time = timerValues.getCurrentProcessingTimeInMs() + (5 * 60 * 1000)
self.handle.registerTimer(expiry_time)
# Store the new timer timestamp in state
self.timer_state.update((expiry_time,))
# No output on input processing, output only on timer expiry
return iter([])
def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> Iterator[pd.DataFrame]:
# Emit output row based on state store
if self.sensor_state.exists():
state = self.sensor_state.get()
output = pd.DataFrame({
"sensor_id": [key[0]], # Use grouping key as sensor_id
"sensor_type": [state[0]],
"last_heartbeat_time": [state[1]]
})
# Remove the entry for the sensor from the state store
self.sensor_state.clear()
# Remove the timer state entry
self.timer_state.clear()
yield output
def close(self) -> None:
pass
dp.create_streaming_table("sensorAlerts")
# Define the schema for the Kafka message value
sensor_schema = StructType([
StructField("sensor_id", LongType(), False),
StructField("sensor_type", StringType(), False),
StructField("sensor_value", LongType(), False)])
@dp.append_flow(target = "sensorAlerts")
def kafka_delta_flow():
return (
spark.readStream
.format("kafka")
.option("subscribe", KAFKA_TOPIC)
.option("startingOffsets", "earliest")
.load()
.select(from_json(col("value").cast("string"), sensor_schema).alias("data"), col("timestamp"))
.select("data.*", "timestamp")
.withWatermark('timestamp', '1 hour')
.groupBy(col("sensor_id"))
.transformWithStateInPandas(
statefulProcessor = SensorHeartbeatProcessor(),
outputStructType = output_schema,
outputMode = 'update',
timeMode = 'ProcessingTime'))