자습서: Spark를 사용하여 Azure Cosmos DB for NoSQL에 연결
적용 대상: NoSQL
이 자습서에서는 Azure Cosmos DB Spark 커넥터를 사용하여 Cosmos DB for NoSQL 계정에서 데이터를 읽거나 씁니다. 이 자습서에서는 Azure Databricks 및 Jupyter Notebook을 사용하여 Spark에서 API for NoSQL과 통합하는 방법을 설명합니다. 이 자습서에서는 Python과 Scala에 중점을 두지만 Spark에서 지원하는 모든 언어나 인터페이스를 사용할 수 있습니다.
이 자습서에서는 다음을 하는 방법을 알아볼 수 있습니다.
- Spark 및 Jupyter Notebook을 사용하여 API for NoSQL 계정에 연결합니다.
- 데이터베이스 및 컨테이너 리소스를 만듭니다.
- 컨테이너에 데이터를 수집합니다.
- 컨테이너의 데이터를 쿼리합니다.
- 컨테이너의 항목에 대한 일반적인 작업을 수행합니다.
필수 조건
- 기존 Azure Cosmos DB API for NoSQL 계정.
- 기존 Azure 구독이 있는 경우 새 계정을 만듭니다.
- Azure 구독이 없으신가요? 신용 카드 없이 Azure Cosmos DB를 무료로 사용해 볼 수 있습니다.
- 기존 Azure Databricks 작업 영역
Spark 및 Jupyter를 사용하여 연결
기존 Azure Databricks 작업 영역을 사용하여 Apache Spark 3.4.x를 사용하여 Azure Cosmos DB for NoSQL 계정에 연결할 준비가 된 컴퓨팅 클러스터를 만듭니다.
Azure Databricks 작업 영역을 엽니다.
작업 영역 인터페이스에서 새 클러스터를 만듭니다. 최소한 다음 설정을 사용하여 클러스터를 구성합니다.
버전 값 런타임 버전 13.3 LTS(Scala 2.12, Spark 3.4.1) 작업 영역 인터페이스를 사용하여 의 그룹 ID로 Maven Central에서 Maven
com.azure.cosmos.spark
패키지를 검색합니다. 접두사azure-cosmos-spark_3-4
이 붙은 아티팩트 ID를 사용하여 Spark 3.4용 패키지를 클러스터에 설치합니다.마지막으로 새 Notebook을 만듭니다.
팁
기본적으로 Notebook은 최근 만들어진 클러스터에 연결됩니다.
Notebook 내에서 NoSQL 계정 엔드포인트, 데이터베이스 이름 및 컨테이너 이름에 대한 OLTP(온라인 트랜잭션 처리) 구성 설정을 지정합니다.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
데이터베이스 및 컨테이너 만들기
카탈로그 API를 사용하여 데이터베이스 및 컨테이너와 같은 계정 리소스를 관리합니다. 그런 다음 OLTP를 사용하여 컨테이너 리소스 내의 데이터를 관리할 수 있습니다.
Spark를 사용하여 API for NoSQL 리소스를 관리하도록 카탈로그 API를 구성합니다.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
CREATE DATABASE IF NOT EXISTS
를 사용하여cosmicworks
라는 새 데이터베이스를 만듭니다.# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
CREATE TABLE IF NOT EXISTS
를 사용하여products
라는 새 컨테이너를 만듭니다. 파티션 키 경로를/category
로 설정하고 초당 최대1000
RU(요청 단위) 처리량을 사용하여 자동 크기 조정 처리량을 사용하도록 설정해야 합니다.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
계층적 파티션 키 구성을 사용하여
employees
라는 다른 컨테이너를 만듭니다. 파티션 키 경로 집합으로/organization
,/department
및/team
을 사용합니다. 특정 순서를 따릅니다. 또한 처리량을 수동 분량400
RU로 설정합니다.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
Notebook 셀을 실행하여 데이터베이스 및 컨테이너가 API for NoSQL 계정 내에서 생성되는지 확인합니다.
데이터 수집
샘플 데이터 세트를 만듭니다. 그런 다음 OLTP를 사용하여 해당 데이터를 API for NoSQL 컨테이너에 수집합니다.
샘플 데이터 세트를 만듭니다.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
spark.createDataFrame
및 이전에 저장된 OLTP 구성을 사용하여 대상 컨테이너에 샘플 데이터를 추가합니다.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
쿼리 데이터
OLTP 데이터를 데이터 프레임에 로드하여 데이터에 대한 일반적인 쿼리를 수행합니다. 다양한 구문을 사용하여 데이터를 필터링하거나 쿼리할 수 있습니다.
OLTP 데이터를 데이터 프레임 개체에 로드하려면
spark.read
를 사용합니다. 이 자습서의 앞부분에서 사용한 것과 동일한 구성을 사용합니다. 또한spark.cosmos.read.inferSchema.enabled
를true
로 설정하여 Spark 커넥터가 기존 항목을 샘플링하여 스키마를 유추할 수 있도록 합니다.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
printSchema
를 사용하여 데이터 프레임에 로드된 데이터의 스키마를 렌더링합니다.# Render schema df.printSchema()
// Render schema df.printSchema()
quantity
열이20
보다 작은 데이터 행을 렌더링합니다.where
및show
함수를 사용하여 이 쿼리를 수행합니다.# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
clearance
열이true
인 첫 번째 데이터 행을 렌더링합니다.filter
함수를 사용하여 이 쿼리를 수행합니다.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
필터 또는 잘림 없이 5개의 데이터 행을 렌더링합니다.
show
함수를 사용하여 렌더링되는 행의 모양과 수를 사용자 지정합니다.# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
다음 원시 NoSQL 쿼리 문자열을 사용하여 데이터를 쿼리합니다.
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
일반 작업 수행
Spark에서 API for NoSQL 데이터로 작업할 때 부분 업데이트를 수행하거나 데이터를 원시 JSON으로 작업할 수 있습니다.
항목의 부분 업데이트를 수행하려면 다음을 수행합니다.
기존
config
구성 변수를 복사하고 새 복사본의 속성을 수정합니다. 특히 쓰기 전략을ItemPatch
로 구성합니다. 그런 다음 대량 지원을 사용하지 않도록 설정합니다. 열과 매핑된 작업을 설정합니다. 마지막으로 기본 작업 유형을Set
로 설정합니다.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
이 패치 작업의 일부로서 대상으로 지정할 항목 파티션 키 및 고유 식별자에 대한 변수를 만듭니다.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
패치 개체 집합을 만들어 대상 항목을 지정하고 수정해야 하는 필드를 지정합니다.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
패치 개체 집합을 사용하여 데이터 프레임을 만듭니다. 패치 작업을 수행하려면
write
를 사용합니다.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
쿼리를 실행하여 패치 작업의 결과를 검토합니다. 이제 다른 변경 내용 없이 항목의 이름을
Yamba New Surfboard
로 지정해야 합니다.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
원시 JSON 데이터로 작업하려면 다음을 수행합니다.
기존
config
구성 변수를 복사하고 새 복사본의 속성을 수정합니다. 특히 대상 컨테이너를employees
로 변경합니다. 그런 다음 원시 JSON 데이터를 사용하도록contacts
열/필드를 구성합니다.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
컨테이너에 수집할 직원 집합을 만듭니다.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
데이터 프레임을 만들고
write
를 사용하여 직원 데이터를 수집합니다.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
show
를 사용하여 데이터 프레임에서 데이터를 렌더링합니다. 출력에서contacts
열이 원시 JSON인지 확인합니다.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
관련 콘텐츠
- Apache Spark
- Azure Cosmos DB Catalog API
- 구성 매개 변수 참조
- Azure Cosmos DB Spark 커넥터 샘플
- Spark 2.4에서 Spark 3으로 마이그레이션*
- 버전 호환성:
- 릴리스 정보:
- 다운로드 링크: