Connettersi ad Azure Cosmos DB per Apache Cassandra da Spark

SI APPLICA A: Cassandra

Questo articolo è uno tra una serie di articoli sull'integrazione di Azure Cosmos DB per Apache Cassandra da Spark. Gli articoli illustrano la connettività, le operazioni DDL (Data Definition Language), le operazioni DML (Data Manipulation Language) di base e l'integrazione avanzata di Azure Cosmos DB per Apache Cassandra da Spark.

Prerequisiti

Dipendenze per la connettività

  • Connettore Spark per Cassandra: Il connettore Spark viene usato per connettersi ad Azure Cosmos DB per Apache Cassandra. Identificare e usare la versione del connettore nel repository Maven compatibile con le versioni di Spark e Scala dell'ambiente Spark. È consigliabile un ambiente che supporti Spark 3.2.1 o versione successiva e il connettore Spark disponibile nelle coordinate com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0maven . Se si usa Spark 2.x, è consigliabile un ambiente con Spark versione 2.4.5, usando il connettore Spark nelle coordinate com.datastax.spark:spark-cassandra-connector_2.11:2.4.3maven.

  • Libreria helper di Azure Cosmos DB per l'API per Cassandra: Se si usa una versione Spark 2.x, oltre al connettore Spark, è necessaria un'altra libreria denominata azure-cosmos-cassandra-spark-helper con coordinate com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 maven da Azure Cosmos DB per gestire la limitazione della frequenza. Questa libreria contiene una classe factory di connessione e una classe per i criteri di ripetizione, entrambe personalizzate.

    I criteri di ripetizione in Azure Cosmos DB sono configurati per gestire le eccezioni con codice stato HTTP 429 ("La frequenza delle richieste è troppo elevata"). Azure Cosmos DB per Apache Cassandra converte queste eccezioni in errori di overload nel protocollo nativo Cassandra ed è possibile riprovare con i back-off. Dato che Azure Cosmos DB usa il modello di velocità effettiva con provisioning, quando il traffico in ingresso/uscita aumenta si verificano eccezioni di limitazione della frequenza delle richieste. Il criterio di ripetizione dei tentativi protegge i processi Spark dai picchi di dati che superano momentaneamente la velocità effettiva allocata per il contenitore. Se si usa il connettore Spark 3.x, l'implementazione di questa libreria non è necessaria.

    Nota

    I criteri di ripetizione possono proteggere i processi di Spark solo da picchi momentanei. Se non è stato configurato un numero sufficiente di UR per eseguire il carico di lavoro, i criteri di ripetizione non sono applicabili e la classe dei criteri di ripetizione genera nuovamente l'eccezione.

  • Dettagli della connessione dell'account Azure Cosmos DB: L'API di Azure per il nome dell'account Cassandra, l'endpoint dell'account e la chiave.

Ottimizzazione della configurazione della velocità effettiva del connettore Spark

Elencati nella sezione successiva sono tutti i parametri pertinenti per controllare la velocità effettiva usando il connettore Spark per Cassandra. Per ottimizzare i parametri per ottimizzare la velocità effettiva per i processi Spark, le spark.cassandra.output.concurrent.writesconfigurazioni , spark.cassandra.concurrent.readse spark.cassandra.input.reads_per_sec devono essere configurate correttamente, per evitare troppe limitazioni e back-off (che a sua volta possono portare a una velocità effettiva inferiore).

Il valore ottimale di queste configurazioni dipende da quattro fattori:

  • Quantità di velocità effettiva (unità richiesta) configurata per la tabella in cui vengono inseriti i dati.
  • Numero di ruoli di lavoro nel cluster Spark.
  • Numero di executor configurati per il processo Spark (che può essere controllato usando spark.cassandra.connection.connections_per_executor_max o spark.cassandra.connection.remoteConnectionsPerExecutor a seconda della versione di Spark)
  • Latenza media di ogni richiesta ad Azure Cosmos DB, se si è collocati nello stesso data center. Si supponga che questo valore sia 10 ms per le operazioni di scrittura e 3 ms per le letture.

Ad esempio, se sono presenti cinque ruoli di lavoro e un valore spark.cassandra.output.concurrent.writespari a = 1 e un valore spark.cassandra.connection.remoteConnectionsPerExecutor pari a = 1, sono presenti cinque ruoli di lavoro che scrivono simultaneamente nella tabella, ognuno con un thread. Se sono necessari 10 ms per eseguire una singola scrittura, è possibile inviare 100 richieste (1000 millisecondi diviso per 10) al secondo, per thread. Con cinque lavoratori, si tratta di 500 scritture al secondo. A un costo medio di cinque unità richiesta (UR) per scrittura, la tabella di destinazione richiederebbe un provisioning minimo di 2500 unità richiesta (5 UR x 500 scritture al secondo).

L'aumento del numero di executor può aumentare il numero di thread in un determinato processo, che a sua volta può aumentare la velocità effettiva. Tuttavia, l'impatto esatto di questo può essere variabile a seconda del processo, mentre il controllo della velocità effettiva con il numero di ruoli di lavoro è più deterministico. È anche possibile determinare il costo esatto di una determinata richiesta profilandolo per ottenere l'addebito dell'unità richiesta (UR). Ciò consente di essere più accurati durante il provisioning della velocità effettiva per la tabella o il keyspace. Per informazioni su come ottenere gli addebiti per unità richiesta a livello di richiesta, vedere l'articolo qui .

Ridimensionamento della velocità effettiva nel database

Il connettore Cassandra Spark saturazione la velocità effettiva in Azure Cosmos DB in modo efficiente. Di conseguenza, anche con tentativi effettivi, è necessario assicurarsi di avere una velocità effettiva sufficiente (UR) di cui è stato effettuato il provisioning a livello di tabella o keyspace per evitare errori correlati alla limitazione della frequenza. L'impostazione minima di 400 UR in una determinata tabella o keyspace non sarà sufficiente. Anche con impostazioni di configurazione minime della velocità effettiva, il connettore Spark può scrivere a una velocità corrispondente a circa 6000 unità richiesta o più.

Se l'impostazione ur necessaria per lo spostamento dei dati con Spark è superiore a quella necessaria per il carico di lavoro a stato stabile, è possibile aumentare e ridurre sistematicamente la velocità effettiva in Azure Cosmos DB per soddisfare le esigenze del carico di lavoro per un determinato periodo di tempo. Leggere l'articolo sulla scalabilità elastica nell'API per Cassandra per comprendere le diverse opzioni per il ridimensionamento a livello di codice e in modo dinamico.

Nota

Le indicazioni precedenti presuppongono una distribuzione ragionevolmente uniforme dei dati. Se si verifica un'asimmetria significativa nei dati, ovvero un numero eccessivo di letture/scritture nello stesso valore della chiave di partizione, è comunque possibile che si verifichino colli di bottiglia, anche se nella tabella è stato effettuato il provisioning di un numero elevato di unità richiesta . Le unità richiesta sono divise equamente tra le partizioni fisiche e l'asimmetria dei dati pesanti può causare un collo di bottiglia delle richieste a una singola partizione.

Parametri di configurazione della velocità effettiva del connettore Spark

La tabella seguente elenca i parametri di configurazione della velocità effettiva specifici di Azure Cosmos DB per Apache Cassandra forniti dal connettore. Per un elenco dettagliato di tutti i parametri di configurazione, vedere la pagina di riferimento per la configurazione del repository GitHub del connettore Cassandra Spark.

Nome proprietà Valore predefinito Descrizione
spark.cassandra.output.batch.size.rows 1 Numero di righe per ogni singolo batch. Impostare questo parametro su 1. Questo parametro viene usato per ottenere una velocità effettiva maggiore per carichi di lavoro pesanti.
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) Nessuno Numero massimo di connessioni per ogni nodo per ogni executor. 10*n equivale a 10 connessioni per nodo in un cluster Cassandra con n-nodi. Pertanto, se sono necessarie cinque connessioni per nodo per ogni executor per un cluster Cassandra a cinque nodi, è necessario impostare questa configurazione su 25. Modificare questo valore in base al grado di parallelismo o al numero di executor per cui sono configurati i processi Spark.
spark.cassandra.output.concurrent.writes 100 Definisce il numero di scritture parallele che possono verificarsi per ogni executor. Dato che "batch.size.rows" è impostato su 1, assicurarsi di aumentare questo valore di conseguenza. Modificare questo valore in base al grado di parallelismo o alla velocità effettiva che si vuole ottenere per il carico di lavoro.
spark.cassandra.concurrent.reads 512 Definisce il numero di letture parallele che possono verificarsi per ogni executor. Modificare questo valore in base al grado di parallelismo o alla velocità effettiva che si vuole ottenere per il carico di lavoro.
spark.cassandra.output.throughput_mb_per_sec Nessuno Definisce la velocità effettiva di scrittura totale per ogni executor. Questo parametro può essere usato come limite superiore per la velocità effettiva del processo Spark e basarlo sulla velocità effettiva di cui è stato effettuato il provisioning del contenitore Azure Cosmos DB.
spark.cassandra.input.reads_per_sec Nessuno Definisce la velocità effettiva di lettura totale per ogni executor. Questo parametro può essere usato come limite superiore per la velocità effettiva del processo Spark e basarlo sulla velocità effettiva di cui è stato effettuato il provisioning del contenitore Azure Cosmos DB.
spark.cassandra.output.batch.grouping.buffer.size 1000 Definisce il numero di batch per singola attività Spark che possono essere archiviate in memoria prima dell'invio all'API per Cassandra
spark.cassandra.connection.keep_alive_ms 60000 Definisce il periodo di tempo fino a quando sono disponibili connessioni inutilizzate.

Modificare la velocità effettiva e il grado di parallelismo di questi parametri in base al carico di lavoro previsto per i processi Spark e alla velocità effettiva di cui è stato effettuato il provisioning per l'account Azure Cosmos DB.

Connessione ad Azure Cosmos DB per Apache Cassandra da Spark

cqlsh

I comandi seguenti illustrano in dettaglio come connettersi ad Azure Cosmos DB per Apache Cassandra da cqlsh. Queste informazioni sono utili per la convalida durante l'esecuzione degli esempi in Spark.
Da Linux/Unix/Mac:

export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl

1. Azure Databricks

L'articolo seguente illustra il provisioning dei cluster di Azure Databricks, la configurazione del cluster per la connessione ad Azure Cosmos DB per Apache Cassandra e diversi notebook di esempio che coprono le operazioni DDL, le operazioni DML e altro ancora.
Usare Azure Cosmos DB per Apache Cassandra da Azure Databricks

2. Azure HDInsight-Spark

L'articolo seguente illustra HDinsight-Spark servizio, provisioning, configurazione del cluster per la connessione ad Azure Cosmos DB per Apache Cassandra e diversi notebook di esempio che coprono le operazioni DDL, le operazioni DML e altro ancora.
Usare Azure Cosmos DB per Apache Cassandra da Azure HDInsight-Spark

3. Ambiente Spark in generale

Mentre le sezioni precedenti sono specifiche per servizi PaaS basati su Spark di Azure, questa sezione è dedicata agli ambienti Spark generali. Di seguito sono indicate in dettaglio le dipendenze del connettore, le importazioni e la configurazione della sessione Spark. La sezione "Passaggi successivi" include collegamenti a esempi di codice per operazioni DDL, operazioni DML e altro.

Dipendenze del connettore:

  1. Aggiungere le coordinate di maven per ottenere il connettore Cassandra per Spark
  2. Aggiungere le coordinate maven per la libreria helper di Azure Cosmos DB per l'API per Cassandra

Importazioni:

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

Configurazione della sessione Spark:

 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000 

Passaggi successivi

Gli articoli seguenti illustrano l'integrazione di Spark con Azure Cosmos DB per Apache Cassandra.