أنماط الدفق المنظم على Azure Databricks

يحتوي هذا على دفاتر الملاحظات ونماذج التعليمات البرمجية للأنماط الشائعة للعمل مع Structured Streaming على Azure Databricks.

البدء في الدفق المنظم

إذا كنت جديدا تماما على Structured Streaming، فشاهد تشغيل أول حمل عمل Structured Streaming.

الكتابة إلى Cassandra كمتلقي للتدفق المنظم في Python

Apache Cassandra هي قاعدة بيانات OLTP موزعة وذات زمن انتقال منخفض وقابلة للتطوير ومتاحة بشكل كبير.

يعمل الدفق المنظم مع Cassandra من خلال Spark Cassandra Connector. يدعم هذا الموصل واجهات برمجة تطبيقات RDD وDataFrame، ولديه دعم أصلي لكتابة بيانات الدفق. مهم يجب استخدام الإصدار المقابل من spark-cassandra-connector-assembly.

يتصل المثال التالي بمضيف واحد أو أكثر في نظام مجموعة قاعدة بيانات Cassandra. كما يحدد تكوينات الاتصال مثل موقع نقطة التحقق ومساحة المفاتيح المحددة وأسماء الجداول:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

الكتابة إلى Azure Synapse Analytics باستخدام foreachBatch() في Python

streamingDF.writeStream.foreachBatch() يسمح لك بإعادة استخدام كتاب بيانات الدفعات الحاليين لكتابة إخراج استعلام دفق إلى Azure Synapse Analytics. راجع وثائق foreachBatch للحصول على التفاصيل.

لتشغيل هذا المثال، تحتاج إلى موصل Azure Synapse Analytics. للحصول على تفاصيل حول موصل Azure Synapse Analytics، راجع بيانات الاستعلام في Azure Synapse Analytics.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

عمليات ربط Stream-Stream

يوضح هذان الدفتران كيفية استخدام عمليات ربط دفق الدفق في Python وSc scala.

ينضم Stream-Stream إلى دفتر ملاحظات Python

الحصول على دفتر الملاحظات

ينضم Stream-Stream إلى دفتر ملاحظات Scala

الحصول على دفتر الملاحظات