read_kafka tabelwaarde, functie

Van toepassing op:vinkje als ja aan Databricks SQL vinkje als ja aan Databricks Runtime 13.3 LTS en hoger

Leest gegevens uit een Apache Kafka-cluster en retourneert de gegevens in tabelvorm.

Kan gegevens lezen uit een of meer Kafka-onderwerpen. Het ondersteunt zowel batchquery's als streaming-opname.

Syntaxis

read_kafka([option_key => option_value ] [, ...])

Argumenten

Voor deze functie is aanroepen van benoemde parameters vereist.

  • option_key: De naam van de optie die u wilt configureren. U moet backticks (') gebruiken voor opties die puntjes (.) bevatten.
  • option_value: Een constante expressie om de optie in te stellen. Accepteert letterlijke en scalaire functies.

Retouren

Records die zijn gelezen uit een Apache Kafka-cluster met het volgende schema:

  • key BINARY: De sleutel van de Kafka-record.
  • value BINARY NOT NULL: De waarde van de Kafka-record.
  • topic STRING NOT NULL: De naam van het Kafka-onderwerp waaruit de record is gelezen.
  • partition INT NOT NULL: De id van de Kafka-partitie waaruit de record is gelezen.
  • offset BIGINT NOT NULL: Het offsetnummer van de record in Kafka TopicPartition.
  • timestamp TIMESTAMP NOT NULL: Een tijdstempelwaarde voor de record. In de timestampType kolom wordt gedefinieerd waarop deze tijdstempel overeenkomt.
  • timestampType INTEGER NOT NULL: Het type tijdstempel dat is opgegeven in de timestamp kolom.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: Koptekstwaarden die zijn opgegeven als onderdeel van de record (indien ingeschakeld).

Voorbeelden

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- A more advanced query with security credentials for Kafka.
> SELECT * FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events',
    startingOffsets => 'earliest',
    `kafka.security.protocol` => 'SASL_SSL',
    `kafka.sasl.mechanism` => 'PLAIN',
    `kafka.sasl.jaas.config` =>  'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{USER_NAME}" password="{PASSWORD}";',
  );

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Opties

U vindt een gedetailleerde lijst met opties in de Apache Spark-documentatie.

Vereiste opties

Geef de onderstaande optie op om verbinding te maken met uw Kafka-cluster.

Optie
bootstrapServers

Type: String

Een door komma's gescheiden lijst met host-/poortparen die verwijzen naar kafka-cluster.

Standaardwaarde: Geen

Geef slechts een van de onderstaande opties op om te configureren uit welke Kafka-onderwerpen gegevens moeten worden opgehaald.

Optie
assign

Type: String

Een JSON-tekenreeks die de specifieke onderwerppartities bevat waaruit moet worden gebruikt. Voor de eerste en 1e partitie van topicA worden bijvoorbeeld '{"topicA":[0,1],"topicB":[2,4]}'de 0- en 1ste partitie gebruikt.

Standaardwaarde: Geen
subscribe

Type: String

Een door komma's gescheiden lijst met Kafka-onderwerpen waaruit moet worden gelezen.

Standaardwaarde: Geen
subscribePattern

Type: String

Een reguliere expressie die overeenkomt met onderwerpen waarop u zich kunt abonneren.

Standaardwaarde: Geen

Diverse opties

read_kafka kan worden gebruikt in batchquery's en in streamingquery's. Met de onderstaande opties kunt u opgeven op welk type query ze van toepassing zijn.

Optie
endingOffsets

Type: Querytype: String alleen batch

De verschuivingen die moeten worden gelezen totdat voor een batchquery "latest" de meest recente records moeten worden opgegeven, of een JSON-tekenreeks die een eindverschil voor elke TopicPartition opgeeft. In de JSON -1 kan een offset worden gebruikt om te verwijzen naar de meest recente versie. -2 (vroegst) als offset is niet toegestaan.

Standaardwaarde: "latest"
endingOffsetsByTimestamp

Type: Querytype: String alleen batch

Een JSON-tekenreeks die een eindtijdstempel aangeeft die moet worden gelezen voor elke TopicPartition. De tijdstempels moeten worden opgegeven als een lange waarde van de tijdstempel in milliseconden, 1970-01-01 00:00:00 UTCbijvoorbeeld
1686444353000. Zie hieronder informatie over het gedrag met tijdstempels.
endingOffsetsByTimestamp heeft voorrang op endingOffsets.

Standaardwaarde: Geen
endingTimestamp

Type: Querytype: String alleen batch

Een tekenreekswaarde van de tijdstempel in milliseconden sinds
1970-01-01 00:00:00 UTCbijvoorbeeld "1686444353000". Als Kafka de overeenkomende offset niet retourneert, wordt de offset ingesteld op de meest recente. Zie hieronder informatie over het gedrag met tijdstempels. Opmerking: endingTimestamp heeft voorrang op endingOffsetsByTimestamp en
endingOffsets.

Standaardwaarde: Geen
includeHeaders

Type: Boolean Querytype: streaming en batch

Of u de Kafka-headers in de rij wilt opnemen.

Standaardwaarde: false
kafka.<consumer_option>

Type: String Querytype: streaming en batch

Alle specifieke opties voor Kafka-consumenten kunnen worden doorgegeven met het kafka. voorvoegsel. Deze opties moeten worden omgeven door backticks wanneer deze worden opgegeven, anders krijgt u een parserfout. U vindt de opties in de Kafka-documentatie.

Opmerking: U moet de volgende opties niet instellen met deze functie:
key.deserializer, , , value.deserializerbootstrap.serversgroup.id

Standaardwaarde: Geen
maxOffsetsPerTrigger

Type: Long Querytype: alleen streaming

Frequentielimiet voor het maximum aantal offsets of rijen dat per triggerinterval wordt verwerkt. Het opgegeven totale aantal offsets wordt proportioneel verdeeld over TopicPartitions.

Standaardwaarde: Geen
startingOffsets

Type: String Querytype: streaming en batch

Het beginpunt waarop een query wordt gestart, "earliest" ofwel van de vroegste offsets, "latest" die alleen afkomstig is van de meest recente offsets, of een JSON-tekenreeks die een beginverschil voor elke TopicPartition opgeeft. In de JSON kan -2 als offset worden gebruikt om te verwijzen naar de vroegste, -1 naar de meest recente.

Opmerking: voor batchquery's is de meest recente (impliciet of met behulp van -1 in JSON) niet toegestaan. Voor streamingquery's geldt dit alleen wanneer een nieuwe query wordt gestart. Opnieuw gestarte streamingquery's worden voortgezet vanaf de offsets die zijn gedefinieerd in het querycontrolepunt. Nieuwe gedetecteerde partities tijdens een query beginnen ten vroegste.

Standaardwaarde: "latest" voor streaming, "earliest" voor batch
startingOffsetsByTimestamp

Type: String Querytype: streaming en batch

Een JSON-tekenreeks die een begintijdstempel voor elke TopicPartition opgeeft. De tijdstempels moeten worden opgegeven als een lange waarde van de tijdstempel in milliseconden, 1970-01-01 00:00:00 UTCbijvoorbeeld 1686444353000. Zie hieronder informatie over het gedrag met tijdstempels. Als Kafka de overeenkomende offset niet retourneert, wordt het gedrag gevolgd door de waarde van de optie startingOffsetsByTimestampStrategy.
startingOffsetsByTimestamp heeft voorrang op startingOffsets.

Opmerking: Voor streamingquery's geldt dit alleen wanneer een nieuwe query wordt gestart. Opnieuw gestarte streamingquery's worden voortgezet vanaf de offsets die zijn gedefinieerd in het querycontrolepunt. Nieuwe gedetecteerde partities tijdens een query beginnen ten vroegste.

Standaardwaarde: Geen
startingOffsetsByTimestampStrategy

Type: String Querytype: streaming en batch

Deze strategie wordt gebruikt wanneer de opgegeven begin offset per tijdstempel (globaal of per partitie) niet overeenkomt met de offset kafka geretourneerd. De beschikbare strategieƫn zijn:

* "error": mislukt de query
* "latest": wijst de meest recente offset voor deze partities toe, zodat Spark nieuwere records van deze partities in latere microbatches kan lezen.

Standaardwaarde: "error"
startingTimestamp

Type: String Querytype: streaming en batch

Een tekenreekswaarde van de tijdstempel in milliseconden sinds
1970-01-01 00:00:00 UTCbijvoorbeeld "1686444353000". Zie hieronder informatie over het gedrag met tijdstempels. Als Kafka de overeenkomende offset niet retourneert, wordt het gedrag gevolgd door de waarde van de optie startingOffsetsByTimestampStrategy.
startingTimestamp heeft voorrang op startingOffsetsByTimestamp en startingOffsets.

Opmerking: Voor streamingquery's geldt dit alleen wanneer een nieuwe query wordt gestart. Opnieuw gestarte streamingquery's worden voortgezet vanaf de offsets die zijn gedefinieerd in het querycontrolepunt. Zojuist gedetecteerde partities tijdens een query worden vroegst gestart.

Standaardwaarde: Geen

Notitie

De geretourneerde offset voor elke partitie is de vroegste offset waarvan de tijdstempel groter is dan of gelijk is aan de opgegeven tijdstempel in de bijbehorende partitie. Het gedrag verschilt per optie als Kafka de overeenkomende offset niet retourneert. Controleer de beschrijving van elke optie.

Spark geeft de tijdstempelgegevens gewoon door aan KafkaConsumer.offsetsForTimesen interpreteert of redeneert niet over de waarde. Raadpleeg de documentatie voor meer informatieKafkaConsumer.offsetsForTimes. Ook kan de betekenis van tijdstempel hier variƫren afhankelijk van de Kafka-configuratie (log.message.timestamp.type). Zie de Documentatie van Apache Kafka voor meer informatie.