Dela via


Ansluta till Azure Cosmos DB för Apache Cassandra från Spark

GÄLLER FÖR: Kassandra

Den här artikeln är en av en serie artiklar om Azure Cosmos DB för Apache Cassandra-integrering från Spark. Artiklarna beskriver anslutningar, DDL-åtgärder (Data Definition Language), grundläggande DML-åtgärder (Data Manipulation Language) och avancerad Azure Cosmos DB för Apache Cassandra-integrering från Spark.

Förutsättningar

Beroenden för anslutning

  • Spark-anslutningsapp för Cassandra: Spark-anslutningsappen används för att ansluta till Azure Cosmos DB för Apache Cassandra. Identifiera och använd den version av anslutningsappen som finns i Maven Central som är kompatibel med Spark- och Scala-versionerna av din Spark-miljö. Vi rekommenderar en miljö som stöder Spark 3.2.1 eller senare och spark-anslutningsappen som är tillgänglig vid maven-koordinaterna com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Om du använder Spark 2.x rekommenderar vi en miljö med Spark version 2.4.5 med spark-anslutningsappen vid maven-koordinaterna com.datastax.spark:spark-cassandra-connector_2.11:2.4.3.

  • Azure Cosmos DB-hjälpbibliotek för API för Cassandra: Om du använder en version av Spark 2.x behöver du, förutom Spark-anslutningsappen, ett annat bibliotek med namnet azure-cosmos-cassandra-spark-helper med maven-koordinater com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 från Azure Cosmos DB för att hantera hastighetsbegränsning. Det här biblioteket innehåller anpassade anslutningsfabriks- och återförsöksprincipklasser.

    Återförsöksprincipen i Azure Cosmos DB är konfigurerad för att hantera HTTP-statuskod 429("Begärandefrekvens stor") undantag. Azure Cosmos DB för Apache Cassandra översätter dessa undantag till överlagrade fel i det interna Cassandra-protokollet och du kan försöka igen med back-offs. Eftersom Azure Cosmos DB använder en etablerad dataflödesmodell uppstår undantag för frekvensbegränsning för begäranden när ingress-/utgående priser ökar. Återförsöksprincipen skyddar dina spark-jobb mot datatoppar som tillfälligt överskrider det allokerade dataflödet för containern. Om du använder Spark 3.x-anslutningsappen krävs inte implementering av det här biblioteket.

    Kommentar

    Återförsöksprincipen kan endast skydda dina spark-jobb mot momentära toppar. Om du inte har konfigurerat tillräckligt många RU:er som krävs för att köra din arbetsbelastning är återförsöksprincipen inte tillämplig och återförsöksprincipklassen återanvänder undantaget.

  • Anslutningsinformation för Azure Cosmos DB-konto: Ditt Azure API för Cassandra-kontonamn, kontoslutpunkt och nyckel.

Optimera konfiguration av Spark-anslutningstjänstens dataflöde

I nästa avsnitt visas alla relevanta parametrar för att kontrollera dataflödet med spark-anslutningsappen för Cassandra. För att optimera parametrarna för att maximera dataflödet för spark-jobb spark.cassandra.output.concurrent.writesmåste konfigurationerna , spark.cassandra.concurrent.reads, och spark.cassandra.input.reads_per_sec vara korrekt konfigurerade för att undvika för mycket begränsning och back-off (vilket i sin tur kan leda till lägre dataflöde).

Det optimala värdet för dessa konfigurationer beror på fyra faktorer:

  • Mängden dataflöde (enheter för begäran) som konfigurerats för den tabell som data matas in i.
  • Antalet arbetare i Spark-klustret.
  • Antalet köre som konfigurerats för spark-jobbet (som kan styras med eller spark.cassandra.connection.connections_per_executor_maxspark.cassandra.connection.remoteConnectionsPerExecutor beroende på Spark-version)
  • Den genomsnittliga svarstiden för varje begäran till Azure Cosmos DB, om du är samlad i samma datacenter. Anta att det här värdet är 10 ms för skrivningar och 3 ms för läsningar.

Om vi till exempel har fem arbetare och värdet spark.cassandra.output.concurrent.writes= 1 och värdet spark.cassandra.connection.remoteConnectionsPerExecutor = 1, så har vi fem arbetare som samtidigt skriver till tabellen, var och en med en tråd. Om det tar 10 ms att utföra en enda skrivning kan vi skicka 100 begäranden (1 000 millisekunder dividerat med 10) per sekund, per tråd. Med fem arbetare skulle detta vara 500 skrivningar per sekund. Till en genomsnittlig kostnad av fem enheter för begäranden (RU:er) per skrivning skulle måltabellen behöva minst 2 500 enheter för begäranden etablerade (5 RU:er x 500 skrivningar per sekund).

Om du ökar antalet utförare kan du öka antalet trådar i ett visst jobb, vilket i sin tur kan öka dataflödet. Den exakta effekten av detta kan dock vara variabel beroende på jobbet, medan det är mer deterministiskt att kontrollera dataflödet med antalet arbetare. Du kan också fastställa den exakta kostnaden för en viss begäran genom att profilera den för att hämta ru-avgiften (Request Unit). Detta hjälper dig att vara mer exakt när du etablerar dataflöde för tabellen eller nyckelområdet. Ta en titt på vår artikel här för att förstå hur du får avgifter för begärandeenheten på en nivå per begäran.

Skala dataflöde i databasen

Cassandra Spark-anslutningsappen mättar dataflödet effektivt i Azure Cosmos DB. Därför måste du, även med effektiva återförsök, se till att du har tillräckligt med dataflöde (RU:er) på tabell- eller nyckelområdesnivå för att förhindra hastighetsbegränsning av relaterade fel. Den minsta inställningen på 400 RU:er i en viss tabell eller nyckelrymd räcker inte. Även vid minsta konfigurationsinställningar för dataflöde kan Spark-anslutningsappen skriva med en hastighet som motsvarar cirka 6 000 enheter för begäran eller mer.

Om RU-inställningen som krävs för dataflytt med Spark är högre än vad som krävs för din arbetsbelastning med stabilt tillstånd kan du enkelt skala upp och ned dataflödet systematiskt i Azure Cosmos DB för att uppfylla arbetsbelastningens behov under en viss tidsperiod. Läs vår artikel om elastisk skalning i API för Cassandra för att förstå de olika alternativen för skalning programmatiskt och dynamiskt.

Kommentar

Riktlinjerna ovan förutsätter en någorlunda enhetlig fördelning av data. Om du har en betydande snedställning i data (dvs. ett omåttligt stort antal läsningar/skrivningar till samma partitionsnyckelvärde) kan det fortfarande uppstå flaskhalsar, även om du har ett stort antal enheter för begäranden etablerade i tabellen. Enheter för begärande delas lika mellan fysiska partitioner, och stora datasnedvridningar kan orsaka en flaskhals med begäranden till en enda partition.

Konfigurationsparametrar för Spark-anslutningstjänstens dataflöde

I följande tabell visas konfigurationsparametrar för Azure Cosmos DB för Apache Cassandra-specifika dataflöde som tillhandahålls av anslutningsappen. En detaljerad lista över alla konfigurationsparametrar finns på konfigurationsreferenssidan för GitHub-lagringsplatsen för Spark Cassandra Connector.

Egenskapsnamn Standardvärde Beskrivning
spark.cassandra.output.batch.size.rows 1 Antal rader per enskild batch. Ange den här parametern till 1. Den här parametern används för att uppnå högre dataflöde för tunga arbetsbelastningar.
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) Ingen Maximalt antal anslutningar per nod per köre. 10*n motsvarar 10 anslutningar per nod i ett Cassandra-kluster med n-nod. Så om du behöver fem anslutningar per nod per köre för ett Cassandra-kluster med fem noder bör du ange den här konfigurationen till 25. Ändra det här värdet baserat på graden av parallellitet eller antalet utförare som spark-jobben har konfigurerats för.
spark.cassandra.output.concurrent.writes 100 Definierar antalet parallella skrivningar som kan ske per köre. Eftersom du anger "batch.size.rows" till 1 skalar du upp det här värdet därefter. Ändra det här värdet baserat på graden av parallellitet eller det dataflöde som du vill uppnå för din arbetsbelastning.
spark.cassandra.concurrent.reads 512 Definierar antalet parallella läsningar som kan ske per köre. Ändra det här värdet baserat på graden av parallellitet eller det dataflöde som du vill uppnå för din arbetsbelastning
spark.cassandra.output.throughput_mb_per_sec Ingen Definierar det totala skrivdataflödet per köre. Den här parametern kan användas som en övre gräns för spark-jobbets dataflöde och basera den på det etablerade dataflödet för din Azure Cosmos DB-container.
spark.cassandra.input.reads_per_sec Ingen Definierar det totala läsdataflödet per köre. Den här parametern kan användas som en övre gräns för spark-jobbets dataflöde och basera den på det etablerade dataflödet för din Azure Cosmos DB-container.
spark.cassandra.output.batch.grouping.buffer.size 1000 Definierar antalet batchar per enskild spark-uppgift som kan lagras i minnet innan de skickas till API:et för Cassandra
spark.cassandra.connection.keep_alive_ms 60000 Definierar den tidsperiod till vilken oanvända anslutningar är tillgängliga.

Justera dataflödet och graden av parallellitet för dessa parametrar baserat på den arbetsbelastning du förväntar dig för dina Spark-jobb och det dataflöde som du har etablerat för ditt Azure Cosmos DB-konto.

Ansluta till Azure Cosmos DB för Apache Cassandra från Spark

cqlsh

Följande kommandon beskriver hur du ansluter till Azure Cosmos DB för Apache Cassandra från cqlsh. Det här är användbart för validering när du kör exemplen i Spark.
Från Linux/Unix/Mac:

export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl

1. Azure Databricks

Artikeln nedan beskriver Azure Databricks-klusteretablering, klusterkonfiguration för anslutning till Azure Cosmos DB för Apache Cassandra och flera exempel på notebook-filer som omfattar DDL-åtgärder, DML-åtgärder med mera.
Arbeta med Azure Cosmos DB för Apache Cassandra från Azure Databricks

2. Azure HDInsight-Spark

Artikeln nedan beskriver HDinsight-Spark-tjänsten, etablering, klusterkonfiguration för anslutning till Azure Cosmos DB för Apache Cassandra och flera exempel på notebook-filer som omfattar DDL-åtgärder, DML-åtgärder med mera.
Arbeta med Azure Cosmos DB för Apache Cassandra från Azure HDInsight-Spark

3. Spark-miljö i allmänhet

Avsnitten ovan var specifika för Azure Spark-baserade PaaS-tjänster, men det här avsnittet beskriver alla allmänna Spark-miljöer. Anslutningsberoenden, importer och Spark-sessionskonfiguration beskrivs nedan. Avsnittet "Nästa steg" beskriver kodexempel för DDL-åtgärder, DML-åtgärder med mera.

Anslutningsberoenden:

  1. Lägg till maven-koordinaterna för att hämta Cassandra-anslutningsappen för Spark
  2. Lägg till maven-koordinaterna för Azure Cosmos DB-hjälpbiblioteket för API för Cassandra

Import:

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

Spark-sessionskonfiguration:

 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000 

Nästa steg

Följande artiklar visar Spark-integrering med Azure Cosmos DB för Apache Cassandra.