Databricks는 수천 개의 파일을 포함하는 데이터 원본에 대한 증분 및 대량 데이터 로드에 COPY INTO 명령을 사용하는 것이 좋습니다.
이 자습서에서는 COPY INTO 명령을 사용하여 Unity 카탈로그 볼륨의 JSON 데이터를 Azure Databricks 작업 영역의 델타 테이블로 로드합니다.
Wanderbricks 샘플 데이터 세트를 데이터 원본으로 사용합니다. 고급 인제스천 사용 사례에 대해서는 자동 로더란 무엇입니까?을 참조하세요.
요구 사항
- 컴퓨팅 리소스에 대한 액세스. Compute를 참조하세요.
- 카탈로그에 스키마 및 볼륨을 만들 수 있는 권한이 있는 Unity 카탈로그 사용 작업 영역입니다. Unity 카탈로그를 사용하여 클라우드 개체 스토리지에 연결을 참조하세요.
1단계: 환경 구성
이 자습서의 코드는 Unity 카탈로그 볼륨을 사용하여 JSON 원본 파일을 저장합니다. 사용 권한이 있는 카탈로그로 <catalog>을(를) CREATE SCHEMA 및 CREATE VOLUME 대체하십시오. 코드를 실행할 수 없는 경우 작업 영역 관리자에게 문의하세요.
Notebook을 만들고 컴퓨팅 리소스에 연결합니다. 그런 다음, 다음 코드를 실행하여 이 자습서에 대한 스키마 및 볼륨을 설정합니다.
Python
# Set parameters and reset demo environment
catalog = "<catalog>"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"
spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")
SQL
-- Reset demo environment
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;
2단계: 샘플 데이터를 볼륨에 JSON으로 쓰기
이 COPY INTO 명령은 파일 기반 원본에서 데이터를 로드합니다.
Wanderbricksbookings 샘플 테이블에서 데이터를 읽어와 레코드 묶음을 JSON 파일로 변환하여 볼륨에 저장함으로써 외부 시스템에서 도착하는 데이터를 시뮬레이션합니다.
Python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")
SQL
볼륨에 파일을 쓰려면 Python 필요합니다. 실제 워크플로에서 이 데이터는 외부 시스템에서 도착합니다.
%python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
3단계: COPY INTO를 사용하여 JSON 데이터를 항등적으로 로드하세요.
를 사용하기 COPY INTO전에 대상 델타 테이블을 만듭니다. 문에 테이블 이름 CREATE TABLE 외에는 아무것도 제공할 필요가 없습니다. 이 작업은 idempotent이므로 코드를 여러 번 실행하더라도 Databricks는 데이터를 한 번만 로드합니다.
Python
# Create target table and load data
spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")
spark.sql(f"""
COPY INTO {catalog}.{schema}.bookings_target
FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
""")
SQL
-- Create target table and load data
CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;
COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
4단계: 테이블 내용 미리 보기
테이블에 Wanderbricks 예약 데이터의 첫 번째 일괄 처리에서 20개의 행이 포함되어 있고 스키마가 JSON 원본 파일에서 올바르게 유추되었는지 확인합니다.
Python
# Review loaded data
display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))
SQL
-- Review loaded data
SELECT * FROM <catalog>.copy_into_tutorial.bookings_target
5단계: 더 많은 데이터 로드 및 결과 미리 보기
외부 시스템에서 도착하는 추가 데이터를 시뮬레이션하려면, 다른 레코드 일괄 처리를 작성하고 COPY INTO을 다시 실행하면 됩니다. 다음 코드를 실행하여 두 번째 데이터 일괄 처리를 작성합니다.
Python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")
SQL
볼륨에 파일을 쓰려면 Python 필요합니다. 실제 워크플로에서 이 데이터는 외부 시스템에서 도착합니다.
%python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
그런 다음 3단계에서 명령을 다시 실행하고 COPY INTO 테이블을 미리 보기하여 새 레코드를 확인합니다. 새 파일만 로드됩니다.
Python
# Confirm new data was loaded
display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))
SQL
-- Confirm new data was loaded
SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target
6단계: 자습서 정리
이 자습서를 완료하면 더 이상 유지하지 않으려면 연결된 리소스를 정리할 수 있습니다. 스키마, 테이블 및 볼륨을 삭제하고 모든 데이터를 제거합니다.
Python
# Drop schema and all associated objects
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
SQL
-- Drop schema and all associated objects
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;