Sdílet prostřednictvím


Připojení ke službě Azure Cosmos DB pro Apache Cassandra ze Sparku

PLATÍ PRO: Cassandra

Tento článek je jedním z řady článků o integraci Azure Cosmos DB for Apache Cassandra ze Sparku. Články se týkají operací připojení, jazyka DDL (Data Definition Language), základních operací jazyka DML (Data Manipulat Language) a pokročilé integrace azure Cosmos DB for Apache Cassandra ze Sparku.

Požadavky

Závislosti pro připojení

  • Konektor Spark pro Cassandra: Konektor Spark se používá k připojení ke službě Azure Cosmos DB pro Apache Cassandra. Identifikujte a použijte verzi konektoru umístěného v centru Mavenu, která je kompatibilní s verzemi Sparku a Scala vašeho prostředí Spark. Doporučujeme prostředí, které podporuje Spark 3.2.1 nebo vyšší a konektor Spark dostupný na souřadnicích com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0mavenu . Pokud používáte Spark 2.x, doporučujeme prostředí se Sparkem verze 2.4.5 pomocí konektoru Spark na souřadnicích com.datastax.spark:spark-cassandra-connector_2.11:2.4.3Mavenu.

  • Pomocná knihovna služby Azure Cosmos DB pro rozhraní API pro Cassandra: Pokud používáte verzi Spark 2.x, pak kromě konektoru Spark potřebujete další knihovnu s názvem azure-cosmos-cassandra-spark-helper s souřadnicemi com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 Mavenu ze služby Azure Cosmos DB, aby bylo možné zvládnout omezování rychlosti. Tato knihovna obsahuje vlastní objekt pro vytváření připojení a třídy zásad opakování.

    Zásady opakování ve službě Azure Cosmos DB jsou nakonfigurované tak, aby zpracovávaly výjimky stavového kódu HTTP 429("Velikost četnosti požadavků"). Azure Cosmos DB pro Apache Cassandra tyto výjimky překládá do přetížených chyb v nativním protokolu Cassandra a můžete to zkusit znovu s back-offs. Vzhledem k tomu, že Azure Cosmos DB používá model zřízené propustnosti, dochází k výjimkám omezování rychlosti požadavků při zvýšení rychlosti příchozího a výchozího přenosu dat. Zásady opakování chrání vaše úlohy Spark před špičkami dat, které momentálně překračují propustnost přidělenou kontejneru. Pokud používáte konektor Spark 3.x, implementace této knihovny se nevyžaduje.

    Poznámka:

    Zásady opakování můžou chránit úlohy Sparku pouze proti momentárním špičkám. Pokud jste nenakonfigurovali dostatek RU potřebných ke spuštění úlohy, zásady opakování se nepoužijí a třída zásad opakování výjimku znovu zvětšuje.

  • Podrobnosti o připojení účtu služby Azure Cosmos DB: Název účtu Azure API pro Cassandra, koncový bod účtu a klíč.

Optimalizace konfigurace propustnosti konektoru Spark

Uvedené v další části jsou všechny relevantní parametry pro řízení propustnosti pomocí konektoru Spark pro Cassandra. Aby bylo možné optimalizovat parametry pro maximalizaci propustnosti pro úlohy Sparku, spark.cassandra.output.concurrent.writesspark.cassandra.concurrent.readsje potřeba správně nakonfigurovat konfiguraci a spark.cassandra.input.reads_per_sec konfigurace, aby se zabránilo příliš velkému omezování a back-off (což zase může vést k nižší propustnosti).

Optimální hodnota těchto konfigurací závisí na čtyřech faktorech:

  • Množství propustnosti (jednotky žádostí) nakonfigurované pro tabulku, do které se data ingestují.
  • Počet pracovních procesů v clusteru Spark.
  • Počet exekutorů nakonfigurovaných pro vaši úlohu Sparku (kterou je možné řídit pomocí spark.cassandra.connection.connections_per_executor_max verze Sparku nebo spark.cassandra.connection.remoteConnectionsPerExecutor v závislosti na verzi Sparku)
  • Průměrná latence každého požadavku do služby Azure Cosmos DB, pokud jste kompletovali do stejného datového centra. Předpokládejme, že tato hodnota je 10 ms pro zápisy a 3 ms pro čtení.

Pokud máme například pět pracovních procesů a hodnotu spark.cassandra.output.concurrent.writes= 1 a hodnotu spark.cassandra.connection.remoteConnectionsPerExecutor = 1, pak máme pět pracovních procesů, které jsou souběžně zapisovat do tabulky, z nichž každý má jedno vlákno. Pokud k provedení jednoho zápisu trvá 10 ms, můžeme odeslat 100 požadavků (1000 milisekund děleno 10) za sekundu na vlákno. S pěti pracovníky by to bylo 500 zápisů za sekundu. Při průměrných nákladech na pět jednotek žádostí (RU) na zápis by cílová tabulka potřebovala minimálně 2500 jednotek žádostí zřízených (5 RU × 500 zápisů za sekundu).

Zvýšení počtu exekutorů může zvýšit počet vláken v dané úloze, což může zase zvýšit propustnost. Přesný dopad toho ale může být proměnlivý v závislosti na úloze a řízení propustnosti s počtem pracovních procesů je determinističtější. Můžete také určit přesné náklady na danou žádost tím, že ji profilujete, abyste získali poplatek za jednotku žádosti (RU). To vám pomůže být přesnější při zřizování propustnosti pro tabulku nebo prostor klíčů. Podívejte se na náš článek , abyste pochopili, jak získat poplatky za jednotky žádostí na úrovni žádosti.

Škálování propustnosti v databázi

Konektor Cassandra Spark efektivně saturuje propustnost ve službě Azure Cosmos DB. V důsledku toho i při efektivních opakováních budete muset zajistit, abyste měli dostatečnou propustnost (RU) zřízenou na úrovni tabulky nebo prostoru klíčů, abyste zabránili chybám souvisejícím s rychlostí. Minimální nastavení 400 RU v dané tabulce nebo prostoru klíčů nebude stačit. I s minimálním nastavením konfigurace propustnosti může konektor Spark zapisovat rychlostí odpovídající přibližně 6000 jednotkám žádostí nebo více.

Pokud je nastavení RU vyžadované pro přesun dat pomocí Sparku vyšší, než je potřeba pro úlohu stabilního stavu, můžete v Azure Cosmos DB systematicky škálovat propustnost nahoru a dolů, aby vyhovovala potřebám vaší úlohy v daném časovém období. Přečtěte si náš článek o elastickém škálování v rozhraní API pro Cassandru a seznamte se s různými možnostmi pro programové a dynamické škálování.

Poznámka:

Výše uvedené pokyny předpokládají rozumně jednotnou distribuci dat. Pokud máte v datech významnou nerovnoměrnou distribuci (tj. nesměrovaný počet čtení a zápisů do stejné hodnoty klíče oddílu), může docházet k kritickým bodům i v případě, že máte v tabulce zřízený velký počet jednotek žádostí. Jednotky žádostí jsou rovnoměrně rozdělené mezi fyzické oddíly a nerovnoměrná distribuce dat může způsobit kritické body požadavků na jeden oddíl.

Parametry konfigurace propustnosti konektoru Sparku

Následující tabulka uvádí parametry konfigurace propustnosti specifické pro Azure Cosmos DB pro Apache Cassandra, které poskytuje konektor. Podrobný seznam všech parametrů konfigurace najdete na stránce s referenčními informacemi o konfiguraci úložiště GitHub konektoru Spark Cassandra.

Název vlastnosti Výchozí hodnota Popis
spark.cassandra.output.batch.size.rows 0 Počet řádků na jednu dávku Nastavte tento parametr na hodnotu 1. Tento parametr slouží k dosažení vyšší propustnosti pro náročné úlohy.
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) Nic Maximální počet připojení na uzel na exekutor 10*n je ekvivalentní 10 připojení na uzel v clusteru Cassandra n-node. Pokud tedy pro cluster Cassandra s pěti uzly vyžadujete pět připojení na jeden uzel, měli byste tuto konfiguraci nastavit na 25. Upravte tuto hodnotu na základě stupně paralelismu nebo počtu exekutorů, pro které jsou nakonfigurované úlohy Sparku.
spark.cassandra.output.concurrent.writes 100 Definuje počet paralelních zápisů, ke kterým může dojít na exekutor. Vzhledem k tomu, že nastavíte hodnotu batch.size.rows na hodnotu 1, nezapomeňte odpovídajícím způsobem vertikálně navýšit kapacitu této hodnoty. Upravte tuto hodnotu na základě stupně paralelismu nebo propustnosti, kterou chcete pro úlohu dosáhnout.
spark.cassandra.concurrent.reads 512 Definuje počet paralelních čtení, ke kterým může dojít na exekutor. Upravte tuto hodnotu na základě stupně paralelismu nebo propustnosti, kterou chcete pro úlohu dosáhnout.
spark.cassandra.output.throughput_mb_per_sec Nic Definuje celkovou propustnost zápisu na exekutor. Tento parametr se dá použít jako horní limit propustnosti úlohy Sparku a založit ho na zřízené propustnosti kontejneru Azure Cosmos DB.
spark.cassandra.input.reads_per_sec Nic Definuje celkovou propustnost čtení na exekutor. Tento parametr se dá použít jako horní limit propustnosti úlohy Sparku a založit ho na zřízené propustnosti kontejneru Azure Cosmos DB.
spark.cassandra.output.batch.grouping.buffer.size 1000 Definuje počet dávek na jednu úlohu Sparku, která se dá před odesláním do rozhraní API pro Cassandra uložit v paměti.
spark.cassandra.connection.keep_alive_ms 60000 Definuje časové období, do kterého jsou k dispozici nepoužívané připojení.

Na základě očekávané úlohy Sparku upravte propustnost a stupeň paralelismu těchto parametrů a propustnost, kterou jste pro svůj účet služby Azure Cosmos DB zřídili.

Připojení ke službě Azure Cosmos DB pro Apache Cassandra ze Sparku

cqlsh

Následující příkazy podrobně uvádějí, jak se připojit ke službě Azure Cosmos DB for Apache Cassandra z cqlsh. To je užitečné pro ověření při procházení ukázek ve Sparku.
Ze systémů Linux/Unix/Mac:

export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl

1. Azure Databricks

Následující článek se zabývá zřizováním clusterů Azure Databricks, konfigurací clusteru pro připojení ke službě Azure Cosmos DB for Apache Cassandra a několika ukázkovými poznámkovými bloky, které pokrývají operace DDL, operace DML a další.
Práce se službou Azure Cosmos DB pro Apache Cassandra z Azure Databricks

2. Azure HDInsight-Spark

Následující článek se zabývá službou HDinsight-Spark, zřizováním, konfigurací clusteru pro připojení ke službě Azure Cosmos DB for Apache Cassandra a několika ukázkovými poznámkovými bloky, které pokrývají operace DDL, operace DML a další.
Práce se službou Azure Cosmos DB pro Apache Cassandra z Azure HDInsight-Spark

3. Prostředí Sparku obecně

I když výše uvedené části byly specifické pro služby PaaS založené na Azure Sparku, tato část se věnuje všem obecným prostředím Spark. Níže jsou podrobně popsány závislosti konektorů, importy a konfigurace relace Sparku. Část Další kroky popisuje ukázky kódu pro operace DDL, operace DML a další.

Závislosti konektorů:

  1. Přidání souřadnic Mavenu pro získání konektoru Cassandra pro Spark
  2. Přidání souřadnic Mavenu pro pomocnou knihovnu azure Cosmos DB pro rozhraní API for Cassandra

Importuje:

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

Konfigurace relace Sparku:

 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000 

Další kroky

Následující články ukazují integraci Sparku se službou Azure Cosmos DB pro Apache Cassandra.