Čtení dat z tabulek Azure Cosmos DB for Apache Cassandra pomocí Sparku
PLATÍ PRO: Cassandra
Tento článek popisuje, jak číst data uložená ve službě Azure Cosmos DB pro Apache Cassandra ze Sparku.
Konfigurace rozhraní API pro Cassandra
Nastavte v clusteru poznámkových bloků následující konfiguraci Sparku. Je to jednorázová aktivita.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Poznámka:
Pokud používáte Spark 3.x, nemusíte instalovat pomocné rutiny a objekt pro vytváření připojení služby Azure Cosmos DB. Místo konektoru Spark 3 byste také měli použít remoteConnectionsPerExecutor
connections_per_executor_max
(viz výše).
Upozorňující
Ukázky Sparku 3 uvedené v tomto článku byly testovány se Sparkem verze 3.2.1 a odpovídajícím konektorem Cassandra Spark com.datastax.spark :spark-cassandra-connector-assembly_2.12:3.2.0. Novější verze Sparku nebo konektoru Cassandra nemusí fungovat podle očekávání.
Rozhraní API datového rámce
Čtení tabulky pomocí příkazu session.read.format
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
val readBooksDF = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
readBooksDF.explain
readBooksDF.show
Čtení tabulky pomocí spark.read.cassandraFormat
val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()
Čtení konkrétních sloupců v tabulce
val readBooksDF = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_author", "book_pub_year")
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
Umožňuje použít filtry.
Predikáty můžete do databáze odeslat, abyste umožnili lépe optimalizované dotazy Sparku. Predikát je podmínka dotazu, který vrací hodnotu true nebo false, obvykle umístěný v klauzuli WHERE. Predikát nasdílení změn filtruje data v databázovém dotazu, snižuje počet položek načtených z databáze a zlepšuje výkon dotazů. Ve výchozím nastavení rozhraní API datové sady Sparku automaticky odešle do databáze platné klauzule WHERE.
val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
Část Cassandra Filters
fyzického plánu obsahuje odsdílený filtr dolů.
RDD API
Čtení tabulky
val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)
Čtení konkrétních sloupců v tabulce
val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)
Zobrazení SQL
Vytvoření dočasného zobrazení z datového rámce
spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load.createOrReplaceTempView("books_vw")
Spouštění dotazů v zobrazení
select * from books_vw where book_pub_year > 1891
Další kroky
V následujících článcích najdete další články o práci se službou Azure Cosmos DB for Apache Cassandra ze Sparku:
Váš názor
https://aka.ms/ContentUserFeedback.
Připravujeme: V průběhu roku 2024 budeme postupně vyřazovat problémy z GitHub coby mechanismus zpětné vazby pro obsah a nahrazovat ho novým systémem zpětné vazby. Další informace naleznete v tématu:Odeslat a zobrazit názory pro