教程: COPY INTO 使用 Spark SQL

Databricks 建议对包含数千个文件的数据源使用 COPY INTO 命令进行增量和批量数据加载。

本教程使用 COPY INTO 命令将 Unity 目录卷中的 JSON 数据加载到Azure Databricks工作区中的 Delta 表中。 使用 Wanderbricks 示例数据集作为数据源。 有关更高级的引入用例,请参阅什么是自动加载程序?

要求

步骤 1:配置环境

本教程中的代码使用 Unity 目录卷来存储 JSON 源文件。 将 <catalog> 替换为你拥有 CREATE SCHEMACREATE VOLUME 权限的目录。 如果无法运行代码,请联系工作区管理员。

创建笔记本 并将其附加到计算资源。 然后运行以下代码,为本教程设置架构和卷。

Python

# Set parameters and reset demo environment

catalog = "<catalog>"

username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"

spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")

SQL

-- Reset demo environment

DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;

步骤 2:以 JSON 形式将示例数据写入卷

COPY INTO 命令从基于文件的源加载数据。 从 Wanderbricksbookings 示例表中读取数据,并将一批记录作为 JSON 文件写入您的存储卷,模拟外部系统的数据到达。

Python

# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")

SQL

将文件写入卷需要使用Python。 在实际工作流中,此数据将来自外部系统。

%python
# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")

步骤 3:使用 COPY INTO 以幂等方式加载 JSON 数据

使用 COPY INTO前创建目标 Delta 表。 CREATE TABLE 语句中只需提供表名,不需要提供任何其他内容。 由于此操作是幂等的,因此即使多次运行代码,Databricks 也仅加载一次数据。

Python

# Create target table and load data

spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")

spark.sql(f"""
  COPY INTO {catalog}.{schema}.bookings_target
  FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('mergeSchema' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true')
""")

SQL

-- Create target table and load data

CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;

COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')

步骤 4:预览表的内容

验证表是否包含来自第一批 Wanderbricks 预订数据的 20 行,以及架构是否已从 JSON 源文件正确推断。

Python

# Review loaded data

display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))

SQL

-- Review loaded data

SELECT * FROM <catalog>.copy_into_tutorial.bookings_target

步骤 5:加载更多数据和预览结果

可以通过编写另一批记录并再次运行 COPY INTO 来模拟从外部系统到达的其他数据。 运行以下代码以编写第二批数据。

Python

# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")

SQL

将文件写入卷需要使用Python。 在实际工作流中,此数据将来自外部系统。

%python
# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")

然后再次运行 COPY INTO 步骤 3 中的命令并预览表以确认新记录。 仅加载新文件。

Python

# Confirm new data was loaded

display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))

SQL

-- Confirm new data was loaded

SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target

步骤 6:整理教程

完成本教程后,如果不再需要保留关联资源,则可以清理这些资源。 删除架构、表和卷,并删除所有数据。

Python

# Drop schema and all associated objects

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")

SQL

-- Drop schema and all associated objects

DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;

其他资源