Delen via


Zelfstudie: COPY INTO met Spark SQL

Databricks raadt u aan de opdracht COPY INTO te gebruiken voor incrementeel en bulksgewijs laden van gegevensbronnen die duizenden bestanden bevatten. Databricks raadt u aan autolaadprogramma's te gebruiken voor geavanceerde use cases.

In deze zelfstudie gebruikt u de COPY INTO opdracht om gegevens uit de opslag van cloudobjecten te laden in een tabel in uw Azure Databricks-werkruimte.

Vereisten

  1. Een Azure-abonnement, een Azure Databricks-werkruimte in dat abonnement en een cluster in die werkruimte. Zie quickstart: Een Spark-taak uitvoeren in Azure Databricks Workspace met behulp van Azure Portal om deze te maken. Als u deze quickstart volgt, hoeft u de instructies niet te volgen in de sectie Een Spark SQL-taak uitvoeren.
  2. Een cluster voor alle doeleinden in uw werkruimte met Databricks Runtime 11.3 LTS of hoger. Als u een cluster voor alle doeleinden wilt maken, raadpleegt u de compute-configuratieverwijzing.
  3. Bekendheid met de gebruikersinterface van de Azure Databricks-werkruimte. Zie Navigeren in de werkruimte.
  4. Bekendheid met Databricks-notebooks.
  5. Een locatie waarnaar u gegevens kunt schrijven; in deze demo wordt de DBFS-hoofdmap als voorbeeld gebruikt, maar Databricks raadt een externe opslaglocatie aan die is geconfigureerd met Unity Catalog.

Stap 1. Uw omgeving configureren en een gegevensgenerator maken

In deze zelfstudie wordt ervan uitgegaan dat u basiskennis hebt van Azure Databricks en een standaardconfiguratie voor werkruimten. Als u de opgegeven code niet kunt uitvoeren, neemt u contact op met uw werkruimtebeheerder om ervoor te zorgen dat u toegang hebt tot rekenresources en een locatie waarnaar u gegevens kunt schrijven.

Houd er rekening mee dat voor de opgegeven code een source parameter wordt gebruikt om de locatie op te geven die u configureert als uw COPY INTO gegevensbron. Zoals geschreven, verwijst deze code naar een locatie in de DBFS-hoofdmap. Als u schrijfmachtigingen hebt voor een opslaglocatie voor externe objecten, vervangt u het dbfs:/ gedeelte van de brontekenreeks door het pad naar de objectopslag. Omdat dit codeblok ook recursief wordt verwijderd om deze demo opnieuw in te stellen, moet u ervoor zorgen dat u deze niet verwijst naar productiegegevens en dat u de /user/{username}/copy-into-demo geneste map behoudt om te voorkomen dat bestaande gegevens worden overschreven of verwijderd.

  1. Maak een nieuw SQL-notebook en koppel deze aan een cluster met Databricks Runtime 11.3 LTS of hoger.

  2. Kopieer en voer de volgende code uit om de opslaglocatie en database die in deze zelfstudie worden gebruikt, opnieuw in te stellen:

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. Kopieer en voer de volgende code uit om enkele tabellen en functies te configureren die worden gebruikt om gegevens willekeurig te genereren:

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

Stap 2: De voorbeeldgegevens naar cloudopslag schrijven

Schrijven naar andere gegevensindelingen dan Delta Lake is zeldzaam in Azure Databricks. De code die hier wordt opgegeven, schrijft naar JSON en simuleert een extern systeem dat mogelijk resultaten van een ander systeem in objectopslag dumpt.

  1. Kopieer en voer de volgende code uit om een batch onbewerkte JSON-gegevens te schrijven:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

Stap 3: COPY INTO gebruiken om JSON-gegevens idempotent te laden

U moet een Delta Lake-doeltabel maken voordat u deze kunt gebruiken COPY INTO. In Databricks Runtime 11.3 LTS en hoger hoeft u niets anders op te geven dan een tabelnaam in uw CREATE TABLE instructie. Voor eerdere versies van Databricks Runtime moet u een schema opgeven bij het maken van een lege tabel.

  1. Kopieer en voer de volgende code uit om de Delta-doeltabel te maken en gegevens uit uw bron te laden:

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

Omdat deze actie idempotent is, kunt u deze meerdere keren uitvoeren, maar gegevens worden slechts eenmaal geladen.

Stap 4: Een voorbeeld van de inhoud van de tabel bekijken

U kunt een eenvoudige SQL-query uitvoeren om de inhoud van deze tabel handmatig te controleren.

  1. Kopieer en voer de volgende code uit om een voorbeeld van uw tabel te bekijken:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

Stap 5: Meer gegevens laden en voorbeeldresultaten bekijken

U kunt stappen 2-4 vaak opnieuw uitvoeren om nieuwe batches met willekeurige onbewerkte JSON-gegevens in uw bron te landen, ze idempotently te laden in Delta Lake met COPY INTOen een voorbeeld van de resultaten te bekijken. Probeer deze stappen uit te voeren op volgorde of meerdere keren om te simuleren dat meerdere batches onbewerkte gegevens worden geschreven of meerdere keren worden uitgevoerd COPY INTO zonder dat er nieuwe gegevens zijn aangekomen.

Stap 6: Zelfstudie opschonen

Wanneer u klaar bent met deze zelfstudie, kunt u de bijbehorende resources opschonen als u ze niet meer wilt behouden.

  1. Kopieer en voer de volgende code uit om de database, tabellen te verwijderen en alle gegevens te verwijderen:

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. Als u de rekenresource wilt stoppen, gaat u naar het tabblad Clusters en beëindigt u het cluster.

Aanvullende bronnen