適用対象: SQL Server 2019 (15.x)
重要
Microsoft SQL Server 2019 ビッグ データ クラスターのアドオンは廃止されます。 SQL Server 2019 ビッグ データ クラスターのサポートは、2025 年 2 月 28 日に終了します。 ソフトウェア アシュアランス付きの SQL Server 2019 を使用する既存の全ユーザーはプラットフォームで完全にサポートされ、ソフトウェアはその時点まで SQL Server の累積更新プログラムによって引き続きメンテナンスされます。 詳細については、お知らせのブログ記事と「Microsoft SQL Server プラットフォームのビッグ データ オプション」を参照してください。
このガイドでは、ストリーミングのユース ケースと、SQL Server ビッグ データ クラスター Spark を使用してそれらを実装する方法について説明します。
このガイドでは、次の方法を学習します。
- ストリーミング ライブラリを読み込んで PySpark および Scala Spark を使用する。
- SQL Server ビッグ データ クラスターを使用して、3 つの一般的なストリーミング パターンを実装する。
[前提条件]
- SQL Server ビッグ データ クラスターのデプロイ
- 次のいずれかのオプション:
- Apache Kafka クラスター2.0 以降
- Azure Event Hubs 名前空間とイベント ハブ
このガイドでは、ストリーミング テクノロジの概念とアーキテクチャについて十分に理解していることを前提とします。 次の記事では、優れた概念ベースラインについて説明します。
- データ アーキテクチャ ガイド - リアルタイム処理
- Apache Kafka アプリケーションから Azure Event Hubs を使用する
- データ アーキテクチャ ガイド - Azure でのリアルタイム メッセージ取り込みテクノロジの選択
Apache Kafka と Azure Event Hubs の概念のマッピング
Apache Kafka の概念 | Event Hubs での概念 |
---|---|
クラスター | Namespace |
トピック | イベント ハブ |
分割 | 分割 |
コンシューマー グループ | コンシューマー グループ |
オフセット | オフセット |
再現性
このガイドでは、「クイック スタート: Kafka プロトコルを使用した Event Hubs によるデータ ストリーミング」で提供されるプロデューサー アプリケーションを使用します。 GitHub 上の Apache Kafka 用の Azure Event Hubs に関するページに、さまざまなプログラミング言語のサンプル アプリケーションがあります。 これらのアプリケーションを使用して、ストリーミング シナリオをすぐに開始できます。
注
このクイック スタートで説明する手順の 1 つに、Azure Event Hub の作成時に Kafka ストリーミング オプションを有効にすることがあります。 Azure Event Hub 名前空間の Kafka エンドポイントが有効になっていることを確認します。
次の変更された producer.py
コードを実行すると、Kafka 互換のクライアントを使用して、シミュレートされたセンサー JSON データがストリーミング エンジンにストリーム配信されます。 Azure Event Hubs は Kafka プロトコルと互換性があることに注意してください。 GitHub のセットアップ手順に従って、サンプルを使用できるようにします。
接続情報はすべて conf
ディクショナリに含まれています。 セットアップは、使用する環境によって異なる場合があります。 少なくとも bootstrap.servers
と sasl.password
を置き換えます。 これらの設定は、次のコード サンプルで最も関連性があります。
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation. All rights reserved.
# Copyright 2016 Confluent Inc.
# Licensed under the MIT License.
# Licensed under the Apache License, version 2.0.
#
# Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka ecosystems.
from confluent_kafka import Producer
import sys
import random
import time
import json
sensors = ["Sensor 1", "Sensor 2", "Sensor 3"]
if __name__ == '__main__':
if len(sys.argv) != 2:
sys.stderr.write('Usage: %s <topic>\n' % sys.argv[0])
sys.exit(1)
topic = sys.argv[1]
# Producer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
# See https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka#prerequisites for SSL issues
conf = {
'bootstrap.servers': '', #replace!
'security.protocol': 'SASL_SSL',
'ssl.ca.location': '/usr/lib/ssl/certs/ca-certificates.crt',
'sasl.mechanism': 'PLAIN',
'sasl.username': '$ConnectionString',
'sasl.password': '<password>', #replace!
'client.id': 'python-sample-producer'
}
# Create Producer instance
p = Producer(**conf)
def delivery_callback(err, msg):
if err:
sys.stderr.write('%% Message failed delivery: %s\n' % err)
else:
sys.stderr.write('%% Message delivered to %s [%d] @ %o\n' % (msg.topic(), msg.partition(), msg.offset()))
# Simulate stream
for i in range(0, 10000):
try:
payload = {
'sensor': random.choice(sensors),
'measure1': random.gauss(37, 7),
'measure2': random.random(),
}
p.produce(topic, json.dumps(payload).encode('utf-8'), callback=delivery_callback)
#p.produce(topic, str(i), callback=delivery_callback)
except BufferError as e:
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p))
p.poll(0)
time.sleep(2)
# Wait until all messages have been delivered
sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()
次のコマンドを使用して、サンプル プロデューサー アプリケーションを実行します。
<my-sample-topic>
を自分の環境情報に置き換えます。
python producer.py <my-sample-topic>
ストリーミング シナリオ
ストリーミング パターン | シナリオの説明と実装 |
---|---|
Kafka または Event Hubs からプルする | ストリーミング エンジンから継続的にデータをプルする Spark Streaming ジョブを作成し、省略可能な変換および分析ロジックを実行します。 |
ストリーミング データを Apache Hadoop 分散ファイル システム (HDFS) にストリーミングする | 一般に、これは前のパターンと関連しています。 ストリーミング プルおよび変換ロジックの後、必要なデータ永続化要件を実現するために、データがさまざまな場所に書き込まれることがあります。 |
Spark から Kafka または Event Hubs にプッシュする | Spark によって処理された後、データが外部のストリーミング エンジンにプッシュバックされる可能性があります。 このパターンが必要になるシナリオは多数あります。たとえば、リアルタイムの製品推奨、マイクロバッチの不正行為および異常検出などです。 |
Spark Streaming アプリケーションのサンプル
このサンプル アプリケーションでは、前のセクションで説明した 3 つのストリーミング パターンを実装します。 アプリケーションは
- ストリーミング サービスに接続するための構成変数を設定する。
- データをプルする Spark Streaming データ フレームを作成する。
- HDFS にローカルで集計されたデータを書き込む。
- 集計データをストリーミング サービスの別のトピックに書き込む。
完全な sample-spark-streaming-python.py
コードを次に示します。
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Sets up batch size to 15 seconds
duration_ms = 15000
# Changes Spark session into a structured streaming context
sc = SparkContext.getOrCreate()
ssc = StreamingContext(sc, duration_ms)
spark = SparkSession(sc)
# Connection information
bootstrap_servers = "" # Replace!
sasl = "" # Replace!
# Topic we will consume from
topic = "sample-topic"
# Topic we will write to
topic_to = "sample-topic-processed"
# Define the schema to speed up processing
jsonSchema = StructType([StructField("sensor", StringType(), True), StructField("measure1", DoubleType(), True), StructField("measure2", DoubleType(), True)])
streaming_input_df = (
spark.readStream \
.format("kafka") \
.option("subscribe", topic) \
.option("kafka.bootstrap.servers", bootstrap_servers) \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", sasl) \
.option("kafka.request.timeout.ms", "60000") \
.option("kafka.session.timeout.ms", "30000") \
.option("failOnDataLoss", "true") \
.option("startingOffsets", "latest") \
.load()
)
def foreach_batch_function(df, epoch_id):
# Transform and write batchDF
if df.count() <= 0:
None
else:
# Create a data frame to be written to HDFS
sensor_df = df.selectExpr('CAST(value AS STRING)').select(from_json("value", jsonSchema).alias("value")).select("value.*")
# root
# |-- sensor: string (nullable = true)
# |-- measure1: double (nullable = true)
# |-- measure2: double (nullable = true)
sensor_df.persist()
# Write to HDFS
sensor_df.write.format('parquet').mode('append').saveAsTable('sensor_data')
# Create a summarization data frame
sensor_stats_df = (sensor_df.groupBy('sensor').agg({'measure1':'avg', 'measure2':'avg', 'sensor':'count'}).withColumn('ts', current_timestamp()).withColumnRenamed('avg(measure1)', 'measure1_avg').withColumnRenamed('avg(measure2)', 'measure2_avg').withColumnRenamed('avg(measure1)', 'measure1_avg').withColumnRenamed('count(sensor)', 'count_sensor'))
# root
# |-- sensor: string (nullable = true)
# |-- measure2_avg: double (nullable = true)
# |-- measure1_avg: double (nullable = true)
# |-- count_sensor: long (nullable = false)
# |-- ts: timestamp (nullable = false)
sensor_stats_df.write.format('parquet').mode('append').saveAsTable('sensor_data_stats')
# Group by and send metrics to an output Kafka topic
sensor_stats_df.writeStream \
.format("kafka") \
.option("topic", topic_to) \
.option("kafka.bootstrap.servers", bootstrap_servers) \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", sasl) \
.save()
# For example, you could write to SQL Server
# df.write.format('com.microsoft.sqlserver.jdbc.spark').mode('append').option('url', url).option('dbtable', datapool_table).save()
sensor_df.unpersist()
writer = streaming_input_df.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()
Spark SQL を使用して、次のテーブルを作成します。 Spark SQL を対話形式で実行するには、Azure Data Studio ノートブックの PySpark カーネルを使用するのも 1 つの方法です。 Azure Data Studio の新しいノートブックで、お使いのビッグ データ クラスターの Spark プールに接続します。 PySpark カーネルを選択し、次を実行します。
%%sql
CREATE TABLE IF NOT EXISTS sensor_data (sensor string, measure1 double, measure2 double)
USING PARQUET;
CREATE TABLE IF NOT EXISTS sensor_data_stats (sensor string, measure2_avg double, measure1_avg double, count_sensor long, ts timestamp)
USING PARQUET;
アプリケーションを HDFS にコピーする
azdata bdc hdfs cp --from-path sample-spark-streaming-python.py --to-path "hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py"
Kafka ライブラリを構成する
ジョブを送信するには、事前にアプリケーションで Kafka クライアント ライブラリを設定します。 次の 2 つのライブラリが必要です。
- kafka-clients - このコア ライブラリにより、Kafka プロトコルのサポートおよび接続が可能になります。
- spark-sql-kafka - このライブラリにより、Kafka ストリームに対して Spark SQL データ フレーム機能が使用できるようになります。
ライブラリは両方とも、次のようになっている必要があります。
- ターゲット Target Scala 2.12 および Spark 3.1.2。 この SQL Server ビッグ データ クラスターの要件は、累積的な更新プログラム 13 (CU13) 以降に関するものです。
- ストリーミング サーバーと互換性があること。
注意事項
原則として、最新の互換性のあるライブラリを使用します。 このガイドに含まれるコードは、Azure Event Hubs 用の Apache Kafka を使用してテスト済みです。 このコードは、サポートの可否に関するステートメントとしてではなく、そのまま提供されます。
Apache Kafka では、設計上、双方向のクライアント互換性が提供されます。 ただし、ライブラリの実装はプログラミング言語によって異なります。 互換性を正しくマッピングするために、常に Kafka プラットフォームのドキュメントを参照してください。
HDFS 上のジョブ用の共有ライブラリの場所
複数のアプリケーションが同じ Kafka クラスターに接続されている場合、または組織に 1 つのバージョン管理された Kafka クラスターがある場合は、適切なライブラリ JAR ファイルを HDFS 上の共有場所にコピーします。 その後、すべてのジョブで同じライブラリ ファイルが参照されるはずです。
ライブラリを共通の場所にコピーします。
azdata bdc hdfs cp --from-path kafka-clients-2.7.0.jar --to-path "hdfs:/apps/jars/kafka-clients-3.0.0.jar"
azdata bdc hdfs cp --from-path spark-sql-kafka-0-10_2.11-2.4.7.jar --to-path "hdfs:/apps/jars/spark-sql-kafka-0-10_2.12-3.1.2.jar"
ライブラリを動的にインストールする
ジョブを送信する際にパッケージを動的にインストールするには、SQL Server ビッグ データ クラスターのパッケージ管理機能を使用します。 ジョブが送信されるたびにライブラリ ファイルが定期的にダウンロードされるため、ジョブの起動時間が長くなります。
azdata を使用して Spark Streaming ジョブを送信する
次の例では、HDFS 上で共有ライブラリ JAR ファイルを使用します。
azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py \
-j '["/apps/jars/kafka-clients-3.0.0.jar","/apps/jars/spark-sql-kafka-0-10_2.12-3.1.2.jar"]' \
--config '{"spark.streaming.concurrentJobs":"3","spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyStreamingETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m
この例では、動的パッケージ管理を使用して依存関係をインストールします。
azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py \
--config '{"spark.jars.packages": "org.apache.kafka:kafka-clients:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2","spark.streaming.concurrentJobs":"3","spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyStreamingETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m
次のステップ
azdata
または Livy エンドポイントを使用して SQL Server ビッグ データ クラスター に Spark ジョブを送信するには、コマンドライン ツールを使用して Spark ジョブを送信に関するページを参照してください。
SQL Server ビッグ データ クラスターおよびこれに関連するシナリオの詳細については、「SQL Server ビッグ データ クラスター」を参照してください。