Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Se aplica a: SQL Server 2019 (15.x)
Important
Los clústeres de macrodatos de Microsoft SQL Server 2019 se retiran. La compatibilidad con clústeres de macrodatos de SQL Server 2019 finalizó a partir del 28 de febrero de 2025. Para obtener más información, consulte la entrada de blog del anuncio y las opciones de macrodatos en la plataforma de Microsoft SQL Server.
En esta guía se describen casos de uso de streaming y cómo implementarlos mediante Spark para Clústeres de macrodatos de SQL Server.
En esta guía, aprenderá a:
- Cargar las bibliotecas de streaming que se usarán con PySpark y Scala Spark.
- Implementar tres patrones de streaming comunes mediante Clústeres de macrodatos de SQL Server.
Prerequisites
- Una implementación de Clústeres de macrodatos de SQL Server
- Una de estas opciones:
- Clúster de Apache Kafka 2.0 o posterior
- Un espacio de nombres de Azure Event Hubs y un centro de eventos
En esta guía se da por supuesto un buen nivel de comprensión acerca de los conceptos y las arquitecturas de la tecnología de streaming. En los artículos siguientes se proporcionan excelentes líneas de base conceptuales:
- Guía de arquitectura de datos: Procesamiento en tiempo real
- Uso de Azure Event Hubs desde aplicaciones de Apache Kafka
- Guía de arquitectura de datos: Elección de una tecnología de ingesta de mensajes en tiempo real en Azure
Asignación conceptual de Apache Kafka y Azure Event Hubs
| Concepto de Apache Kafka | Concepto de Event Hubs |
|---|---|
| Cluster | Namespace |
| Topic | Event hub |
| Partition | Partition |
| Consumer group | Consumer group |
| Offset | Offset |
Reproducibility
En esta guía se usa la aplicación de producción proporcionada por el Inicio rápido: Streaming de datos con Event Hubs mediante el protocolo de Kafka. Puede encontrar aplicaciones de ejemplo en muchos lenguajes de programación en la página de GitHub sobre Azure Event Hubs para Apache Kafka. Use estas aplicaciones para iniciar escenarios de streaming.
Note
Uno de los pasos que se lleva a cabo con el inicio rápido conlleva que la opción de streaming de Kafka esté habilitada al crear la instancia de Azure Event Hub. Confirme que el punto de conexión de Kafka para el espacio de nombres de Azure Event Hub está habilitado.
A continuación se muestra el código producer.py modificado, que transmite mediante streaming datos JSON de sensor simulados al motor de streaming mediante un cliente compatible con Kafka. Tenga en cuenta que Azure Event Hubs es compatible con el protocolo Kafka. Siga las instrucciones de configuración de GitHub para que el ejemplo funcione de acuerdo con sus requisitos.
Toda la información de conexión está en el diccionario conf. La configuración puede variar en función del entorno. Reemplace al menos bootstrap.servers y sasl.password. Estos valores son los más importantes en el ejemplo de código siguiente.
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation. All rights reserved.
# Copyright 2016 Confluent Inc.
# Licensed under the MIT License.
# Licensed under the Apache License, version 2.0.
#
# Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka ecosystems.
from confluent_kafka import Producer
import sys
import random
import time
import json
sensors = ["Sensor 1", "Sensor 2", "Sensor 3"]
if __name__ == '__main__':
if len(sys.argv) != 2:
sys.stderr.write('Usage: %s <topic>\n' % sys.argv[0])
sys.exit(1)
topic = sys.argv[1]
# Producer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
# See https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka#prerequisites for SSL issues
conf = {
'bootstrap.servers': '', #replace!
'security.protocol': 'SASL_SSL',
'ssl.ca.location': '/usr/lib/ssl/certs/ca-certificates.crt',
'sasl.mechanism': 'PLAIN',
'sasl.username': '$ConnectionString',
'sasl.password': '<password>', #replace!
'client.id': 'python-sample-producer'
}
# Create Producer instance
p = Producer(**conf)
def delivery_callback(err, msg):
if err:
sys.stderr.write('%% Message failed delivery: %s\n' % err)
else:
sys.stderr.write('%% Message delivered to %s [%d] @ %o\n' % (msg.topic(), msg.partition(), msg.offset()))
# Simulate stream
for i in range(0, 10000):
try:
payload = {
'sensor': random.choice(sensors),
'measure1': random.gauss(37, 7),
'measure2': random.random(),
}
p.produce(topic, json.dumps(payload).encode('utf-8'), callback=delivery_callback)
#p.produce(topic, str(i), callback=delivery_callback)
except BufferError as e:
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p))
p.poll(0)
time.sleep(2)
# Wait until all messages have been delivered
sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()
Use el comando siguiente para ejecutar la aplicación de producción de ejemplo. Reemplace <my-sample-topic> por la información de su entorno.
python producer.py <my-sample-topic>
Streaming scenarios
| Streaming pattern | Descripción e implementación del escenario |
|---|---|
| Extracción de Kafka o Event Hubs | Cree un trabajo de streaming de Spark que extraiga datos continuamente del motor de streaming y realice transformaciones opcionales y lógica de análisis. |
| Recepción de datos de streaming en el Sistema de archivos distribuido de Apache Hadoop (HDFS) | En general, este patrón se correlaciona con el anterior. Después de la lógica de extracción y transformación de streaming, los datos pueden escribirse en una gran variedad de ubicaciones para lograr el requisito de persistencia de datos deseado. |
| Envío de cambios desde Spark a Kafka o Event Hubs | Después del procesamiento por medio de Spark, los datos se pueden volver a insertar en el motor de streaming externo. Hay muchos escenarios en los que este patrón es conveniente, como las recomendaciones de productos en tiempo real, la detección de anomalías y fraude en microlotes. |
Aplicación de streaming de Spark de ejemplo
En esta aplicación de ejemplo se implementan los tres patrones de streaming descritos en la sección anterior. The application:
- Se establecen variables de configuración para conectarse al servicio de streaming.
- Se crea una trama de datos de streaming de Spark para extraer datos.
- Se escriben los datos agregados localmente en HDFS.
- Se escriben los datos agregados en otro tema en el servicio de streaming.
Este es el código de sample-spark-streaming-python.py completo:
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Sets up batch size to 15 seconds
duration_ms = 15000
# Changes Spark session into a structured streaming context
sc = SparkContext.getOrCreate()
ssc = StreamingContext(sc, duration_ms)
spark = SparkSession(sc)
# Connection information
bootstrap_servers = "" # Replace!
sasl = "" # Replace!
# Topic we will consume from
topic = "sample-topic"
# Topic we will write to
topic_to = "sample-topic-processed"
# Define the schema to speed up processing
jsonSchema = StructType([StructField("sensor", StringType(), True), StructField("measure1", DoubleType(), True), StructField("measure2", DoubleType(), True)])
streaming_input_df = (
spark.readStream \
.format("kafka") \
.option("subscribe", topic) \
.option("kafka.bootstrap.servers", bootstrap_servers) \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", sasl) \
.option("kafka.request.timeout.ms", "60000") \
.option("kafka.session.timeout.ms", "30000") \
.option("failOnDataLoss", "true") \
.option("startingOffsets", "latest") \
.load()
)
def foreach_batch_function(df, epoch_id):
# Transform and write batchDF
if df.count() <= 0:
None
else:
# Create a data frame to be written to HDFS
sensor_df = df.selectExpr('CAST(value AS STRING)').select(from_json("value", jsonSchema).alias("value")).select("value.*")
# root
# |-- sensor: string (nullable = true)
# |-- measure1: double (nullable = true)
# |-- measure2: double (nullable = true)
sensor_df.persist()
# Write to HDFS
sensor_df.write.format('parquet').mode('append').saveAsTable('sensor_data')
# Create a summarization data frame
sensor_stats_df = (sensor_df.groupBy('sensor').agg({'measure1':'avg', 'measure2':'avg', 'sensor':'count'}).withColumn('ts', current_timestamp()).withColumnRenamed('avg(measure1)', 'measure1_avg').withColumnRenamed('avg(measure2)', 'measure2_avg').withColumnRenamed('avg(measure1)', 'measure1_avg').withColumnRenamed('count(sensor)', 'count_sensor'))
# root
# |-- sensor: string (nullable = true)
# |-- measure2_avg: double (nullable = true)
# |-- measure1_avg: double (nullable = true)
# |-- count_sensor: long (nullable = false)
# |-- ts: timestamp (nullable = false)
sensor_stats_df.write.format('parquet').mode('append').saveAsTable('sensor_data_stats')
# Group by and send metrics to an output Kafka topic
sensor_stats_df.writeStream \
.format("kafka") \
.option("topic", topic_to) \
.option("kafka.bootstrap.servers", bootstrap_servers) \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", sasl) \
.save()
# For example, you could write to SQL Server
# df.write.format('com.microsoft.sqlserver.jdbc.spark').mode('append').option('url', url).option('dbtable', datapool_table).save()
sensor_df.unpersist()
writer = streaming_input_df.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()
Cree las tablas siguientes mediante Spark SQL. El kernel de PySpark en un cuaderno de Azure Data Studio es una forma de ejecutar Spark SQL de forma interactiva. En un nuevo cuaderno de Azure Data Studio, conéctese al grupo de Spark del clúster de macrodatos. Elija el kernel de PySpark y ejecute lo siguiente:
%%sql
CREATE TABLE IF NOT EXISTS sensor_data (sensor string, measure1 double, measure2 double)
USING PARQUET;
CREATE TABLE IF NOT EXISTS sensor_data_stats (sensor string, measure2_avg double, measure1_avg double, count_sensor long, ts timestamp)
USING PARQUET;
Copia de la aplicación en HDFS
azdata bdc hdfs cp --from-path sample-spark-streaming-python.py --to-path "hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py"
Configuración de bibliotecas de Kafka
Configure las bibliotecas de cliente de Kafka en la aplicación antes de enviar los trabajos. Se requieren dos bibliotecas:
- kafka-clients: la biblioteca principal que habilita la compatibilidad y la conectividad del protocolo Kafka.
- spark-sql-kafka: esta biblioteca habilita la funcionalidad de trama de datos de Spark SQL en los flujos de Kafka.
Las dos bibliotecas deben cumplir los requisitos siguientes:
- Tener como destino Scala 2.12 y Spark 3.1.2. Se trata de un requisito de los clústeres de macrodatos de SQL Server para la actualización acumulativa 13 (CU13) o una versión posterior.
- Ser compatibles con el servidor de streaming.
Caution
Como regla general, use la biblioteca compatible más reciente. El código proporcionado en esta guía se ha probado con Apache Kafka para Azure Event Hubs. Se proporciona tal cual, no como una declaración de compatibilidad.
Apache Kafka ofrece compatibilidad de cliente bidireccional por diseño. No obstante, las implementaciones de biblioteca varían según los lenguajes de programación. Consulte siempre la documentación de la plataforma Kafka para asignar la compatibilidad de forma correcta.
Ubicaciones de biblioteca compartidas para trabajos en HDFS
Si varias aplicaciones se conectan al mismo clúster de Kafka o la organización tiene un clúster de Kafka de una sola versión, copie los archivos JAR de biblioteca adecuados en una ubicación compartida en HDFS. Después, todos los trabajos deben hacer referencia a los mismos archivos de biblioteca.
Copie las bibliotecas en la ubicación común:
azdata bdc hdfs cp --from-path kafka-clients-2.7.0.jar --to-path "hdfs:/apps/jars/kafka-clients-3.0.0.jar"
azdata bdc hdfs cp --from-path spark-sql-kafka-0-10_2.11-2.4.7.jar --to-path "hdfs:/apps/jars/spark-sql-kafka-0-10_2.12-3.1.2.jar"
Instalación dinámica de las bibliotecas
Al enviar los trabajos, se pueden instalar los paquetes de forma dinámica mediante las características de administración de paquetes de Clústeres de macrodatos de SQL Server. Por cada envío de trabajo, se aplica una penalización del tiempo de inicio del trabajo debido a las descargas periódicas de los archivos de biblioteca.
Envío del trabajo de streaming de Spark mediante azdata
En el ejemplo siguiente se usan los archivos JAR de biblioteca compartida en HDFS:
azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py \
-j '["/apps/jars/kafka-clients-3.0.0.jar","/apps/jars/spark-sql-kafka-0-10_2.12-3.1.2.jar"]' \
--config '{"spark.streaming.concurrentJobs":"3","spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyStreamingETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m
En este ejemplo se usa la administración dinámica de paquetes para instalar las dependencias:
azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py \
--config '{"spark.jars.packages": "org.apache.kafka:kafka-clients:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2","spark.streaming.concurrentJobs":"3","spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyStreamingETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m
Next steps
Para enviar trabajos de Spark a Clústeres de macrodatos de SQL Server mediante puntos de conexión de azdata o Livy, vea Envío de trabajos de Spark mediante herramientas de línea de comandos.
Para obtener más información sobre los clústeres de macrodatos de SQL Server y los escenarios relacionados, vea Clústeres de macrodatos de SQL Server.