Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Bu, Azure Databricks'te Yapılandırılmış Akış ile çalışmaya yönelik yaygın desenlere yönelik not defterlerini ve kod örneklerini içerir.
Yapılandırılmış Akış'ı kullanmaya başlama
Yapılandırılmış Akış'a yepyeniyseniz, İlk Yapılandırılmış Akış iş yükünüzü çalıştırma bölümüne bakın.
Python'da Yapılandırılmış Akış için Cassandra'ya havuz olarak yazma
Apache Cassandra dağıtılmış, düşük gecikme süreli, ölçeklenebilir, yüksek oranda kullanılabilir bir OLTP veritabanıdır.
Yapılandırılmış Akış, Spark Cassandra Bağlayıcısı aracılığıyla Cassandra ile birlikte çalışır. Bu bağlayıcı hem RDD hem de DataFrame API'lerini destekler ve akış verileri yazmak için yerel desteğe sahiptir. Önemli Spark-cassandra-connector-assembly'in ilgili sürümünü kullanmanız gerekir.
Aşağıdaki örnek, Cassandra veritabanı kümesindeki bir veya daha fazla ana bilgisayara bağlanır. Ayrıca denetim noktası konumu, belirli anahtar alanı ve tablo adları gibi bağlantı yapılandırmalarını da belirtir:
spark.conf.set("spark.cassandra.connection.host", "host1,host2")
df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()
Python'da kullanarak foreachBatch() Azure Synapse Analytics'e yazma
streamingDF.writeStream.foreachBatch() , bir akış sorgusunun çıkışını Azure Synapse Analytics'e yazmak için mevcut toplu veri yazıcılarını yeniden kullanmanıza olanak tanır.
Ayrıntılar için foreachBatch belgelerine bakın.
Bu örneği çalıştırmak için Azure Synapse Analytics bağlayıcısına ihtiyacınız vardır. Azure Synapse Analytics bağlayıcısı hakkında ayrıntılı bilgi için Azure Synapse Analytics'te verileri sorgulama'ya bakın.
from pyspark.sql.functions import *
from pyspark.sql import *
def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)
Akışlar arası birleştirmeler
Bu iki defter, Python ve Scala'da akıştan akışa birleştirmelerin nasıl kullanılacağını gösterir.