Eventi
31 mar, 23 - 2 apr, 23
Il più grande evento di apprendimento di SQL, Infrastruttura e Power BI. 31 marzo - 2 aprile. Usare il codice FABINSIDER per salvare $400.
Iscriviti oggi stessoQuesto browser non è più supportato.
Esegui l'aggiornamento a Microsoft Edge per sfruttare i vantaggi di funzionalità più recenti, aggiornamenti della sicurezza e supporto tecnico.
Si applica a: SQL Server 2019 (15.x)
Importante
Il componente aggiuntivo per i cluster Big Data di Microsoft SQL Server 2019 verrà ritirato. Il supporto per i cluster Big Data di SQL Server 2019 terminerà il 28 febbraio 2025. Tutti gli utenti esistenti di SQL Server 2019 con Software Assurance saranno completamente supportati nella piattaforma e fino a quel momento il software continuerà a ricevere aggiornamenti cumulativi di SQL Server. Per altre informazioni, vedere il post di blog relativo all'annuncio e Opzioni per i Big Data nella piattaforma Microsoft SQL Server.
Questa guida illustra i casi d'uso di streaming e come implementarli usando Spark per cluster Big Data di SQL Server.
Questa guida illustra come eseguire queste operazioni:
Questa guida presuppone un buon livello di comprensione dei concetti e delle architetture della tecnologia di streaming. Gli articoli seguenti forniscono informazioni di base concettuali eccellenti:
Concetto di Apache Kafka | Concetto di Hub eventi |
---|---|
Cluster | Spazio dei nomi |
Argomento | Hub eventi |
Partizione | Partizione |
Gruppo di consumer | Gruppo di consumer |
Contropartita | Contropartita |
Questa guida usa l'applicazione producer fornita in Avvio rapido: Streaming di dati con Hub eventi tramite il protocollo Kafka. È possibile trovare applicazioni di esempio in molti linguaggi di programmazione in Azure Event Hubs for Apache Kafka (Hub eventi di Azure per Apache Kafka) su GitHub. Usare queste applicazioni per implementare velocemente scenari di streaming.
Nota
Uno dei passaggi eseguiti nella guida di avvio rapido è l'abilitazione dell'opzione di streaming Kafka durante la creazione dell'hub eventi di Azure. Verificare che l'endpoint Kafka per lo spazio dei nomi dell'hub eventi di Azure sia abilitato.
Il codice producer.py
modificato seguente trasmette i dati JSON dei sensori simulati al motore di streaming usando un client compatibile con Kafka. Si noti che Hub eventi di Azure è compatibile con il protocollo Kafka. Seguire le istruzioni di installazione e configurazione in GitHub per far funzionare l'esempio.
Tutte le informazioni di connessione si trovano nel dizionario conf
. La configurazione può variare a seconda dell'ambiente in uso. Sostituire almeno bootstrap.servers
e sasl.password
. Queste impostazioni sono le più rilevanti nell'esempio di codice seguente.
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation. All rights reserved.
# Copyright 2016 Confluent Inc.
# Licensed under the MIT License.
# Licensed under the Apache License, version 2.0.
#
# Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka ecosystems.
from confluent_kafka import Producer
import sys
import random
import time
import json
sensors = ["Sensor 1", "Sensor 2", "Sensor 3"]
if __name__ == '__main__':
if len(sys.argv) != 2:
sys.stderr.write('Usage: %s <topic>\n' % sys.argv[0])
sys.exit(1)
topic = sys.argv[1]
# Producer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
# See https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka#prerequisites for SSL issues
conf = {
'bootstrap.servers': '', #replace!
'security.protocol': 'SASL_SSL',
'ssl.ca.location': '/usr/lib/ssl/certs/ca-certificates.crt',
'sasl.mechanism': 'PLAIN',
'sasl.username': '$ConnectionString',
'sasl.password': '<password>', #replace!
'client.id': 'python-sample-producer'
}
# Create Producer instance
p = Producer(**conf)
def delivery_callback(err, msg):
if err:
sys.stderr.write('%% Message failed delivery: %s\n' % err)
else:
sys.stderr.write('%% Message delivered to %s [%d] @ %o\n' % (msg.topic(), msg.partition(), msg.offset()))
# Simulate stream
for i in range(0, 10000):
try:
payload = {
'sensor': random.choice(sensors),
'measure1': random.gauss(37, 7),
'measure2': random.random(),
}
p.produce(topic, json.dumps(payload).encode('utf-8'), callback=delivery_callback)
#p.produce(topic, str(i), callback=delivery_callback)
except BufferError as e:
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p))
p.poll(0)
time.sleep(2)
# Wait until all messages have been delivered
sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()
Eseguire l'applicazione producer di esempio usando il comando seguente. Sostituire <my-sample-topic>
con le informazioni sull'ambiente in uso.
python producer.py <my-sample-topic>
Modello di streaming | Descrizione e implementazione dello scenario |
---|---|
Eseguire il pull da Kafka o da Hub eventi | Creare un processo di streaming Spark che esegue il pull continuo dei dati dal motore di streaming, eseguendo trasformazioni e logica di analisi facoltative. |
Eseguire il sink dei dati di streaming in Apache Hadoop Distributed File System (HDFS) | In generale, questo modello è correlato al modello precedente. Dopo la logica di pull e trasformazione di streaming, i dati possono essere scritti in molte posizioni per ottenere il requisito di persistenza dei dati desiderato. |
Eseguire il push da Spark in Kafka o Hub eventi | Dopo l'elaborazione da Spark, è possibile eseguire il push dei dati nel motore di streaming esterno. Questo modello è auspicabile in molti scenari, ad esempio raccomandazioni sui prodotti in tempo reale e rilevamento di illeciti di micro batch e di anomalie. |
Questa applicazione di esempio implementa i tre modelli di streaming descritti nella sezione precedente. L'applicazione:
Di seguito è riportato il codice sample-spark-streaming-python.py
completo:
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Sets up batch size to 15 seconds
duration_ms = 15000
# Changes Spark session into a structured streaming context
sc = SparkContext.getOrCreate()
ssc = StreamingContext(sc, duration_ms)
spark = SparkSession(sc)
# Connection information
bootstrap_servers = "" # Replace!
sasl = "" # Replace!
# Topic we will consume from
topic = "sample-topic"
# Topic we will write to
topic_to = "sample-topic-processed"
# Define the schema to speed up processing
jsonSchema = StructType([StructField("sensor", StringType(), True), StructField("measure1", DoubleType(), True), StructField("measure2", DoubleType(), True)])
streaming_input_df = (
spark.readStream \
.format("kafka") \
.option("subscribe", topic) \
.option("kafka.bootstrap.servers", bootstrap_servers) \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", sasl) \
.option("kafka.request.timeout.ms", "60000") \
.option("kafka.session.timeout.ms", "30000") \
.option("failOnDataLoss", "true") \
.option("startingOffsets", "latest") \
.load()
)
def foreach_batch_function(df, epoch_id):
# Transform and write batchDF
if df.count() <= 0:
None
else:
# Create a data frame to be written to HDFS
sensor_df = df.selectExpr('CAST(value AS STRING)').select(from_json("value", jsonSchema).alias("value")).select("value.*")
# root
# |-- sensor: string (nullable = true)
# |-- measure1: double (nullable = true)
# |-- measure2: double (nullable = true)
sensor_df.persist()
# Write to HDFS
sensor_df.write.format('parquet').mode('append').saveAsTable('sensor_data')
# Create a summarization data frame
sensor_stats_df = (sensor_df.groupBy('sensor').agg({'measure1':'avg', 'measure2':'avg', 'sensor':'count'}).withColumn('ts', current_timestamp()).withColumnRenamed('avg(measure1)', 'measure1_avg').withColumnRenamed('avg(measure2)', 'measure2_avg').withColumnRenamed('avg(measure1)', 'measure1_avg').withColumnRenamed('count(sensor)', 'count_sensor'))
# root
# |-- sensor: string (nullable = true)
# |-- measure2_avg: double (nullable = true)
# |-- measure1_avg: double (nullable = true)
# |-- count_sensor: long (nullable = false)
# |-- ts: timestamp (nullable = false)
sensor_stats_df.write.format('parquet').mode('append').saveAsTable('sensor_data_stats')
# Group by and send metrics to an output Kafka topic
sensor_stats_df.writeStream \
.format("kafka") \
.option("topic", topic_to) \
.option("kafka.bootstrap.servers", bootstrap_servers) \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", sasl) \
.save()
# For example, you could write to SQL Server
# df.write.format('com.microsoft.sqlserver.jdbc.spark').mode('append').option('url', url).option('dbtable', datapool_table).save()
sensor_df.unpersist()
writer = streaming_input_df.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()
Creare le tabelle seguenti usando Spark SQL. Il kernel PySpark in un notebook di Azure Data Studio è un modo per eseguire Spark SQL in modo interattivo. In un nuovo notebook in Azure Data Studio connettersi al pool Spark del cluster Big Data. Scegliere il kernel PySpark ed eseguire quanto segue:
%%sql
CREATE TABLE IF NOT EXISTS sensor_data (sensor string, measure1 double, measure2 double)
USING PARQUET;
CREATE TABLE IF NOT EXISTS sensor_data_stats (sensor string, measure2_avg double, measure1_avg double, count_sensor long, ts timestamp)
USING PARQUET;
azdata bdc hdfs cp --from-path sample-spark-streaming-python.py --to-path "hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py"
Configurare le librerie client Kafka con l'applicazione prima di inviare i processi. Sono necessarie due librerie:
Entrambe le librerie devono:
Attenzione
Come regola generale, usare la libreria compatibile più recente. Il codice in questa guida è stato testato usando Apache Kafka per Hub eventi di Azure. Il codice viene fornito così come è e non rappresenta una dichiarazione di supportabilità.
Apache Kafka offre compatibilità client bidirezionale per impostazione predefinita. Tuttavia, le implementazioni delle librerie variano nei diversi linguaggi di programmazione. Fare sempre riferimento alla documentazione della piattaforma Kafka per eseguire correttamente il mapping di compatibilità.
Se più applicazioni si connettono allo stesso cluster Kafka o se l'organizzazione ha un singolo cluster Kafka con controllo delle versioni, copiare i file JAR della libreria appropriati in un percorso condiviso in HDFS. Tutti i processi devono quindi fare riferimento agli stessi file di libreria.
Copiare le librerie nel percorso comune:
azdata bdc hdfs cp --from-path kafka-clients-2.7.0.jar --to-path "hdfs:/apps/jars/kafka-clients-3.0.0.jar"
azdata bdc hdfs cp --from-path spark-sql-kafka-0-10_2.11-2.4.7.jar --to-path "hdfs:/apps/jars/spark-sql-kafka-0-10_2.12-3.1.2.jar"
È possibile installare in modo dinamico i pacchetti quando si invia un processo usando le funzionalità di gestione dei pacchetti dei cluster Big Data di SQL Server. È prevista una penalità per l'ora di avvio del processo a causa dei download ricorrenti dei file di libreria in ogni invio di processo.
L'esempio seguente usa i file JAR della libreria condivisa in HDFS:
azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py \
-j '["/apps/jars/kafka-clients-3.0.0.jar","/apps/jars/spark-sql-kafka-0-10_2.12-3.1.2.jar"]' \
--config '{"spark.streaming.concurrentJobs":"3","spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyStreamingETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m
In questo esempio viene usata la gestione dinamica dei pacchetti per installare le dipendenze:
azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py \
--config '{"spark.jars.packages": "org.apache.kafka:kafka-clients:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2","spark.streaming.concurrentJobs":"3","spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyStreamingETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m
Per inviare processi Spark a cluster Big Data di SQL Server usando azdata
o endpoint Livy, vedere Inviare processi Spark usando gli strumenti da riga di comando.
Per altre informazioni sui cluster Big Data di SQL Server e sugli scenari correlati, vedere Cluster Big Data di SQL Server.
Eventi
31 mar, 23 - 2 apr, 23
Il più grande evento di apprendimento di SQL, Infrastruttura e Power BI. 31 marzo - 2 aprile. Usare il codice FABINSIDER per salvare $400.
Iscriviti oggi stessoFormazione
Modulo
Eseguire trasformazioni di dati di streaming avanzate con Apache Spark e Kafka in Azure HDInsight
Certificazione
Microsoft Certified: Azure Data Engineer Associate - Certifications
Dimostrare la comprensione delle attività comuni di ingegneria dei dati per implementare e gestire carichi di lavoro di ingegneria dei dati in Microsoft Azure, usando vari servizi di Azure.
Documentazione
Panoramica della configurazione post-distribuzione dei cluster Big Data
Domande frequenti sui cluster Big Data di SQL Server - SQL Server Big Data Clusters
Domande frequenti sui concetti, le funzionalità, la distribuzione, la supportabilità e gli strumenti dei cluster Big Data di SQL Server
Partner per i cluster Big Data di SQL Server 2019 - SQL Server
Elenchi di partner di terze parti per l'implementazione dei cluster Big Data di SQL Server 2019.