Zelfstudie: COPY INTO met Spark SQL
Databricks raadt u aan de opdracht COPY INTO te gebruiken voor incrementeel en bulksgewijs laden van gegevensbronnen die duizenden bestanden bevatten. Databricks raadt u aan autolaadprogramma's te gebruiken voor geavanceerde use cases.
In deze zelfstudie gebruikt u de COPY INTO
opdracht om gegevens uit de opslag van cloudobjecten te laden in een tabel in uw Azure Databricks-werkruimte.
Vereisten
- Een Azure-abonnement, een Azure Databricks-werkruimte in dat abonnement en een cluster in die werkruimte. Zie quickstart: Een Spark-taak uitvoeren in Azure Databricks Workspace met behulp van Azure Portal om deze te maken. Als u deze quickstart volgt, hoeft u de instructies niet te volgen in de sectie Een Spark SQL-taak uitvoeren.
- Een cluster voor alle doeleinden in uw werkruimte met Databricks Runtime 11.3 LTS of hoger. Als u een cluster voor alle doeleinden wilt maken, raadpleegt u de compute-configuratieverwijzing.
- Bekendheid met de gebruikersinterface van de Azure Databricks-werkruimte. Zie Navigeren in de werkruimte.
- Bekendheid met Databricks-notebooks.
- Een locatie waarnaar u gegevens kunt schrijven; in deze demo wordt de DBFS-hoofdmap als voorbeeld gebruikt, maar Databricks raadt een externe opslaglocatie aan die is geconfigureerd met Unity Catalog.
Stap 1. Uw omgeving configureren en een gegevensgenerator maken
In deze zelfstudie wordt ervan uitgegaan dat u basiskennis hebt van Azure Databricks en een standaardconfiguratie voor werkruimten. Als u de opgegeven code niet kunt uitvoeren, neemt u contact op met uw werkruimtebeheerder om ervoor te zorgen dat u toegang hebt tot rekenresources en een locatie waarnaar u gegevens kunt schrijven.
Houd er rekening mee dat voor de opgegeven code een source
parameter wordt gebruikt om de locatie op te geven die u configureert als uw COPY INTO
gegevensbron. Zoals geschreven, verwijst deze code naar een locatie in de DBFS-hoofdmap. Als u schrijfmachtigingen hebt voor een opslaglocatie voor externe objecten, vervangt u het dbfs:/
gedeelte van de brontekenreeks door het pad naar de objectopslag. Omdat dit codeblok ook recursief wordt verwijderd om deze demo opnieuw in te stellen, moet u ervoor zorgen dat u deze niet verwijst naar productiegegevens en dat u de /user/{username}/copy-into-demo
geneste map behoudt om te voorkomen dat bestaande gegevens worden overschreven of verwijderd.
Maak een nieuw SQL-notebook en koppel deze aan een cluster met Databricks Runtime 11.3 LTS of hoger.
Kopieer en voer de volgende code uit om de opslaglocatie en database die in deze zelfstudie worden gebruikt, opnieuw in te stellen:
%python # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"copyinto_{username}_db" source = f"dbfs:/user/{username}/copy-into-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") dbutils.fs.rm(source, True)
Kopieer en voer de volgende code uit om enkele tabellen en functies te configureren die worden gebruikt om gegevens willekeurig te genereren:
-- Configure random data generator CREATE TABLE user_ping_raw (user_id STRING, ping INTEGER, time TIMESTAMP) USING json LOCATION ${c.source}; CREATE TABLE user_ids (user_id STRING); INSERT INTO user_ids VALUES ("potato_luver"), ("beanbag_lyfe"), ("default_username"), ("the_king"), ("n00b"), ("frodo"), ("data_the_kid"), ("el_matador"), ("the_wiz"); CREATE FUNCTION get_ping() RETURNS INT RETURN int(rand() * 250); CREATE FUNCTION is_active() RETURNS BOOLEAN RETURN CASE WHEN rand() > .25 THEN true ELSE false END;
Stap 2: De voorbeeldgegevens naar cloudopslag schrijven
Schrijven naar andere gegevensindelingen dan Delta Lake is zeldzaam in Azure Databricks. De code die hier wordt opgegeven, schrijft naar JSON en simuleert een extern systeem dat mogelijk resultaten van een ander systeem in objectopslag dumpt.
Kopieer en voer de volgende code uit om een batch onbewerkte JSON-gegevens te schrijven:
-- Write a new batch of data to the data source INSERT INTO user_ping_raw SELECT *, get_ping() ping, current_timestamp() time FROM user_ids WHERE is_active()=true;
Stap 3: COPY INTO gebruiken om JSON-gegevens idempotent te laden
U moet een Delta Lake-doeltabel maken voordat u deze kunt gebruiken COPY INTO
. In Databricks Runtime 11.3 LTS en hoger hoeft u niets anders op te geven dan een tabelnaam in uw CREATE TABLE
instructie. Voor eerdere versies van Databricks Runtime moet u een schema opgeven bij het maken van een lege tabel.
Kopieer en voer de volgende code uit om de Delta-doeltabel te maken en gegevens uit uw bron te laden:
-- Create target table and load data CREATE TABLE IF NOT EXISTS user_ping_target; COPY INTO user_ping_target FROM ${c.source} FILEFORMAT = JSON FORMAT_OPTIONS ("mergeSchema" = "true") COPY_OPTIONS ("mergeSchema" = "true")
Omdat deze actie idempotent is, kunt u deze meerdere keren uitvoeren, maar gegevens worden slechts eenmaal geladen.
Stap 4: Een voorbeeld van de inhoud van de tabel bekijken
U kunt een eenvoudige SQL-query uitvoeren om de inhoud van deze tabel handmatig te controleren.
Kopieer en voer de volgende code uit om een voorbeeld van uw tabel te bekijken:
-- Review updated table SELECT * FROM user_ping_target
Stap 5: Meer gegevens laden en voorbeeldresultaten bekijken
U kunt stappen 2-4 vaak opnieuw uitvoeren om nieuwe batches met willekeurige onbewerkte JSON-gegevens in uw bron te landen, ze idempotently te laden in Delta Lake met COPY INTO
en een voorbeeld van de resultaten te bekijken. Probeer deze stappen uit te voeren op volgorde of meerdere keren om te simuleren dat meerdere batches onbewerkte gegevens worden geschreven of meerdere keren worden uitgevoerd COPY INTO
zonder dat er nieuwe gegevens zijn aangekomen.
Stap 6: Zelfstudie opschonen
Wanneer u klaar bent met deze zelfstudie, kunt u de bijbehorende resources opschonen als u ze niet meer wilt behouden.
Kopieer en voer de volgende code uit om de database, tabellen te verwijderen en alle gegevens te verwijderen:
%python # Drop database and tables and remove data spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") dbutils.fs.rm(source, True)
Als u de rekenresource wilt stoppen, gaat u naar het tabblad Clusters en beëindigt u het cluster.