Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
U kunt gegevens laden uit elke gegevensbron die wordt ondersteund door Apache Spark in Azure Databricks met behulp van pijplijnen. U kunt gegevenssets, tabellen en weergaven, definiëren in Lakeflow Spark-declaratieve pijplijnen voor elke query die een Spark DataFrame retourneert, inclusief het streamen van DataFrames en Pandas voor Spark DataFrames. Voor gegevensopnametaken raadt Databricks het gebruik van streamingtabellen aan voor de meeste gebruiksvoorbeelden. Streamingtabellen zijn handig voor het opnemen van gegevens uit cloudobjectopslag met behulp van automatisch laden of berichtenbussen zoals Kafka.
Niet alle gegevensbronnen hebben SQL-ondersteuning voor opname. U kunt echter SQL- en Python-bronnen in dezelfde pijplijn combineren om waar nodig Python te gebruiken. Zie Python-afhankelijkheden voor pijplijnen beheren voor meer informatie over het werken met bibliotheken die niet zijn verpakt in Lakeflow Spark-declaratieve pijplijnen. Voor algemene informatie over gegevensinvoer in Azure Databricks, zie Standard-connectors in Lakeflow Connect.
In de volgende voorbeelden ziet u enkele veelvoorkomende patronen voor het laden van gegevens.
Laden vanuit een bestaande tabel
Laad gegevens uit een bestaande tabel in Azure Databricks. U kunt de gegevens transformeren met behulp van een query of de tabel laden voor verdere verwerking in uw pijplijn.
Python
@dp.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)
SQL
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
bestanden laden vanuit cloudobjectopslag
Databricks raadt het gebruik van autolaadprogramma's aan in pijplijnen voor de meeste gegevensopnametaken uit de opslag van cloudobjecten of uit bestanden in een Unity Catalog-volume. Automatische laadprogramma's en pijplijnen zijn ontworpen om steeds groeiende gegevens incrementeel en idempotent te laden wanneer ze binnenkomen in de cloudopslag. Zie Wat is automatisch laden? en gegevens laden uit objectopslag.
In het volgende voorbeeld worden gegevens uit de cloudopslag gelezen met behulp van automatisch laden.
Python
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
)
SQL
CREATE OR REFRESH STREAMING TABLE sales
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
format => "json"
);
In de volgende voorbeelden wordt autolader gebruikt om gegevenssets te maken op basis van CSV-bestanden in een Unity Catalog-volume.
Python
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/retail_org/customers/",
format => "csv"
)
Opmerking
- Als u Automatisch laden gebruikt met bestandsmeldingen en een volledige vernieuwing uitvoert voor uw pijplijn of streamingtabel, moet u uw resources handmatig opschonen. U kunt CloudFilesResourceManager in een notebook gebruiken om opschoning uit te voeren.
- Als u bestanden wilt laden met Auto Loader in een Unity Catalog-ingeschakelde pijplijn, moet u externe locatiesgebruiken. Zie Unity Catalog gebruiken met pijplijnen voor meer informatie over het gebruik van Unity Catalog met pijplijnen.
Verifiëren bij cloudopslag
Auto Loader maakt gebruik van externe locaties van Unity Catalog om te verifiëren bij cloudopslag. U moet een externe locatie configureren voor het opslagpad waaruit u wilt lezen en de READ FILES bevoegdheid verlenen aan de actieve gebruiker.
Als u wilt opnemen vanuit Azure Data Lake Storage, configureert u een externe locatie die wordt ondersteund door een opslagreferentie die verwijst naar een opslagcontainer. Zie Verbinding maken met cloudobjectopslag met behulp van Unity Catalog voor meer informatie.
Gegevens laden vanuit een berichtenbus
U kunt pijplijnen configureren voor het opnemen van gegevens uit berichtenbussen. Databricks raadt het gebruik van streaming-tabellen aan met continue uitvoering en verbeterde automatische schaalaanpassing om de meest efficiënte opname te bieden voor laden met lage latentie van berichtbussen. Zie Het clustergebruik van Lakeflow Spark-declaratieve pijplijnen optimaliseren met automatisch schalen voor meer informatie.
Met de volgende code wordt bijvoorbeeld een streamingtabel geconfigureerd voor het opnemen van gegevens uit Kafka met behulp van de functie read_kafka .
Python
from pyspark import pipelines as dp
@dp.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_raw AS
SELECT *
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'topic1'
);
Voor het opnemen van gegevens uit andere bronnen van berichtenbussen, zie:
- Kinesis: read_kinesis
- Pub/Sub-onderwerp: read_pubsub
- Pulsar: read_pulsar
Gegevens uit Azure Event Hubs laden
Azure Event Hubs is een service voor gegevensstreaming die een compatibele Apache Kafka-interface biedt. U kunt de Structured Streaming Kafka-connector, die is opgenomen in de Lakeflow Spark Declarative Pipelines-runtime, gebruiken om berichten uit Azure Event Hubs te laden. Zie Azure Event Hubs gebruiken als pijplijngegevensbron voor meer informatie over het laden en verwerken van berichten van Azure Event Hubs.
Gegevens laden van externe systemen
Lakeflow Spark-declaratieve pijplijnen ondersteunen het laden van gegevens uit elke gegevensbron die wordt ondersteund door Azure Databricks. Zie Verbinding maken met gegevensbronnen en externe services. U kunt ook externe gegevens laden met Lakehouse Federation voor ondersteunde gegevensbronnen. Omdat Lakehouse Federation Databricks Runtime 13.3 LTS of hoger vereist, moet u uw pijplijn configureren voor het gebruik van het preview-kanaal.
Sommige gegevensbronnen hebben geen equivalente SQL-ondersteuning. Als u Lakehouse Federation niet kunt gebruiken met een van deze gegevensbronnen, kunt u Python gebruiken om gegevens uit de bron op te nemen. U kunt Python- en SQL-bronbestanden toevoegen aan dezelfde pijplijn. Het volgende voorbeeld declareert een gerealiseerde weergave voor toegang tot de huidige status van gegevens in een externe PostgreSQL-tabel.
import dp
@dp.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Kleine of statische gegevenssets laden vanuit cloudobjectopslag
U kunt kleine of statische gegevenssets laden met behulp van apache Spark-belastingsyntaxis. Lakeflow Spark-declaratieve pijplijnen ondersteunt alle bestandsindelingen die worden ondersteund door Apache Spark in Azure Databricks. Zie Opties voor gegevensindeling voor een volledige lijst.
In de volgende voorbeelden ziet u hoe u JSON laadt om een tabel te maken.
Python
@dp.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)
Opmerking
De read_files SQL-functie is gebruikelijk voor alle SQL-omgevingen in Azure Databricks. Het is het aanbevolen patroon voor directe bestandstoegang met behulp van SQL in pijplijnen. Zie Opties voor meer informatie.
Gegevens laden uit een aangepaste Python-gegevensbron
Met aangepaste Python-gegevensbronnen kunt u gegevens laden in aangepaste indelingen. U kunt code schrijven van en schrijven naar een specifieke externe gegevensbron of uw bestaande Python code gebruiken om gegevens van uw eigen interne systemen te lezen. Zie Aangepaste gegevensbronnen van PySpark voor meer informatie over het ontwikkelen van Python-gegevensbronnen.
In het volgende voorbeeld wordt een aangepaste gegevensbron geregistreerd met de indelingsnaam my_custom_datasource en gelezen in zowel batch- als streamingmodi.
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.
# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
return spark.read.format("my_custom_datasource").load()
# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
return spark.readStream.format("my_custom_datasource").load()
Een streamingtabel configureren om wijzigingen in een bronstreamingtabel te negeren
Standaard zijn voor streamingtabellen bronnen die alleen toevoegingen toestaan vereist. Wanneer voor uw bronstreamingtabel updates of verwijderingen zijn vereist (bijvoorbeeld voor AVG-verwerking 'recht om vergeten te worden'), gebruikt u de skipChangeCommits flag om deze wijzigingen te negeren. Deze vlag werkt alleen met spark.readStream het gebruik van de option() functie en kan niet worden gebruikt wanneer de bronstreamingtabel het doel is van een create_auto_cdc_flow() -functie. Zie Wijzigingen in delta-brontabellen verwerken voor meer informatie.
@dp.table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
Veilig toegang krijgen tot opslaggeheimen binnen een pijplijn
U kunt Azure Databricks-geheimen gebruiken om referenties op te slaan, zoals toegangssleutels of wachtwoorden. Als u het geheim in uw pijplijn wilt configureren, gebruikt u een Spark-eigenschap in de clusterconfiguratie voor pijplijninstellingen. Zie Klassieke rekenkracht configureren voor pijplijnen.
In het volgende voorbeeld wordt een geheim gebruikt voor het opslaan van een toegangssleutel die is vereist voor het lezen van invoergegevens uit een Azure Data Lake Storage-opslagaccount met behulp van automatisch laden. U kunt dezelfde methode gebruiken om elk geheim te configureren dat is vereist voor uw pijplijn, bijvoorbeeld AWS-sleutels voor toegang tot S3 of het wachtwoord voor een Apache Hive-metastore.
Zie Verbinding maken met Azure Data Lake Storage en Blob Storage voor meer informatie over het werken met Azure Data Lake Storage.
Opmerking
U moet het spark.hadoop. voorvoegsel toevoegen aan de spark_conf-configuratiesleutel waarmee de geheime waarde wordt ingesteld.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path>",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
}
}
],
"name": ":re[LDP] quickstart using ADLS2"
}
Vervang in dit codevoorbeeld de volgende waarden.
| Tijdelijke aanduiding | Vervangen door |
|---|---|
<container-name> |
De naam van de Azure opslagaccount-container. |
<storage-account-name> |
De naam van het ADLS-opslagaccount. |
<path> |
Het pad voor pijplijnuitvoergegevens en metagegevens. |
<scope-name> |
De naam van de Azure Databricks secrets scope. |
<secret-name> |
De naam van de sleutel die de toegangssleutel voor het Azure-opslagaccount bevat. |
from pyspark import pipelines as dp
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Vervang in dit codevoorbeeld de volgende waarden.
| Tijdelijke aanduiding | Vervangen door |
|---|---|
<container-name> |
De naam van de Azure opslagaccountcontainer waarin de invoergegevens worden opgeslagen. |
<storage-account-name> |
De naam van het ADLS-opslagaccount. |
<path-to-input-dataset> |
Het pad naar de invoergegevensset. |