Eventos
Compilación de Intelligent Apps
17 mar, 21 - 21 mar, 10
Únase a la serie de reuniones para crear soluciones de inteligencia artificial escalables basadas en casos de uso reales con compañeros desarrolladores y expertos.
Regístrese ahoraEste explorador ya no se admite.
Actualice a Microsoft Edge para aprovechar las características y actualizaciones de seguridad más recientes, y disponer de soporte técnico.
SE APLICA A: Cassandra
En este artículo se explica cómo acceder a Azure Cosmos DB for Apache Cassandra desde Spark en YARN con HDInsight-Spark desde spark-shell
. HDInsight es un PaaS de Hortonworks Hadoop de Microsoft en Azure. Usa el almacenamiento de objetos para HDFS y se suministra con varios tipos, incluido Spark. Aunque en este artículo se hace referencia a HDInsight-Spark, se aplica a todas las distribuciones de Hadoop.
Antes de empezar, revise los conceptos básicos de la conexión a Azure Cosmos DB for Apache Cassandra.
Los siguientes requisitos previos son necesarios:
Aprovisionamiento de Azure Cosmos DB for Apache Cassandra. Consulte Creación de una cuenta de base de datos.
Aprovisionamiento de un clúster de HDInsight-Spark. Consulte Creación de un clúster de Apache Spark en Azure HDInsight mediante una plantilla de ARM.
Configuración de la API para Cassandra en Spark2. El conector de Spark para Cassandra requiere que los detalles de la conexión de Cassandra se inicialicen como parte del contexto de Spark. Al iniciar una instancia de Jupyter Notebook, la sesión y el contexto ya se inicializan. Continúe y reinicialice el contexto de Spark, a menos que se complete con cada configuración establecida como parte del inicio predeterminado de una instancia de Jupyter Notebook por parte de HDInsight. Una solución alternativa es agregar los detalles de la instancia de Cassandra directamente a la configuración del servicio Ambari, Spark2. Este enfoque es una actividad que se realiza una sola vez por clúster y que requiere que se reinicie el servicio Spark2.
Vaya al servicio Ambari, Spark2, y seleccione las configuraciones.
Luego, vaya a spark2-defaults personalizado y agregue una nueva propiedad con lo siguiente, y reinicie el servicio Spark2:
spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
spark.cassandra.connection.port=10350<br>
spark.cassandra.connection.ssl.enabled=true<br>
spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
Puede usar cqlsh
para la validación. Para más información, consulte Conexión a Azure Cosmos DB for Apache Cassandra desde Spark.
El shell de Spark se usa para la prueba y exploración.
Inicie spark-shell
con las dependencias de Maven necesarias, compatibles con la versión de Spark de su clúster.
spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
Ejecute algunas operaciones DDL y DML
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra
// Specify connection factory for Cassandra
spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
// Parallelism and throughput configs
spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
spark.conf.set("spark.cassandra.concurrent.reads", "512")
spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
Ejecute operaciones CRUD
//1) Create table if it does not exist
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
//2) Delete data from potential prior runs
cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
//3) Generate a few rows
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
//4) Persist
booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
//5) Read the data in the table
spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
HDInsight-Spark incluye los servicios Zeppelin y Jupyter Notebook. Ambos son entornos de cuadernos basados en web que admiten Scala y Python. Los cuadernos son excelentes para la colaboración y el análisis exploratorio interactivo, pero no están pensados para procesos operativos ni de producción.
Los siguientes cuadernos de Jupyter Notebook se pueden cargar en un clúster de HDInsight Spark y proporcionan ejemplos para trabajar con Azure Cosmos DB for Apache Cassandra. Asegúrese de consultar el primer cuaderno 1.0-ReadMe.ipynb
para revisar la configuración del servicio de Spark para conectarse a Azure Cosmos DB for Apache Cassandra.
Descargue los cuadernos de azure-cosmos-db-cassandra-api-spark-notebooks-jupyter en su equipo.
Al iniciar Jupyter, vaya a Scala. Cree un directorio y, después, cargue los cuadernos en el directorio. El botón Cargar se encuentra en la parte superior derecha.
Recorra los cuadernos, y cada celda de ellos, de manera secuencial. Seleccione el botón Ejecutar de la parte superior de cada cuaderno para ejecutar todas las celdas, o bien Mayús+Entrar para ejecutar cada una de ellas.
En los procesos automatizados en producción, los programas de Spark se envían al clúster a través de spark-submit.
Eventos
Compilación de Intelligent Apps
17 mar, 21 - 21 mar, 10
Únase a la serie de reuniones para crear soluciones de inteligencia artificial escalables basadas en casos de uso reales con compañeros desarrolladores y expertos.
Regístrese ahoraCursos
Módulo
Uso de Apache Spark en Microsoft Fabric - Training
Apache Spark es una de las tecnologías principales para el análisis de datos a gran escala. Microsoft Fabric admite clústeres de Spark, lo que permite analizar y procesar datos a gran escala.
Certificación
Microsoft Certified: Azure Cosmos DB Developer Specialty - Certifications
Escribe consultas eficaces, crea directivas de indexación, administra y aprovisiona recursos en la API de SQL y el SDK con Microsoft Azure Cosmos DB.