Adatok olvasása Az Apache Cassandra-táblákhoz készült Azure Cosmos DB-ből a Spark használatával
A KÖVETKEZŐKRE VONATKOZIK: Cassandra
Ez a cikk azt ismerteti, hogyan olvashatja be az Azure Cosmos DB for Apache Cassandra szolgáltatásban tárolt adatokat a Sparkból.
Api a Cassandra konfigurációhoz
Állítsa be az alábbi Spark-konfigurációt a jegyzetfüzetfürtben. Ez egy egyszeri tevékenység.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Megjegyzés
Ha Spark 3.x-et használ, nem kell telepítenie az Azure Cosmos DB segítőt és kapcsolat-előállítót. A Spark 3-összekötő helyett connections_per_executor_max
is érdemes használnia remoteConnectionsPerExecutor
(lásd fent).
Figyelmeztetés
A cikkben bemutatott Spark 3-mintákat a Spark 3.2.1-es verziójával és a megfelelő Cassandra Spark Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 verzióval teszteltük. Előfordulhat, hogy a Spark és/vagy a Cassandra-összekötő későbbi verziói nem a várt módon működnek.
Dataframe API
Táblázat olvasása a session.read.format paranccsal
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
val readBooksDF = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
readBooksDF.explain
readBooksDF.show
Táblázat olvasása a spark.read.cassandraFormat használatával
val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()
Adott oszlopok olvasása a táblázatban
val readBooksDF = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_author", "book_pub_year")
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
Szűrők alkalmazása
A predikátumokat leküldheti az adatbázisba, hogy jobban optimalizált Spark-lekérdezéseket lehessen használni. A predikátum egy olyan lekérdezés feltétele, amely igaz vagy hamis értéket ad vissza, általában a WHERE záradékban. A predikátum leküldése szűri az adatbázis-lekérdezés adatait, csökkenti az adatbázisból lekért bejegyzések számát, és javítja a lekérdezési teljesítményt. Alapértelmezés szerint a Spark Dataset API automatikusan leküldi az érvényes WHERE záradékokat az adatbázisba.
val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
A Cassandra Filters
fizikai terv szakasza tartalmazza a leküldéses leküldéses szűrőt.
RDD API
Táblázat olvasása
val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)
Adott oszlopok olvasása a táblázatban
val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)
SQL-nézetek
Ideiglenes nézet létrehozása adatkeretből
spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load.createOrReplaceTempView("books_vw")
Lekérdezések futtatása a nézeten
select * from books_vw where book_pub_year > 1891
Következő lépések
Az alábbiakban további cikkek találhatók az Apache Cassandra Azure Cosmos DB Sparkból való használatával kapcsolatban: