Leer en inglés

Compartir a través de


Conexión a Azure Cosmos DB for Apache Cassandra desde Spark

SE APLICA A: Cassandra

Este artículo es uno de los que se encuentra entre la serie de artículos sobre la integración de Azure Cosmos DB for Apache Cassandra desde Spark. En los artículos se explica la conectividad, las operaciones de lenguaje de definición de datos (DDL), las operaciones básicas de lenguaje de manipulación de datos (DML) y la integración avanzada de Azure Cosmos DB for Apache Cassandra desde Spark.

Prerrequisitos

Dependencias de conectividad

  • Conector de Spark para Cassandra: el conector de Spark se usa para establecer conexión con Azure Cosmos DB for Apache Cassandra. Identifique y use la versión del conector que se encuentra en la central de Maven que sea compatible con las versiones de Spark y Scala de su entorno de Spark. Se recomienda un entorno que admita Spark 3.2.1 o una versión posterior y el conector de Spark disponible en las coordenadas de Maven com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Si usa Spark 2.x, se recomienda utilizar un entorno con la versión de Spark 2.4.5 y el conector de Spark en las coordenadas de Maven com.datastax.spark:spark-cassandra-connector_2.11:2.4.3.

  • Biblioteca auxiliar de Azure Cosmos DB para la API para Cassandra: si usa una versión de Spark 2.x, además del conector de Spark, necesita otra biblioteca denominada azure-cosmos-cassandra-spark-helper con coordenadas de Maven com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 de Azure Cosmos DB para controlar la limitación de velocidad. Esta biblioteca contiene un generador de conexión y clases de directivas de reintentos personalizados.

    La directiva de reintentos de Azure Cosmos DB está configurada para controlar las excepciones del código de estado HTTP 429 ("Tasa de solicitud grande"). Azure Cosmos DB for Apache Cassandra traslada estas excepciones como errores sobrecargados en el protocolo nativo de Cassandra y el usuario puede volver a intentarlo con interrupciones. Dado que Azure Cosmos DB usa el modelo de rendimiento aprovisionado, se producen excepciones de limitación de tasas de solicitud al aumentar las tasas de entrada/salida. La directiva de reintentos protege los trabajos de Spark frente a picos de datos que superan momentáneamente el rendimiento asignado al contenedor. Si usa el conector de Spark 3.x, no es necesario implementar esta biblioteca.

    Nota

    La directiva de reintentos solo puede proteger los trabajos de Spark frente a picos momentáneos. Si no ha configurado suficientes RU necesarias para ejecutar la carga de trabajo, la directiva de reintentos no es aplicable y la clase de directiva de reintentos vuelve a producir la excepción.

  • Detalles de conexión de la cuenta de Azure Cosmos DB: nombre de la cuenta, punto de conexión de la cuenta y clave de la API para Cassandra de Azure.

Optimización de la configuración de rendimiento del conector de Spark

En la sección siguiente se muestran todos los parámetros pertinentes para controlar el rendimiento mediante el conector de Spark para Cassandra. Para optimizar los parámetros a fin de maximizar el rendimiento de los trabajos de Spark, las configuraciones de spark.cassandra.output.concurrent.writes, spark.cassandra.concurrent.reads y spark.cassandra.input.reads_per_sec deben ser las correctas para así evitar demasiados límites e interrupciones (lo que, a su vez, puede dar lugar a un menor rendimiento).

El valor óptimo de estas configuraciones depende de cuatro factores:

  • La cantidad de rendimiento (unidades de solicitud) configurada para la tabla en la que se ingieren los datos.
  • El número de trabajos en el clúster de Spark.
  • El número de ejecutores configurados para el trabajo de Spark (que se puede controlar mediante spark.cassandra.connection.connections_per_executor_max o spark.cassandra.connection.remoteConnectionsPerExecutor, según la versión de Spark).
  • La latencia media de cada solicitud a Azure Cosmos DB, si se le coloca en el mismo centro de datos. Suponga que este valor es de 10 ms para escrituras y 3 ms para lecturas.

Por ejemplo, si tenemos cinco trabajos y un valor de spark.cassandra.output.concurrent.writes= 1 y un valor de spark.cassandra.connection.remoteConnectionsPerExecutor= 1, tenemos cinco trabajos que escriben simultáneamente en la tabla, cada uno con un subproceso. Si se tardan 10 ms en realizar una sola escritura, podemos enviar 100 solicitudes (1000 milisegundos divididos por 10) por segundo, por subproceso. Con cinco trabajos, serían 500 escrituras por segundo. Con un costo medio de cinco unidades de solicitud (RU) por escritura, la tabla de destino necesitaría 2500 unidades de solicitud aprovisionadas como mínimo (5 RU x 500 escrituras por segundo).

Al aumentar el número de ejecutores, puede aumentar el número de subprocesos de un trabajo determinado, lo que a su vez puede aumentar el rendimiento. Sin embargo, el efecto exacto de esta acción puede ser variable en función del trabajo, mientras que el control del rendimiento con el número de trabajos es más determinista. También puede determinar el costo exacto de una solicitud determinada mediante la generación de perfiles para obtener el cargo de unidad de solicitud (RU). Esta opción le ayudará a ser más preciso al aprovisionar el rendimiento de la tabla o el espacio de claves. Consulte nuestro artículo aquí para comprender cómo obtener los cargos por unidad de solicitud en un nivel de solicitud.

Escalado del rendimiento en la base de datos

El conector de Cassandra Spark saturará el rendimiento en Azure Cosmos DB de forma eficaz. En consecuencia, incluso con reintentos efectivos, deberá asegurarse de que tiene suficiente rendimiento (RU) aprovisionado en el nivel de tabla o de espacio de claves para evitar errores relacionados con la limitación de velocidad. La configuración mínima de 400 RU en una tabla o un espacio de claves determinados no será suficiente. Incluso con las opciones de configuración de rendimiento mínimo, el conector de Spark puede escribir a una velocidad correspondiente a aproximadamente 6000 unidades de solicitud o más.

Si la configuración de RU necesaria para el movimiento de datos mediante Spark es mayor que la necesaria para la carga de trabajo de estado estable, puede escalar y reducir verticalmente fácilmente el rendimiento de forma sistemática en Azure Cosmos DB para satisfacer las necesidades de la carga de trabajo durante un período de tiempo determinado. Para conocer las distintas opciones de escalado mediante programación y dinámica, lea nuestro artículo sobre el escalado elástico en la API para Cassandra.

Nota

En la guía anterior se supone una distribución de datos razonablemente uniforme. Si tiene un sesgo significativo en los datos (es decir, un número desmesuradamente grande de lecturas y escrituras para el mismo valor de clave de partición), es posible que siga experimentando cuellos de botella, incluso si tiene un gran número de unidades de solicitud aprovisionadas en la tabla. Las unidades de solicitud se dividen equitativamente entre las particiones físicas y, una asimetría de datos intensiva puede causar un cuello de botella de las solicitudes en una única partición.

Parámetros de configuración de rendimiento del conector de Spark

En la tabla siguiente se enumeran los parámetros de configuración de rendimiento específicos de la API para Cassandra de Azure Cosmos DB proporcionados por el conector. Para obtener una lista detallada de todos los parámetros de configuración, consulte la página de referencia de configuración del repositorio de GitHub del conector de Cassandra de Spark.

Nombre de la propiedad Valor predeterminado Descripción
spark.cassandra.output.batch.size.rows 1 Número de filas por lote único. Establezca este parámetro en 1. Este parámetro se utiliza para lograr un mayor rendimiento para las cargas de trabajo altas.
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) None Número máximo de conexiones por nodo y ejecutor. 10*n equivale a diez conexiones por nodo en un clúster de Cassandra con n nodos. Por lo tanto, si necesita cinco conexiones por nodo y ejecutor para un clúster de Cassandra de cinco nodos, debe establecer esta configuración en 25. Modifique este valor según el grado de paralelismo o el número de ejecutores configurados para los trabajos de Spark.
spark.cassandra.output.concurrent.writes 100 Define el número de escrituras paralelas que pueden producirse por ejecutor. Dado que ha establecido "batch.size.rows" en 1, asegúrese de escalar verticalmente este valor en consecuencia. Modifique este valor según el grado de paralelismo o el rendimiento que desea lograr para la carga de trabajo.
spark.cassandra.concurrent.reads 512 Define el número de lecturas en paralelo que pueden producirse por ejecutor. Modifique este valor según el grado de paralelismo o el rendimiento que desea lograr para la carga de trabajo.
spark.cassandra.output.throughput_mb_per_sec None Define el rendimiento de escritura total por ejecutor. Este parámetro puede usarse como límite superior para el rendimiento de trabajo de Spark y basarse en el rendimiento aprovisionado del contenedor de Azure Cosmos DB.
spark.cassandra.input.reads_per_sec None Define el rendimiento de lectura total por ejecutor. Este parámetro puede usarse como límite superior para el rendimiento de trabajo de Spark y basarse en el rendimiento aprovisionado del contenedor de Azure Cosmos DB.
spark.cassandra.output.batch.grouping.buffer.size 1000 Define el número de lotes por cada tarea única de Spark que se pueden almacenar en la memoria antes de enviarlos a la API para Cassandra.
spark.cassandra.connection.keep_alive_ms 60000 Define el período de tiempo hasta el que están disponibles las conexiones no utilizadas.

Ajuste el rendimiento y el grado de paralelismo de estos parámetros en función de la carga de trabajo que espera para los trabajos de Spark y el rendimiento que se ha aprovisionado para la cuenta de Azure Cosmos DB.

Conexión a Azure Cosmos DB for Apache Cassandra desde Spark

cqlsh

Los comandos siguientes proporcionan información detallada acerca de cómo conectarse a Azure Cosmos DB for Apache Cassandra desde cqlsh. Esto es útil para la validación mientras se ejecuta a través de los ejemplos de Spark.
Desde Linux/Unix/Mac:

export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl

1. Azure Databricks

En el siguiente artículo se contempla el aprovisionamiento del clúster de Azure Databricks, la configuración del clúster para conectarse a Azure Cosmos DB for Apache Cassandra y varios cuadernos de ejemplo que abarcan operaciones DDL, operaciones DML y mucho más.
Uso de Azure Cosmos DB for Apache Cassandra desde Azure Databricks

2. Azure HDInsight-Spark

En el siguiente artículo se contempla el servicio de HDInsight Spark, el aprovisionamiento del clúster de Azure Databricks, la configuración del clúster para conectarse a Azure Cosmos DB for Apache Cassandra y varios cuadernos de ejemplo que abarcan operaciones DDL, operaciones DML y mucho más.
Uso de Azure Cosmos DB for Apache Cassandra desde Azure HDInsight-Spark

3. Entorno de Spark en general

Mientras que las secciones anteriores eran específicas de servicios de PaaS basados en Spark de Azure, esta sección trata cualquier entorno general de Spark. A continuación se proporciona información detallada acerca de las dependencias del conector, las importaciones y la configuración de sesión de Spark. En la sección "Siguientes pasos" se muestran ejemplos de código para operaciones DDL, operaciones DML y mucho más.

Dependencias del conector:

  1. Agregue las coordenadas de Maven para obtener el conector de Cassandra para Spark.
  2. Agregue las coordenadas de Maven para la biblioteca auxiliar de Azure Cosmos DB para la API para Cassandra.

Importaciones:

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

Configuración de la sesión de Spark:

 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000 

Pasos siguientes

En los artículos siguientes se muestra la integración de Spark con Azure Cosmos DB for Apache Cassandra.