Databricks 建议对包含数千个文件的数据源使用 COPY INTO 命令进行增量和批量数据加载。
本教程使用 COPY INTO 命令将 Unity 目录卷中的 JSON 数据加载到Azure Databricks工作区中的 Delta 表中。 使用 Wanderbricks 示例数据集作为数据源。 有关更高级的引入用例,请参阅什么是自动加载程序?
要求
- 访问计算资源。 请参阅 计算。
- 启用了 Unity Catalog 的工作区,具有在该目录中创建架构和卷的权限。 请参阅 使用 Unity 目录连接到云对象存储。
步骤 1:配置环境
本教程中的代码使用 Unity 目录卷来存储 JSON 源文件。 将 <catalog> 替换为你拥有 CREATE SCHEMA 和 CREATE VOLUME 权限的目录。 如果无法运行代码,请联系工作区管理员。
创建笔记本 并将其附加到计算资源。 然后运行以下代码,为本教程设置架构和卷。
Python
# Set parameters and reset demo environment
catalog = "<catalog>"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"
spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")
SQL
-- Reset demo environment
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;
步骤 2:以 JSON 形式将示例数据写入卷
该 COPY INTO 命令从基于文件的源加载数据。 从 Wanderbricksbookings 示例表中读取数据,并将一批记录作为 JSON 文件写入您的存储卷,模拟外部系统的数据到达。
Python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")
SQL
将文件写入卷需要使用Python。 在实际工作流中,此数据将来自外部系统。
%python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
步骤 3:使用 COPY INTO 以幂等方式加载 JSON 数据
使用 COPY INTO前创建目标 Delta 表。
CREATE TABLE 语句中只需提供表名,不需要提供任何其他内容。 由于此操作是幂等的,因此即使多次运行代码,Databricks 也仅加载一次数据。
Python
# Create target table and load data
spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")
spark.sql(f"""
COPY INTO {catalog}.{schema}.bookings_target
FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
""")
SQL
-- Create target table and load data
CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;
COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
步骤 4:预览表的内容
验证表是否包含来自第一批 Wanderbricks 预订数据的 20 行,以及架构是否已从 JSON 源文件正确推断。
Python
# Review loaded data
display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))
SQL
-- Review loaded data
SELECT * FROM <catalog>.copy_into_tutorial.bookings_target
步骤 5:加载更多数据和预览结果
可以通过编写另一批记录并再次运行 COPY INTO 来模拟从外部系统到达的其他数据。 运行以下代码以编写第二批数据。
Python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")
SQL
将文件写入卷需要使用Python。 在实际工作流中,此数据将来自外部系统。
%python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
然后再次运行 COPY INTO 步骤 3 中的命令并预览表以确认新记录。 仅加载新文件。
Python
# Confirm new data was loaded
display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))
SQL
-- Confirm new data was loaded
SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target
步骤 6:整理教程
完成本教程后,如果不再需要保留关联资源,则可以清理这些资源。 删除架构、表和卷,并删除所有数据。
Python
# Drop schema and all associated objects
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
SQL
-- Drop schema and all associated objects
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;