Csatlakozás az Apache Cassandrához készült Azure Cosmos DB-hez a Sparkból

A KÖVETKEZŐKRE VONATKOZIK: Cassandra

Ez a cikk az Azure Cosmos DB for Apache Cassandra Spark-integrációval kapcsolatos cikkek egyike. A cikkek a kapcsolatokról, az Adatdefiníciós nyelv (DDL) műveleteiről, az alapvető adatkezelési nyelvi (DML) műveletekről és az Apache Cassandra Sparkból történő fejlett Azure Cosmos DB-integrációjáról vonatkoznak.

Előfeltételek

Kapcsolatok függőségei

  • Spark-összekötő a Cassandra-hoz: A Spark-összekötő az Apache Cassandrához készült Azure Cosmos DB-hez való csatlakozásra szolgál. Azonosítsa és használja a Maven centralban található összekötő azon verzióját, amely kompatibilis a Spark-környezet Spark- és Scala-verzióival. Olyan környezetet ajánlunk, amely támogatja a Spark 3.2.1-et vagy újabb verziót, valamint a Maven koordinátáinál elérhető Spark-összekötőt com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Ha Spark 2.x-et használ, a Spark 2.4.5-ös verziójával rendelkező környezetet javasoljuk, amely a Spark-összekötőt használja a maven koordinátáinál com.datastax.spark:spark-cassandra-connector_2.11:2.4.3.

  • Azure Cosmos DB segédkódtár a Cassandra API-hoz: Ha Spark 2.x-es verziót használ, akkor a Spark-összekötő mellett szüksége lesz egy másik , azure-cosmos-cassandra-spark-helper nevű kódtárra az Azure Cosmos DB maven koordinátáival com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 a sebességkorlátozás kezeléséhez. Ez a kódtár egyéni kapcsolat-előállítót és újrapróbálkozási szabályzatosztályokat tartalmaz.

    Az Újrapróbálkozási szabályzat az Azure Cosmos DB-ben úgy van konfigurálva, hogy kezelje a 429-es ("Nagy kérelemmennyiség") HTTP-állapotkód kivételeit. Az Apache Cassandra-hoz készült Azure Cosmos DB ezeket a kivételeket túlterhelt hibákká alakítja a Cassandra natív protokollon, és újrapróbálkozott a visszalépésekkel. Mivel az Azure Cosmos DB kiosztott átviteli sebességű modellt használ, a kérelemsebesség-korlátozási kivételek akkor fordulnak elő, ha a bejövő/kimenő forgalom sebessége nő. Az újrapróbálkozési szabályzat megvédi a Spark-feladatokat a tárolóhoz lefoglalt átviteli sebességet átmenetileg túllépő adatcsúcsokkal szemben. Ha a Spark 3.x-összekötőt használja, a kódtár megvalósítása nem szükséges.

    Megjegyzés

    Az újrapróbálkozások szabályzata csak a pillanatnyi kiugrásokkal szemben képes megvédeni a Spark-feladatokat. Ha nem konfigurált elegendő kérelemegységet a számítási feladat futtatásához, akkor az újrapróbálkozási szabályzat nem alkalmazható, és az újrapróbálkozási szabályzatosztály újrakezdi a kivételt.

  • Az Azure Cosmos DB-fiók kapcsolatának részletei: Az Azure API for Cassandra-fiók neve, fiókvégpontja és kulcsa.

A Spark-összekötő átviteli sebességének konfigurációjának optimalizálása

A következő szakaszban felsoroljuk az átviteli sebesség cassandrához készült Spark-összekötő használatával történő vezérléséhez szükséges összes releváns paramétert. A Spark-feladatok átviteli sebességének maximalizálása érdekében a paraméterek optimalizálásához megfelelően kell konfigurálni a spark.cassandra.output.concurrent.writes, spark.cassandra.concurrent.readsés spark.cassandra.input.reads_per_sec konfigurációkat, hogy elkerülje a túl sok szabályozást és visszakapcsolást (ami viszont alacsonyabb átviteli sebességet eredményezhet).

Ezeknek a konfigurációknak az optimális értéke négy tényezőtől függ:

  • Az adatbetöltési táblához konfigurált átviteli sebesség (kérelemegységek).
  • A Spark-fürt feldolgozóinak száma.
  • A Spark-feladathoz konfigurált végrehajtók száma (amelyek a Spark verziójától függően vagy spark.cassandra.connection.remoteConnectionsPerExecutor használatával spark.cassandra.connection.connections_per_executor_max vezérelhetők)
  • Az Azure Cosmos DB-nek küldött kérések átlagos késése, ha ugyanabban az adatközpontban van rendezve. Tegyük fel, hogy ez az érték 10 ms az írásokhoz és 3 ms az olvasásokhoz.

Ha például öt feldolgozónk van, és az értéke spark.cassandra.output.concurrent.writes= 1, és az értéke spark.cassandra.connection.remoteConnectionsPerExecutor = 1, akkor öt feldolgozónk van, amelyek egyszerre íródnak a táblába, mindegyik egy szállal. Ha egyetlen írás végrehajtása 10 ms-ot vesz igénybe, akkor szálonként 100 kérést (1000 ezredmásodperc osztva 10-zel) küldhetünk. Öt feldolgozó esetén ez másodpercenként 500 írási adat lenne. Írásonként öt kérelemegység (RU) átlagos költségén a céltáblának legalább 2500 kérelemegységre lenne szüksége (5 kérelemegység x 500 írás másodpercenként).

A végrehajtók számának növelése növelheti az adott feladat szálainak számát, ami viszont növelheti az átviteli sebességet. Ennek pontos hatása azonban a feladattól függően változó lehet, míg az átviteli sebesség szabályozása a feldolgozók számával determinisztikusabb. A kérelemegység (RU) díjának lekéréséhez profilkészítéssel is meghatározhatja egy adott kérelem pontos költségét. Ez segít pontosabban kiosztani az átviteli sebességet a táblához vagy a kulcstérhez. Tekintse meg a cikkünket , amelyből megtudhatja, hogyan kérhet le kérelemegység-díjakat kérelemszinten.

Az átviteli sebesség skálázása az adatbázisban

A Cassandra Spark-összekötő hatékonyan telíti az átviteli sebességet az Azure Cosmos DB-ben. Ennek eredményeképpen még a hatékony újrapróbálkozások esetén is gondoskodnia kell arról, hogy elegendő átviteli sebesség (RU) legyen kiépítve a tábla vagy a kulcstér szintjén a sebességkorlátozással kapcsolatos hibák elkerülése érdekében. Egy adott táblában vagy kulcstérben a minimális 400 kérelemegység-beállítás nem lesz elegendő. A Spark-összekötő a minimális átvitelisebesség-konfigurációs beállítások mellett is körülbelül 6000 kérelemegységnek megfelelő sebességgel írhat.

Ha a Spark használatával történő adatáthelyezéshez szükséges RU-beállítás magasabb, mint ami az állandó állapotú számítási feladathoz szükséges, könnyedén fel- és leskálázhatja az átviteli sebességet az Azure Cosmos DB-ben, hogy megfeleljen a számítási feladat igényeinek egy adott időszakra vonatkozóan. A Cassandra API rugalmas méretezéséről szóló cikkünkből megismerheti a programozott és dinamikus skálázás különböző lehetőségeit.

Megjegyzés

A fenti útmutató az adatok ésszerűen egységes eloszlását feltételezi. Ha jelentős eltérést tapasztal az adatokban (vagyis egy adott partíciókulcs-értékhez rendelten nagy számú olvasási/írási adat), akkor is szűk keresztmetszeteket tapasztalhat, még akkor is, ha nagy számú kérelemegység van kiépítve a táblában. A kérelemegységek egyenlően vannak elosztva a fizikai partíciók között, és a nagy adateltérés szűk keresztmetszetet okozhat az egyetlen partícióra irányuló kérések számára.

Spark-összekötő átviteli sebességének konfigurációs paraméterei

Az alábbi táblázat az Apache Cassandra-specifikus átvitelisebesség-konfigurációs paraméterekhez készült Azure Cosmos DB-t sorolja fel, amelyeket az összekötő biztosít. Az összes konfigurációs paraméter részletes listáját a Spark Cassandra Connector GitHub-adattár konfigurációs referenciaoldalán találja.

Tulajdonság neve Alapértelmezett érték Leírás
spark.cassandra.output.batch.size.rows 1 Sorok száma egyetlen kötegenként. Állítsa ezt a paramétert 1-re. Ez a paraméter nagyobb átviteli sebességet biztosít a nagy számítási feladatokhoz.
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) None Csomópontonkénti kapcsolatok maximális száma végrehajtónként. A 10*n csomópontonként 10 kapcsolatnak felel meg egy n csomópontos Cassandra-fürtben. Ha tehát csomópontonként öt kapcsolatot igényel egy öt csomópontos Cassandra-fürthöz, akkor ezt a konfigurációt 25-ösre kell állítania. Módosítsa ezt az értéket a párhuzamosság mértéke vagy a Spark-feladatokhoz konfigurált végrehajtók száma alapján.
spark.cassandra.output.concurrent.writes 100 Meghatározza a végrehajtónként végrehajtható párhuzamos írások számát. Mivel a "batch.size.rows" értéket 1 értékre állítja, mindenképpen ennek megfelelően skálázza fel ezt az értéket. Módosítsa ezt az értéket a párhuzamosság mértéke vagy a számítási feladathoz elérni kívánt átviteli sebesség alapján.
spark.cassandra.concurrent.reads 512 Meghatározza a végrehajtónkénti párhuzamos olvasások számát. Módosítsa ezt az értéket a párhuzamosság mértéke vagy a számítási feladathoz elérni kívánt átviteli sebesség alapján
spark.cassandra.output.throughput_mb_per_sec None Meghatározza a végrehajtónkénti teljes írási átviteli sebességet. Ez a paraméter a Spark-feladat átviteli sebességének felső korlátjaként használható, és az Azure Cosmos DB-tároló kiosztott átviteli sebességén alapul.
spark.cassandra.input.reads_per_sec None Meghatározza a végrehajtónkénti teljes olvasási sebességet. Ez a paraméter a Spark-feladat átviteli sebességének felső korlátjaként használható, és az Azure Cosmos DB-tároló kiosztott átviteli sebességén alapul.
spark.cassandra.output.batch.grouping.buffer.size 1000 Meghatározza, hogy hány köteg tárolható a memóriában a Cassandra API-ba való küldés előtt
spark.cassandra.connection.keep_alive_ms 60000 Meghatározza azt az időtartamot, amíg a nem használt kapcsolatok elérhetők.

A paraméterek átviteli sebességének és párhuzamossági fokának módosítása a Spark-feladatokhoz várt számítási feladat és az Azure Cosmos DB-fiókhoz kiosztott átviteli sebesség alapján.

Csatlakozás az Apache Cassandrához készült Azure Cosmos DB-hez a Sparkból

cqlsh

Az alábbi parancsok részletesen ismertetik, hogyan csatlakozhat az Apache Cassandrához készült Azure Cosmos DB-hez a cqlsh-ból. Ez akkor hasznos, ha a Sparkban futtatja a mintákat.
Linux/Unix/Mac rendszeren:

export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl

1. Azure Databricks

Az alábbi cikk ismerteti az Azure Databricks-fürtök kiépítését, az Apache Cassandra-hoz készült Azure Cosmos DB-hez való csatlakozás fürtkonfigurációját, valamint számos mintajegyzetfüzetet, amelyek a DDL-műveleteket, a DML-műveleteket és egyebeket ismertetik.
Az Azure Cosmos DB használata az Apache Cassandrához az Azure Databricksből

2. Azure HDInsight-Spark

Az alábbi cikk HDinsight-Spark szolgáltatást, üzembe helyezést, az Apache Cassandrához készült Azure Cosmos DB-hez való csatlakozás fürtkonfigurációját, valamint számos DDL-műveleteket, DML-műveleteket és egyebeket lefedő mintajegyzetfüzetet tartalmaz.
Az Apache Cassandrához készült Azure Cosmos DB használata az Azure HDInsight-Sparkból

3. Spark-környezet általában

Bár a fenti szakaszok az Azure Spark-alapú PaaS-szolgáltatásokra vonatkoztak, ez a szakasz minden általános Spark-környezetet lefed. Az összekötők függőségeit, importálását és Spark-munkamenetek konfigurációját az alábbiakban találja. A "Következő lépések" szakasz a DDL-műveletek kódmintáit, a DML-műveleteket és egyebeket ismerteti.

Összekötő függőségei:

  1. Adja hozzá a maven koordinátáit a Cassandra-összekötő Sparkhoz való lekéréséhez
  2. Adja hozzá a Maven-koordinátákat a Cassandra API-hoz készült Azure Cosmos DB segédkódtárhoz

Behozatal:

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

Spark-munkamenet konfigurációja:

 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000 

Következő lépések

Az alábbi cikkek bemutatják a Spark-integrációt az Apache Cassandra-hoz készült Azure Cosmos DB-vel.