Eseguire la migrazione dei dati da Cassandra a un account Azure Cosmos DB for Apache Cassandra usando Azure Databricks

SI APPLICA A: Cassandra

L'API per Cassandra in Azure Cosmos DB è diventata un'ottima scelta per i carichi di lavoro aziendali in esecuzione in Apache Cassandra per diversi motivi:

  • Nessun sovraccarico di gestione e monitoraggio: elimina il sovraccarico della gestione e del monitoraggio delle impostazioni nei file OS, JVM e YAML e nelle relative interazioni.

  • Risparmio significativo sui costi: è possibile risparmiare sui costi con Azure Cosmos DB, che include il costo delle macchine virtuali, della larghezza di banda e delle licenze applicabili. Non è necessario gestire costi di data center, server, archiviazione SSD, rete ed elettricità.

  • Possibilità di usare codice e strumenti esistenti: Azure Cosmos DB offre compatibilità a livello di protocollo di trasmissione con gli strumenti e gli SDK Cassandra esistenti. Questa compatibilità garantisce l'uso della codebase esistente con Azure Cosmos DB for Apache Cassandra, con poche semplici modifiche.

Esistono molti modi per eseguire la migrazione dei carichi di lavoro del database da una piattaforma a un'altra. Azure Databricks è una piattaforma distribuita come servizio (PaaS) per Apache Spark che permette di eseguire migrazioni offline su larga scala. Questo articolo descrive i passaggi necessari per eseguire la migrazione dei dati da keyspace e tabelle Apache Cassandra nativi in Azure Cosmos DB for Apache Cassandra usando Azure Databricks.

Prerequisiti

Effettuare il provisioning di un cluster di Azure Databricks

È possibile seguire le istruzioni per effettuare il provisioning di un cluster di Azure Databricks. È consigliabile selezionare Databricks runtime versione 7.5, che supporta Spark 3.0.

Screenshot that shows finding the Databricks runtime version.

Aggiungere le dipendenze

È necessario aggiungere la libreria del connettore Apache Spark Cassandra al cluster per connettersi agli endpoint Cassandra nativi e di Azure Cosmos DB. Nel cluster selezionare Librerie>Installa nuova>Maven e quindi aggiungere com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 nelle coordinate Maven.

Screenshot that shows searching for Maven packages in Databricks.

Selezionare Installa e quindi riavviare il cluster al termine dell'installazione.

Nota

Assicurarsi di riavviare il cluster Databricks dopo l'installazione della libreria del connettore Cassandra.

Avviso

Gli esempi illustrati in questo articolo sono stati testati con Spark versione 3.0.1 e il connettore Cassandra Spark corrispondente com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0. Le versioni successive di Spark e/o del connettore Cassandra potrebbero non funzionare come previsto.

Creare un notebook Scala per la migrazione

Creare un notebook Scala in Databricks. Sostituire le configurazioni Cassandra di origine e di destinazione con le credenziali corrispondenti e i keyspace e le tabelle di origine e di destinazione. Eseguire quindi il codice seguente:

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val nativeCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "false",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val cosmosCassandra = Map( 
    "spark.cassandra.connection.host" -> "<USERNAME>.cassandra.cosmos.azure.com",
    "spark.cassandra.connection.port" -> "10350",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    //"spark.cassandra.connection.remoteConnectionsPerExecutor" -> "1", // Spark 3.x
    "spark.cassandra.connection.connections_per_executor_max"-> "1", // Spark 2.x
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//Read from native Cassandra
val DFfromNativeCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(nativeCassandra)
  .load
  
//Write to CosmosCassandra
DFfromNativeCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(cosmosCassandra)
  .mode(SaveMode.Append) // only required for Spark 3.x
  .save

Nota

I valori di spark.cassandra.output.batch.size.rows e spark.cassandra.output.concurrent.writes e il numero di ruoli di lavoro nel cluster Spark sono configurazioni importanti da ottimizzare per evitare la limitazione della frequenza. La limitazione della frequenza si verifica quando le richieste ad Azure Cosmos DB superano la velocità effettiva con provisioning o unità richiesta (UR). Potrebbe essere necessario modificare queste impostazioni, a seconda del numero di executor nel cluster Spark e potenzialmente delle dimensioni (e quindi del costo in UR) di ogni record scritto nelle tabelle di destinazione.

Risoluzione dei problemi

Limitazione della frequenza (errore 429)

È possibile che venga visualizzato un codice di errore 429 o un testo di errore "Frequenza richieste di grandi dimensioni" anche se sono state ridotte le impostazioni ai valori minimi. Gli scenari seguenti possono causare la limitazione della frequenza:

  • La velocità effettiva allocata alla tabella è inferiore a 6.000 unità richiesta. Anche con impostazioni minime, Spark può scrivere a una velocità di circa 6.000 unità richiesta o più. Se è stato effettuato il provisioning di una tabella in un keyspace con velocità effettiva condivisa, è possibile che questa tabella abbia meno di 6.000 UR disponibili in fase di esecuzione.

    Assicurarsi che la tabella di cui si esegue la migrazione disponga di almeno 6.000 UR disponibili quando si esegue la migrazione. Se necessario, allocare unità richiesta dedicate a tale tabella.

  • Asimmetria dei dati eccessiva con volumi di dati di grandi dimensioni. Se si dispone di una grande quantità di dati di cui eseguire la migrazione in una determinata tabella, ma è presente un'asimmetria significativa nei dati, ovvero un numero elevato di record scritti per lo stesso valore della chiave di partizione, è possibile che si verifichino ancora limitazioni della frequenza anche se sono presenti diverse unità richiesta di cui è stato effettuato il provisioning nella tabella. Le unità richiesta sono divise equamente tra le partizioni fisiche e una pesante asimmetria dei dati può causare un collo di bottiglia delle richieste a una singola partizione.

    In questo scenario, ridurre le impostazioni di velocità effettiva minima in Spark e forzare l'esecuzione lenta della migrazione. Questo scenario può essere più comune quando si esegue la migrazione di tabelle di riferimento o di controllo, in cui l'accesso è meno frequente e l'asimmetria può essere elevata. Tuttavia, se in qualsiasi altro tipo di tabella è presente un'asimmetria significativa, è possibile esaminare il modello di dati per evitare problemi di partizione ad accesso frequente per il carico di lavoro durante le operazioni con stato stabile.

Passaggi successivi