Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Databricks rekommenderar att du använder kommandot COPY INTO för inkrementell och massinläsning av datakällor som innehåller tusentals filer.
I den här självstudien använder du kommandot COPY INTO för att läsa in JSON-data från en Unity Catalog-volym till en Delta-tabell i din Azure Databricks arbetsyta. Du använder Wanderbricks-exempeldatauppsättningen som datakälla. Mer avancerade användningsfall för inmatning finns i Vad är automatisk inläsning?.
Krav
- Åtkomst till en beräkningsresurs. Se Beräkning.
- En Unity Catalog-aktiverad arbetsyta med behörighet att skapa scheman och volymer i en katalog. Se Ansluta till molnobjektlagring med Unity Catalog.
Steg 1: Konfigurera din miljö
Koden i den här självstudien använder en Unity Catalog-volym för att lagra JSON-källfiler. Ersätt <catalog> med en katalog där du har CREATE SCHEMA och CREATE VOLUME behörigheter. Kontakta arbetsyteadministratören om du inte kan köra koden.
Skapa en notebook-fil och koppla den till en beräkningsresurs. Kör sedan följande kod för att konfigurera ett schema och en volym för denna handledning.
Python
# Set parameters and reset demo environment
catalog = "<catalog>"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"
spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")
SQL
-- Reset demo environment
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;
Steg 2: Skriva exempeldata till volymen som JSON
Kommandot COPY INTO läser in data från filbaserade källor. Läs från Wanderbricks-exempeltabellenbookings och skriv ett parti poster som JSON-filer till din volym, för att simulera data som anländer från ett externt system.
Python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")
SQL
Att skriva filer till en volym kräver Python. I ett verkligt arbetsflöde skulle dessa data komma från ett externt system.
%python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
Steg 3: Använd COPY INTO för att läsa in JSON-data utan att resultatet förändras vid upprepade inläsningar.
Skapa en Delta-måltabell innan du använder COPY INTO. Du behöver inte ange något annat än ett tabellnamn i instruktionen CREATE TABLE . Eftersom den här åtgärden är idempotent läser Databricks bara in data en gång, även om du kör koden flera gånger.
Python
# Create target table and load data
spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")
spark.sql(f"""
COPY INTO {catalog}.{schema}.bookings_target
FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
""")
SQL
-- Create target table and load data
CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;
COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
Steg 4: Förhandsgranska innehållet i tabellen
Kontrollera att tabellen innehåller 20 rader från den första batchen med Wanderbricks-bokningsdata och att schemat har härledts korrekt från JSON-källfilerna.
Python
# Review loaded data
display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))
SQL
-- Review loaded data
SELECT * FROM <catalog>.copy_into_tutorial.bookings_target
Steg 5: Läs in mer data och förhandsgranska resultat
Du kan simulera ytterligare data som kommer från ett externt system genom att skriva en ytterligare batch med dataposter och köra COPY INTO igen. Kör följande kod för att skriva en andra batch med data.
Python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")
SQL
Att skriva filer till en volym kräver Python. I ett verkligt arbetsflöde skulle dessa data komma från ett externt system.
%python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
Kör sedan COPY INTO kommandot från steg 3 och granska tabellen igen för att bekräfta de nya posterna. Endast de nya filerna läses in.
Python
# Confirm new data was loaded
display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))
SQL
-- Confirm new data was loaded
SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target
Steg 6: Rensa handledning
När du är klar med den här handledningen kan du rensa de associerade resurserna om du inte längre vill behålla dem. Släpp schemat, tabellerna och volymen och ta bort alla data.
Python
# Drop schema and all associated objects
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
SQL
-- Drop schema and all associated objects
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;