次の方法で共有


SQL Server ビッグ データ クラスター Spark ストリーミングのガイド

適用対象: SQL Server 2019 (15.x)

重要

Microsoft SQL Server 2019 ビッグ データ クラスターのアドオンは廃止されます。 SQL Server 2019 ビッグ データ クラスターのサポートは、2025 年 2 月 28 日に終了します。 ソフトウェア アシュアランス付きの SQL Server 2019 を使用する既存の全ユーザーはプラットフォームで完全にサポートされ、ソフトウェアはその時点まで SQL Server の累積更新プログラムによって引き続きメンテナンスされます。 詳細については、お知らせのブログ記事と「Microsoft SQL Server プラットフォームのビッグ データ オプション」を参照してください。

このガイドでは、ストリーミングのユース ケースと、SQL Server ビッグ データ クラスター Spark を使用してそれらを実装する方法について説明します。

このガイドでは、次の方法を学習します。

  • ストリーミング ライブラリを読み込んで PySpark および Scala Spark を使用する。
  • SQL Server ビッグ データ クラスターを使用して、3 つの一般的なストリーミング パターンを実装する。

[前提条件]

  • SQL Server ビッグ データ クラスターのデプロイ
  • 次のいずれかのオプション:
    • Apache Kafka クラスター2.0 以降
    • Azure Event Hubs 名前空間とイベント ハブ

このガイドでは、ストリーミング テクノロジの概念とアーキテクチャについて十分に理解していることを前提とします。 次の記事では、優れた概念ベースラインについて説明します。

Apache Kafka と Azure Event Hubs の概念のマッピング

Apache Kafka の概念 Event Hubs での概念
クラスター Namespace
トピック イベント ハブ
分割 分割
コンシューマー グループ コンシューマー グループ
オフセット オフセット

再現性

このガイドでは、「クイック スタート: Kafka プロトコルを使用した Event Hubs によるデータ ストリーミング」で提供されるプロデューサー アプリケーションを使用します。 GitHub 上の Apache Kafka 用の Azure Event Hubs に関するページに、さまざまなプログラミング言語のサンプル アプリケーションがあります。 これらのアプリケーションを使用して、ストリーミング シナリオをすぐに開始できます。

このクイック スタートで説明する手順の 1 つに、Azure Event Hub の作成時に Kafka ストリーミング オプションを有効にすることがあります。 Azure Event Hub 名前空間の Kafka エンドポイントが有効になっていることを確認します。

次の変更された producer.py コードを実行すると、Kafka 互換のクライアントを使用して、シミュレートされたセンサー JSON データがストリーミング エンジンにストリーム配信されます。 Azure Event Hubs は Kafka プロトコルと互換性があることに注意してください。 GitHub のセットアップ手順に従って、サンプルを使用できるようにします。

接続情報はすべて conf ディクショナリに含まれています。 セットアップは、使用する環境によって異なる場合があります。 少なくとも bootstrap.serverssasl.password を置き換えます。 これらの設定は、次のコード サンプルで最も関連性があります。

#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation. All rights reserved.
# Copyright 2016 Confluent Inc.
# Licensed under the MIT License.
# Licensed under the Apache License, version 2.0.
#
# Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka ecosystems.

from confluent_kafka import Producer
import sys
import random
import time
import json

sensors = ["Sensor 1", "Sensor 2", "Sensor 3"]

if __name__ == '__main__':
    if len(sys.argv) != 2:
        sys.stderr.write('Usage: %s <topic>\n' % sys.argv[0])
        sys.exit(1)
    topic = sys.argv[1]

    # Producer configuration
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    # See https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka#prerequisites for SSL issues
    conf = {
        'bootstrap.servers': '',                     #replace!
        'security.protocol': 'SASL_SSL',
        'ssl.ca.location': '/usr/lib/ssl/certs/ca-certificates.crt',
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '$ConnectionString',
        'sasl.password': '<password>',               #replace!
        'client.id': 'python-sample-producer'
    }

    # Create Producer instance
    p = Producer(**conf)

    def delivery_callback(err, msg):
        if err:
            sys.stderr.write('%% Message failed delivery: %s\n' % err)
        else:
            sys.stderr.write('%% Message delivered to %s [%d] @ %o\n' % (msg.topic(), msg.partition(), msg.offset()))

    # Simulate stream
    for i in range(0, 10000):
        try:
            payload = {
                'sensor': random.choice(sensors),
                'measure1': random.gauss(37, 7),
                'measure2': random.random(),
            }
            p.produce(topic, json.dumps(payload).encode('utf-8'), callback=delivery_callback)
            #p.produce(topic, str(i), callback=delivery_callback)
        except BufferError as e:
            sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p))
        p.poll(0)
        time.sleep(2)

    # Wait until all messages have been delivered
    sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
    p.flush()

次のコマンドを使用して、サンプル プロデューサー アプリケーションを実行します。 <my-sample-topic> を自分の環境情報に置き換えます。

python producer.py <my-sample-topic>

ストリーミング シナリオ

ストリーミング パターン シナリオの説明と実装
Kafka または Event Hubs からプルする ストリーミング エンジンから継続的にデータをプルする Spark Streaming ジョブを作成し、省略可能な変換および分析ロジックを実行します。
ストリーミング データを Apache Hadoop 分散ファイル システム (HDFS) にストリーミングする 一般に、これは前のパターンと関連しています。 ストリーミング プルおよび変換ロジックの後、必要なデータ永続化要件を実現するために、データがさまざまな場所に書き込まれることがあります。
Spark から Kafka または Event Hubs にプッシュする Spark によって処理された後、データが外部のストリーミング エンジンにプッシュバックされる可能性があります。 このパターンが必要になるシナリオは多数あります。たとえば、リアルタイムの製品推奨、マイクロバッチの不正行為および異常検出などです。

Spark Streaming アプリケーションのサンプル

このサンプル アプリケーションでは、前のセクションで説明した 3 つのストリーミング パターンを実装します。 アプリケーションは

  1. ストリーミング サービスに接続するための構成変数を設定する。
  2. データをプルする Spark Streaming データ フレームを作成する。
  3. HDFS にローカルで集計されたデータを書き込む。
  4. 集計データをストリーミング サービスの別のトピックに書き込む。

完全な sample-spark-streaming-python.py コードを次に示します。

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Sets up batch size to 15 seconds
duration_ms = 15000
# Changes Spark session into a structured streaming context
sc = SparkContext.getOrCreate()
ssc = StreamingContext(sc, duration_ms)
spark = SparkSession(sc)

# Connection information
bootstrap_servers = "" # Replace!
sasl = "" # Replace!
# Topic we will consume from
topic = "sample-topic"
# Topic we will write to
topic_to = "sample-topic-processed"

# Define the schema to speed up processing
jsonSchema = StructType([StructField("sensor", StringType(), True), StructField("measure1", DoubleType(), True), StructField("measure2", DoubleType(), True)])

streaming_input_df = (
    spark.readStream \
    .format("kafka") \
    .option("subscribe", topic) \
    .option("kafka.bootstrap.servers", bootstrap_servers) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.jaas.config", sasl) \
    .option("kafka.request.timeout.ms", "60000") \
    .option("kafka.session.timeout.ms", "30000") \
    .option("failOnDataLoss", "true") \
    .option("startingOffsets", "latest") \
    .load()
)

def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF
    if df.count() <= 0:
        None
    else:
        # Create a data frame to be written to HDFS
        sensor_df = df.selectExpr('CAST(value AS STRING)').select(from_json("value", jsonSchema).alias("value")).select("value.*")
        # root
        #  |-- sensor: string (nullable = true)
        #  |-- measure1: double (nullable = true)
        #  |-- measure2: double (nullable = true)
        sensor_df.persist()
        # Write to HDFS
        sensor_df.write.format('parquet').mode('append').saveAsTable('sensor_data')
        # Create a summarization data frame
        sensor_stats_df = (sensor_df.groupBy('sensor').agg({'measure1':'avg', 'measure2':'avg', 'sensor':'count'}).withColumn('ts', current_timestamp()).withColumnRenamed('avg(measure1)', 'measure1_avg').withColumnRenamed('avg(measure2)', 'measure2_avg').withColumnRenamed('avg(measure1)', 'measure1_avg').withColumnRenamed('count(sensor)', 'count_sensor'))
        # root
        # |-- sensor: string (nullable = true)
        # |-- measure2_avg: double (nullable = true)
        # |-- measure1_avg: double (nullable = true)
        # |-- count_sensor: long (nullable = false)
        # |-- ts: timestamp (nullable = false)
        sensor_stats_df.write.format('parquet').mode('append').saveAsTable('sensor_data_stats')
        # Group by and send metrics to an output Kafka topic
        sensor_stats_df.writeStream \
            .format("kafka") \
            .option("topic", topic_to) \ 
            .option("kafka.bootstrap.servers", bootstrap_servers) \
            .option("kafka.sasl.mechanism", "PLAIN") \
            .option("kafka.security.protocol", "SASL_SSL") \
            .option("kafka.sasl.jaas.config", sasl) \
            .save()
        # For example, you could write to SQL Server
        # df.write.format('com.microsoft.sqlserver.jdbc.spark').mode('append').option('url', url).option('dbtable', datapool_table).save()
        sensor_df.unpersist()


writer = streaming_input_df.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()

Spark SQL を使用して、次のテーブルを作成します。 Spark SQL を対話形式で実行するには、Azure Data Studio ノートブックの PySpark カーネルを使用するのも 1 つの方法です。 Azure Data Studio の新しいノートブックで、お使いのビッグ データ クラスターの Spark プールに接続します。 PySpark カーネルを選択し、次を実行します。

%%sql
CREATE TABLE IF NOT EXISTS sensor_data (sensor string, measure1 double, measure2 double)
USING PARQUET;

CREATE TABLE IF NOT EXISTS sensor_data_stats (sensor string, measure2_avg double, measure1_avg double, count_sensor long, ts timestamp)
USING PARQUET;

アプリケーションを HDFS にコピーする

azdata bdc hdfs cp --from-path sample-spark-streaming-python.py --to-path "hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py"

Kafka ライブラリを構成する

ジョブを送信するには、事前にアプリケーションで Kafka クライアント ライブラリを設定します。 次の 2 つのライブラリが必要です。

  • kafka-clients - このコア ライブラリにより、Kafka プロトコルのサポートおよび接続が可能になります。
  • spark-sql-kafka - このライブラリにより、Kafka ストリームに対して Spark SQL データ フレーム機能が使用できるようになります。

ライブラリは両方とも、次のようになっている必要があります。

  • ターゲット Target Scala 2.12 および Spark 3.1.2。 この SQL Server ビッグ データ クラスターの要件は、累積的な更新プログラム 13 (CU13) 以降に関するものです。
  • ストリーミング サーバーと互換性があること。

注意事項

原則として、最新の互換性のあるライブラリを使用します。 このガイドに含まれるコードは、Azure Event Hubs 用の Apache Kafka を使用してテスト済みです。 このコードは、サポートの可否に関するステートメントとしてではなく、そのまま提供されます。

Apache Kafka では、設計上、双方向のクライアント互換性が提供されます。 ただし、ライブラリの実装はプログラミング言語によって異なります。 互換性を正しくマッピングするために、常に Kafka プラットフォームのドキュメントを参照してください。

HDFS 上のジョブ用の共有ライブラリの場所

複数のアプリケーションが同じ Kafka クラスターに接続されている場合、または組織に 1 つのバージョン管理された Kafka クラスターがある場合は、適切なライブラリ JAR ファイルを HDFS 上の共有場所にコピーします。 その後、すべてのジョブで同じライブラリ ファイルが参照されるはずです。

ライブラリを共通の場所にコピーします。

azdata bdc hdfs cp --from-path kafka-clients-2.7.0.jar --to-path "hdfs:/apps/jars/kafka-clients-3.0.0.jar"
azdata bdc hdfs cp --from-path spark-sql-kafka-0-10_2.11-2.4.7.jar --to-path "hdfs:/apps/jars/spark-sql-kafka-0-10_2.12-3.1.2.jar"

ライブラリを動的にインストールする

ジョブを送信する際にパッケージを動的にインストールするには、SQL Server ビッグ データ クラスターのパッケージ管理機能を使用します。 ジョブが送信されるたびにライブラリ ファイルが定期的にダウンロードされるため、ジョブの起動時間が長くなります。

azdata を使用して Spark Streaming ジョブを送信する

次の例では、HDFS 上で共有ライブラリ JAR ファイルを使用します。

azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py \
-j '["/apps/jars/kafka-clients-3.0.0.jar","/apps/jars/spark-sql-kafka-0-10_2.12-3.1.2.jar"]' \
--config '{"spark.streaming.concurrentJobs":"3","spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyStreamingETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m

この例では、動的パッケージ管理を使用して依存関係をインストールします。

azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py \
--config '{"spark.jars.packages": "org.apache.kafka:kafka-clients:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2","spark.streaming.concurrentJobs":"3","spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyStreamingETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m

次のステップ

azdata または Livy エンドポイントを使用して SQL Server ビッグ データ クラスター に Spark ジョブを送信するには、コマンドライン ツールを使用して Spark ジョブを送信に関するページを参照してください。

SQL Server ビッグ データ クラスターおよびこれに関連するシナリオの詳細については、「SQL Server ビッグ データ クラスター」を参照してください。