이 문서에서는 Azure Databricks에서 Azure DocumentDB에 연결하여 Python 및 Spark를 사용하여 일반적인 데이터 작업을 수행하는 방법을 보여 줍니다. 필요한 종속성을 구성하고, 연결을 설정하고, MongoDB Spark 커넥터를 사용하여 읽기, 쓰기, 필터링 및 집계 작업을 실행합니다.
필수 조건
Azure 구독
- Azure 구독이 없는 경우 체험 계정 만들기
기존 Azure DocumentDB 클러스터
- 클러스터가 없는 경우 새 클러스터를 만듭니다.
Azure Databricks의 Spark 환경
- Spark 3.2.1 이상과 호환되는 MongoDB Spark 커넥터(Maven 좌표
org.mongodb.spark:mongo-spark-connector_2.12:3.0.1에서 사용 가능)
- Spark 3.2.1 이상과 호환되는 MongoDB Spark 커넥터(Maven 좌표
Azure Databricks 작업 영역 구성
Azure DocumentDB에 연결하도록 Azure Databricks 작업 영역을 구성합니다. MongoDB Connector for Spark 라이브러리를 컴퓨팅에 추가하여 Azure DocumentDB에 연결할 수 있도록 합니다.
Azure Databricks 작업 영역으로 이동합니다.
기본 컴퓨트를 구성하거나 새로운 컴퓨트 리소스를 만들어 Notebook을 실행하십시오.
Spark 3.0 이상을 지원하는 Databricks 런타임을 선택합니다.
컴퓨팅 리소스에서 라이브러리>새로 설치>Maven을 선택합니다.
Maven 좌표를 추가합니다.
org.mongodb.spark:mongo-spark-connector_2.12:3.0.1설치를 선택합니다.
설치가 완료되면 컴퓨팅을 다시 시작합니다.
연결 설정 구성
모든 읽기 및 쓰기 작업에 Azure DocumentDB 연결 문자열을 사용하도록 Spark를 구성합니다.
Azure Portal에서 Azure DocumentDB 리소스로 이동합니다.
설정>연결 문자열의 연결 문자열을 복사합니다. 다음과 같은 형식이 있습니다.
mongodb+srv://<user>:<password>@<database_name>.mongocluster.cosmos.azure.comAzure Databricks에서 컴퓨팅 구성으로 이동하고 페이지 아래쪽에 있는 고급 옵션을 선택합니다.
다음 Spark 구성 변수를 추가합니다.
-
spark.mongodb.output.uri- 연결 문자열 붙여넣기 -
spark.mongodb.input.uri- 연결 문자열 붙여넣기
-
구성을 저장합니다.
또는 데이터를 읽거나 쓸 때 .option() 메서드를 사용하여 코드에서 직접 연결 문자열을 설정할 수 있습니다.
Python Notebook 만들기
새 Python Notebook을 만들어 데이터 작업을 실행합니다.
Azure Databricks 작업 영역에서 새 Python Notebook을 만듭니다.
Notebook의 시작 부분에서 연결 변수를 정의합니다.
connectionString = "mongodb+srv://<user>:<password>@<database_name>.mongocluster.cosmos.azure.com/?tls=true&authMechanism=SCRAM-SHA-256&retrywrites=false&maxIdleTimeMS=120000" database = "<database_name>" collection = "<collection_name>"자리 표시자 값을 실제 데이터베이스 이름 및 컬렉션 이름으로 바꿉 있습니다.
컬렉션에서 데이터 읽기
Azure DocumentDB 컬렉션에서 Spark DataFrame으로 데이터를 읽습니다.
다음 코드를 사용하여 컬렉션에서 데이터를 로드합니다.
df = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString).option("collection", collection).load()로드된 데이터를 확인합니다.
df.printSchema() display(df)결과를 관찰합니다. 이 코드는 지정된 컬렉션의 모든 문서를 포함하는 DataFrame을 만들고 스키마와 데이터를 표시합니다.
데이터 필터링
필터를 적용하여 컬렉션에서 데이터의 특정 하위 집합을 검색합니다.
DataFrame
filter()메서드를 사용하여 조건을 적용합니다.df_filtered = df.filter(df["birth_year"] == 1970) display(df_filtered)열 인덱스 번호를 사용하십시오.
df_filtered = df.filter(df[2] == 1970) display(df_filtered)결과를 관찰합니다. 이 방법은 필터 조건과 일치하는 문서만 반환합니다.
SQL을 사용하여 데이터 쿼리
임시 뷰를 만들고 친숙한 SQL 기반 분석을 위해 데이터에 대해 SQL 쿼리를 실행합니다.
DataFrame에서 임시 보기를 만듭니다.
df.createOrReplaceTempView("T")뷰에 대해 SQL 쿼리를 실행합니다.
df_result = spark.sql("SELECT * FROM T WHERE birth_year == 1970 AND gender == 2") display(df_result)결과를 관찰합니다. 이 방법을 사용하면 복잡한 쿼리 및 조인에 표준 SQL 구문을 사용할 수 있습니다.
컬렉션에 데이터 쓰기
Azure DocumentDB 컬렉션에 DataFrames를 다시 작성하여 새 데이터 또는 수정된 데이터를 저장합니다.
다음 코드를 사용하여 컬렉션에 데이터를 씁니다.
df.write.format("mongo").option("spark.mongodb.output.uri", connectionString).option("database", database).option("collection", "CitiBike2019").mode("append").save()쓰기 작업은 출력 없이 완료됩니다. 컬렉션에서 데이터를 읽어 쓰기 작업이 성공적으로 완료되었는지 확인합니다.
df_verify = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString).option("collection", "CitiBike2019").load() display(df_verify)팁 (조언)
또는 요구 사항에 따라 다른 쓰기 모드
appendoverwriteignore를 사용합니다.
집계 파이프라인 실행
집계 파이프라인을 실행하여 Azure DocumentDB 내에서 직접 서버 쪽 데이터 처리 및 분석을 수행합니다. 집계 파이프라인을 사용하면 데이터베이스에서 데이터를 이동하지 않고도 강력한 데이터 변환, 그룹화 및 계산을 수행할 수 있습니다. 실시간 분석, 대시보드 및 보고서 생성에 적합합니다.
집계 파이프라인을 JSON 문자열로 정의합니다.
pipeline = "[{ $group : { _id : '$birth_year', totaldocs : { $count : 1 }, totalduration: {$sum: '$tripduration'}} }]"파이프라인을 실행하고 결과를 로드합니다.
df_aggregated = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString).option("collection", collection).option("pipeline", pipeline).load() display(df_aggregated)
관련 콘텐츠
- Maven Central - MongoDB Spark 커넥터 버전
- 실제 MongoDB 집계 - 집계 파이프라인 가이드
- 방화벽 설정 구성