Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Örnek: Birden çok Kafka konu başlığından akış tablosuna yazma
Aşağıdaki örnek, "kafka_target" adlı bir akış tablosu oluşturur ve iki Kafka kanalından bu akış tablosuna veri yazar.
Piton
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dp.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
SQL sorgularında kullanılan tablo değerli fonksiyon hakkında read_kafka() daha fazla bilgi edinmek için bkz. SQL dil referansında read_kafka.
Python'da program aracılığıyla tek bir tabloyu hedefleyen birden çok akış oluşturabilirsiniz. Aşağıdaki örnekte Kafka konularının listesi için bu desen gösterilmektedir.
Uyarı
Bu desen, tablo oluşturmak için döngü for kullanmakla aynı gereksinimlere sahiptir. Akışı tanımlayan işleve açıkça bir Python değeri geçirmeniz gerekir. Bkz. Döngüde for tablo oluşturma.
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Örnek: Tek seferlik veri geri doldurma işlemi yürütme
Var olan bir akış tablosuna veri eklemek için bir sorgu çalıştırmak istiyorsanız kullanın append_flow.
Var olan bir veri kümesini ekledikten sonra birden çok seçeneğiniz vardır:
- Sorgunun, backfill dizinine ulaşırsa yeni verileri eklemesini istiyorsanız sorguyu yerinde bırakın.
- Bunun tek seferlik bir dolum olmasını ve bir daha asla çalıştırılmamasını istiyorsanız, işlem hattını bir kez çalıştırdıktan sonra sorguyu kaldırın.
- Sorgunun bir kez çalıştırılmasını ve yalnızca verilerin tamamen güncellendiği durumlarda yeniden çalıştırılmasını istiyorsanız, ekleme sürecinde parametresini
onceolarakTrueayarlayın. SQL'de kullanınINSERT INTO ONCE.
Aşağıdaki örneklerde geçmiş verileri akış tablosuna eklemek için bir sorgu çalıştırılır:
Piton
from pyspark import pipelines as dp
@dp.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dp.append_flow(
target = "csv_target",
once = True)
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO ONCE
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Daha ayrıntılı bir örnek için bkz. Geçmiş verileri işlem hatlarıyla doldurma.
Örnek: Ekleme akışı işlemesini UNION yerine kullanın
Yan tümcesi olan UNION bir sorgu kullanmak yerine, birden çok kaynağı birleştirmek ve tek bir akış tablosuna yazmak için ekleme akışı sorgularını kullanabilirsiniz. Kullanıcının akış tablosuna birden çok kaynaktan veri eklemesine olanak tanıyan ekleme akış sorgularını, UNION çalıştırmadan yerine kullanabilirsiniz.
Aşağıdaki Python örneği, birden çok veri kaynağını yan tümcesiyle birleştiren bir UNION sorgu içerir:
@dp.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
)
raw_orders_eu = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
)
return raw_orders_us.union(raw_orders_eu)
Aşağıdaki örnekler sorguyu UNION ekleme akışı sorgularıyla değiştirir:
Piton
dp.create_streaming_table("raw_orders")
@dp.append_flow(target="raw_orders")
def raw_orders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dp.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);
Örnek: Algılayıcı kalp atışlarını izlemek için transformWithState kullanın
Aşağıdaki örnekte, Kafka'dan okuyan ve sensörlerin düzenli aralıklarla sinyal yaydığını doğrulayan durum bilgisine sahip bir işlemci gösterilmektedir. 5 dakika içinde sinyal alınmazsa, işlemci analiz için hedef Delta tablosuna bir giriş oluşturur.
Özel durum bilgisi olan uygulamalarla ilgili daha fazla bilgi için bkz. Özel durum bilgisi olan bir uygulama oluşturma.
Uyarı
RocksDB, Databricks Runtime 17.2 ile başlayan varsayılan durum sağlayıcısıdır. Sorgu desteklenmeyen bir sağlayıcı özel durumu nedeniyle başarısız olursa, aşağıdaki işlem hattı yapılandırmalarını ekleyin, tam yenileme veya denetim noktası sıfırlaması gerçekleştirin ve işlem hattınızı yeniden çalıştırın:
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider",
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled": "true"
}
from typing import Iterator
import pandas as pd
from pyspark import pipelines as dp
from pyspark.sql.functions import col, from_json
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType, TimestampType
KAFKA_TOPIC = "<your-kafka-topic>"
output_schema = StructType([
StructField("sensor_id", LongType(), False),
StructField("sensor_type", StringType(), False),
StructField("last_heartbeat_time", TimestampType(), False)])
class SensorHeartbeatProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
# Define state schema to store sensor information (sensor_id is the grouping key)
state_schema = StructType([
StructField("sensor_type", StringType(), False),
StructField("last_heartbeat_time", TimestampType(), False)])
self.sensor_state = handle.getValueState("sensorState", state_schema)
# State variable to track the previously registered timer
timer_schema = StructType([StructField("timer_ts", LongType(), False)])
self.timer_state = handle.getValueState("timerState", timer_schema)
self.handle = handle
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
# Process one row from input and update state
pdf = next(rows)
row = pdf.iloc[0]
# Store or update the sensor information in state using current timestamp
current_time = pd.Timestamp(timerValues.getCurrentProcessingTimeInMs(), unit='ms')
self.sensor_state.update((
row["sensor_type"],
current_time
))
# Delete old timer if already registered
if self.timer_state.exists():
old_timer = self.timer_state.get()[0]
self.handle.deleteTimer(old_timer)
# Register a timer for 5 minutes from current processing time
expiry_time = timerValues.getCurrentProcessingTimeInMs() + (5 * 60 * 1000)
self.handle.registerTimer(expiry_time)
# Store the new timer timestamp in state
self.timer_state.update((expiry_time,))
# No output on input processing, output only on timer expiry
return iter([])
def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> Iterator[pd.DataFrame]:
# Emit output row based on state store
if self.sensor_state.exists():
state = self.sensor_state.get()
output = pd.DataFrame({
"sensor_id": [key[0]], # Use grouping key as sensor_id
"sensor_type": [state[0]],
"last_heartbeat_time": [state[1]]
})
# Remove the entry for the sensor from the state store
self.sensor_state.clear()
# Remove the timer state entry
self.timer_state.clear()
yield output
def close(self) -> None:
pass
dp.create_streaming_table("sensorAlerts")
# Define the schema for the Kafka message value
sensor_schema = StructType([
StructField("sensor_id", LongType(), False),
StructField("sensor_type", StringType(), False),
StructField("sensor_value", LongType(), False)])
@dp.append_flow(target = "sensorAlerts")
def kafka_delta_flow():
return (
spark.readStream
.format("kafka")
.option("subscribe", KAFKA_TOPIC)
.option("startingOffsets", "earliest")
.load()
.select(from_json(col("value").cast("string"), sensor_schema).alias("data"), col("timestamp"))
.select("data.*", "timestamp")
.withWatermark('timestamp', '1 hour')
.groupBy(col("sensor_id"))
.transformWithStateInPandas(
statefulProcessor = SensorHeartbeatProcessor(),
outputStructType = output_schema,
outputMode = 'update',
timeMode = 'ProcessingTime'))