Azure Databricks에서 Azure Cosmos DB for MongoDB vCore에 연결
이 문서에서는 Azure Databricks에서 Azure Cosmos DB MongoDB vCore를 연결하는 방법을 설명합니다. Python 코드를 사용하여 읽기, 필터, SQL, 집계 파이프라인 및 테이블 쓰기와 같은 기본 DML(데이터 조작 언어) 작업을 안내합니다.
필수 조건
원하는 Spark 환경 Azure Databricks를 프로비전합니다.
연결에 대한 종속성 구성
다음은 Azure Databricks에서 Azure Cosmos DB for MongoDB vCore에 연결하는 데 필요한 종속성입니다.
MongoDB용 Spark 커넥터 Spark 커넥터는 Azure Cosmos DB for MongoDB vCore에 연결하는 데 사용됩니다. Spark 및 Spark 환경의 Scala 버전과 호환되는 Maven central에 위치한 커넥터의 버전을 식별하고 사용합니다. Spark 3.2.1 이상과 maven 좌표
org.mongodb.spark:mongo-spark-connector_2.12:3.0.1
에서 사용할 수 있는 Spark 커넥터를 지원하는 환경이 권장됩니다.Azure Cosmos DB for MongoDB 연결 문자열: Azure Cosmos DB for MongoDB vCore 연결 문자열, 사용자 이름 및 암호입니다.
Azure Databricks 클러스터 프로비전
지침에 따라 Azure Databricks 클러스터를 프로비저닝할 수 있습니다. Spark 3.0을 지원하는 Databricks Runtime 버전 7.6을 선택하는 것이 좋습니다.
종속성 추가
클러스터에 MongoDB Connector for Spark 라이브러리를 추가하여 기본 MongoDB 및 Azure Cosmos DB for MongoDB 엔드포인트에 모두 연결합니다. 클러스터에서 라이브러리>새로 설치>Maven을 선택한 다음, org.mongodb.spark:mongo-spark-connector_2.12:3.0.1
Maven 좌표를 추가합니다.
설치를 선택한 다음 설치가 완료되면 클러스터를 다시 시작합니다.
참고 항목
MongoDB Connector for Spark 라이브러리가 설치된 후 Databricks 클러스터를 다시 시작해야 합니다.
그런 다음, 마이그레이션을 위해 Scala 또는 Python Notebook을 만들 수 있습니다.
Azure Cosmos DB for MongoDB vCore에 연결하기 위한 Python Notebook 만들기
Databricks에서 Python Notebook을 만듭니다. 다음 코드를 실행하기 전에 변수에 올바른 값을 입력해야 합니다.
Azure Cosmos DB for MongoDB 연결 문자열을 사용하여 Spark 구성 업데이트
- Azure Portal의 Azure Cosmos DB MongoDB vCore 리소스의 설정 - >연결 문자열 아래에 있는 연결 문자열을 확인합니다. "mongodb+srv://<user>:<password>@<database_name>.mongocluster.cosmos.azure.com" 형식입니다.
- 클러스터 구성의 Databricks로 돌아가서 고급 옵션(페이지 하단) 아래에
spark.mongodb.output.uri
및spark.mongodb.input.uri
변수 모두에 대한 연결 문자열을 붙여넣습니다. 사용자 이름과 암호 필드를 적절하게 채웁니다. 이렇게 하면 클러스터에서 실행되는 모든 통합 문서가 이 구성을 사용합니다. - 또는
spark.read.format("mongo").option("spark.mongodb.input.uri", connectionString).load()
와 같은 API를 호출할 때option
을 명시적으로 설정할 수 있습니다. 클러스터에서 변수를 구성하는 경우 옵션을 설정할 필요가 없습니다.
connectionString_vcore="mongodb+srv://<user>:<password>@<database_name>.mongocluster.cosmos.azure.com/?tls=true&authMechanism=SCRAM-SHA-256&retrywrites=false&maxIdleTimeMS=120000"
database="<database_name>"
collection="<collection_name>"
데이터 샘플 집합
이 랩에서는 CSV 'Citibike2019' 데이터 세트를 사용합니다. 가져오기 가능: CitiBike 여행 기록 2019. 이를 "CitiBikeDB"라는 데이터베이스와 "CitiBike2019" 컬렉션에 로드했습니다. 로드된 데이터를 가리키도록 변수 데이터베이스와 컬렉션을 설정하고 이 예에서는 변수를 사용하고 있습니다.
database="CitiBikeDB"
collection="CitiBike2019"
Azure Cosmos DB for MongoDB vCore에서 데이터 읽기
일반적인 구문은 다음과 같습니다.
df_vcore = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString_vcore).option("collection",collection).load()
다음과 같이 로드된 데이터 프레임의 유효성을 검사할 수 있습니다.
df_vcore.printSchema()
display(df_vcore)
한 가지 예를 살펴보겠습니다.
df_vcore = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString_vcore).option("collection",collection).load()
df_vcore.printSchema()
display(df_vcore)
출력
스키마
DataFrame
Azure Cosmos DB for MongoDB vCore에서 데이터 필터링
일반적인 구문은 다음과 같습니다.
df_v = df_vcore.filter(df_vcore[column number/column name] == [filter condition])
display(df_v)
한 가지 예를 살펴보겠습니다.
df_v = df_vcore.filter(df_vcore[2] == 1970)
display(df_v)
출력:
뷰 또는 임시 테이블을 만들고 이에 대해 SQL 쿼리를 실행합니다.
일반적인 구문은 다음과 같습니다.
df_[dataframename].createOrReplaceTempView("[View Name]")
spark.sql("SELECT * FROM [View Name]")
한 가지 예를 살펴보겠습니다.
df_vcore.createOrReplaceTempView("T_VCORE")
df_v = spark.sql(" SELECT * FROM T_VCORE WHERE birth_year == 1970 and gender == 2 ")
display(df_v)
출력:
Azure Cosmos DB for MongoDB vCore에 데이터 쓰기
일반적인 구문은 다음과 같습니다.
df.write.format("mongo").option("spark.mongodb.output.uri", connectionString).option("database",database).option("collection","<collection_name>").mode("append").save()
한 가지 예를 살펴보겠습니다.
df_vcore.write.format("mongo").option("spark.mongodb.output.uri", connectionString_vcore).option("database",database).option("collection","CitiBike2019").mode("append").save()
이 명령은 컬렉션에 직접 쓰기 때문에 출력이 없습니다. 읽기 명령을 사용하여 레코드가 업데이트되었는지 교차 확인할 수 있습니다.
집계 파이프라인을 실행하는 Azure Cosmos DB for MongoDB vCore 컬렉션에서 데이터 읽기
[!참고] 집계 파이프라인은 Azure Cosmos DB for MongoDB 내에서 데이터를 전처리하고 변환할 수 있는 강력한 기능입니다. 실시간 분석, 대시보드, 롤업을 통한 보고서 생성, '서버 쪽' 데이터 사후 처리를 통한 합계 및 평균에 매우 적합합니다. (참고: 관련 책 전체가 있습니다).
Azure Cosmos DB for MongoDB는 필요한 데이터만 추출, 필터링 및 처리할 수 있는 풍부한 보조/복합 인덱스도 지원합니다.
예를 들어, 전체 데이터 집합을 먼저 로드할 필요 없이 데이터베이스 내에서 특정 지역에 있는 모든 고객을 분석하여 데이터 이동을 최소화하고 대기 시간을 줄입니다.
다음은 집계 함수를 사용하는 예입니다.
pipeline = "[{ $group : { _id : '$birth_year', totaldocs : { $count : 1 }, totalduration: {$sum: '$tripduration'}} }]"
df_vcore = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString_vcore).option("collection",collection).option("pipeline", pipeline).load()
display(df_vcore)
출력
관련 콘텐츠
다음 문서에서는 Azure Cosmos DB for MongoDB vCore에서 집계 파이프라인을 사용하는 방법을 보여 줍니다.
- Maven Central은 Spark 커넥터를 찾을 수 있는 곳입니다.
- 집계 파이프라인