Sdílet prostřednictvím


Agregace operací s tabulkami Apache Cassandra ve službě Azure Cosmos DB ze Sparku

PLATÍ PRO: Cassandra

Tento článek popisuje základní agregační operace s tabulkami Azure Cosmos DB for Apache Cassandra ze Sparku.

Poznámka:

Filtrování na straně serveru a agregace na straně serveru se v současné době ve službě Azure Cosmos DB pro Apache Cassandra nepodporuje.

Konfigurace rozhraní API pro Cassandra

Nastavte v clusteru poznámkových bloků následující konfiguraci Sparku. Je to jednorázová aktivita.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
 spark.cassandra.connection.port  10350
 spark.cassandra.connection.ssl.enabled  true
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000
 spark.cassandra.concurrent.reads  512
 spark.cassandra.output.batch.grouping.buffer.size  1000
 spark.cassandra.connection.keep_alive_ms  600000000

Poznámka:

Pokud používáte Spark 3.x, nemusíte instalovat pomocné rutiny a objekt pro vytváření připojení služby Azure Cosmos DB. Místo konektoru Spark 3 byste také měli použít remoteConnectionsPerExecutorconnections_per_executor_max (viz výše).

Upozorňující

Ukázky Sparku 3 uvedené v tomto článku byly testovány se Sparkem verze 3.2.1 a odpovídajícím konektorem Cassandra Spark com.datastax.spark :spark-cassandra-connector-assembly_2.12:3.2.0. Novější verze Sparku nebo konektoru Cassandra nemusí fungovat podle očekávání.

Ukázkový generátor dat

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// Generate a simple dataset containing five values
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")

booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

Operace count

RDD API

sc.cassandraTable("books_ks", "books").count

Výstup:

count: Long = 5

Rozhraní API datového rámce

Počet do datových rámců se v současné době nepodporuje. Následující ukázka ukazuje, jak po zachování datového rámce do paměti spustit počet datových rámců jako alternativní řešení.

Zvolte možnost úložiště z následujících dostupných možností, abyste se vyhnuli problémům s nedostatkem paměti:

  • MEMORY_ONLY: Jedná se o výchozí možnost úložiště. Ukládá RDD jako deserializované objekty Java v prostředí JVM. Pokud se sada RDD nevejde do paměti, některé oddíly se nebudou ukládat do mezipaměti a budou se průběžně přepočítat při každém jejich potřebě.

  • MEMORY_AND_DISK: Ukládá RDD jako deserializované objekty Java v prostředí JVM. Pokud se sada RDD nevejde do paměti, uložte oddíly, které se nevejdou na disk, a kdykoli je to potřeba, přečtěte si je z umístění, které jsou uložené.

  • MEMORY_ONLY_SER (Java/Scala): Ukládá RDD jako serializované objekty Java – pole 1 bajtů na oddíl. Tato možnost je prostorově efektivní ve srovnání s deserializovanými objekty, zejména při použití rychlého serializátoru, ale náročnější na čtení procesoru.

  • MEMORY_AND_DISK_SER (Java/Scala): Tato možnost úložiště je jako MEMORY_ONLY_SER, jediný rozdíl spočívá v tom, že rozdělí oddíly, které se nevejdou do paměti disku, místo aby je překompiluje, když jsou potřeba.

  • DISK_ONLY: Ukládá oddíly RDD pouze na disk.

  • MEMORY_ONLY_2, MEMORY_AND_DISK_2...: Stejné jako výše uvedené úrovně, ale replikuje každý oddíl na dva uzly clusteru.

  • OFF_HEAP (experimentální): Podobá se MEMORY_ONLY_SER, ale ukládá data do paměti mimo haldu a vyžaduje, aby byla předem povolená paměť mimo haldu.

//Workaround
import org.apache.spark.storage.StorageLevel

//Read from source
val readBooksDF = spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()

//Explain plan
readBooksDF.explain

//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)

//Subsequent execution against this DF hits the cache
readBooksDF.count

//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")

SQL

%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;

Průměrná operace

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean

Výstup:

res24: Double = 16.016000175476073

Rozhraní API datového rámce

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(avg("book_price"))
  .show

Výstup:

+------------------+
| avg(book_price) |
| +------------------+ |
| 16.016000175476073 |
| +------------------+ |

SQL

select avg(book_price) from books_vw;

Výstup:

16.016000175476073

Minimální operace

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min

Výstup:

res31: Float = 11.33

Rozhraní API datového rámce

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_id","book_price")
  .agg(min("book_price"))
  .show

Výstup:

+---------------+
| min(book_price) |
| +---------------+ |
| 11.33 |
| +---------------+ |

SQL

%sql
select avg(book_price) from books_vw;

Výstup:

11.33

Max. operace

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max

Rozhraní API datového rámce

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(max("book_price"))
  .show

Výstup:

+---------------+
| max(book_price) |
| +---------------+ |
| 22.45 |
| +---------------+ |

SQL

%sql
select max(book_price) from books_vw;

Výstup:

22.45

Operace součet

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum

Výstup:

res46: Double = 80.08000087738037

Rozhraní API datového rámce

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(sum("book_price"))
  .show

Výstup:

+-----------------+
| sum(book_price) |
| +-----------------+ |
| 80.08000087738037 |
| +-----------------+ |

SQL

select sum(book_price) from books_vw;

Výstup:

80.08000087738037

Nejlepší nebo srovnatelná operace

RDD API

val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.

Výstup:

(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1

Rozhraní API datového rámce

import org.apache.spark.sql.functions._

val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_price")
  .orderBy(desc("book_price"))
  .limit(3)

//Explain plan
readBooksDF.explain

//Top
readBooksDF.show

Výstup:

== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
| book_name | book_price |
| +--------------------+----------+ |
| A sign of four | 22.45 |
| The adventures of... | 19.83 |
| The memoirs of Sh... | 14.22 |
| +--------------------+----------+ |

import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]

SQL

select book_name,book_price from books_vw order by book_price desc limit 3;

Další krok