Consulta de datos de Cosmos DB con Spark

Completado

Después de agregar un servicio vinculado para la base de datos de Azure Cosmos DB habilitada para el almacén analítico, puede usarlo para consultar los datos mediante un grupo de Spark en el área de trabajo de Azure Synapse Analytics.

Carga de datos analíticos de Azure Cosmos DB en un dataframe

Para la exploración inicial o el análisis rápido de los datos de un servicio vinculado de Azure Cosmos DB, suele ser más fácil cargar datos de un contenedor en un dataframe mediante un lenguaje compatible con Spark, como PySpark (una implementación específica de Python para Spark) o Scala (un lenguaje basado en Java que se suele usar en Spark).

Por ejemplo, el código de PySpark siguiente se podría usar para cargar un dataframe denominado df a partir de los datos en el contenedor my-container conectado a través del servicio vinculado my_linked_service y mostrar las 10 primeras filas de datos:

 df = spark.read
     .format("cosmos.olap")\
     .option("spark.synapse.linkedService", "my_linked_service")\
     .option("spark.cosmos.container", "my-container")\
     .load()

display(df.limit(10))

Supongamos que el contenedor my-container se usa para almacenar elementos similares al ejemplo siguiente:

{
    "productID": 123,
    "productName": "Widget",
    "id": "7248f072-11c3-42b1-a368-...",
    "_rid": "mjMaAL...==",
    "_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
    "_etag": "\"54004b09-0000-2300-...\"",
    "_attachments": "attachments/",
    "_ts": 1655414791
}

La salida del código de PySpark sería similar a la tabla siguiente:

_rid _ts productID ProductName id _etag
mjMaAL...== 1655414791 123 Widget 7248f072-11c3-42b1-a368-... 54004b09-0000-2300-...
mjMaAL...== 1655414829 124 Wotsit dc33131c-65c7-421a-a0f7-... 5400ca09-0000-2300-...
mjMaAL...== 1655414835 125 Thingumy ce22351d-78c7-428a-a1h5-... 5400ca09-0000-2300-...
... ... ... ... ... ...

Los datos se cargan desde el almacén analítico del contenedor, no desde el almacén operativo. Esto garantiza que no haya ninguna sobrecarga de consultas en el almacén operativo. Los campos del almacén de datos analíticos incluyen los campos definidos por la aplicación (en este caso productID y productName) y los campos de metadatos que se crearon automáticamente.

Después de cargar el dataframe, puede usar sus métodos nativos para explorar los datos. Por ejemplo, el código siguiente crea un dataframe que contiene solo las columnas productID y productName, ordenadas por productName:

products_df = df.select("productID", "productName").orderBy("productName")

display(products_df.limit(10))

La salida de este código tendría un aspecto similar a esta tabla:

productID ProductName
125 Thingumy
123 Widget
124 Wotsit
... ...

Escritura de un dataframe en un contenedor de Cosmos DB

En la mayoría de los escenarios de HTAP, debe usar el servicio vinculado para leer datos en Spark desde el almacén analítico. Sin embargo, puede escribir el contenido de un dataframe en el contenedor, tal como en el ejemplo siguiente:

mydf.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "my_linked_service")\
    .option("spark.cosmos.container", "my-container")\
    .mode('append')\
    .save()

Nota

Escribir un dataframe en un contenedor actualiza el almacén operativo y puede tener un impacto en su rendimiento. A continuación, los cambios se sincronizan con el almacén analítico.

Uso de Spark SQL para consultar datos analíticos de Azure Cosmos DB

Spark SQL es una API de Spark que proporciona sintaxis de lenguaje SQL y semántica de base de datos relacional en un grupo de Spark. Puede usar Spark SQL para definir metadatos para tablas que se pueden consultar mediante SQL.

Por ejemplo, el código siguiente crea una tabla denominada Products en función del contenedor hipotético que se utilizó en los ejemplos anteriores:

%%sql

-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;

USE mydb;

-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
    spark.synapse.linkedService 'my_linked_service',
    spark.cosmos.container 'my-container'
);

-- Query the table
SELECT productID, productName
FROM products;

Sugerencia

La palabra clave %%sql al comienzo del código es un valor mágico que indica al grupo de Spark ejecutar el código como SQL en lugar del lenguaje predeterminado (que suele establecerse en PySpark).

Con este enfoque, puede crear una base de datos lógica en el grupo de Spark que, a continuación, puede usar para consultar los datos analíticos en Azure Cosmos DB para admitir cargas de trabajo de informes y análisis de datos sin afectar al almacén operativo de la cuenta de Azure Cosmos DB.