read_kafka
tabelwaarde, functie
Van toepassing op: Databricks SQL Databricks Runtime 13.3 LTS en hoger
Leest gegevens uit een Apache Kafka-cluster en retourneert de gegevens in tabelvorm.
Kan gegevens lezen uit een of meer Kafka-onderwerpen. Het ondersteunt zowel batchquery's als streaming-opname.
Syntaxis
read_kafka([option_key => option_value ] [, ...])
Argumenten
Voor deze functie is aanroepen van benoemde parameters vereist.
option_key
: De naam van de optie die u wilt configureren. U moet backticks (') gebruiken voor opties die puntjes (.
) bevatten.option_value
: Een constante expressie om de optie in te stellen. Accepteert letterlijke en scalaire functies.
Retouren
Records die zijn gelezen uit een Apache Kafka-cluster met het volgende schema:
key BINARY
: De sleutel van de Kafka-record.value BINARY NOT NULL
: De waarde van de Kafka-record.topic STRING NOT NULL
: De naam van het Kafka-onderwerp waaruit de record is gelezen.partition INT NOT NULL
: De id van de Kafka-partitie waaruit de record is gelezen.offset BIGINT NOT NULL
: Het offsetnummer van de record in KafkaTopicPartition
.timestamp TIMESTAMP NOT NULL
: Een tijdstempelwaarde voor de record. In detimestampType
kolom wordt gedefinieerd waarop deze tijdstempel overeenkomt.timestampType INTEGER NOT NULL
: Het type tijdstempel dat is opgegeven in detimestamp
kolom.headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: Koptekstwaarden die zijn opgegeven als onderdeel van de record (indien ingeschakeld).
Voorbeelden
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Opties
U vindt een gedetailleerde lijst met opties in de Apache Spark-documentatie.
Vereiste opties
Geef de onderstaande optie op om verbinding te maken met uw Kafka-cluster.
Optie |
---|
bootstrapServers Type: String Een door komma's gescheiden lijst met host-/poortparen die verwijzen naar kafka-cluster. Standaardwaarde: Geen |
Geef slechts een van de onderstaande opties op om te configureren uit welke Kafka-onderwerpen gegevens moeten worden opgehaald.
Optie |
---|
assign Type: String Een JSON-tekenreeks die de specifieke onderwerppartities bevat waaruit moet worden gebruikt. Voor de eerste en 1e partitie van topicA worden bijvoorbeeld '{"topicA":[0,1],"topicB":[2,4]}' de 0- en 1ste partitie gebruikt.Standaardwaarde: Geen |
subscribe Type: String Een door komma's gescheiden lijst met Kafka-onderwerpen waaruit moet worden gelezen. Standaardwaarde: Geen |
subscribePattern Type: String Een reguliere expressie die overeenkomt met onderwerpen waarop u zich kunt abonneren. Standaardwaarde: Geen |
Diverse opties
read_kafka
kan worden gebruikt in batchquery's en in streamingquery's. Met de onderstaande opties kunt u opgeven op welk type query ze van toepassing zijn.
Optie |
---|
endingOffsets Type: Querytype: String alleen batchDe verschuivingen die moeten worden gelezen totdat voor een batchquery "latest" de meest recente records moeten worden opgegeven, of een JSON-tekenreeks die een eindverschil voor elke TopicPartition opgeeft. In de JSON -1 kan een offset worden gebruikt om te verwijzen naar de meest recente versie. -2 (vroegst) als offset is niet toegestaan.Standaardwaarde: "latest" |
endingOffsetsByTimestamp Type: Querytype: String alleen batchEen JSON-tekenreeks die een eindtijdstempel aangeeft die moet worden gelezen voor elke TopicPartition. De tijdstempels moeten worden opgegeven als een lange waarde van de tijdstempel in milliseconden, 1970-01-01 00:00:00 UTC bijvoorbeeld1686444353000 . Zie hieronder informatie over het gedrag met tijdstempels.endingOffsetsByTimestamp heeft voorrang op endingOffsets .Standaardwaarde: Geen |
endingTimestamp Type: Querytype: String alleen batchEen tekenreekswaarde van de tijdstempel in milliseconden sinds 1970-01-01 00:00:00 UTC bijvoorbeeld "1686444353000" . Als Kafka de overeenkomende offset niet retourneert, wordt de offset ingesteld op de meest recente. Zie hieronder informatie over het gedrag met tijdstempels. Opmerking: endingTimestamp heeft voorrang op endingOffsetsByTimestamp enendingOffsets .Standaardwaarde: Geen |
includeHeaders Type: Boolean Querytype: streaming en batchOf u de Kafka-headers in de rij wilt opnemen. Standaardwaarde: false |
kafka.<consumer_option> Type: String Querytype: streaming en batchAlle specifieke opties voor Kafka-consumenten kunnen worden doorgegeven met het kafka. voorvoegsel. Deze opties moeten worden omgeven door backticks wanneer deze worden opgegeven, anders krijgt u een parserfout. U vindt de opties in de Kafka-documentatie.Opmerking: U moet de volgende opties niet instellen met deze functie: key.deserializer , , , value.deserializer bootstrap.servers group.id Standaardwaarde: Geen |
maxOffsetsPerTrigger Type: Long Querytype: alleen streamingFrequentielimiet voor het maximum aantal offsets of rijen dat per triggerinterval wordt verwerkt. Het opgegeven totale aantal offsets wordt proportioneel verdeeld over TopicPartitions. Standaardwaarde: Geen |
startingOffsets Type: String Querytype: streaming en batchHet beginpunt waarop een query wordt gestart, "earliest" ofwel van de vroegste offsets, "latest" die alleen afkomstig is van de meest recente offsets, of een JSON-tekenreeks die een beginverschil voor elke TopicPartition opgeeft. In de JSON kan -2 als offset worden gebruikt om te verwijzen naar de vroegste, -1 naar de meest recente.Opmerking: voor batchquery's is de meest recente (impliciet of met behulp van -1 in JSON) niet toegestaan. Voor streamingquery's geldt dit alleen wanneer een nieuwe query wordt gestart. Opnieuw gestarte streamingquery's worden voortgezet vanaf de offsets die zijn gedefinieerd in het querycontrolepunt. Nieuwe gedetecteerde partities tijdens een query beginnen ten vroegste. Standaardwaarde: "latest" voor streaming, "earliest" voor batch |
startingOffsetsByTimestamp Type: String Querytype: streaming en batchEen JSON-tekenreeks die een begintijdstempel voor elke TopicPartition opgeeft. De tijdstempels moeten worden opgegeven als een lange waarde van de tijdstempel in milliseconden, 1970-01-01 00:00:00 UTC bijvoorbeeld 1686444353000 . Zie hieronder informatie over het gedrag met tijdstempels. Als Kafka de overeenkomende offset niet retourneert, wordt het gedrag gevolgd door de waarde van de optie startingOffsetsByTimestampStrategy .startingOffsetsByTimestamp heeft voorrang op startingOffsets .Opmerking: Voor streamingquery's geldt dit alleen wanneer een nieuwe query wordt gestart. Opnieuw gestarte streamingquery's worden voortgezet vanaf de offsets die zijn gedefinieerd in het querycontrolepunt. Nieuwe gedetecteerde partities tijdens een query beginnen ten vroegste. Standaardwaarde: Geen |
startingOffsetsByTimestampStrategy Type: String Querytype: streaming en batchDeze strategie wordt gebruikt wanneer de opgegeven begin offset per tijdstempel (globaal of per partitie) niet overeenkomt met de offset kafka geretourneerd. De beschikbare strategieƫn zijn: - "error" : mislukt de query- "latest" : wijst de meest recente offset voor deze partities toe, zodat Spark nieuwere records van deze partities in latere microbatches kan lezen.Standaardwaarde: "error" |
startingTimestamp Type: String Querytype: streaming en batchEen tekenreekswaarde van de tijdstempel in milliseconden sinds 1970-01-01 00:00:00 UTC bijvoorbeeld "1686444353000" . Zie hieronder informatie over het gedrag met tijdstempels. Als Kafka de overeenkomende offset niet retourneert, wordt het gedrag gevolgd door de waarde van de optie startingOffsetsByTimestampStrategy .startingTimestamp heeft voorrang op startingOffsetsByTimestamp en startingOffsets .Opmerking: Voor streamingquery's geldt dit alleen wanneer een nieuwe query wordt gestart. Opnieuw gestarte streamingquery's worden voortgezet vanaf de offsets die zijn gedefinieerd in het querycontrolepunt. Zojuist gedetecteerde partities tijdens een query worden vroegst gestart. Standaardwaarde: Geen |
Notitie
De geretourneerde offset voor elke partitie is de vroegste offset waarvan de tijdstempel groter is dan of gelijk is aan de opgegeven tijdstempel in de bijbehorende partitie. Het gedrag verschilt per optie als Kafka de overeenkomende offset niet retourneert. Controleer de beschrijving van elke optie.
Spark geeft de tijdstempelgegevens gewoon door aan KafkaConsumer.offsetsForTimes
en interpreteert of redeneert niet over de waarde. Raadpleeg de documentatie voor meer informatieKafkaConsumer.offsetsForTimes
. Ook kan de betekenis van tijdstempel hier variƫren afhankelijk van de Kafka-configuratie (log.message.timestamp.type
). Zie de Documentatie van Apache Kafka voor meer informatie.