Megosztás a következőn keresztül:


Strukturált streamelési minták az Azure Databricksben

Ez jegyzetfüzeteket és kódmintákat tartalmaz a strukturált streamelés Azure Databricksen való használatához szükséges gyakori mintákhoz.

A strukturált streamelés használatának első lépései

Ha teljesen új a strukturált streamelésben, olvassa el az első strukturált streamelési számítási feladat futtatása című témakört.

Írás a Cassandra-ba fogadóként a Python strukturált streameléséhez

Az Apache Cassandra egy elosztott, alacsony késésű, méretezhető, magas rendelkezésre állású OLTP-adatbázis.

A strukturált streamelés a Cassandra-val a Spark Cassandra-összekötőn keresztül működik. Ez az összekötő rdd és DataFrame API-kat is támogat, és natív támogatást nyújt a streamelési adatok írásához. Fontos: A spark-cassandra-connector-assembly megfelelő verzióját kell használnia.

Az alábbi példa egy Cassandra-adatbázisfürt egy vagy több gazdagépéhez csatlakozik. Emellett megadja a kapcsolatkonfigurációkat, például az ellenőrzőpont helyét, valamint az adott kulcsteret és táblaneveket:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Írás az Azure Synapse Analyticsbe a Python használatával foreachBatch()

streamingDF.writeStream.foreachBatch() Lehetővé teszi a meglévő kötegelt adatírók újrafelhasználását egy streamelési lekérdezés kimenetének az Azure Synapse Analyticsbe való írásához. A részletekért tekintse meg a foreachBatch dokumentációját .

A példa futtatásához szüksége van az Azure Synapse Analytics-összekötőre. Az Azure Synapse Analytics-összekötővel kapcsolatos részletekért tekintse meg az Azure Synapse Analytics lekérdezési adatait.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Streamek csatlakoztatása

Ez a két jegyzetfüzet bemutatja, hogyan használhat stream-stream illesztéseket a Pythonban és a Scalában.

A Stream-Stream összekapcsolja a Python-jegyzetfüzetet

Jegyzetfüzet beszerzése

A Stream-Stream a Scala-jegyzetfüzethez csatlakozik

Jegyzetfüzet beszerzése