Delen via


Verbinding maken met Azure Cosmos DB voor Apache Cassandra vanuit Spark

VAN TOEPASSING OP: Cassandra

Dit artikel is een van de artikelen in Azure Cosmos DB voor Apache Cassandra-integratie vanuit Spark. De artikelen hebben betrekking op connectiviteit, DDL-bewerkingen (Data Definition Language), eenvoudige DML-bewerkingen (Data Manipulation Language) en geavanceerde Azure Cosmos DB voor Apache Cassandra-integratie vanuit Spark.

Vereisten

Afhankelijkheden voor connectiviteit

  • Spark-connector voor Cassandra: Spark-connector wordt gebruikt om verbinding te maken met Azure Cosmos DB voor Apache Cassandra. Identificeer en gebruik de versie van de connector in Maven Central die compatibel is met de Spark- en Scala-versies van uw Spark-omgeving. We raden een omgeving aan die Ondersteuning biedt voor Spark 3.2.1 of hoger en de Spark-connector die beschikbaar is op maven-coördinaten com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Als u Spark 2.x gebruikt, raden we een omgeving aan met Spark-versie 2.4.5, met behulp van spark-connector op maven-coördinaten com.datastax.spark:spark-cassandra-connector_2.11:2.4.3.

  • Azure Cosmos DB-helperbibliotheek voor API voor Cassandra: Als u een versie van Spark 2.x gebruikt, hebt u naast de Spark-connector een andere bibliotheek nodig met de naam azure-cosmos-cassandra-spark-helper met maven-coördinaten com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 van Azure Cosmos DB om snelheidsbeperking af te handelen. Deze bibliotheek bevat aangepaste verbindingsfactory- en beleidsklassen voor opnieuw proberen.

    Het beleid voor opnieuw proberen in Azure Cosmos DB is geconfigureerd voor het afhandelen van HTTP-statuscode 429("Request Rate Large")-uitzonderingen. De Azure Cosmos DB voor Apache Cassandra vertaalt deze uitzonderingen in overbelaste fouten in het systeemeigen Cassandra-protocol en u kunt het opnieuw proberen met back-offs. Omdat Azure Cosmos DB gebruikmaakt van een ingericht doorvoermodel, treden aanvraagsnelheidsbeperkingsonderingen op wanneer de snelheid voor inkomend/uitgaand verkeer toeneemt. Het beleid voor opnieuw proberen beschermt uw Spark-taken tegen gegevenspieken die tijdelijk de doorvoer overschrijden die is toegewezen voor uw container. Als u de Spark 3.x-connector gebruikt, is het implementeren van deze bibliotheek niet vereist.

    Notitie

    Het beleid voor opnieuw proberen kan uw spark-taken alleen beschermen tegen tijdelijke pieken. Als u onvoldoende RU's hebt geconfigureerd die nodig zijn om uw workload uit te voeren, is het beleid voor opnieuw proberen niet van toepassing en wordt de uitzondering opnieuw uitgevoerd door de beleidsklasse voor opnieuw proberen.

  • Verbindingsgegevens voor Azure Cosmos DB-account: uw Azure-API voor Cassandra-accountnaam, accounteindpunt en -sleutel.

Doorvoerconfiguratie van Spark-connector optimaliseren

In de volgende sectie staan alle relevante parameters voor het beheren van de doorvoer met behulp van de Spark Connector voor Cassandra. Om parameters te optimaliseren om de doorvoer voor Spark-taken te maximaliseren, moeten de spark.cassandra.output.concurrent.writes, spark.cassandra.concurrent.readsen spark.cassandra.input.reads_per_sec configuraties correct worden geconfigureerd om te voorkomen dat er te veel beperking en back-off (wat op zijn beurt kan leiden tot lagere doorvoer).

De optimale waarde van deze configuraties is afhankelijk van vier factoren:

  • De hoeveelheid doorvoer (aanvraageenheden) die is geconfigureerd voor de tabel waarin gegevens worden opgenomen.
  • Het aantal werkrollen in uw Spark-cluster.
  • Het aantal uitvoerders dat is geconfigureerd voor uw Spark-taak (die kan worden beheerd met spark.cassandra.connection.connections_per_executor_max of spark.cassandra.connection.remoteConnectionsPerExecutor afhankelijk van de Spark-versie)
  • De gemiddelde latentie van elke aanvraag naar Azure Cosmos DB als u zich in hetzelfde datacenter bevindt. Stel dat deze waarde 10 ms is voor schrijfbewerkingen en 3 ms voor leesbewerkingen.

Als we bijvoorbeeld vijf werkrollen hebben en een waarde van spark.cassandra.output.concurrent.writes= 1 en een waarde van spark.cassandra.connection.remoteConnectionsPerExecutor = 1, hebben we vijf werkrollen die gelijktijdig in de tabel worden geschreven, elk met één thread. Als het 10 ms duurt om één schrijfbewerking uit te voeren, kunnen we per thread 100 aanvragen (1000 milliseconden gedeeld door 10) per seconde verzenden. Met vijf werknemers zou dit 500 schrijfbewerkingen per seconde zijn. Tegen een gemiddelde kosten van vijf aanvraageenheden (RU's) per schrijfbewerking moet de doeltabel minimaal 2500 aanvraageenheden inrichten (5 RU's x 500 schrijfbewerkingen per seconde).

Door het aantal uitvoerders te verhogen, kan het aantal threads in een bepaalde taak toenemen, waardoor de doorvoer op zijn beurt kan worden verhoogd. De exacte impact hiervan kan echter variabel zijn, afhankelijk van de taak, terwijl het beheren van de doorvoer met het aantal werkrollen meer deterministisch is. U kunt ook de exacte kosten van een bepaalde aanvraag bepalen door deze te profileren om de ru-kosten (Request Unit) op te halen. Dit helpt u om nauwkeuriger te zijn bij het inrichten van doorvoer voor uw tabel of keyspace. Bekijk ons artikel hier voor meer informatie over het ophalen van kosten per aanvraageenheid per aanvraagniveau.

Doorvoer schalen in de database

De Cassandra Spark-connector zorgt voor een efficiënte doorvoer in Azure Cosmos DB. Als gevolg hiervan moet u, zelfs bij effectieve nieuwe pogingen, ervoor zorgen dat u voldoende doorvoer (RU's) op tabel- of keyspaceniveau hebt ingericht om snelheidsbeperkingsfouten te voorkomen. De minimuminstelling van 400 RU's in een bepaalde tabel of keyspace is niet voldoende. Zelfs bij minimale configuratie-instellingen voor doorvoer kan de Spark-connector schrijven met een snelheid die overeenkomt met ongeveer 6000 aanvraageenheden of meer.

Als de RU-instelling die is vereist voor gegevensverplaatsing met Behulp van Spark hoger is dan wat vereist is voor uw werkbelasting met een stabiele status, kunt u de doorvoer eenvoudig systematisch omhoog en omlaag schalen in Azure Cosmos DB om te voldoen aan de behoeften van uw workload gedurende een bepaalde periode. Lees ons artikel over elastisch schalen in API voor Cassandra voor inzicht in de verschillende opties voor programmatisch en dynamisch schalen.

Notitie

In de bovenstaande richtlijnen wordt uitgegaan van een redelijk uniforme verdeling van gegevens. Als u een aanzienlijke scheefheid in de gegevens hebt (dat wil gezegd, een buitengewoon groot aantal lees-/schrijfbewerkingen naar dezelfde partitiesleutelwaarde), kunnen er nog steeds knelpunten optreden, zelfs als u een groot aantal aanvraageenheden hebt ingericht in uw tabel. Aanvraageenheden worden gelijkmatig verdeeld over fysieke partities en zware scheeftrekken van gegevens kunnen leiden tot een knelpunt van aanvragen naar één partitie.

Configuratieparameters voor doorvoer van Spark-connector

De volgende tabel bevat azure Cosmos DB voor configuratieparameters voor doorvoer die specifiek zijn voor Apache Cassandra die door de connector worden geleverd. Zie de configuratiereferentiepagina van de GitHub-opslagplaats van de Spark Cassandra-connector voor een gedetailleerde lijst met alle configuratieparameters.

Naam van eigenschap Standaardwaarde Beschrijving
spark.cassandra.output.batch.size.rows 1 Aantal rijen per batch. Stel deze parameter in op 1. Deze parameter wordt gebruikt om een hogere doorvoer te bereiken voor zware werkbelastingen.
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) Geen Maximum aantal verbindingen per knooppunt per uitvoerder. 10*n is gelijk aan 10 verbindingen per knooppunt in een Cassandra-cluster met n knooppunten. Dus als u vijf verbindingen per knooppunt per uitvoerder nodig hebt voor een Cassandra-cluster met vijf knooppunten, moet u deze configuratie instellen op 25. Wijzig deze waarde op basis van de mate van parallelle uitvoering of het aantal uitvoerders waarvoor uw Spark-taken zijn geconfigureerd.
spark.cassandra.output.concurrent.writes 100 Hiermee definieert u het aantal parallelle schrijfbewerkingen dat per uitvoerder kan optreden. Omdat u 'batch.size.rows' instelt op 1, moet u deze waarde dienovereenkomstig omhoog schalen. Wijzig deze waarde op basis van de mate van parallelle uitvoering of de doorvoer die u wilt bereiken voor uw workload.
spark.cassandra.concurrent.reads 512 Hiermee definieert u het aantal parallelle leesbewerkingen dat per uitvoerder kan optreden. Wijzig deze waarde op basis van de mate van parallelle uitvoering of de doorvoer die u wilt bereiken voor uw workload
spark.cassandra.output.throughput_mb_per_sec Geen Hiermee definieert u de totale schrijfdoorvoer per uitvoerder. Deze parameter kan worden gebruikt als een bovengrens voor de doorvoer van uw Spark-taak en baseer deze op de ingerichte doorvoer van uw Azure Cosmos DB-container.
spark.cassandra.input.reads_per_sec Geen Hiermee definieert u de totale leesdoorvoer per uitvoerder. Deze parameter kan worden gebruikt als een bovengrens voor de doorvoer van uw Spark-taak en baseer deze op de ingerichte doorvoer van uw Azure Cosmos DB-container.
spark.cassandra.output.batch.grouping.buffer.size 1000 Definieert het aantal batches per enkele Spark-taak die in het geheugen kan worden opgeslagen voordat deze naar de API voor Cassandra wordt verzonden
spark.cassandra.connection.keep_alive_ms 60000 Hiermee definieert u de periode totdat ongebruikte verbindingen beschikbaar zijn.

Pas de doorvoer en mate van parallelle uitvoering van deze parameters aan op basis van de workload die u verwacht voor uw Spark-taken en de doorvoer die u hebt ingericht voor uw Azure Cosmos DB-account.

Verbinding maken met Azure Cosmos DB voor Apache Cassandra vanuit Spark

cqlsh

In de volgende opdrachten wordt beschreven hoe u vanuit cqlsh verbinding maakt met Azure Cosmos DB voor Apache Cassandra. Dit is handig voor validatie wanneer u de voorbeelden in Spark doorloopt.
Van Linux/Unix/Mac:

export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl

1. Azure Databricks

In het onderstaande artikel wordt het inrichten van Azure Databricks-clusters beschreven, de clusterconfiguratie voor het maken van verbinding met Azure Cosmos DB voor Apache Cassandra en verschillende voorbeeldnotebooks die betrekking hebben op DDL-bewerkingen, DML-bewerkingen en meer.
Werken met Azure Cosmos DB voor Apache Cassandra vanuit Azure Databricks

2. Azure HDInsight-Spark

Het onderstaande artikel bevat informatie over de HDinsight-Spark-service, inrichting, clusterconfiguratie voor het maken van verbinding met Azure Cosmos DB voor Apache Cassandra en verschillende voorbeeldnotebooks die betrekking hebben op DDL-bewerkingen, DML-bewerkingen en meer.
Werken met Azure Cosmos DB voor Apache Cassandra vanuit Azure HDInsight-Spark

3. Spark-omgeving in het algemeen

Hoewel de bovenstaande secties specifiek waren voor PaaS-services op basis van Azure Spark, worden in deze sectie alle algemene Spark-omgevingen behandeld. De configuratie van connectorafhankelijkheden, import- en Spark-sessies wordt hieronder beschreven. De sectie 'Volgende stappen' bevat codevoorbeelden voor DDL-bewerkingen, DML-bewerkingen en meer.

Connectorafhankelijkheden:

  1. De maven-coördinaten toevoegen om de Cassandra-connector voor Spark op te halen
  2. De maven-coördinaten voor de Azure Cosmos DB-helperbibliotheek voor API voor Cassandra toevoegen

Invoer:

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

Configuratie van Spark-sessie:

 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000 

Volgende stappen

In de volgende artikelen ziet u de Integratie van Spark met Azure Cosmos DB voor Apache Cassandra.