Bagikan melalui


Pola Streaming Terstruktur di Azure Databricks

Ini berisi buku catatan dan sampel kode untuk pola umum untuk bekerja dengan Streaming Terstruktur di Azure Databricks.

Mulai menggunakan Streaming Terstruktur

Jika Anda baru menggunakan Streaming Terstruktur, lihat Menjalankan beban kerja Streaming Terstruktur pertama Anda.

Menulis ke Cassandra sebagai sink untuk Streaming Terstruktur di Python

Apache Cassandra adalah database OLTP terdistribusi, latensi rendah, dapat diskalakan, dan sangat tersedia.

Streaming Terstruktur berfungsi dengan Cassandra melalui Koneksi or Spark Cassandra. Konektor ini mendukung API RDD dan DataFrame, dan memiliki dukungan asli untuk menulis data streaming. Penting Anda harus menggunakan versi yang sesuai dari spark-cassandra-connector-assembly.

Contoh berikut terhubung ke satu atau beberapa host di kluster database Cassandra. Ini juga menentukan konfigurasi koneksi seperti lokasi titik pemeriksaan dan keyspace dan nama tabel tertentu:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Menulis ke Azure Synapse Analytics menggunakan foreachBatch() di Python

streamingDF.writeStream.foreachBatch() memungkinkan Anda menggunakan kembali penulis data batch yang ada untuk mengalirkan output kueri streaming ke Azure Synapse Analytics. Lihat dokumentasi foreachBatch untuk detailnya.

Untuk menjalankan contoh ini, Anda memerlukan konektor Azure Synapse Analytics. Untuk detail tentang konektor Azure Synapse Analytics, lihat Data kueri di Azure Synapse Analytics.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Aliran-Aliran gabungan

Kedua buku catatan tersebut menunjukkan cara menggunakan aliran-aliran gabungan di Python dan Scala.

Aliran-Aliran gabungan buku catatan Python notebook

Dapatkan buku catatan

Aliran-Aliran gabungan buku catatan Scala

Dapatkan buku catatan