Bagikan melalui


Panduan Kluster Big Data SQL Server untuk Spark Streaming

Berlaku untuk: SQL Server 2019 (15.x)

Penting

Add-on Microsoft SQL Server 2019 untuk Kluster Big Data akan dihentikan. Dukungan untuk Kluster Big Data SQL Server 2019 akan berakhir pada 28 Februari 2025. Semua pengguna SQL Server 2019 yang ada dengan Jaminan Perangkat Lunak akan didukung sepenuhnya pada platform dan perangkat lunak akan terus dipertahankan melalui pembaruan kumulatif SQL Server hingga saat itu. Untuk informasi selengkapnya, lihat posting blog pengumuman dan opsi big data di platform Microsoft SQL Server.

Panduan ini mencakup kasus penggunaan streaming dan cara mengimplementasikannya dengan menggunakan SQL Server Big Data Clusters Spark.

Dalam panduan ini, Anda akan mempelajari cara:

  • Muat pustaka streaming untuk digunakan dengan PySpark dan Scala Spark.
  • Terapkan tiga pola streaming umum dengan menggunakan Kluster Big Data SQL Server.

Prasyarat

  • Penyebaran Kluster Big Data SQL Server
  • Salah satu opsi berikut:
    • Kluster Apache Kafka 2.0 atau yang lebih baru
    • Namespace layanan Azure Event Hubs dan pusat aktivitas

Panduan ini mengasumsikan tingkat pemahaman yang baik tentang konsep dan arsitektur teknologi streaming. Artikel berikut memberikan garis besar konseptual yang sangat baik:

Pemetaan konseptual Apache Kafka dan Azure Event Hubs

Konsep Apache Kafka Konsep Event Hubs
Kelompok Namespace
Topik Pusat aktivitas
Partisi Partisi
Grup konsumen Grup konsumen
Offset Offset

Reprodusibilitas

Panduan ini menggunakan aplikasi produsen yang disediakan dalam Mulai Cepat: Streaming data dengan Event Hubs dengan menggunakan protokol Kafka. Anda dapat menemukan aplikasi sampel dalam banyak bahasa pemrograman di Azure Event Hubs untuk Apache Kafka di GitHub. Gunakan aplikasi ini untuk memulai skenario streaming.

Nota

Salah satu langkah yang dicapai oleh panduan mulai cepat adalah mengaktifkan opsi streaming Kafka saat membuat Azure Event Hub. Konfirmasikan bahwa titik akhir Kafka untuk namespace Azure Event Hub diaktifkan.

Kode yang dimodifikasi producer.py berikut mengalirkan data JSON sensor yang disimulasikan ke mesin streaming dengan menggunakan klien yang kompatibel dengan Kafka. Perhatikan bahwa Azure Event Hubs kompatibel dengan protokol Kafka. Ikuti instruksi penyiapan di GitHub untuk membuat sampel berfungsi untuk Anda.

Semua informasi koneksi ada di conf kamus. Penyiapan Anda mungkin berbeda tergantung pada lingkungan Anda. Ganti setidaknya bootstrap.servers dan sasl.password. Pengaturan ini adalah yang paling relevan dalam sampel kode berikut.

#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation. All rights reserved.
# Copyright 2016 Confluent Inc.
# Licensed under the MIT License.
# Licensed under the Apache License, version 2.0.
#
# Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka ecosystems.

from confluent_kafka import Producer
import sys
import random
import time
import json

sensors = ["Sensor 1", "Sensor 2", "Sensor 3"]

if __name__ == '__main__':
    if len(sys.argv) != 2:
        sys.stderr.write('Usage: %s <topic>\n' % sys.argv[0])
        sys.exit(1)
    topic = sys.argv[1]

    # Producer configuration
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    # See https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka#prerequisites for SSL issues
    conf = {
        'bootstrap.servers': '',                     #replace!
        'security.protocol': 'SASL_SSL',
        'ssl.ca.location': '/usr/lib/ssl/certs/ca-certificates.crt',
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '$ConnectionString',
        'sasl.password': '<password>',               #replace!
        'client.id': 'python-sample-producer'
    }

    # Create Producer instance
    p = Producer(**conf)

    def delivery_callback(err, msg):
        if err:
            sys.stderr.write('%% Message failed delivery: %s\n' % err)
        else:
            sys.stderr.write('%% Message delivered to %s [%d] @ %o\n' % (msg.topic(), msg.partition(), msg.offset()))

    # Simulate stream
    for i in range(0, 10000):
        try:
            payload = {
                'sensor': random.choice(sensors),
                'measure1': random.gauss(37, 7),
                'measure2': random.random(),
            }
            p.produce(topic, json.dumps(payload).encode('utf-8'), callback=delivery_callback)
            #p.produce(topic, str(i), callback=delivery_callback)
        except BufferError as e:
            sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p))
        p.poll(0)
        time.sleep(2)

    # Wait until all messages have been delivered
    sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
    p.flush()

Jalankan aplikasi produsen sampel dengan menggunakan perintah berikut. Ganti <my-sample-topic> dengan informasi lingkungan Anda.

python producer.py <my-sample-topic>

Skenario streaming

Pola streaming Deskripsi dan implementasi skenario
Mengambil data dari Kafka atau Event Hubs Buat pekerjaan Spark Streaming yang menarik data terus menerus dari mesin streaming, melakukan transformasi opsional dan logika analitik.
Salurkan data streaming ke Apache Hadoop Distributed File System (HDFS) Secara umum, pola ini berkorelasi dengan pola sebelumnya. Setelah logika penarikan dan transformasi streaming, data dapat ditulis ke banyak lokasi untuk mencapai persyaratan persistensi data yang diinginkan.
Dorong dari Spark ke Kafka atau Event Hubs Setelah diproses oleh Spark, data dapat didorong kembali ke mesin streaming eksternal. Pola ini banyak diinginkan dalam banyak skenario, seperti rekomendasi produk real-time dan deteksi penipuan serta anomali dalam batch mikro.

Sampel aplikasi Spark Streaming

Aplikasi sampel ini mengimplementasikan tiga pola streaming yang dijelaskan di bagian sebelumnya. Aplikasi:

  1. Menyiapkan variabel konfigurasi untuk menyambungkan ke layanan streaming.
  2. Membuat bingkai data Spark Streaming untuk menarik data.
  3. Menulis data agregat secara lokal ke HDFS.
  4. Menuliskan data agregat ke topik yang lain dalam layanan streaming.

Berikut adalah kode lengkapnya sample-spark-streaming-python.py :

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Sets up batch size to 15 seconds
duration_ms = 15000
# Changes Spark session into a structured streaming context
sc = SparkContext.getOrCreate()
ssc = StreamingContext(sc, duration_ms)
spark = SparkSession(sc)

# Connection information
bootstrap_servers = "" # Replace!
sasl = "" # Replace!
# Topic we will consume from
topic = "sample-topic"
# Topic we will write to
topic_to = "sample-topic-processed"

# Define the schema to speed up processing
jsonSchema = StructType([StructField("sensor", StringType(), True), StructField("measure1", DoubleType(), True), StructField("measure2", DoubleType(), True)])

streaming_input_df = (
    spark.readStream \
    .format("kafka") \
    .option("subscribe", topic) \
    .option("kafka.bootstrap.servers", bootstrap_servers) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.jaas.config", sasl) \
    .option("kafka.request.timeout.ms", "60000") \
    .option("kafka.session.timeout.ms", "30000") \
    .option("failOnDataLoss", "true") \
    .option("startingOffsets", "latest") \
    .load()
)

def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF
    if df.count() <= 0:
        None
    else:
        # Create a data frame to be written to HDFS
        sensor_df = df.selectExpr('CAST(value AS STRING)').select(from_json("value", jsonSchema).alias("value")).select("value.*")
        # root
        #  |-- sensor: string (nullable = true)
        #  |-- measure1: double (nullable = true)
        #  |-- measure2: double (nullable = true)
        sensor_df.persist()
        # Write to HDFS
        sensor_df.write.format('parquet').mode('append').saveAsTable('sensor_data')
        # Create a summarization data frame
        sensor_stats_df = (sensor_df.groupBy('sensor').agg({'measure1':'avg', 'measure2':'avg', 'sensor':'count'}).withColumn('ts', current_timestamp()).withColumnRenamed('avg(measure1)', 'measure1_avg').withColumnRenamed('avg(measure2)', 'measure2_avg').withColumnRenamed('avg(measure1)', 'measure1_avg').withColumnRenamed('count(sensor)', 'count_sensor'))
        # root
        # |-- sensor: string (nullable = true)
        # |-- measure2_avg: double (nullable = true)
        # |-- measure1_avg: double (nullable = true)
        # |-- count_sensor: long (nullable = false)
        # |-- ts: timestamp (nullable = false)
        sensor_stats_df.write.format('parquet').mode('append').saveAsTable('sensor_data_stats')
        # Group by and send metrics to an output Kafka topic
        sensor_stats_df.writeStream \
            .format("kafka") \
            .option("topic", topic_to) \ 
            .option("kafka.bootstrap.servers", bootstrap_servers) \
            .option("kafka.sasl.mechanism", "PLAIN") \
            .option("kafka.security.protocol", "SASL_SSL") \
            .option("kafka.sasl.jaas.config", sasl) \
            .save()
        # For example, you could write to SQL Server
        # df.write.format('com.microsoft.sqlserver.jdbc.spark').mode('append').option('url', url).option('dbtable', datapool_table).save()
        sensor_df.unpersist()


writer = streaming_input_df.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()

Buat tabel berikut dengan menggunakan Spark SQL. Kernel PySpark di notebook Azure Data Studio adalah salah satu cara untuk menjalankan Spark SQL secara interaktif. Di notebook baru di Azure Data Studio, sambungkan ke kumpulan Spark untuk kluster big data Anda. Pilih kernel PySpark, dan jalankan hal berikut:

%%sql
CREATE TABLE IF NOT EXISTS sensor_data (sensor string, measure1 double, measure2 double)
USING PARQUET;

CREATE TABLE IF NOT EXISTS sensor_data_stats (sensor string, measure2_avg double, measure1_avg double, count_sensor long, ts timestamp)
USING PARQUET;

Menyalin aplikasi ke HDFS

azdata bdc hdfs cp --from-path sample-spark-streaming-python.py --to-path "hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py"

Mengonfigurasi pustaka Kafka

Siapkan pustaka klien Kafka di aplikasi Anda sebelum mengirimkan pekerjaan. Diperlukan dua pustaka:

  • kafka-clients - Pustaka inti ini memungkinkan dukungan dan konektivitas protokol Kafka.
  • spark-sql-kafka - Pustaka ini memungkinkan fungsionalitas bingkai data Spark SQL pada aliran Kafka.

Kedua pustaka harus:

  • Target Scala 2.12 dan Spark 3.1.2. Persyaratan Kluster Big Data SQL Server ini adalah untuk Pembaruan Kumulatif 13 (CU13) atau yang lebih baru.
  • Kompatibel dengan server Streaming Anda.

Perhatian

Sebagai aturan umum, gunakan pustaka yang kompatibel terbaru. Kode dalam panduan ini diuji dengan menggunakan Apache Kafka untuk Azure Event Hubs. Kode disediakan as-is, bukan sebagai pernyataan dukungan.

Apache Kafka menawarkan kompatibilitas klien dua arah berdasarkan desain. Tetapi implementasi pustaka bervariasi di seluruh bahasa pemrograman. Selalu lihat dokumentasi platform Kafka Anda untuk memetakan kompatibilitas dengan benar.

Berbagi lokasi pustaka untuk pekerjaan di HDFS

Jika beberapa aplikasi terhubung ke kluster Kafka yang sama, atau jika organisasi Anda memiliki satu kluster Kafka versi tunggal, salin file JAR pustaka yang sesuai ke lokasi bersama di HDFS. Maka semua pekerjaan harus mereferensikan file pustaka yang sama.

Salin perpustakaan ke lokasi umum:

azdata bdc hdfs cp --from-path kafka-clients-2.7.0.jar --to-path "hdfs:/apps/jars/kafka-clients-3.0.0.jar"
azdata bdc hdfs cp --from-path spark-sql-kafka-0-10_2.11-2.4.7.jar --to-path "hdfs:/apps/jars/spark-sql-kafka-0-10_2.12-3.1.2.jar"

Menginstal pustaka secara dinamis

Anda dapat menginstal paket secara dinamis saat mengirimkan pekerjaan dengan menggunakan fitur manajemen paket Kluster Big Data SQL Server. Terdapat penalti waktu mulai pekerjaan karena pengunduhan berulang file pustaka pada setiap pengiriman pekerjaan.

Ajukan pekerjaan Spark Streaming dengan menggunakan azdata

Contoh berikut menggunakan file JAR pustaka bersama di HDFS:

azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py \
-j '["/apps/jars/kafka-clients-3.0.0.jar","/apps/jars/spark-sql-kafka-0-10_2.12-3.1.2.jar"]' \
--config '{"spark.streaming.concurrentJobs":"3","spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyStreamingETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m

Contoh ini menggunakan manajemen paket dinamis untuk menginstal dependensi:

azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py \
--config '{"spark.jars.packages": "org.apache.kafka:kafka-clients:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2","spark.streaming.concurrentJobs":"3","spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyStreamingETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m

Langkah selanjutnya

Untuk mengirimkan pekerjaan Spark ke Kluster Big Data SQL Server dengan menggunakan azdata atau titik akhir Livy, lihat Mengirimkan pekerjaan Spark dengan menggunakan alat baris perintah.

Untuk informasi selengkapnya tentang Kluster Big Data SQL Server dan skenario terkait, lihat Kluster Big Data SQL Server.