أنماط الدفق المنظم على Azure Databricks
يحتوي هذا على دفاتر الملاحظات ونماذج التعليمات البرمجية للأنماط الشائعة للعمل مع Structured Streaming على Azure Databricks.
البدء في الدفق المنظم
إذا كنت جديدا تماما على Structured Streaming، فشاهد تشغيل أول حمل عمل Structured Streaming.
الكتابة إلى Cassandra كمتلقي للتدفق المنظم في Python
Apache Cassandra هي قاعدة بيانات OLTP موزعة وذات زمن انتقال منخفض وقابلة للتطوير ومتاحة بشكل كبير.
يعمل الدفق المنظم مع Cassandra من خلال Spark Cassandra Connector. يدعم هذا الموصل واجهات برمجة تطبيقات RDD وDataFrame، ولديه دعم أصلي لكتابة بيانات الدفق. مهم يجب استخدام الإصدار المقابل من spark-cassandra-connector-assembly.
يتصل المثال التالي بمضيف واحد أو أكثر في نظام مجموعة قاعدة بيانات Cassandra. كما يحدد تكوينات الاتصال مثل موقع نقطة التحقق ومساحة المفاتيح المحددة وأسماء الجداول:
spark.conf.set("spark.cassandra.connection.host", "host1,host2")
df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()
الكتابة إلى Azure Synapse Analytics باستخدام foreachBatch()
في Python
streamingDF.writeStream.foreachBatch()
يسمح لك بإعادة استخدام كتاب بيانات الدفعات الحاليين لكتابة إخراج استعلام دفق إلى Azure Synapse Analytics. راجع وثائق foreachBatch للحصول على التفاصيل.
لتشغيل هذا المثال، تحتاج إلى موصل Azure Synapse Analytics. للحصول على تفاصيل حول موصل Azure Synapse Analytics، راجع بيانات الاستعلام في Azure Synapse Analytics.
from pyspark.sql.functions import *
from pyspark.sql import *
def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)
عمليات ربط Stream-Stream
يوضح هذان الدفتران كيفية استخدام عمليات ربط دفق الدفق في Python وSc scala.