Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
SE APLICA A: NoSQL
El conector spark de Azure Cosmos DB proporciona una manera eficaz de procesar la fuente de cambios a escala mediante Apache Spark. El conector usa el SDK de Java debajo e implementa un modelo de extracción que distribuye el procesamiento de forma transparente entre ejecutores de Spark, lo que lo convierte en ideal para escenarios de procesamiento de datos a gran escala.
Funcionamiento del conector de Spark
El conector de Spark para Azure Cosmos DB se basa en el SDK de Java de Azure Cosmos DB e implementa un enfoque de modelo de extracción para leer la fuente de cambios. Entre las características clave se incluyen las siguientes:
- Base del SDK de Java: usa el sólido SDK de Java de Azure Cosmos DB debajo para el procesamiento confiable de la fuente de cambios.
- Implementación del modelo de extracción: sigue el patrón del modelo de extracción de fuente de cambios , lo que proporciona control sobre el ritmo de procesamiento.
- Procesamiento distribuido: distribuye automáticamente el procesamiento de la fuente de cambios entre varios ejecutores de Spark para el procesamiento paralelo.
- Escalado transparente: el conector controla la creación de particiones y la distribución de carga sin necesidad de intervención manual.
Funcionalidad de punto de control único
Una de las principales ventajas de usar spark Connector para el procesamiento de fuente de cambios es su mecanismo de punto de control integrado. Esta característica proporciona:
- Recuperación automática: mecanismo predefinido para la recuperación al procesar la fuente de cambios a escala
- Tolerancia a errores: capacidad de reanudar el procesamiento desde el último punto de control en caso de errores
- Administración de estado: mantiene el estado de procesamiento entre sesiones de Spark y reinicios del clúster
- Escalabilidad: admite puntos de control en entornos de Spark distribuidos
Esta funcionalidad de control es exclusiva del conector de Spark y no está disponible al usar los SDK directamente, lo que hace que sea especialmente útil para escenarios de producción que requieren alta disponibilidad y confiabilidad.
Advertencia
La spark.cosmos.changeFeed.startFrom configuración se omite si hay marcadores existentes en la ubicación del punto de control. Al reanudar desde un punto de control, el conector continuará desde la última posición procesada en lugar del punto de inicio especificado.
Cuándo usar Spark para el procesamiento de la fuente de cambios
Considere la posibilidad de usar spark Connector para el procesamiento de fuentes de cambios en estos escenarios:
- Procesamiento de datos a gran escala: cuando necesite procesar grandes volúmenes de datos de fuente de cambios que superen las funcionalidades de una sola máquina
- Transformaciones complejas: cuando el procesamiento de la fuente de cambios implica transformaciones de datos complejas, agregaciones o combinaciones con otros conjuntos de datos
- Análisis distribuido: cuando necesite realizar análisis en tiempo real o casi en tiempo real en los datos de la fuente de cambios en un entorno distribuido
- Integración con canalizaciones de datos: cuando el procesamiento de la fuente de cambios forma parte de canalizaciones ETL/ELT más grandes que ya usan Spark
- Requisitos de tolerancia a errores: cuando se necesitan mecanismos sólidos de punto de control y recuperación para cargas de trabajo de producción
- Procesamiento de varios contenedores: cuando es necesario procesar fuentes de cambios de varios contenedores simultáneamente
Para escenarios más sencillos o cuando necesite un control específico sobre el procesamiento de documentos individual, considere la posibilidad de usar el procesador de fuente de cambios o el modelo de extracción directamente con los SDK.
Ejemplos de código
En los ejemplos siguientes se muestra cómo leer desde la fuente de cambios mediante spark Connector. Para obtener ejemplos más completos, consulte los cuadernos de ejemplo completos:
- Ejemplo de streaming estructurado de Python : procesamiento de datos de taxis de NUEVA YORK con fuente de cambios
- Ejemplo de migración de contenedores de Scala : migración de contenedores en vivo mediante la fuente de cambios
# Configure change feed reading
changeFeedConfig = {
"spark.cosmos.accountEndpoint": "https://<account-name>.documents.azure.com:443/",
"spark.cosmos.accountKey": "<account-key>",
"spark.cosmos.database": "<database-name>",
"spark.cosmos.container": "<container-name>",
# Start from beginning, now, or specific timestamp (ignored if checkpoints exist)
"spark.cosmos.changeFeed.startFrom": "Beginning", # "Now" or "2020-02-10T14:15:03"
"spark.cosmos.changeFeed.mode": "LatestVersion", # or "AllVersionsAndDeletes"
# Control batch size - if not set, all available data processed in first batch
"spark.cosmos.changeFeed.itemCountPerTriggerHint": "50000",
"spark.cosmos.read.partitioning.strategy": "Restrictive"
}
# Read change feed as a streaming DataFrame
changeFeedDF = spark \
.readStream \
.format("cosmos.oltp.changeFeed") \
.options(**changeFeedConfig) \
.load()
# Configure output settings with checkpointing
outputConfig = {
"spark.cosmos.accountEndpoint": "https://<target-account>.documents.azure.com:443/",
"spark.cosmos.accountKey": "<target-account-key>",
"spark.cosmos.database": "<target-database>",
"spark.cosmos.container": "<target-container>",
"spark.cosmos.write.strategy": "ItemOverwrite"
}
# Process and write the change feed data with checkpointing
query = changeFeedDF \
.selectExpr("*") \
.writeStream \
.format("cosmos.oltp") \
.outputMode("append") \
.option("checkpointLocation", "/tmp/changefeed-checkpoint") \
.options(**outputConfig) \
.start()
# Wait for the streaming query to finish
query.awaitTermination()
Opciones de configuración de claves
Al trabajar con la fuente de cambios en Spark, estas opciones de configuración son especialmente importantes:
-
spark.cosmos.changeFeed.startFrom: controla dónde empezar a leer la fuente de cambios
-
"Beginning"- Empezar desde el principio de la fuente de cambios -
"Now"- Inicio desde la hora actual -
"2020-02-10T14:15:03"- Inicio desde una marca de tiempo específica (formato ISO 8601) - Nota: Esta configuración se omite si hay marcadores existentes en la ubicación del punto de control.
-
-
spark.cosmos.changeFeed.mode: especifica el modo de fuente de cambios.
-
"LatestVersion"- Procesar solo la versión más reciente de los documentos modificados -
"AllVersionsAndDeletes"- Procesar todas las versiones de cambios, incluidas las eliminaciones
-
-
spark.cosmos.changeFeed.itemCountPerTriggerHint: controla el tamaño del procesamiento por lotes
- Número máximo aproximado de elementos leídos de la fuente de cambios para cada micro-lote o desencadenador
- Ejemplo:
"50000" - Importante: Si no se establece, todos los datos disponibles de la fuente de cambios se procesarán en el primer microproceso.
- checkpointLocation: especifica dónde almacenar la información del punto de control para la tolerancia a errores y la recuperación.
- spark.cosmos.read.partitioning.strategy: controla cómo se particionan los datos entre ejecutores de Spark
Pasos siguientes
- Más información sobre los patrones de diseño de fuente de cambios
- Exploración del modelo de extracción de fuente de cambios
- Descripción del procesador de fuente de cambios para escenarios de una sola máquina
- Revise la documentación del conector de Spark para ver opciones de configuración adicionales.
- Consulte los modos de fuente de cambios para diferentes escenarios de procesamiento.