Csatlakozás az Apache Cassandrához készült Azure Cosmos DB-hez a Sparkból
A KÖVETKEZŐKRE VONATKOZIK: Cassandra
Ez a cikk az Azure Cosmos DB for Apache Cassandra Spark-integrációval kapcsolatos cikkek egyike. A cikkek a kapcsolatokról, az Adatdefiníciós nyelv (DDL) műveleteiről, az alapvető adatkezelési nyelvi (DML) műveletekről és az Apache Cassandra Sparkból történő fejlett Azure Cosmos DB-integrációjáról vonatkoznak.
Előfeltételek
A választott Spark-környezet kiépítése [Azure Databricks | Azure HDInsight-Spark | Egyéb]
Kapcsolatok függőségei
Spark-összekötő a Cassandra-hoz: A Spark-összekötő az Apache Cassandrához készült Azure Cosmos DB-hez való csatlakozásra szolgál. Azonosítsa és használja a Maven centralban található összekötő azon verzióját, amely kompatibilis a Spark-környezet Spark- és Scala-verzióival. Olyan környezetet ajánlunk, amely támogatja a Spark 3.2.1-et vagy újabb verziót, valamint a Maven koordinátáinál elérhető Spark-összekötőt
com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0
. Ha Spark 2.x-et használ, a Spark 2.4.5-ös verziójával rendelkező környezetet javasoljuk, amely a Spark-összekötőt használja a maven koordinátáinálcom.datastax.spark:spark-cassandra-connector_2.11:2.4.3
.Azure Cosmos DB segédkódtár a Cassandra API-hoz: Ha Spark 2.x-es verziót használ, akkor a Spark-összekötő mellett szüksége lesz egy másik , azure-cosmos-cassandra-spark-helper nevű kódtárra az Azure Cosmos DB maven koordinátáival
com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0
a sebességkorlátozás kezeléséhez. Ez a kódtár egyéni kapcsolat-előállítót és újrapróbálkozási szabályzatosztályokat tartalmaz.Az Újrapróbálkozási szabályzat az Azure Cosmos DB-ben úgy van konfigurálva, hogy kezelje a 429-es ("Nagy kérelemmennyiség") HTTP-állapotkód kivételeit. Az Apache Cassandra-hoz készült Azure Cosmos DB ezeket a kivételeket túlterhelt hibákká alakítja a Cassandra natív protokollon, és újrapróbálkozott a visszalépésekkel. Mivel az Azure Cosmos DB kiosztott átviteli sebességű modellt használ, a kérelemsebesség-korlátozási kivételek akkor fordulnak elő, ha a bejövő/kimenő forgalom sebessége nő. Az újrapróbálkozési szabályzat megvédi a Spark-feladatokat a tárolóhoz lefoglalt átviteli sebességet átmenetileg túllépő adatcsúcsokkal szemben. Ha a Spark 3.x-összekötőt használja, a kódtár megvalósítása nem szükséges.
Megjegyzés
Az újrapróbálkozások szabályzata csak a pillanatnyi kiugrásokkal szemben képes megvédeni a Spark-feladatokat. Ha nem konfigurált elegendő kérelemegységet a számítási feladat futtatásához, akkor az újrapróbálkozási szabályzat nem alkalmazható, és az újrapróbálkozási szabályzatosztály újrakezdi a kivételt.
Az Azure Cosmos DB-fiók kapcsolatának részletei: Az Azure API for Cassandra-fiók neve, fiókvégpontja és kulcsa.
A Spark-összekötő átviteli sebességének konfigurációjának optimalizálása
A következő szakaszban felsoroljuk az átviteli sebesség cassandrához készült Spark-összekötő használatával történő vezérléséhez szükséges összes releváns paramétert. A Spark-feladatok átviteli sebességének maximalizálása érdekében a paraméterek optimalizálásához megfelelően kell konfigurálni a spark.cassandra.output.concurrent.writes
, spark.cassandra.concurrent.reads
és spark.cassandra.input.reads_per_sec
konfigurációkat, hogy elkerülje a túl sok szabályozást és visszakapcsolást (ami viszont alacsonyabb átviteli sebességet eredményezhet).
Ezeknek a konfigurációknak az optimális értéke négy tényezőtől függ:
- Az adatbetöltési táblához konfigurált átviteli sebesség (kérelemegységek).
- A Spark-fürt feldolgozóinak száma.
- A Spark-feladathoz konfigurált végrehajtók száma (amelyek a Spark verziójától függően vagy
spark.cassandra.connection.remoteConnectionsPerExecutor
használatávalspark.cassandra.connection.connections_per_executor_max
vezérelhetők) - Az Azure Cosmos DB-nek küldött kérések átlagos késése, ha ugyanabban az adatközpontban van rendezve. Tegyük fel, hogy ez az érték 10 ms az írásokhoz és 3 ms az olvasásokhoz.
Ha például öt feldolgozónk van, és az értéke spark.cassandra.output.concurrent.writes
= 1, és az értéke spark.cassandra.connection.remoteConnectionsPerExecutor
= 1, akkor öt feldolgozónk van, amelyek egyszerre íródnak a táblába, mindegyik egy szállal. Ha egyetlen írás végrehajtása 10 ms-ot vesz igénybe, akkor szálonként 100 kérést (1000 ezredmásodperc osztva 10-zel) küldhetünk. Öt feldolgozó esetén ez másodpercenként 500 írási adat lenne. Írásonként öt kérelemegység (RU) átlagos költségén a céltáblának legalább 2500 kérelemegységre lenne szüksége (5 kérelemegység x 500 írás másodpercenként).
A végrehajtók számának növelése növelheti az adott feladat szálainak számát, ami viszont növelheti az átviteli sebességet. Ennek pontos hatása azonban a feladattól függően változó lehet, míg az átviteli sebesség szabályozása a feldolgozók számával determinisztikusabb. A kérelemegység (RU) díjának lekéréséhez profilkészítéssel is meghatározhatja egy adott kérelem pontos költségét. Ez segít pontosabban kiosztani az átviteli sebességet a táblához vagy a kulcstérhez. Tekintse meg a cikkünket , amelyből megtudhatja, hogyan kérhet le kérelemegység-díjakat kérelemszinten.
Az átviteli sebesség skálázása az adatbázisban
A Cassandra Spark-összekötő hatékonyan telíti az átviteli sebességet az Azure Cosmos DB-ben. Ennek eredményeképpen még a hatékony újrapróbálkozások esetén is gondoskodnia kell arról, hogy elegendő átviteli sebesség (RU) legyen kiépítve a tábla vagy a kulcstér szintjén a sebességkorlátozással kapcsolatos hibák elkerülése érdekében. Egy adott táblában vagy kulcstérben a minimális 400 kérelemegység-beállítás nem lesz elegendő. A Spark-összekötő a minimális átvitelisebesség-konfigurációs beállítások mellett is körülbelül 6000 kérelemegységnek megfelelő sebességgel írhat.
Ha a Spark használatával történő adatáthelyezéshez szükséges RU-beállítás magasabb, mint ami az állandó állapotú számítási feladathoz szükséges, könnyedén fel- és leskálázhatja az átviteli sebességet az Azure Cosmos DB-ben, hogy megfeleljen a számítási feladat igényeinek egy adott időszakra vonatkozóan. A Cassandra API rugalmas méretezéséről szóló cikkünkből megismerheti a programozott és dinamikus skálázás különböző lehetőségeit.
Megjegyzés
A fenti útmutató az adatok ésszerűen egységes eloszlását feltételezi. Ha jelentős eltérést tapasztal az adatokban (vagyis egy adott partíciókulcs-értékhez rendelten nagy számú olvasási/írási adat), akkor is szűk keresztmetszeteket tapasztalhat, még akkor is, ha nagy számú kérelemegység van kiépítve a táblában. A kérelemegységek egyenlően vannak elosztva a fizikai partíciók között, és a nagy adateltérés szűk keresztmetszetet okozhat az egyetlen partícióra irányuló kérések számára.
Spark-összekötő átviteli sebességének konfigurációs paraméterei
Az alábbi táblázat az Apache Cassandra-specifikus átvitelisebesség-konfigurációs paraméterekhez készült Azure Cosmos DB-t sorolja fel, amelyeket az összekötő biztosít. Az összes konfigurációs paraméter részletes listáját a Spark Cassandra Connector GitHub-adattár konfigurációs referenciaoldalán találja.
Tulajdonság neve | Alapértelmezett érték | Leírás |
---|---|---|
spark.cassandra.output.batch.size.rows | 1 | Sorok száma egyetlen kötegenként. Állítsa ezt a paramétert 1-re. Ez a paraméter nagyobb átviteli sebességet biztosít a nagy számítási feladatokhoz. |
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) | None | Csomópontonkénti kapcsolatok maximális száma végrehajtónként. A 10*n csomópontonként 10 kapcsolatnak felel meg egy n csomópontos Cassandra-fürtben. Ha tehát csomópontonként öt kapcsolatot igényel egy öt csomópontos Cassandra-fürthöz, akkor ezt a konfigurációt 25-ösre kell állítania. Módosítsa ezt az értéket a párhuzamosság mértéke vagy a Spark-feladatokhoz konfigurált végrehajtók száma alapján. |
spark.cassandra.output.concurrent.writes | 100 | Meghatározza a végrehajtónként végrehajtható párhuzamos írások számát. Mivel a "batch.size.rows" értéket 1 értékre állítja, mindenképpen ennek megfelelően skálázza fel ezt az értéket. Módosítsa ezt az értéket a párhuzamosság mértéke vagy a számítási feladathoz elérni kívánt átviteli sebesség alapján. |
spark.cassandra.concurrent.reads | 512 | Meghatározza a végrehajtónkénti párhuzamos olvasások számát. Módosítsa ezt az értéket a párhuzamosság mértéke vagy a számítási feladathoz elérni kívánt átviteli sebesség alapján |
spark.cassandra.output.throughput_mb_per_sec | None | Meghatározza a végrehajtónkénti teljes írási átviteli sebességet. Ez a paraméter a Spark-feladat átviteli sebességének felső korlátjaként használható, és az Azure Cosmos DB-tároló kiosztott átviteli sebességén alapul. |
spark.cassandra.input.reads_per_sec | None | Meghatározza a végrehajtónkénti teljes olvasási sebességet. Ez a paraméter a Spark-feladat átviteli sebességének felső korlátjaként használható, és az Azure Cosmos DB-tároló kiosztott átviteli sebességén alapul. |
spark.cassandra.output.batch.grouping.buffer.size | 1000 | Meghatározza, hogy hány köteg tárolható a memóriában a Cassandra API-ba való küldés előtt |
spark.cassandra.connection.keep_alive_ms | 60000 | Meghatározza azt az időtartamot, amíg a nem használt kapcsolatok elérhetők. |
A paraméterek átviteli sebességének és párhuzamossági fokának módosítása a Spark-feladatokhoz várt számítási feladat és az Azure Cosmos DB-fiókhoz kiosztott átviteli sebesség alapján.
Csatlakozás az Apache Cassandrához készült Azure Cosmos DB-hez a Sparkból
cqlsh
Az alábbi parancsok részletesen ismertetik, hogyan csatlakozhat az Apache Cassandrához készült Azure Cosmos DB-hez a cqlsh-ból. Ez akkor hasznos, ha a Sparkban futtatja a mintákat.
Linux/Unix/Mac rendszeren:
export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl
1. Azure Databricks
Az alábbi cikk ismerteti az Azure Databricks-fürtök kiépítését, az Apache Cassandra-hoz készült Azure Cosmos DB-hez való csatlakozás fürtkonfigurációját, valamint számos mintajegyzetfüzetet, amelyek a DDL-műveleteket, a DML-műveleteket és egyebeket ismertetik.
Az Azure Cosmos DB használata az Apache Cassandrához az Azure Databricksből
2. Azure HDInsight-Spark
Az alábbi cikk HDinsight-Spark szolgáltatást, üzembe helyezést, az Apache Cassandrához készült Azure Cosmos DB-hez való csatlakozás fürtkonfigurációját, valamint számos DDL-műveleteket, DML-műveleteket és egyebeket lefedő mintajegyzetfüzetet tartalmaz.
Az Apache Cassandrához készült Azure Cosmos DB használata az Azure HDInsight-Sparkból
3. Spark-környezet általában
Bár a fenti szakaszok az Azure Spark-alapú PaaS-szolgáltatásokra vonatkoztak, ez a szakasz minden általános Spark-környezetet lefed. Az összekötők függőségeit, importálását és Spark-munkamenetek konfigurációját az alábbiakban találja. A "Következő lépések" szakasz a DDL-műveletek kódmintáit, a DML-műveleteket és egyebeket ismerteti.
Összekötő függőségei:
- Adja hozzá a maven koordinátáit a Cassandra-összekötő Sparkhoz való lekéréséhez
- Adja hozzá a Maven-koordinátákat a Cassandra API-hoz készült Azure Cosmos DB segédkódtárhoz
Behozatal:
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra
Spark-munkamenet konfigurációja:
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Következő lépések
Az alábbi cikkek bemutatják a Spark-integrációt az Apache Cassandra-hoz készült Azure Cosmos DB-vel.