다음을 통해 공유


Azure Databricks의 구조적 스트리밍 패턴

여기에는 Azure Databricks에서 구조적 스트리밍을 사용하기 위한 일반적인 패턴에 대한 Notebook 및 코드 샘플이 포함되어 있습니다.

구조적 스트리밍 시작

구조적 스트리밍을 처음 접하는 경우 첫 번째 구조적 스트리밍 워크로드 실행을 참조하세요.

Python에서 구조적 스트리밍을 위한 싱크로 Cassandra에 쓰기

Apache Cassandra 는 분산되고 대기 시간이 짧으며 확장성이 뛰어난 고가용성 OLTP 데이터베이스입니다.

구조적 스트리밍은 Spark Cassandra 커넥터를 통해 Cassandra와 함께 작동합니다. 이 커넥터는 RDD 및 DataFrame API를 모두 지원하며 스트리밍 데이터 쓰기를 기본적으로 지원합니다. 중요 해당 버전의 spark-cassandra-connector-assembly를 사용해야 합니다.

다음 예제에서는 Cassandra 데이터베이스 클러스터에서 하나 이상의 호스트에 연결합니다. 또한 검사점 위치, 특정 키스페이스 및 테이블 이름과 같은 연결 구성을 지정합니다.

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Python에서 사용하여 foreachBatch() Azure Synapse Analytics에 쓰기

streamingDF.writeStream.foreachBatch()를 통해 기존 일괄 처리 데이터 기록기를 다시 사용하여 스트리밍 쿼리의 출력을 Azure Synapse Analytics에 쓸 수 있습니다. 자세한 내용은 foreachBatch 설명서를 참조하세요.

이 예제를 실행하려면 Azure Synapse Analytics 커넥터가 필요합니다. Azure Synapse Analytics 커넥터에 대한 자세한 내용은 Azure Synapse Analytics에서 데이터 쿼리를 참조하세요.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

스트림-스트림 조인

이러한 두 Notebook은 Python 및 Scala에서 스트림-스트림 조인을 사용하는 방법을 보여 줍니다.

스트림-스트림 조인 Python Notebook

전자 필기장 가져오기

스트림-스트림 조인 Scala Notebook

전자 필기장 가져오기