Självstudie: COPY INTO med Spark SQL

Databricks rekommenderar att du använder kommandot COPY INTO för inkrementell och massinläsning av datakällor som innehåller tusentals filer.

I den här självstudien använder du kommandot COPY INTO för att läsa in JSON-data från en Unity Catalog-volym till en Delta-tabell i din Azure Databricks arbetsyta. Du använder Wanderbricks-exempeldatauppsättningen som datakälla. Mer avancerade användningsfall för inmatning finns i Vad är automatisk inläsning?.

Krav

Steg 1: Konfigurera din miljö

Koden i den här självstudien använder en Unity Catalog-volym för att lagra JSON-källfiler. Ersätt <catalog> med en katalog där du har CREATE SCHEMA och CREATE VOLUME behörigheter. Kontakta arbetsyteadministratören om du inte kan köra koden.

Skapa en notebook-fil och koppla den till en beräkningsresurs. Kör sedan följande kod för att konfigurera ett schema och en volym för denna handledning.

Python

# Set parameters and reset demo environment

catalog = "<catalog>"

username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"

spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")

SQL

-- Reset demo environment

DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;

Steg 2: Skriva exempeldata till volymen som JSON

Kommandot COPY INTO läser in data från filbaserade källor. Läs från Wanderbricks-exempeltabellenbookings och skriv ett parti poster som JSON-filer till din volym, för att simulera data som anländer från ett externt system.

Python

# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")

SQL

Att skriva filer till en volym kräver Python. I ett verkligt arbetsflöde skulle dessa data komma från ett externt system.

%python
# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")

Steg 3: Använd COPY INTO för att läsa in JSON-data utan att resultatet förändras vid upprepade inläsningar.

Skapa en Delta-måltabell innan du använder COPY INTO. Du behöver inte ange något annat än ett tabellnamn i instruktionen CREATE TABLE . Eftersom den här åtgärden är idempotent läser Databricks bara in data en gång, även om du kör koden flera gånger.

Python

# Create target table and load data

spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")

spark.sql(f"""
  COPY INTO {catalog}.{schema}.bookings_target
  FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('mergeSchema' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true')
""")

SQL

-- Create target table and load data

CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;

COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')

Steg 4: Förhandsgranska innehållet i tabellen

Kontrollera att tabellen innehåller 20 rader från den första batchen med Wanderbricks-bokningsdata och att schemat har härledts korrekt från JSON-källfilerna.

Python

# Review loaded data

display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))

SQL

-- Review loaded data

SELECT * FROM <catalog>.copy_into_tutorial.bookings_target

Steg 5: Läs in mer data och förhandsgranska resultat

Du kan simulera ytterligare data som kommer från ett externt system genom att skriva en ytterligare batch med dataposter och köra COPY INTO igen. Kör följande kod för att skriva en andra batch med data.

Python

# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")

SQL

Att skriva filer till en volym kräver Python. I ett verkligt arbetsflöde skulle dessa data komma från ett externt system.

%python
# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")

Kör sedan COPY INTO kommandot från steg 3 och granska tabellen igen för att bekräfta de nya posterna. Endast de nya filerna läses in.

Python

# Confirm new data was loaded

display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))

SQL

-- Confirm new data was loaded

SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target

Steg 6: Rensa handledning

När du är klar med den här handledningen kan du rensa de associerade resurserna om du inte längre vill behålla dem. Släpp schemat, tabellerna och volymen och ta bort alla data.

Python

# Drop schema and all associated objects

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")

SQL

-- Drop schema and all associated objects

DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;

Ytterligare resurser