Udostępnij za pośrednictwem


Wzorce przesyłania strumieniowego ze strukturą w usłudze Azure Databricks

Zawiera to notesy i przykłady kodu dla typowych wzorców pracy ze strukturą przesyłania strumieniowego w usłudze Azure Databricks.

Wprowadzenie do przesyłania strumieniowego ze strukturą

Jeśli dopiero zaczynasz korzystać z przesyłania strumieniowego ze strukturą, zobacz Uruchamianie pierwszego obciążenia przesyłania strumieniowego ze strukturą.

Zapisywanie w systemie Cassandra jako ujście dla przesyłania strumieniowego ze strukturą w języku Python

Apache Cassandra to rozproszona, mała opóźnienia, skalowalna, wysoce dostępna baza danych OLTP.

Przesyłanie strumieniowe ze strukturą współpracuje z rozwiązaniem Cassandra za pośrednictwem Połączenie or platformy Spark. Ten łącznik obsługuje zarówno interfejsy API RDD, jak i DataFrame oraz natywną obsługę zapisywania danych przesyłanych strumieniowo. Ważne Należy użyć odpowiedniej wersji zestawu spark-cassandra-connector-assembly.

Poniższy przykład łączy się z co najmniej jednym hostem w klastrze bazy danych Cassandra. Określa również konfiguracje połączeń, takie jak lokalizacja punktu kontrolnego i określone nazwy przestrzeni kluczy i tabel:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Zapisywanie w usłudze Azure Synapse Analytics przy użyciu języka foreachBatch() Python

streamingDF.writeStream.foreachBatch() umożliwia ponowne użycie istniejących składników zapisywania danych wsadowych w celu zapisania danych wyjściowych zapytania przesyłania strumieniowego do usługi Azure Synapse Analytics. Aby uzyskać szczegółowe informacje, zobacz dokumentację foreachBatch.

Aby uruchomić ten przykład, potrzebny jest łącznik usługi Azure Synapse Analytics. Aby uzyskać szczegółowe informacje na temat łącznika usługi Azure Synapse Analytics, zobacz Query data in Azure Synapse Analytics (Wykonywanie zapytań o dane w usłudze Azure Synapse Analytics).

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Sprzężenia strumień-strumień

Te dwa notesy pokazują, jak używać sprzężeń strumienia w językach Python i Scala.

Usługa Stream-Stream dołącza do notesu języka Python

Pobierz notes

Usługa Stream-Stream dołącza do notesu Scala

Pobierz notes