Leggere in inglese

Condividi tramite


Guida allo streaming con Spark per cluster Big Data di SQL Server

Si applica a: SQL Server 2019 (15.x)

Importante

Il componente aggiuntivo per i cluster Big Data di Microsoft SQL Server 2019 verrà ritirato. Il supporto per i cluster Big Data di SQL Server 2019 terminerà il 28 febbraio 2025. Tutti gli utenti esistenti di SQL Server 2019 con Software Assurance saranno completamente supportati nella piattaforma e fino a quel momento il software continuerà a ricevere aggiornamenti cumulativi di SQL Server. Per altre informazioni, vedere il post di blog relativo all'annuncio e Opzioni per i Big Data nella piattaforma Microsoft SQL Server.

Questa guida illustra i casi d'uso di streaming e come implementarli usando Spark per cluster Big Data di SQL Server.

Questa guida illustra come eseguire queste operazioni:

  • Caricare le librerie di streaming da usare con PySpark e Scala Spark.
  • Implementare tre modelli di streaming comuni usando cluster Big Data di SQL Server.

Prerequisiti

  • Una distribuzione con cluster Big Data di SQL Server
  • Una di queste opzioni:
    • Cluster Apache Kafka 2.0 o versione successiva
    • Uno spazio dei nomi di Hub eventi di Azure e un hub eventi

Questa guida presuppone un buon livello di comprensione dei concetti e delle architetture della tecnologia di streaming. Gli articoli seguenti forniscono informazioni di base concettuali eccellenti:

Mapping concettuale di Apache Kafka e Hub eventi di Azure

Concetto di Apache Kafka Concetto di Hub eventi
Cluster Spazio dei nomi
Argomento Hub eventi
Partizione Partizione
Gruppo di consumer Gruppo di consumer
Contropartita Contropartita

riproducibilità

Questa guida usa l'applicazione producer fornita in Avvio rapido: Streaming di dati con Hub eventi tramite il protocollo Kafka. È possibile trovare applicazioni di esempio in molti linguaggi di programmazione in Azure Event Hubs for Apache Kafka (Hub eventi di Azure per Apache Kafka) su GitHub. Usare queste applicazioni per implementare velocemente scenari di streaming.

Nota

Uno dei passaggi eseguiti nella guida di avvio rapido è l'abilitazione dell'opzione di streaming Kafka durante la creazione dell'hub eventi di Azure. Verificare che l'endpoint Kafka per lo spazio dei nomi dell'hub eventi di Azure sia abilitato.

Il codice producer.py modificato seguente trasmette i dati JSON dei sensori simulati al motore di streaming usando un client compatibile con Kafka. Si noti che Hub eventi di Azure è compatibile con il protocollo Kafka. Seguire le istruzioni di installazione e configurazione in GitHub per far funzionare l'esempio.

Tutte le informazioni di connessione si trovano nel dizionario conf. La configurazione può variare a seconda dell'ambiente in uso. Sostituire almeno bootstrap.servers e sasl.password. Queste impostazioni sono le più rilevanti nell'esempio di codice seguente.

Python
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation. All rights reserved.
# Copyright 2016 Confluent Inc.
# Licensed under the MIT License.
# Licensed under the Apache License, version 2.0.
#
# Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka ecosystems.

from confluent_kafka import Producer
import sys
import random
import time
import json

sensors = ["Sensor 1", "Sensor 2", "Sensor 3"]

if __name__ == '__main__':
    if len(sys.argv) != 2:
        sys.stderr.write('Usage: %s <topic>\n' % sys.argv[0])
        sys.exit(1)
    topic = sys.argv[1]

    # Producer configuration
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    # See https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka#prerequisites for SSL issues
    conf = {
        'bootstrap.servers': '',                     #replace!
        'security.protocol': 'SASL_SSL',
        'ssl.ca.location': '/usr/lib/ssl/certs/ca-certificates.crt',
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '$ConnectionString',
        'sasl.password': '<password>',               #replace!
        'client.id': 'python-sample-producer'
    }

    # Create Producer instance
    p = Producer(**conf)

    def delivery_callback(err, msg):
        if err:
            sys.stderr.write('%% Message failed delivery: %s\n' % err)
        else:
            sys.stderr.write('%% Message delivered to %s [%d] @ %o\n' % (msg.topic(), msg.partition(), msg.offset()))

    # Simulate stream
    for i in range(0, 10000):
        try:
            payload = {
                'sensor': random.choice(sensors),
                'measure1': random.gauss(37, 7),
                'measure2': random.random(),
            }
            p.produce(topic, json.dumps(payload).encode('utf-8'), callback=delivery_callback)
            #p.produce(topic, str(i), callback=delivery_callback)
        except BufferError as e:
            sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p))
        p.poll(0)
        time.sleep(2)

    # Wait until all messages have been delivered
    sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
    p.flush()

Eseguire l'applicazione producer di esempio usando il comando seguente. Sostituire <my-sample-topic> con le informazioni sull'ambiente in uso.

Bash
python producer.py <my-sample-topic>

Scenari di streaming

Modello di streaming Descrizione e implementazione dello scenario
Eseguire il pull da Kafka o da Hub eventi Creare un processo di streaming Spark che esegue il pull continuo dei dati dal motore di streaming, eseguendo trasformazioni e logica di analisi facoltative.
Eseguire il sink dei dati di streaming in Apache Hadoop Distributed File System (HDFS) In generale, questo modello è correlato al modello precedente. Dopo la logica di pull e trasformazione di streaming, i dati possono essere scritti in molte posizioni per ottenere il requisito di persistenza dei dati desiderato.
Eseguire il push da Spark in Kafka o Hub eventi Dopo l'elaborazione da Spark, è possibile eseguire il push dei dati nel motore di streaming esterno. Questo modello è auspicabile in molti scenari, ad esempio raccomandazioni sui prodotti in tempo reale e rilevamento di illeciti di micro batch e di anomalie.

Applicazione Spark Streaming di esempio

Questa applicazione di esempio implementa i tre modelli di streaming descritti nella sezione precedente. L'applicazione:

  1. Imposta le variabili di configurazione per la connessione al servizio di streaming.
  2. Crea un frame di dati di Spark Streaming per eseguire il pull dei dati.
  3. Scrive i dati aggregati in locale in HDFS.
  4. Scrive i dati aggregati in un argomento diverso nel servizio di streaming.

Di seguito è riportato il codice sample-spark-streaming-python.py completo:

Python
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Sets up batch size to 15 seconds
duration_ms = 15000
# Changes Spark session into a structured streaming context
sc = SparkContext.getOrCreate()
ssc = StreamingContext(sc, duration_ms)
spark = SparkSession(sc)

# Connection information
bootstrap_servers = "" # Replace!
sasl = "" # Replace!
# Topic we will consume from
topic = "sample-topic"
# Topic we will write to
topic_to = "sample-topic-processed"

# Define the schema to speed up processing
jsonSchema = StructType([StructField("sensor", StringType(), True), StructField("measure1", DoubleType(), True), StructField("measure2", DoubleType(), True)])

streaming_input_df = (
    spark.readStream \
    .format("kafka") \
    .option("subscribe", topic) \
    .option("kafka.bootstrap.servers", bootstrap_servers) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.jaas.config", sasl) \
    .option("kafka.request.timeout.ms", "60000") \
    .option("kafka.session.timeout.ms", "30000") \
    .option("failOnDataLoss", "true") \
    .option("startingOffsets", "latest") \
    .load()
)

def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF
    if df.count() <= 0:
        None
    else:
        # Create a data frame to be written to HDFS
        sensor_df = df.selectExpr('CAST(value AS STRING)').select(from_json("value", jsonSchema).alias("value")).select("value.*")
        # root
        #  |-- sensor: string (nullable = true)
        #  |-- measure1: double (nullable = true)
        #  |-- measure2: double (nullable = true)
        sensor_df.persist()
        # Write to HDFS
        sensor_df.write.format('parquet').mode('append').saveAsTable('sensor_data')
        # Create a summarization data frame
        sensor_stats_df = (sensor_df.groupBy('sensor').agg({'measure1':'avg', 'measure2':'avg', 'sensor':'count'}).withColumn('ts', current_timestamp()).withColumnRenamed('avg(measure1)', 'measure1_avg').withColumnRenamed('avg(measure2)', 'measure2_avg').withColumnRenamed('avg(measure1)', 'measure1_avg').withColumnRenamed('count(sensor)', 'count_sensor'))
        # root
        # |-- sensor: string (nullable = true)
        # |-- measure2_avg: double (nullable = true)
        # |-- measure1_avg: double (nullable = true)
        # |-- count_sensor: long (nullable = false)
        # |-- ts: timestamp (nullable = false)
        sensor_stats_df.write.format('parquet').mode('append').saveAsTable('sensor_data_stats')
        # Group by and send metrics to an output Kafka topic
        sensor_stats_df.writeStream \
            .format("kafka") \
            .option("topic", topic_to) \ 
            .option("kafka.bootstrap.servers", bootstrap_servers) \
            .option("kafka.sasl.mechanism", "PLAIN") \
            .option("kafka.security.protocol", "SASL_SSL") \
            .option("kafka.sasl.jaas.config", sasl) \
            .save()
        # For example, you could write to SQL Server
        # df.write.format('com.microsoft.sqlserver.jdbc.spark').mode('append').option('url', url).option('dbtable', datapool_table).save()
        sensor_df.unpersist()


writer = streaming_input_df.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()

Creare le tabelle seguenti usando Spark SQL. Il kernel PySpark in un notebook di Azure Data Studio è un modo per eseguire Spark SQL in modo interattivo. In un nuovo notebook in Azure Data Studio connettersi al pool Spark del cluster Big Data. Scegliere il kernel PySpark ed eseguire quanto segue:

Python
%%sql
CREATE TABLE IF NOT EXISTS sensor_data (sensor string, measure1 double, measure2 double)
USING PARQUET;

CREATE TABLE IF NOT EXISTS sensor_data_stats (sensor string, measure2_avg double, measure1_avg double, count_sensor long, ts timestamp)
USING PARQUET;

Copiare l'applicazione in HDFS

Bash
azdata bdc hdfs cp --from-path sample-spark-streaming-python.py --to-path "hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py"

Configurare le librerie Kafka

Configurare le librerie client Kafka con l'applicazione prima di inviare i processi. Sono necessarie due librerie:

  • kafka-clients - Questa libreria principale abilita il supporto e la connettività del protocollo Kafka.
  • spark-sql-kafka - Questa libreria abilita la funzionalità dei frame di dati Spark SQL nei flussi Kafka.

Entrambe le librerie devono:

  • Impostare Scala 2.12 e Spark 3.1.2 come destinazione. Questo requisito per i cluster Big Data di SQL Server riguarda l'aggiornamento cumulativo 13 (CU13) o versione successiva.
  • Essere compatibili con il server di streaming.

Attenzione

Come regola generale, usare la libreria compatibile più recente. Il codice in questa guida è stato testato usando Apache Kafka per Hub eventi di Azure. Il codice viene fornito così come è e non rappresenta una dichiarazione di supportabilità.

Apache Kafka offre compatibilità client bidirezionale per impostazione predefinita. Tuttavia, le implementazioni delle librerie variano nei diversi linguaggi di programmazione. Fare sempre riferimento alla documentazione della piattaforma Kafka per eseguire correttamente il mapping di compatibilità.

Condividere i percorsi delle librerie per i processi in HDFS

Se più applicazioni si connettono allo stesso cluster Kafka o se l'organizzazione ha un singolo cluster Kafka con controllo delle versioni, copiare i file JAR della libreria appropriati in un percorso condiviso in HDFS. Tutti i processi devono quindi fare riferimento agli stessi file di libreria.

Copiare le librerie nel percorso comune:

Bash
azdata bdc hdfs cp --from-path kafka-clients-2.7.0.jar --to-path "hdfs:/apps/jars/kafka-clients-3.0.0.jar"
azdata bdc hdfs cp --from-path spark-sql-kafka-0-10_2.11-2.4.7.jar --to-path "hdfs:/apps/jars/spark-sql-kafka-0-10_2.12-3.1.2.jar"

Installare in modo dinamico le librerie

È possibile installare in modo dinamico i pacchetti quando si invia un processo usando le funzionalità di gestione dei pacchetti dei cluster Big Data di SQL Server. È prevista una penalità per l'ora di avvio del processo a causa dei download ricorrenti dei file di libreria in ogni invio di processo.

Inviare il processo di Spark Streaming usando azdata

L'esempio seguente usa i file JAR della libreria condivisa in HDFS:

Bash
azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py \
-j '["/apps/jars/kafka-clients-3.0.0.jar","/apps/jars/spark-sql-kafka-0-10_2.12-3.1.2.jar"]' \
--config '{"spark.streaming.concurrentJobs":"3","spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyStreamingETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m

In questo esempio viene usata la gestione dinamica dei pacchetti per installare le dipendenze:

Bash
azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py \
--config '{"spark.jars.packages": "org.apache.kafka:kafka-clients:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2","spark.streaming.concurrentJobs":"3","spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyStreamingETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m

Passaggi successivi

Per inviare processi Spark a cluster Big Data di SQL Server usando azdata o endpoint Livy, vedere Inviare processi Spark usando gli strumenti da riga di comando.

Per altre informazioni sui cluster Big Data di SQL Server e sugli scenari correlati, vedere Cluster Big Data di SQL Server.