Verbinding maken met Azure Cosmos DB voor Apache Cassandra vanuit Spark

VAN TOEPASSING OP: Cassandra

Dit artikel maakt deel uit van een reeks artikelen over Azure Cosmos DB voor Apache Cassandra-integratie van Spark. De artikelen hebben betrekking op connectiviteit, DDL-bewerkingen (Data Definition Language), DML-bewerkingen (Basic Data Manipulation Language) en geavanceerde Integratie van Azure Cosmos DB voor Apache Cassandra van Spark.

Vereisten

Afhankelijkheden voor connectiviteit

  • Spark-connector voor Cassandra: De Spark-connector wordt gebruikt om verbinding te maken met Azure Cosmos DB voor Apache Cassandra. Identificeer en gebruik de versie van de connector in Maven Central die compatibel is met de Spark- en Scala-versies van uw Spark-omgeving. We raden een omgeving aan die Ondersteuning biedt voor Spark 3.2.1 of hoger, en de Spark-connector die beschikbaar is op maven-coördinaten com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Als u Spark 2.x gebruikt, raden we een omgeving aan met Spark-versie 2.4.5, met behulp van een Spark-connector op maven-coördinaten com.datastax.spark:spark-cassandra-connector_2.11:2.4.3.

  • Azure Cosmos DB-helperbibliotheek voor API voor Cassandra: Als u een versie Van Spark 2.x gebruikt, hebt u naast de Spark-connector een andere bibliotheek nodig met de naam azure-cosmos-cassandra-spark-helper met maven-coördinatencom.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 van Azure Cosmos DB om snelheidsbeperking te kunnen verwerken. Deze bibliotheek bevat aangepaste verbindingsfactory en beleidsklassen voor opnieuw proberen.

    Het beleid voor opnieuw proberen in Azure Cosmos DB is geconfigureerd voor het verwerken van HTTP-statuscode 429("Request Rate Large") uitzonderingen. Azure Cosmos DB voor Apache Cassandra vertaalt deze uitzonderingen in overbelaste fouten in het systeemeigen Cassandra-protocol en u kunt het opnieuw proberen met back-offs. Omdat Azure Cosmos DB gebruikmaakt van een ingerichte doorvoermodel, treden er uitzonderingen op voor aanvraagsnelheidsbeperking wanneer de tarieven voor inkomend/uitgaand verkeer toenemen. Het beleid voor opnieuw proberen beschermt uw Spark-taken tegen gegevenspieken die tijdelijk de doorvoer overschrijden die voor uw container is toegewezen. Als u de Spark 3.x-connector gebruikt, is het implementeren van deze bibliotheek niet vereist.

    Notitie

    Het beleid voor opnieuw proberen kan uw Spark-taken alleen beschermen tegen tijdelijke pieken. Als u onvoldoende RU's hebt geconfigureerd die nodig zijn om uw workload uit te voeren, is het beleid voor opnieuw proberen niet van toepassing en wordt de uitzondering opnieuw door de beleidsklasse voor opnieuw proberen omvergeworpen.

  • Verbindingsgegevens voor Azure Cosmos DB-account: De naam, het accounteindpunt en de sleutel van uw Azure API voor Cassandra-account.

Configuratie voor doorvoer van Spark-connector optimaliseren

In de volgende sectie worden alle relevante parameters vermeld voor het beheren van doorvoer met behulp van de Spark-connector voor Cassandra. Als u parameters wilt optimaliseren om de doorvoer voor Spark-taken te maximaliseren, moeten de spark.cassandra.output.concurrent.writesconfiguraties , spark.cassandra.concurrent.readsen spark.cassandra.input.reads_per_sec correct worden geconfigureerd om te veel beperking en back-off te voorkomen (wat op zijn beurt kan leiden tot een lagere doorvoer).

De optimale waarde van deze configuraties is afhankelijk van vier factoren:

  • De hoeveelheid doorvoer (aanvraageenheden) die is geconfigureerd voor de tabel waarin gegevens worden opgenomen.
  • Het aantal werkrollen in uw Spark-cluster.
  • Het aantal uitvoerders dat is geconfigureerd voor uw Spark-taak (dat kan worden beheerd met of spark.cassandra.connection.connections_per_executor_maxspark.cassandra.connection.remoteConnectionsPerExecutor afhankelijk van de Spark-versie)
  • De gemiddelde latentie van elke aanvraag naar Azure Cosmos DB, als u zich in hetzelfde datacenter bevindt. Stel dat deze waarde 10 ms is voor schrijfbewerkingen en 3 ms voor leesbewerkingen.

Als we bijvoorbeeld vijf werkrollen hebben en een waarde van spark.cassandra.output.concurrent.writes= 1 en een waarde van spark.cassandra.connection.remoteConnectionsPerExecutor = 1, hebben we vijf werkrollen die tegelijkertijd in de tabel schrijven, elk met één thread. Als het 10 ms duurt om één schrijfbewerking uit te voeren, kunnen we per seconde 100 aanvragen (1000 milliseconden gedeeld door 10) per seconde verzenden. Met vijf werkrollen zou dit 500 schrijfbewerkingen per seconde zijn. Tegen een gemiddelde kosten van vijf aanvraageenheden (RU's) per schrijfbewerking moet voor de doeltabel minimaal 2500 aanvraageenheden zijn ingericht (5 RU's x 500 schrijfbewerkingen per seconde).

Door het aantal uitvoerders te verhogen, kan het aantal threads in een bepaalde taak toenemen, wat op zijn beurt de doorvoer kan verhogen. De exacte impact hiervan kan echter variëren, afhankelijk van de taak, terwijl het deterministischer is om de doorvoer met het aantal werkrollen te beheren. U kunt ook de exacte kosten van een bepaalde aanvraag bepalen door deze te profileren om de ru-kosten (Request Unit) op te halen. Dit helpt u om nauwkeuriger te zijn bij het inrichten van doorvoer voor uw tabel of keyspace. Bekijk ons artikel hier om te begrijpen hoe u kosten voor aanvraageenheden per aanvraag kunt krijgen.

Doorvoer schalen in de database

De Cassandra Spark-connector verzadigt de doorvoer in Azure Cosmos DB efficiënt. Als gevolg hiervan moet u, zelfs bij effectieve nieuwe pogingen, ervoor zorgen dat u voldoende doorvoer (RU's) hebt ingericht op het niveau van de tabel of keyspace om fouten met betrekking tot snelheidsbeperking te voorkomen. De minimale instelling van 400 RU's in een bepaalde tabel of keyspace is niet voldoende. Zelfs bij minimale configuratie-instellingen voor doorvoer kan de Spark-connector schrijven met een snelheid die overeenkomt met ongeveer 6000 aanvraageenheden of meer.

Als de RU-instelling die is vereist voor gegevensverplaatsing met behulp van Spark hoger is dan wat vereist is voor uw steady state-workload, kunt u de doorvoer eenvoudig systematisch omhoog en omlaag schalen in Azure Cosmos DB om te voldoen aan de behoeften van uw workload voor een bepaalde periode. Lees ons artikel over elastisch schalen in API voor Cassandra voor meer informatie over de verschillende opties voor programmatisch en dynamisch schalen.

Notitie

In de bovenstaande richtlijnen wordt ervan uitgegaan dat de gegevens redelijk uniform worden verdeeld. Als u een aanzienlijke scheefheid in de gegevens hebt (dat wil gezegd, een buitensporig groot aantal lees-/schrijfbewerkingen naar dezelfde partitiesleutelwaarde), kunt u nog steeds knelpunten ondervinden, zelfs als er een groot aantal aanvraageenheden in uw tabel is ingericht. Aanvraageenheden worden gelijkmatig verdeeld over fysieke partities en scheeftrekken van zware gegevens kunnen een knelpunt van aanvragen naar één partitie veroorzaken.

Configuratieparameters voor doorvoer van Spark-connector

De volgende tabel bevat azure Cosmos DB voor Apache Cassandra-specifieke configuratieparameters voor doorvoer die door de connector worden geleverd. Zie de pagina met configuratiereferenties van de GitHub-opslagplaats van spark Cassandra Connector voor een gedetailleerde lijst met alle configuratieparameters.

Eigenschapsnaam Standaardwaarde Beschrijving
spark.cassandra.output.batch.size.rows 1 Aantal rijen per batch. Stel deze parameter in op 1. Deze parameter wordt gebruikt om een hogere doorvoer te bereiken voor zware workloads.
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) Geen Maximum aantal verbindingen per knooppunt per uitvoerder. 10*n is gelijk aan 10 verbindingen per knooppunt in een Cassandra-cluster met n-knooppunten. Dus als u vijf verbindingen per knooppunt per uitvoerder nodig hebt voor een Cassandra-cluster met vijf knooppunten, moet u deze configuratie instellen op 25. Wijzig deze waarde op basis van de mate van parallelle uitvoering of het aantal uitvoerders waarvoor uw Spark-taken zijn geconfigureerd.
spark.cassandra.output.concurrent.writes 100 Definieert het aantal parallelle schrijfbewerkingen dat per uitvoerder kan plaatsvinden. Omdat u 'batch.size.rows' instelt op 1, moet u deze waarde dienovereenkomstig omhoog schalen. Wijzig deze waarde op basis van de mate van parallelle uitvoering of de doorvoer die u wilt bereiken voor uw workload.
spark.cassandra.concurrent.reads 512 Hiermee definieert u het aantal parallelle leesbewerkingen dat per uitvoerder kan plaatsvinden. Wijzig deze waarde op basis van de mate van parallelle uitvoering of de doorvoer die u wilt bereiken voor uw workload
spark.cassandra.output.throughput_mb_per_sec Geen Definieert de totale schrijfdoorvoer per uitvoerder. Deze parameter kan worden gebruikt als een bovengrens voor de doorvoer van uw Spark-taak en kan worden gebaseerd op de ingerichte doorvoer van uw Azure Cosmos DB-container.
spark.cassandra.input.reads_per_sec Geen Hiermee definieert u de totale leesdoorvoer per uitvoerder. Deze parameter kan worden gebruikt als een bovengrens voor de doorvoer van uw Spark-taak en kan worden gebaseerd op de ingerichte doorvoer van uw Azure Cosmos DB-container.
spark.cassandra.output.batch.grouping.buffer.size 1000 Definieert het aantal batches per enkele Spark-taak dat in het geheugen kan worden opgeslagen voordat deze naar de API wordt verzonden voor Cassandra
spark.cassandra.connection.keep_alive_ms 60000 Hiermee definieert u de periode totdat ongebruikte verbindingen beschikbaar zijn.

Pas de doorvoer en mate van parallelle uitvoering van deze parameters aan op basis van de workload die u verwacht voor uw Spark-taken en de doorvoer die u hebt ingericht voor uw Azure Cosmos DB-account.

Verbinding maken met Azure Cosmos DB voor Apache Cassandra vanuit Spark

cqlsh

De volgende opdrachten beschrijven hoe u vanuit cqlsh verbinding maakt met Azure Cosmos DB voor Apache Cassandra. Dit is handig voor validatie wanneer u de voorbeelden in Spark doorloopt.
Vanaf Linux/Unix/Mac:

export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl

1. Azure Databricks

Het onderstaande artikel bevat informatie over het inrichten van Azure Databricks-clusters, de clusterconfiguratie voor het maken van verbinding met Azure Cosmos DB voor Apache Cassandra en verschillende voorbeeldnotebooks die betrekking hebben op DDL-bewerkingen, DML-bewerkingen en meer.
Werken met Azure Cosmos DB voor Apache Cassandra vanuit Azure Databricks

2. Azure HDInsight-Spark

In het onderstaande artikel worden HDinsight-Spark service, inrichting, clusterconfiguratie voor het maken van verbinding met Azure Cosmos DB voor Apache Cassandra en verschillende voorbeeldnotebooks behandeld die betrekking hebben op DDL-bewerkingen, DML-bewerkingen en meer.
Werken met Azure Cosmos DB voor Apache Cassandra vanuit Azure HDInsight-Spark

3. Spark-omgeving in het algemeen

Hoewel de bovenstaande secties specifiek waren voor PaaS-services op basis van Azure Spark, wordt in deze sectie elke algemene Spark-omgeving behandeld. Connectorafhankelijkheden, importbewerkingen en Configuratie van Spark-sessies worden hieronder beschreven. De sectie 'Volgende stappen' bevat codevoorbeelden voor DDL-bewerkingen, DML-bewerkingen en meer.

Connectorafhankelijkheden:

  1. De maven-coördinaten toevoegen om de Cassandra-connector voor Spark op te halen
  2. De maven-coördinaten toevoegen voor de Azure Cosmos DB-helperbibliotheek voor API voor Cassandra

Invoer:

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

Configuratie van Spark-sessie:

 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000 

Volgende stappen

De volgende artikelen laten zien hoe Spark is geïntegreerd met Azure Cosmos DB voor Apache Cassandra.