Leer en inglés

Compartir a través de


Lectura de datos de tablas de Azure Cosmos DB for Apache Cassandra con Spark

SE APLICA A: Cassandra

En este artículo se describe cómo leer los datos almacenados en Azure Cosmos DB for Apache Cassandra desde Spark.

Configuración de la API para Cassandra

Establezca la configuración de Spark siguiente en el clúster del cuaderno. Es una actividad única.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

Nota

Si usa Spark 3.x, no es necesario instalar el asistente de Azure Cosmos DB ni el generador de conexiones. También debe usar remoteConnectionsPerExecutor en lugar de connections_per_executor_max para el conector de Spark 3 (consulte más arriba).

Advertencia

Los ejemplos de Spark 3 que se muestran en este artículo se han probado con la versión 3.2.1 de Spark y el conector de Cassandra Spark com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 correspondiente. Es posible que las versiones posteriores de Spark o del conector de Cassandra no funcionen según lo previsto.

Dataframe API

Lectura de la tabla con el comando session.read.format

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

val readBooksDF = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load

readBooksDF.explain
readBooksDF.show

Lectura de la tabla con spark.read.cassandraFormat

val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()

Lectura de columnas específicas de la tabla

val readBooksDF = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_author", "book_pub_year")

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

Aplicación de filtros

Puede insertar predicados en la base de datos para permitir consultas Spark mejor optimizadas. Un predicado es una condición de una consulta que devuelve true o false, y que normalmente se encuentra en la cláusula WHERE. Un predicado inserta filtros en los datos de la consulta de base de datos, lo que reduce el número de entradas recuperadas de la base de datos y mejora el rendimiento de las consultas. De forma predeterminada, la API Dataset de Spark insertará automáticamente cláusulas WHERE válidas en la base de datos.

val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

La sección Cassandra Filters del plan físico incluye el filtro insertado.

particiones

RDD API

Lectura de tabla

val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)

Lectura de columnas específicas de la tabla

val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)

Vistas SQL

Creación de una vista temporal desde un dataframe

spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load.createOrReplaceTempView("books_vw")

Ejecución de consultas en la vista

select * from books_vw where book_pub_year > 1891

Pasos siguientes

Los siguientes son artículos adicionales sobre cómo trabajar con Azure Cosmos DB for Apache Cassandra desde Spark: