SQL Server 大数据群集 Spark 流式处理指南

适用于:SQL Server 2019 (15.x)

重要

Microsoft SQL Server 2019 大数据群集附加产品将停用。 对 SQL Server 2019 大数据群集的支持将于 2025 年 2 月 28 日结束。 具有软件保障的 SQL Server 2019 的所有现有用户都将在平台上获得完全支持,在此之前,该软件将继续通过 SQL Server 累积更新进行维护。 有关详细信息,请参阅公告博客文章Microsoft SQL Server 平台上的大数据选项

本指南介绍了流式处理用例,还介绍如何使用 SQL Server 大数据群集 Spark 来实现这些用例。

本指南介绍如何:

  • 加载要用于 PySpark 和 Scala Spark 的流式处理库。
  • 通过使用 SQL Server 大数据群集实现 3 种常见的流式处理模式。

先决条件

  • SQL Server 大数据群集部署
  • 具有以下选项之一:
    • Apache Kafka 群集 2.0 或更高版本
    • Azure 事件中心命名空间和事件中心

本指南假设你已充分了解有关流式处理技术概念和体系结构的内容。 以下文章提供了卓越的概念基线:

Apache Kafka 和 Azure 事件中心概念映射

Apache Kafka 概念 事件中心概念
集群 Namespace
主题 事件中心
分区 分区
使用者组 使用者组
抵消 抵消

可再现性

本指南使用快速入门:使用 Kafka 协议通过事件中心进行数据流式处理中提供的创建器应用程序。 你可以在 GitHub 上的适用于 Apache Kafka 的 Azure 事件中心找到使用多种编程语言的示例应用程序。 使用这些应用程序可以快速开始流式处理方案。

注释

通过快速入门完成的步骤之一是在创建 Azure 事件中心时启用了 Kafka 流选项。 确认 Azure 事件中心命名空间的 Kafka 终结点已启用。

以下经过修改的 producer.py 代码通过使用与 Kafka 兼容的客户端将模拟的传感器 JSON 数据流式传输到流式处理引擎。 请注意,Azure 事件中心与 Kafka 协议是兼容的。 按照 GitHub 中的设置说明操作以获取适合你的示例。

所有连接信息均位于 conf 字典中。 你的设置可能因环境而异。 至少替换 bootstrap.serverssasl.password。 这些设置在以下代码示例中是最相关的。

#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation. All rights reserved.
# Copyright 2016 Confluent Inc.
# Licensed under the MIT License.
# Licensed under the Apache License, version 2.0.
#
# Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka ecosystems.

from confluent_kafka import Producer
import sys
import random
import time
import json

sensors = ["Sensor 1", "Sensor 2", "Sensor 3"]

if __name__ == '__main__':
    if len(sys.argv) != 2:
        sys.stderr.write('Usage: %s <topic>\n' % sys.argv[0])
        sys.exit(1)
    topic = sys.argv[1]

    # Producer configuration
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    # See https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka#prerequisites for SSL issues
    conf = {
        'bootstrap.servers': '',                     #replace!
        'security.protocol': 'SASL_SSL',
        'ssl.ca.location': '/usr/lib/ssl/certs/ca-certificates.crt',
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '$ConnectionString',
        'sasl.password': '<password>',               #replace!
        'client.id': 'python-sample-producer'
    }

    # Create Producer instance
    p = Producer(**conf)

    def delivery_callback(err, msg):
        if err:
            sys.stderr.write('%% Message failed delivery: %s\n' % err)
        else:
            sys.stderr.write('%% Message delivered to %s [%d] @ %o\n' % (msg.topic(), msg.partition(), msg.offset()))

    # Simulate stream
    for i in range(0, 10000):
        try:
            payload = {
                'sensor': random.choice(sensors),
                'measure1': random.gauss(37, 7),
                'measure2': random.random(),
            }
            p.produce(topic, json.dumps(payload).encode('utf-8'), callback=delivery_callback)
            #p.produce(topic, str(i), callback=delivery_callback)
        except BufferError as e:
            sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p))
        p.poll(0)
        time.sleep(2)

    # Wait until all messages have been delivered
    sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
    p.flush()

通过使用以下命令,运行示例创建器应用程序。 将 <my-sample-topic> 替换为环境信息。

python producer.py <my-sample-topic>

流式处理方案

流式处理模式 方案说明和实现
从 Kafka 或事件中心拉取 创建一个 Spark 流式处理作业来从流式处理引擎中持续拉取数据,从而执行可选转换和分析逻辑。
将流数据接收到 Apache Hadoop 分布式文件系统 (HDFS) 中 一般来说,这种模式与之前的模式相关。 在流式处理拉取和转换逻辑后,数据可被写入多个位置以实现所需的数据持久性要求。
从 Spark 推送到 Kafka 或事件中心 由 Spark 处理后,数据可能会被推送回到外部流式处理引擎。 这种模式在很多场景下都是可取的,比如实时产品建议和微批处理欺诈和异常情况检测。

示例 Spark 流式处理应用程序

此示例应用程序实现在上一部分中描述的 3 种流式处理模式。 该应用程序:

  1. 设置用于连接到流式处理服务的配置变量。
  2. 创建用于拉取数据的 Spark 流式处理数据帧。
  3. 将聚合数据本地写入到 HDFS。
  4. 将聚合数据写入到流式处理服务中的其他主题。

完整的 sample-spark-streaming-python.py 代码如下所示:

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Sets up batch size to 15 seconds
duration_ms = 15000
# Changes Spark session into a structured streaming context
sc = SparkContext.getOrCreate()
ssc = StreamingContext(sc, duration_ms)
spark = SparkSession(sc)

# Connection information
bootstrap_servers = "" # Replace!
sasl = "" # Replace!
# Topic we will consume from
topic = "sample-topic"
# Topic we will write to
topic_to = "sample-topic-processed"

# Define the schema to speed up processing
jsonSchema = StructType([StructField("sensor", StringType(), True), StructField("measure1", DoubleType(), True), StructField("measure2", DoubleType(), True)])

streaming_input_df = (
    spark.readStream \
    .format("kafka") \
    .option("subscribe", topic) \
    .option("kafka.bootstrap.servers", bootstrap_servers) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.jaas.config", sasl) \
    .option("kafka.request.timeout.ms", "60000") \
    .option("kafka.session.timeout.ms", "30000") \
    .option("failOnDataLoss", "true") \
    .option("startingOffsets", "latest") \
    .load()
)

def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF
    if df.count() <= 0:
        None
    else:
        # Create a data frame to be written to HDFS
        sensor_df = df.selectExpr('CAST(value AS STRING)').select(from_json("value", jsonSchema).alias("value")).select("value.*")
        # root
        #  |-- sensor: string (nullable = true)
        #  |-- measure1: double (nullable = true)
        #  |-- measure2: double (nullable = true)
        sensor_df.persist()
        # Write to HDFS
        sensor_df.write.format('parquet').mode('append').saveAsTable('sensor_data')
        # Create a summarization data frame
        sensor_stats_df = (sensor_df.groupBy('sensor').agg({'measure1':'avg', 'measure2':'avg', 'sensor':'count'}).withColumn('ts', current_timestamp()).withColumnRenamed('avg(measure1)', 'measure1_avg').withColumnRenamed('avg(measure2)', 'measure2_avg').withColumnRenamed('avg(measure1)', 'measure1_avg').withColumnRenamed('count(sensor)', 'count_sensor'))
        # root
        # |-- sensor: string (nullable = true)
        # |-- measure2_avg: double (nullable = true)
        # |-- measure1_avg: double (nullable = true)
        # |-- count_sensor: long (nullable = false)
        # |-- ts: timestamp (nullable = false)
        sensor_stats_df.write.format('parquet').mode('append').saveAsTable('sensor_data_stats')
        # Group by and send metrics to an output Kafka topic
        sensor_stats_df.writeStream \
            .format("kafka") \
            .option("topic", topic_to) \ 
            .option("kafka.bootstrap.servers", bootstrap_servers) \
            .option("kafka.sasl.mechanism", "PLAIN") \
            .option("kafka.security.protocol", "SASL_SSL") \
            .option("kafka.sasl.jaas.config", sasl) \
            .save()
        # For example, you could write to SQL Server
        # df.write.format('com.microsoft.sqlserver.jdbc.spark').mode('append').option('url', url).option('dbtable', datapool_table).save()
        sensor_df.unpersist()


writer = streaming_input_df.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()

使用 Spark SQL 创建以下表。 Azure Data Studio 笔记本中的 PySpark 内核是交互运行 Spark SQL 的一种方式。 在 Azure Data Studio 的新笔记本中,连接到大数据群集的 Spark 池。 选择 PySpark 内核,并执行以下操作:

%%sql
CREATE TABLE IF NOT EXISTS sensor_data (sensor string, measure1 double, measure2 double)
USING PARQUET;

CREATE TABLE IF NOT EXISTS sensor_data_stats (sensor string, measure2_avg double, measure1_avg double, count_sensor long, ts timestamp)
USING PARQUET;

将应用程序复制到 HDFS

azdata bdc hdfs cp --from-path sample-spark-streaming-python.py --to-path "hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py"

配置 Kafka 库

在提交作业之前,请在应用程序中设置 Kafka 客户端库。 需要两个库:

  • kafka-clients - 此核心库可实现 Kafka 协议支持和连接性。
  • spark-sql-kafka - 此库在 Kafka 流上启用 Spark SQL 数据帧功能。

这两个库均必须:

  • 面向 Scala 2.12 和 Spark 3.1.2。 此 SQL Server 大数据群集要求适用于累积更新 13 (CU13) 或更高版本。
  • 与流式传输服务器兼容。

谨慎

一般规则是使用最新的兼容库。 本指南中的代码已通过适用于 Azure 事件中心的 Apache Kafka 的测试。 该代码按原样提供,不作为可支持性声明。

Apache Kafka 按设计提供双向客户端兼容性。 但不同的编程语言对库的实现是不同的。 请始终参考 Kafka 平台文档来正确地映射兼容性。

共享 HDFS 上作业的库位置

如果多个应用程序连接到同一个 Kafka 群集,或者如果你的组织有单个受到版本控制的 Kafka 群集,请将相应的库 JAR 文件复制到 HDFS 上的共享位置。 然后,所有作业都应引用相同的库文件。

将库复制到常见位置:

azdata bdc hdfs cp --from-path kafka-clients-2.7.0.jar --to-path "hdfs:/apps/jars/kafka-clients-3.0.0.jar"
azdata bdc hdfs cp --from-path spark-sql-kafka-0-10_2.11-2.4.7.jar --to-path "hdfs:/apps/jars/spark-sql-kafka-0-10_2.12-3.1.2.jar"

动态安装库

通过使用 SQL Server 大数据群集的包管理功能,可以在提交作业时动态安装包。 由于每次提交作业时反复下载库文件,因此作业启动时间会变长。

通过使用 azdata 提交 Spark 流式处理作业

以下示例使用 HDFS 上的共享库 JAR 文件:

azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py \
-j '["/apps/jars/kafka-clients-3.0.0.jar","/apps/jars/spark-sql-kafka-0-10_2.12-3.1.2.jar"]' \
--config '{"spark.streaming.concurrentJobs":"3","spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyStreamingETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m

本示例使用动态包管理来安装依赖项:

azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py \
--config '{"spark.jars.packages": "org.apache.kafka:kafka-clients:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2","spark.streaming.concurrentJobs":"3","spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyStreamingETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m

后续步骤

若要使用 azdata 或 Livy 终结点将 Spark 作业提交到 SQL Server 大数据群集,请参阅使用命令行工具提交 Spark 作业

若要详细了解 SQL Server 大数据群集和相关方案,请参阅 SQL Server 大数据群集