Spark를 사용하여 Cosmos DB 데이터 쿼리

완료됨

분석 저장소 지원 Azure Cosmos DB 데이터베이스에 대한 연결된 서비스를 추가한 후에는 이 서비스를 사용하여 Azure Synapse Analytics 작업 영역에서 Spark 풀을 사용해 데이터를 쿼리할 수 있습니다.

데이터 프레임에 Azure Cosmos DB 분석 데이터 로드

Azure Cosmos DB 연결된 서비스에서 데이터를 초기 탐색하거나 빠르게 분석하는 경우 PySpark(Python의 Spark 관련 구현) 또는 Scala(Spark에서 대개 사용되는 Java 기반 언어)와 같은 Spark 지원 언어를 사용하여 컨테이너에서 데이터 프레임으로 데이터를 로드하는 것이 가장 쉬운 경우가 많습니다.

예를 들어 다음 PySpark 코드를 사용하여 my_linked_service 연결된 서비스를 사용하여 연결된 my-container의 데이터에서 df라는 데이터 프레임을 로드하고 처음 10개의 데이터 행을 표시할 수 있습니다.

 df = spark.read
     .format("cosmos.olap")\
     .option("spark.synapse.linkedService", "my_linked_service")\
     .option("spark.cosmos.container", "my-container")\
     .load()

display(df.limit(10))

my-container 컨테이너가 다음 예제와 유사한 항목을 저장하는 데 사용된다고 가정하겠습니다.

{
    "productID": 123,
    "productName": "Widget",
    "id": "7248f072-11c3-42b1-a368-...",
    "_rid": "mjMaAL...==",
    "_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
    "_etag": "\"54004b09-0000-2300-...\"",
    "_attachments": "attachments/",
    "_ts": 1655414791
}

PySpark 코드의 출력은 다음 표와 유사합니다.

_rid _ts productID productName id _etag
mjMaAL...== 1655414791 123 위젯 7248f072-11c3-42b1-a368-... 54004b09-0000-2300-...
mjMaAL...== 1655414829 124 Wotsit dc33131c-65c7-421a-a0f7-... 5400ca09-0000-2300-...
mjMaAL...== 1655414835 125 Thingumy ce22351d-78c7-428a-a1h5-... 5400ca09-0000-2300-...
... ... ... ... ... ...

데이터는 운영 저장소에 쿼리 오버헤드가 없도록 운영 저장소가 아닌 컨테이너의 분석 저장소에서 로드됩니다. 분석 데이터 저장소의 필드에는 애플리케이션 정의 필드(이 경우 productIDproductName)와 자동으로 생성된 메타데이터 필드가 포함됩니다.

데이터 프레임을 로드한 후 네이티브 메서드를 사용하여 데이터를 탐색할 수 있습니다. 예를 들어 다음 코드는 productName에 따라 정렬된 productIDproductName 열만 포함하는 새 데이터 프레임을 만듭니다.

products_df = df.select("productID", "productName").orderBy("productName")

display(products_df.limit(10))

이 코드의 출력은 다음 표와 비슷합니다.

productID productName
125 Thingumy
123 위젯
124 Wotsit
... ...

Cosmos DB 컨테이너에 데이터 프레임 작성

대부분의 HTAP 시나리오에서는 연결된 서비스를 사용하여 분석 저장소에서 Spark로 데이터를 읽어야 합니다. 그러나 다음 예제와 같이 데이터 프레임의 내용을 컨테이너에 쓸 수 있습니다.

mydf.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "my_linked_service")\
    .option("spark.cosmos.container", "my-container")\
    .mode('append')\
    .save()

참고

컨테이너에 데이터 프레임을 쓰면 운영 저장소가 업데이트되고 성능에 영향을 미칠 수 있습니다. 그런 다음 변경 내용이 분석 저장소에 동기화됩니다.

Spark SQL을 사용하여 Azure Cosmos DB 분석 데이터 쿼리

Spark SQL은 Spark 풀에서 SQL 언어 구문 및 관계형 데이터베이스 의미 체계를 제공하는 Spark API입니다. Spark SQL을 사용하여 SQL로 쿼리할 수 있는 테이블에 대한 메타데이터를 정의할 수 있습니다.

예를 들어 다음 코드는 이전 예제에서 사용된 가상의 컨테이너를 기반으로 Products라는 테이블을 만듭니다.

%%sql

-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;

USE mydb;

-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
    spark.synapse.linkedService 'my_linked_service',
    spark.cosmos.container 'my-container'
);

-- Query the table
SELECT productID, productName
FROM products;

코드의 시작 부분에 있는 %%sql 키워드는 기본 언어(일반적으로 PySpark로 설정됨)가 아닌 SQL로 코드를 실행하도록 Spark 풀에 지시하는 매직입니다.

이 접근 방식을 사용하면 Spark 풀에서 논리 데이터베이스를 만든 후에 Azure Cosmos DB의 분석 데이터를 쿼리하여 Azure Cosmos DB 계정의 운영 저장소에 영향을 주지 않고 데이터 분석 및 보고 워크로드를 지원하는 데 이 데이터베이스를 사용할 수 있습니다.