Verbinding maken met Azure Cosmos DB voor Apache Cassandra vanuit Spark
VAN TOEPASSING OP: Cassandra
Dit artikel maakt deel uit van een reeks artikelen over Azure Cosmos DB voor Apache Cassandra-integratie van Spark. De artikelen hebben betrekking op connectiviteit, DDL-bewerkingen (Data Definition Language), DML-bewerkingen (Basic Data Manipulation Language) en geavanceerde Integratie van Azure Cosmos DB voor Apache Cassandra van Spark.
Vereisten
Een Azure Cosmos DB inrichten voor Apache Cassandra-account.
Een Spark-omgeving naar keuze inrichten [Azure Databricks | Azure HDInsight-Spark | Overige].
Afhankelijkheden voor connectiviteit
Spark-connector voor Cassandra: De Spark-connector wordt gebruikt om verbinding te maken met Azure Cosmos DB voor Apache Cassandra. Identificeer en gebruik de versie van de connector in Maven Central die compatibel is met de Spark- en Scala-versies van uw Spark-omgeving. We raden een omgeving aan die Ondersteuning biedt voor Spark 3.2.1 of hoger, en de Spark-connector die beschikbaar is op maven-coördinaten
com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0
. Als u Spark 2.x gebruikt, raden we een omgeving aan met Spark-versie 2.4.5, met behulp van een Spark-connector op maven-coördinatencom.datastax.spark:spark-cassandra-connector_2.11:2.4.3
.Azure Cosmos DB-helperbibliotheek voor API voor Cassandra: Als u een versie Van Spark 2.x gebruikt, hebt u naast de Spark-connector een andere bibliotheek nodig met de naam azure-cosmos-cassandra-spark-helper met maven-coördinaten
com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0
van Azure Cosmos DB om snelheidsbeperking te kunnen verwerken. Deze bibliotheek bevat aangepaste verbindingsfactory en beleidsklassen voor opnieuw proberen.Het beleid voor opnieuw proberen in Azure Cosmos DB is geconfigureerd voor het verwerken van HTTP-statuscode 429("Request Rate Large") uitzonderingen. Azure Cosmos DB voor Apache Cassandra vertaalt deze uitzonderingen in overbelaste fouten in het systeemeigen Cassandra-protocol en u kunt het opnieuw proberen met back-offs. Omdat Azure Cosmos DB gebruikmaakt van een ingerichte doorvoermodel, treden er uitzonderingen op voor aanvraagsnelheidsbeperking wanneer de tarieven voor inkomend/uitgaand verkeer toenemen. Het beleid voor opnieuw proberen beschermt uw Spark-taken tegen gegevenspieken die tijdelijk de doorvoer overschrijden die voor uw container is toegewezen. Als u de Spark 3.x-connector gebruikt, is het implementeren van deze bibliotheek niet vereist.
Notitie
Het beleid voor opnieuw proberen kan uw Spark-taken alleen beschermen tegen tijdelijke pieken. Als u onvoldoende RU's hebt geconfigureerd die nodig zijn om uw workload uit te voeren, is het beleid voor opnieuw proberen niet van toepassing en wordt de uitzondering opnieuw door de beleidsklasse voor opnieuw proberen omvergeworpen.
Verbindingsgegevens voor Azure Cosmos DB-account: De naam, het accounteindpunt en de sleutel van uw Azure API voor Cassandra-account.
Configuratie voor doorvoer van Spark-connector optimaliseren
In de volgende sectie worden alle relevante parameters vermeld voor het beheren van doorvoer met behulp van de Spark-connector voor Cassandra. Als u parameters wilt optimaliseren om de doorvoer voor Spark-taken te maximaliseren, moeten de spark.cassandra.output.concurrent.writes
configuraties , spark.cassandra.concurrent.reads
en spark.cassandra.input.reads_per_sec
correct worden geconfigureerd om te veel beperking en back-off te voorkomen (wat op zijn beurt kan leiden tot een lagere doorvoer).
De optimale waarde van deze configuraties is afhankelijk van vier factoren:
- De hoeveelheid doorvoer (aanvraageenheden) die is geconfigureerd voor de tabel waarin gegevens worden opgenomen.
- Het aantal werkrollen in uw Spark-cluster.
- Het aantal uitvoerders dat is geconfigureerd voor uw Spark-taak (dat kan worden beheerd met of
spark.cassandra.connection.connections_per_executor_max
spark.cassandra.connection.remoteConnectionsPerExecutor
afhankelijk van de Spark-versie) - De gemiddelde latentie van elke aanvraag naar Azure Cosmos DB, als u zich in hetzelfde datacenter bevindt. Stel dat deze waarde 10 ms is voor schrijfbewerkingen en 3 ms voor leesbewerkingen.
Als we bijvoorbeeld vijf werkrollen hebben en een waarde van spark.cassandra.output.concurrent.writes
= 1 en een waarde van spark.cassandra.connection.remoteConnectionsPerExecutor
= 1, hebben we vijf werkrollen die tegelijkertijd in de tabel schrijven, elk met één thread. Als het 10 ms duurt om één schrijfbewerking uit te voeren, kunnen we per seconde 100 aanvragen (1000 milliseconden gedeeld door 10) per seconde verzenden. Met vijf werkrollen zou dit 500 schrijfbewerkingen per seconde zijn. Tegen een gemiddelde kosten van vijf aanvraageenheden (RU's) per schrijfbewerking moet voor de doeltabel minimaal 2500 aanvraageenheden zijn ingericht (5 RU's x 500 schrijfbewerkingen per seconde).
Door het aantal uitvoerders te verhogen, kan het aantal threads in een bepaalde taak toenemen, wat op zijn beurt de doorvoer kan verhogen. De exacte impact hiervan kan echter variëren, afhankelijk van de taak, terwijl het deterministischer is om de doorvoer met het aantal werkrollen te beheren. U kunt ook de exacte kosten van een bepaalde aanvraag bepalen door deze te profileren om de ru-kosten (Request Unit) op te halen. Dit helpt u om nauwkeuriger te zijn bij het inrichten van doorvoer voor uw tabel of keyspace. Bekijk ons artikel hier om te begrijpen hoe u kosten voor aanvraageenheden per aanvraag kunt krijgen.
Doorvoer schalen in de database
De Cassandra Spark-connector verzadigt de doorvoer in Azure Cosmos DB efficiënt. Als gevolg hiervan moet u, zelfs bij effectieve nieuwe pogingen, ervoor zorgen dat u voldoende doorvoer (RU's) hebt ingericht op het niveau van de tabel of keyspace om fouten met betrekking tot snelheidsbeperking te voorkomen. De minimale instelling van 400 RU's in een bepaalde tabel of keyspace is niet voldoende. Zelfs bij minimale configuratie-instellingen voor doorvoer kan de Spark-connector schrijven met een snelheid die overeenkomt met ongeveer 6000 aanvraageenheden of meer.
Als de RU-instelling die is vereist voor gegevensverplaatsing met behulp van Spark hoger is dan wat vereist is voor uw steady state-workload, kunt u de doorvoer eenvoudig systematisch omhoog en omlaag schalen in Azure Cosmos DB om te voldoen aan de behoeften van uw workload voor een bepaalde periode. Lees ons artikel over elastisch schalen in API voor Cassandra voor meer informatie over de verschillende opties voor programmatisch en dynamisch schalen.
Notitie
In de bovenstaande richtlijnen wordt ervan uitgegaan dat de gegevens redelijk uniform worden verdeeld. Als u een aanzienlijke scheefheid in de gegevens hebt (dat wil gezegd, een buitensporig groot aantal lees-/schrijfbewerkingen naar dezelfde partitiesleutelwaarde), kunt u nog steeds knelpunten ondervinden, zelfs als er een groot aantal aanvraageenheden in uw tabel is ingericht. Aanvraageenheden worden gelijkmatig verdeeld over fysieke partities en scheeftrekken van zware gegevens kunnen een knelpunt van aanvragen naar één partitie veroorzaken.
Configuratieparameters voor doorvoer van Spark-connector
De volgende tabel bevat azure Cosmos DB voor Apache Cassandra-specifieke configuratieparameters voor doorvoer die door de connector worden geleverd. Zie de pagina met configuratiereferenties van de GitHub-opslagplaats van spark Cassandra Connector voor een gedetailleerde lijst met alle configuratieparameters.
Eigenschapsnaam | Standaardwaarde | Beschrijving |
---|---|---|
spark.cassandra.output.batch.size.rows | 1 | Aantal rijen per batch. Stel deze parameter in op 1. Deze parameter wordt gebruikt om een hogere doorvoer te bereiken voor zware workloads. |
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) | Geen | Maximum aantal verbindingen per knooppunt per uitvoerder. 10*n is gelijk aan 10 verbindingen per knooppunt in een Cassandra-cluster met n-knooppunten. Dus als u vijf verbindingen per knooppunt per uitvoerder nodig hebt voor een Cassandra-cluster met vijf knooppunten, moet u deze configuratie instellen op 25. Wijzig deze waarde op basis van de mate van parallelle uitvoering of het aantal uitvoerders waarvoor uw Spark-taken zijn geconfigureerd. |
spark.cassandra.output.concurrent.writes | 100 | Definieert het aantal parallelle schrijfbewerkingen dat per uitvoerder kan plaatsvinden. Omdat u 'batch.size.rows' instelt op 1, moet u deze waarde dienovereenkomstig omhoog schalen. Wijzig deze waarde op basis van de mate van parallelle uitvoering of de doorvoer die u wilt bereiken voor uw workload. |
spark.cassandra.concurrent.reads | 512 | Hiermee definieert u het aantal parallelle leesbewerkingen dat per uitvoerder kan plaatsvinden. Wijzig deze waarde op basis van de mate van parallelle uitvoering of de doorvoer die u wilt bereiken voor uw workload |
spark.cassandra.output.throughput_mb_per_sec | Geen | Definieert de totale schrijfdoorvoer per uitvoerder. Deze parameter kan worden gebruikt als een bovengrens voor de doorvoer van uw Spark-taak en kan worden gebaseerd op de ingerichte doorvoer van uw Azure Cosmos DB-container. |
spark.cassandra.input.reads_per_sec | Geen | Hiermee definieert u de totale leesdoorvoer per uitvoerder. Deze parameter kan worden gebruikt als een bovengrens voor de doorvoer van uw Spark-taak en kan worden gebaseerd op de ingerichte doorvoer van uw Azure Cosmos DB-container. |
spark.cassandra.output.batch.grouping.buffer.size | 1000 | Definieert het aantal batches per enkele Spark-taak dat in het geheugen kan worden opgeslagen voordat deze naar de API wordt verzonden voor Cassandra |
spark.cassandra.connection.keep_alive_ms | 60000 | Hiermee definieert u de periode totdat ongebruikte verbindingen beschikbaar zijn. |
Pas de doorvoer en mate van parallelle uitvoering van deze parameters aan op basis van de workload die u verwacht voor uw Spark-taken en de doorvoer die u hebt ingericht voor uw Azure Cosmos DB-account.
Verbinding maken met Azure Cosmos DB voor Apache Cassandra vanuit Spark
cqlsh
De volgende opdrachten beschrijven hoe u vanuit cqlsh verbinding maakt met Azure Cosmos DB voor Apache Cassandra. Dit is handig voor validatie wanneer u de voorbeelden in Spark doorloopt.
Vanaf Linux/Unix/Mac:
export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl
1. Azure Databricks
Het onderstaande artikel bevat informatie over het inrichten van Azure Databricks-clusters, de clusterconfiguratie voor het maken van verbinding met Azure Cosmos DB voor Apache Cassandra en verschillende voorbeeldnotebooks die betrekking hebben op DDL-bewerkingen, DML-bewerkingen en meer.
Werken met Azure Cosmos DB voor Apache Cassandra vanuit Azure Databricks
2. Azure HDInsight-Spark
In het onderstaande artikel worden HDinsight-Spark service, inrichting, clusterconfiguratie voor het maken van verbinding met Azure Cosmos DB voor Apache Cassandra en verschillende voorbeeldnotebooks behandeld die betrekking hebben op DDL-bewerkingen, DML-bewerkingen en meer.
Werken met Azure Cosmos DB voor Apache Cassandra vanuit Azure HDInsight-Spark
3. Spark-omgeving in het algemeen
Hoewel de bovenstaande secties specifiek waren voor PaaS-services op basis van Azure Spark, wordt in deze sectie elke algemene Spark-omgeving behandeld. Connectorafhankelijkheden, importbewerkingen en Configuratie van Spark-sessies worden hieronder beschreven. De sectie 'Volgende stappen' bevat codevoorbeelden voor DDL-bewerkingen, DML-bewerkingen en meer.
Connectorafhankelijkheden:
- De maven-coördinaten toevoegen om de Cassandra-connector voor Spark op te halen
- De maven-coördinaten toevoegen voor de Azure Cosmos DB-helperbibliotheek voor API voor Cassandra
Invoer:
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra
Configuratie van Spark-sessie:
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Volgende stappen
De volgende artikelen laten zien hoe Spark is geïntegreerd met Azure Cosmos DB voor Apache Cassandra.