Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Пример. Запись в потоковую таблицу из нескольких разделов Kafka
В следующих примерах создается таблица потоковой передачи с именем kafka_target, и выполняется запись в нее из двух топиков Kafka.
Питон
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dp.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Дополнительные сведения о табличной функции, используемой в SQL-запросах, см. в справочнике по языку SQL в разделе read_kafka().
В Python можно программно создать несколько потоков, предназначенных для одной таблицы. В следующем примере показан этот шаблон для списка разделов Kafka.
Замечание
Паттерн имеет те же требования, что и цикл for, используемый для создания таблиц. Необходимо явно передать значение Python в функцию, определяющую поток. См. статью "Создание таблиц в циклеfor".
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Пример: Запуск одноразового заполнения данных
Если вы хотите запустить запрос, чтобы добавить данные в существующую потоковую таблицу, используйте append_flow.
После добавления набора существующих данных у вас есть несколько параметров:
- Если вы хотите, чтобы запрос добавлял новые данные, поступающие в директорию для догрузки данных, оставьте запрос в его текущем виде.
- Если вы хотите, чтобы это было однократное резервное заполнение, и никогда не выполняйте его снова, удалите запрос после запуска конвейера один раз.
- Если вы хотите, чтобы запрос выполнился один раз и повторно выполнялся только в тех случаях, когда данные полностью обновляются, установите параметр
onceв положениеTrueв процессе добавления. В SQL используйтеINSERT INTO ONCE.
В следующих примерах выполняется запрос для добавления исторических данных в таблицу потоковой передачи:
Питон
from pyspark import pipelines as dp
@dp.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dp.append_flow(
target = "csv_target",
once = True)
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO ONCE
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Более подробный пример см. в статье "Обратное заполнение исторических данных с помощью пайплайнов".
Пример: Используйте обработку потока добавления вместо UNION
Вместо использования запроса с UNION предложением, вы можете использовать запросы на добавление потоков для объединения нескольких источников и записи в одну потоковую таблицу. Использование запросов на добавление вместо UNION позволяет добавлять данные в потоковую таблицу из разных источников без полного обновления.
В следующем примере Python содержится запрос, который объединяет несколько источников данных с предложением UNION :
@dp.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
)
raw_orders_eu = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
)
return raw_orders_us.union(raw_orders_eu)
Следующие примеры показывают замену запроса UNION на запросы потока добавления.
Питон
dp.create_streaming_table("raw_orders")
@dp.append_flow(target="raw_orders")
def raw_orders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dp.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);
Пример. Использование transformWithState для мониторинга пульса датчика
В следующем примере показан обработчик с сохранением состояния, который считывает данные из Kafka и проверяет, что датчики периодически посылают сигналы heartbeat. Если сигнал не поступает в течение 5 минут, процессор вносит запись в целевую таблицу Delta для анализа.
Дополнительные сведения о создании пользовательских приложений с отслеживанием состояния см. в статье "Создание пользовательского приложения с отслеживанием состояния".
Замечание
RocksDB — это поставщик состояний по умолчанию, начиная с Databricks Runtime 17.2. Если запрос завершается ошибкой из-за исключения провайдера, не поддерживаемого, добавьте следующие конфигурации конвейера, выполните полное обновление или сброс этих чекпоинтов, а затем повторно запустите конвейер.
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider",
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled": "true"
}
from typing import Iterator
import pandas as pd
from pyspark import pipelines as dp
from pyspark.sql.functions import col, from_json
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType, TimestampType
KAFKA_TOPIC = "<your-kafka-topic>"
output_schema = StructType([
StructField("sensor_id", LongType(), False),
StructField("sensor_type", StringType(), False),
StructField("last_heartbeat_time", TimestampType(), False)])
class SensorHeartbeatProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
# Define state schema to store sensor information (sensor_id is the grouping key)
state_schema = StructType([
StructField("sensor_type", StringType(), False),
StructField("last_heartbeat_time", TimestampType(), False)])
self.sensor_state = handle.getValueState("sensorState", state_schema)
# State variable to track the previously registered timer
timer_schema = StructType([StructField("timer_ts", LongType(), False)])
self.timer_state = handle.getValueState("timerState", timer_schema)
self.handle = handle
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
# Process one row from input and update state
pdf = next(rows)
row = pdf.iloc[0]
# Store or update the sensor information in state using current timestamp
current_time = pd.Timestamp(timerValues.getCurrentProcessingTimeInMs(), unit='ms')
self.sensor_state.update((
row["sensor_type"],
current_time
))
# Delete old timer if already registered
if self.timer_state.exists():
old_timer = self.timer_state.get()[0]
self.handle.deleteTimer(old_timer)
# Register a timer for 5 minutes from current processing time
expiry_time = timerValues.getCurrentProcessingTimeInMs() + (5 * 60 * 1000)
self.handle.registerTimer(expiry_time)
# Store the new timer timestamp in state
self.timer_state.update((expiry_time,))
# No output on input processing, output only on timer expiry
return iter([])
def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> Iterator[pd.DataFrame]:
# Emit output row based on state store
if self.sensor_state.exists():
state = self.sensor_state.get()
output = pd.DataFrame({
"sensor_id": [key[0]], # Use grouping key as sensor_id
"sensor_type": [state[0]],
"last_heartbeat_time": [state[1]]
})
# Remove the entry for the sensor from the state store
self.sensor_state.clear()
# Remove the timer state entry
self.timer_state.clear()
yield output
def close(self) -> None:
pass
dp.create_streaming_table("sensorAlerts")
# Define the schema for the Kafka message value
sensor_schema = StructType([
StructField("sensor_id", LongType(), False),
StructField("sensor_type", StringType(), False),
StructField("sensor_value", LongType(), False)])
@dp.append_flow(target = "sensorAlerts")
def kafka_delta_flow():
return (
spark.readStream
.format("kafka")
.option("subscribe", KAFKA_TOPIC)
.option("startingOffsets", "earliest")
.load()
.select(from_json(col("value").cast("string"), sensor_schema).alias("data"), col("timestamp"))
.select("data.*", "timestamp")
.withWatermark('timestamp', '1 hour')
.groupBy(col("sensor_id"))
.transformWithStateInPandas(
statefulProcessor = SensorHeartbeatProcessor(),
outputStructType = output_schema,
outputMode = 'update',
timeMode = 'ProcessingTime'))