Condividi tramite


Esercitazione: COPY INTO con Spark SQL

Databricks consiglia di usare il comando COPY INTO per il caricamento incrementale e bulk dei dati per le origini dati che contengono migliaia di file. Databricks consiglia di usare Auto Loader per i casi d'uso avanzati.

In questa esercitazione si usa il comando COPY INTO per caricare i dati dall'archiviazione di oggetti cloud in una tabella nell'area di lavoro di Azure Databricks.

Requisiti

Passaggio 1. Configurare l'ambiente e creare un generatore di dati

Questa esercitazione presuppone una conoscenza di base di Azure Databricks e di una configurazione predefinita dell'area di lavoro. Se non è possibile eseguire il codice specificato, contattare l'amministratore dell'area di lavoro per assicurarsi di avere accesso alle risorse di calcolo e a una posizione in cui è possibile scrivere i dati.

Si noti che il codice fornito utilizza il parametro source per specificare la posizione che configurerai come origine dati COPY INTO. Come scritto, questo codice punta a una posizione nella radice DBFS. Se disponi dei permessi di scrittura su un'ubicazione di archiviazione di oggetti esterni, sostituisci parte dbfs:/ della stringa di origine con il percorso del tuo archivio oggetti. Poiché questo blocco di codice esegue anche una cancellazione ricorsiva per reimpostare questa dimostrazione, assicuratevi di non puntare ai dati di produzione e di conservare la directory annidata /user/{username}/copy-into-demo per evitare la sovrascrittura o l'eliminazione di dati esistenti.

  1. Creare un nuovo notebook e collegarlo a una risorsa di calcolo.

  2. Copiare ed eseguire il codice seguente per reimpostare il percorso di archiviazione e il database usati in questa esercitazione:

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. Copiare ed eseguire il codice seguente per configurare alcune tabelle e funzioni che verranno usate per generare dati in modo casuale:

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

Passaggio 2: Scrivere i dati di esempio nell'archiviazione cloud

La scrittura in formati di dati diversi da Delta Lake è rara in Azure Databricks. Il codice fornito qui scrive in JSON, simulando un sistema esterno che potrebbe eseguire il dump dei risultati da un altro sistema nello storage di oggetti.

  1. Copiare ed eseguire il codice seguente per scrivere un batch di dati JSON non elaborati:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

Passaggio 3: Utilizzare COPY INTO per caricare i dati JSON in modo idempotente

È necessario creare una tabella Delta Lake di destinazione prima di poter usare COPY INTO. Non è necessario specificare elementi diversi da un nome di tabella nell'istruzione CREATE TABLE .

  1. Copiare ed eseguire il codice seguente per creare la tabella Delta di destinazione e caricare i dati dall'origine:

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

Poiché questa azione è idempotente, è possibile eseguirla più volte, ma i dati verranno caricati una sola volta.

Passaggio 4: Visualizzare in anteprima il contenuto della tabella

È possibile eseguire una semplice query SQL per esaminare manualmente il contenuto di questa tabella.

  1. Copiare ed eseguire il codice seguente per visualizzare in anteprima la tabella:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

Passaggio 5: Caricare più dati e visualizzare in anteprima i risultati

È possibile eseguire nuovamente i passaggi da 2 a 4 più volte per ottenere nuovi batch di dati JSON non elaborati casuali nella tua origine, caricarli idempotentemente in Delta Lake con COPY INTOe visualizzare in anteprima i risultati. Provare a eseguire questi passaggi fuori ordine o più volte per simulare la scrittura di più batch di dati grezzi o l'esecuzione multipla di COPY INTO senza che siano arrivati nuovi dati.

Passaggio 6: Fase di completamento del tutorial

Al termine di questa esercitazione, è possibile pulire le risorse associate se non si vogliono più mantenerle.

Copiare ed eseguire il codice seguente per eliminare il database, le tabelle e rimuovere tutti i dati:

%python
# Drop database and tables and remove data

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
dbutils.fs.rm(source, True)

Risorse aggiuntive