COPY INTO를 사용하여 데이터 로드 시작

COPY INTO SQL 명령을 사용하면 파일 위치에서 Delta 테이블로 데이터를 로드할 수 있습니다. 이는 재시도 가능하며 멱등성인 작업입니다. 이미 로드된 원본 위치의 파일은 건너뜁니다.

COPY INTO 에서는 다음과 같은 기능을 제공합니다.

  • S3, ADLS Gen2, ABFS, GCS 및 Unity 카탈로그 볼륨을 포함하여 클라우드 스토리지에서 쉽게 구성할 수 있는 파일 또는 디렉터리 필터입니다.
  • CSV, JSON, XML, Avro, ORC, Parquet, 텍스트 및 이진 파일 등 여러 소스 파일 형식 지원
  • 기본적으로 정확히 한 번(idempotent) 파일 처리
  • 대상 테이블 스키마 유추, 매핑, 병합 및 진화

참고 항목

더 확장 가능하고 강력한 파일 수집 환경을 위해 Databricks는 SQL 사용자가 스트리밍 테이블을 활용하는 것이 좋습니다. Databricks SQL에서 스트리밍 테이블을 사용하여 데이터 로드를 참조하세요.

Warning

COPY INTO 는 삭제 벡터에 대한 작업 영역 설정을 적용합니다. 사용하도록 설정된 경우 SQL 웨어하우스에서 실행되거나 Databricks Runtime 14.0 이상을 실행하는 컴퓨팅에서 대상 테이블에서 COPY INTO 삭제 벡터를 사용할 수 있습니다. 사용하도록 설정되면 삭제 벡터는 Databricks Runtime 11.3 LTS 이하의 테이블에 대한 쿼리를 차단합니다. 삭제 벡터란?자동 사용 삭제 벡터를 참조하세요.

요구 사항

계정 관리자는 수집을 위한 데이터 액세스 구성의 단계를 따라 클라우드 개체 스토리지의 데이터에 대한 액세스를 구성해야 사용자가 데이터를 COPY INTO로드할 수 있습니다.

예: 스키마 없는 Delta Lake 테이블에 데이터 로드

참고 항목

이 기능은 Databricks Runtime 11.3 LTS 이상에서 사용할 수 있습니다.

다음으로 설정 mergeSchematrue 하여 명령 중에 COPY INTO 스키마가 나중에 유추되도록 빈 자리 표시자 델타 테이블을 만들 수 있습니다.COPY_OPTIONS

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

위의 SQL 문은 멱등적이며 Delta 테이블에 정확히 한 번 데이터를 수집하도록 실행되도록 예약할 수 있습니다.

참고 항목

빈 Delta 테이블은 COPY INTO 외부에서 사용할 수 없습니다. INSERT INTOMERGE INTO는 스키마가 없는 Delta 테이블에 데이터를 쓰는 데 지원되지 않습니다. 데이터가 COPY INTO를 사용하여 테이블에 삽입되면 테이블을 쿼리할 수 있게 됩니다.

COPY INTO에 대한 대상 테이블 만들기를 참조하세요.

예: 스키마 설정 및 Delta Lake 테이블에 데이터 로드

다음 예에서는 Delta 테이블을 만든 다음 COPY INTO SQL 명령을 사용하여 Databricks 데이터 세트에서 테이블로 샘플 데이터를 로드하는 방법을 보여 줍니다. Azure Databricks 클러스터에 연결된 Notebook에서 예제 Python, R, Scala 또는 SQL 코드를 실행할 수 있습니다. Databricks SQLSQL 웨어하우스와 연결된 쿼리에서 SQL 코드를 실행할 수도 있습니다.

SQL

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...

Python

table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''

R

library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...

Scala

val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

정리하려면 테이블을 삭제하는 다음 코드를 실행합니다.

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Scala

spark.sql("DROP TABLE " + table_name)

SQL

DROP TABLE default.loan_risks_upload

참조

추가 리소스