Query's uitvoeren op streaminggegevens
U kunt Azure Databricks gebruiken om query's uit te voeren op streaminggegevensbronnen met behulp van Structured Streaming. Azure Databricks biedt uitgebreide ondersteuning voor streamingworkloads in Python en Scala en biedt ondersteuning voor de meeste structured streaming-functionaliteit met SQL.
In de volgende voorbeelden ziet u hoe u een geheugensink gebruikt voor handmatige inspectie van streaminggegevens tijdens interactieve ontwikkeling in notebooks. Vanwege de limieten voor rijuitvoer in de gebruikersinterface van het notebook, ziet u mogelijk niet alle gegevens die worden gelezen door streamingquery's. In productieworkloads moet u alleen streamingquery's activeren door ze naar een doeltabel of extern systeem te schrijven.
Notitie
SQL-ondersteuning voor interactieve query's op streaminggegevens is beperkt tot notebooks die worden uitgevoerd op alle rekendoeleinden. U kunt SQL ook gebruiken wanneer u streamingtabellen declareert in Databricks SQL- of Delta Live Tables. Zie Gegevens laden met behulp van streamingtabellen in Databricks SQL en wat is Delta Live Tables?
Query's uitvoeren op gegevens uit streamingsystemen
Azure Databricks biedt streaminggegevenslezers voor de volgende streamingsystemen:
- Kafka
- Kinesis
- PubSub
- Pulsar
U moet configuratiegegevens opgeven wanneer u query's initialiseert op basis van deze systemen, die variƫren, afhankelijk van uw geconfigureerde omgeving en het systeem waaruit u wilt lezen. Zie Streaminggegevensbronnen configureren.
Veelvoorkomende werkbelastingen waarbij streamingsystemen betrokken zijn, omvatten gegevensopname naar lakehouse en stroomverwerking naar sinkgegevens naar externe systemen. Zie Streaming op Azure Databricks voor meer informatie over streamingworkloads.
In de volgende voorbeelden ziet u een interactieve streaming die vanuit Kafka wordt gelezen:
Python
display(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'latest'
);
Een query uitvoeren op een tabel als streaming-leesbewerking
Azure Databricks maakt standaard alle tabellen met Delta Lake. Wanneer u een streamingquery uitvoert voor een Delta-tabel, worden met de query automatisch nieuwe records opgehaald wanneer een versie van de tabel wordt doorgevoerd. Standaard verwachten streamingquery's dat brontabellen alleen toegevoegde records bevatten. Als u wilt werken met streaminggegevens die updates en verwijderingen bevatten, raadt Databricks aan om Delta Live Tables en APPLY CHANGES INTO
. Zie de APPLY CHANGES API's: Vereenvoudig het vastleggen van wijzigingsgegevens met Delta Live Tables.
In de volgende voorbeelden ziet u hoe u interactieve streaming uitvoert die uit een tabel wordt gelezen:
Python
display(spark.readStream.table("table_name"))
SQL
SELECT * FROM STREAM table_name
Query's uitvoeren op gegevens in de opslag van cloudobjecten met automatisch laden
U kunt gegevens streamen vanuit cloudobjectopslag met behulp van Auto Loader, de Azure Databricks-cloudgegevensconnector. U kunt de connector gebruiken met bestanden die zijn opgeslagen in Unity Catalog-volumes of andere opslaglocaties voor cloudobjecten. Databricks raadt het gebruik van volumes aan om de toegang tot gegevens in cloudobjectopslag te beheren. Zie Verbinding maken met gegevensbronnen.
Azure Databricks optimaliseert deze connector voor streamingopname van gegevens in cloudobjectopslag die is opgeslagen in populaire gestructureerde, semi-gestructureerde en ongestructureerde indelingen. Databricks raadt u aan opgenomen gegevens op te slaan in een bijna onbewerkte indeling om de doorvoer te maximaliseren en potentiƫle gegevensverlies te minimaliseren vanwege beschadigde records of schemawijzigingen.
Zie Gegevens opnemen in een Databricks Lakehouse voor meer aanbevelingen voor het opnemen van gegevens uit cloudobjectopslag.
In de volgende voorbeelden ziet u een interactieve streaming die wordt gelezen uit een map met JSON-bestanden in een volume:
Python
display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))
SQL
SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')