Conexión a Azure Cosmos DB for Apache Cassandra desde Spark
SE APLICA A: Cassandra
Este artículo es uno de los que se encuentra entre la serie de artículos sobre la integración de Azure Cosmos DB for Apache Cassandra desde Spark. En los artículos se explica la conectividad, las operaciones de lenguaje de definición de datos (DDL), las operaciones básicas de lenguaje de manipulación de datos (DML) y la integración avanzada de Azure Cosmos DB for Apache Cassandra desde Spark.
Aprovisionamiento de una cuenta de Azure Cosmos DB for Apache Cassandra.
Aprovisionar la elección del entorno de Spark [Azure Databricks | Spark de Azure HDInsight | Otros].
Conector de Spark para Cassandra: el conector de Spark se usa para establecer conexión con Azure Cosmos DB for Apache Cassandra. Identifique y use la versión del conector que se encuentra en la central de Maven que sea compatible con las versiones de Spark y Scala de su entorno de Spark. Se recomienda un entorno que admita Spark 3.2.1 o una versión posterior y el conector de Spark disponible en las coordenadas de Maven
com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0
. Si usa Spark 2.x, se recomienda utilizar un entorno con la versión de Spark 2.4.5 y el conector de Spark en las coordenadas de Mavencom.datastax.spark:spark-cassandra-connector_2.11:2.4.3
.Biblioteca auxiliar de Azure Cosmos DB para la API para Cassandra: si usa una versión de Spark 2.x, además del conector de Spark, necesita otra biblioteca denominada azure-cosmos-cassandra-spark-helper con coordenadas de Maven
com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0
de Azure Cosmos DB para controlar la limitación de velocidad. Esta biblioteca contiene un generador de conexión y clases de directivas de reintentos personalizados.La directiva de reintentos de Azure Cosmos DB está configurada para controlar las excepciones del código de estado HTTP 429 ("Tasa de solicitud grande"). Azure Cosmos DB for Apache Cassandra traslada estas excepciones como errores sobrecargados en el protocolo nativo de Cassandra y el usuario puede volver a intentarlo con interrupciones. Dado que Azure Cosmos DB usa el modelo de rendimiento aprovisionado, se producen excepciones de limitación de tasas de solicitud al aumentar las tasas de entrada/salida. La directiva de reintentos protege los trabajos de Spark frente a picos de datos que superan momentáneamente el rendimiento asignado al contenedor. Si usa el conector de Spark 3.x, no es necesario implementar esta biblioteca.
Nota
La directiva de reintentos solo puede proteger los trabajos de Spark frente a picos momentáneos. Si no ha configurado suficientes RU necesarias para ejecutar la carga de trabajo, la directiva de reintentos no es aplicable y la clase de directiva de reintentos vuelve a producir la excepción.
Detalles de conexión de la cuenta de Azure Cosmos DB: nombre de la cuenta, punto de conexión de la cuenta y clave de la API para Cassandra de Azure.
En la sección siguiente se muestran todos los parámetros pertinentes para controlar el rendimiento mediante el conector de Spark para Cassandra. Para optimizar los parámetros a fin de maximizar el rendimiento de los trabajos de Spark, las configuraciones de spark.cassandra.output.concurrent.writes
, spark.cassandra.concurrent.reads
y spark.cassandra.input.reads_per_sec
deben ser las correctas para así evitar demasiados límites e interrupciones (lo que, a su vez, puede dar lugar a un menor rendimiento).
El valor óptimo de estas configuraciones depende de cuatro factores:
- La cantidad de rendimiento (unidades de solicitud) configurada para la tabla en la que se ingieren los datos.
- El número de trabajos en el clúster de Spark.
- El número de ejecutores configurados para el trabajo de Spark (que se puede controlar mediante
spark.cassandra.connection.connections_per_executor_max
ospark.cassandra.connection.remoteConnectionsPerExecutor
, según la versión de Spark). - La latencia media de cada solicitud a Azure Cosmos DB, si se le coloca en el mismo centro de datos. Suponga que este valor es de 10 ms para escrituras y 3 ms para lecturas.
Por ejemplo, si tenemos cinco trabajos y un valor de spark.cassandra.output.concurrent.writes
= 1 y un valor de spark.cassandra.connection.remoteConnectionsPerExecutor
= 1, tenemos cinco trabajos que escriben simultáneamente en la tabla, cada uno con un subproceso. Si se tardan 10 ms en realizar una sola escritura, podemos enviar 100 solicitudes (1000 milisegundos divididos por 10) por segundo, por subproceso. Con cinco trabajos, serían 500 escrituras por segundo. Con un costo medio de cinco unidades de solicitud (RU) por escritura, la tabla de destino necesitaría 2500 unidades de solicitud aprovisionadas como mínimo (5 RU x 500 escrituras por segundo).
Al aumentar el número de ejecutores, puede aumentar el número de subprocesos de un trabajo determinado, lo que a su vez puede aumentar el rendimiento. Sin embargo, el efecto exacto de esta acción puede ser variable en función del trabajo, mientras que el control del rendimiento con el número de trabajos es más determinista. También puede determinar el costo exacto de una solicitud determinada mediante la generación de perfiles para obtener el cargo de unidad de solicitud (RU). Esta opción le ayudará a ser más preciso al aprovisionar el rendimiento de la tabla o el espacio de claves. Consulte nuestro artículo aquí para comprender cómo obtener los cargos por unidad de solicitud en un nivel de solicitud.
El conector de Cassandra Spark saturará el rendimiento en Azure Cosmos DB de forma eficaz. En consecuencia, incluso con reintentos efectivos, deberá asegurarse de que tiene suficiente rendimiento (RU) aprovisionado en el nivel de tabla o de espacio de claves para evitar errores relacionados con la limitación de velocidad. La configuración mínima de 400 RU en una tabla o un espacio de claves determinados no será suficiente. Incluso con las opciones de configuración de rendimiento mínimo, el conector de Spark puede escribir a una velocidad correspondiente a aproximadamente 6000 unidades de solicitud o más.
Si la configuración de RU necesaria para el movimiento de datos mediante Spark es mayor que la necesaria para la carga de trabajo de estado estable, puede escalar y reducir verticalmente fácilmente el rendimiento de forma sistemática en Azure Cosmos DB para satisfacer las necesidades de la carga de trabajo durante un período de tiempo determinado. Para conocer las distintas opciones de escalado mediante programación y dinámica, lea nuestro artículo sobre el escalado elástico en la API para Cassandra.
Nota
En la guía anterior se supone una distribución de datos razonablemente uniforme. Si tiene un sesgo significativo en los datos (es decir, un número desmesuradamente grande de lecturas y escrituras para el mismo valor de clave de partición), es posible que siga experimentando cuellos de botella, incluso si tiene un gran número de unidades de solicitud aprovisionadas en la tabla. Las unidades de solicitud se dividen equitativamente entre las particiones físicas y, una asimetría de datos intensiva puede causar un cuello de botella de las solicitudes en una única partición.
En la tabla siguiente se enumeran los parámetros de configuración de rendimiento específicos de la API para Cassandra de Azure Cosmos DB proporcionados por el conector. Para obtener una lista detallada de todos los parámetros de configuración, consulte la página de referencia de configuración del repositorio de GitHub del conector de Cassandra de Spark.
Nombre de la propiedad | Valor predeterminado | Descripción |
---|---|---|
spark.cassandra.output.batch.size.rows | 1 | Número de filas por lote único. Establezca este parámetro en 1. Este parámetro se utiliza para lograr un mayor rendimiento para las cargas de trabajo altas. |
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) | None | Número máximo de conexiones por nodo y ejecutor. 10*n equivale a diez conexiones por nodo en un clúster de Cassandra con n nodos. Por lo tanto, si necesita cinco conexiones por nodo y ejecutor para un clúster de Cassandra de cinco nodos, debe establecer esta configuración en 25. Modifique este valor según el grado de paralelismo o el número de ejecutores configurados para los trabajos de Spark. |
spark.cassandra.output.concurrent.writes | 100 | Define el número de escrituras paralelas que pueden producirse por ejecutor. Dado que ha establecido "batch.size.rows" en 1, asegúrese de escalar verticalmente este valor en consecuencia. Modifique este valor según el grado de paralelismo o el rendimiento que desea lograr para la carga de trabajo. |
spark.cassandra.concurrent.reads | 512 | Define el número de lecturas en paralelo que pueden producirse por ejecutor. Modifique este valor según el grado de paralelismo o el rendimiento que desea lograr para la carga de trabajo. |
spark.cassandra.output.throughput_mb_per_sec | None | Define el rendimiento de escritura total por ejecutor. Este parámetro puede usarse como límite superior para el rendimiento de trabajo de Spark y basarse en el rendimiento aprovisionado del contenedor de Azure Cosmos DB. |
spark.cassandra.input.reads_per_sec | None | Define el rendimiento de lectura total por ejecutor. Este parámetro puede usarse como límite superior para el rendimiento de trabajo de Spark y basarse en el rendimiento aprovisionado del contenedor de Azure Cosmos DB. |
spark.cassandra.output.batch.grouping.buffer.size | 1000 | Define el número de lotes por cada tarea única de Spark que se pueden almacenar en la memoria antes de enviarlos a la API para Cassandra. |
spark.cassandra.connection.keep_alive_ms | 60000 | Define el período de tiempo hasta el que están disponibles las conexiones no utilizadas. |
Ajuste el rendimiento y el grado de paralelismo de estos parámetros en función de la carga de trabajo que espera para los trabajos de Spark y el rendimiento que se ha aprovisionado para la cuenta de Azure Cosmos DB.
Los comandos siguientes proporcionan información detallada acerca de cómo conectarse a Azure Cosmos DB for Apache Cassandra desde cqlsh. Esto es útil para la validación mientras se ejecuta a través de los ejemplos de Spark.
Desde Linux/Unix/Mac:
export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl
En el siguiente artículo se contempla el aprovisionamiento del clúster de Azure Databricks, la configuración del clúster para conectarse a Azure Cosmos DB for Apache Cassandra y varios cuadernos de ejemplo que abarcan operaciones DDL, operaciones DML y mucho más.
Uso de Azure Cosmos DB for Apache Cassandra desde Azure Databricks
En el siguiente artículo se contempla el servicio de HDInsight Spark, el aprovisionamiento del clúster de Azure Databricks, la configuración del clúster para conectarse a Azure Cosmos DB for Apache Cassandra y varios cuadernos de ejemplo que abarcan operaciones DDL, operaciones DML y mucho más.
Uso de Azure Cosmos DB for Apache Cassandra desde Azure HDInsight-Spark
Mientras que las secciones anteriores eran específicas de servicios de PaaS basados en Spark de Azure, esta sección trata cualquier entorno general de Spark. A continuación se proporciona información detallada acerca de las dependencias del conector, las importaciones y la configuración de sesión de Spark. En la sección "Siguientes pasos" se muestran ejemplos de código para operaciones DDL, operaciones DML y mucho más.
- Agregue las coordenadas de Maven para obtener el conector de Cassandra para Spark.
- Agregue las coordenadas de Maven para la biblioteca auxiliar de Azure Cosmos DB para la API para Cassandra.
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
En los artículos siguientes se muestra la integración de Spark con Azure Cosmos DB for Apache Cassandra.
- operaciones DDL
- Create/insert operations (Operaciones de creación e inserción)
- Lee operaciones.
- Upsert operations (Operaciones de upsert)
- Delete operations (Operaciones de eliminación)
- Aggregation operations (Operaciones de agregación)
- Table copy operations (Operaciones de copia en tabla)