read_pulsar functie met streamingtabelwaarde

Van toepassing op:check marked yes Databricks SQL check marked yes Databricks Runtime 14.1 en hoger

Belangrijk

Deze functie is beschikbaar als openbare preview.

Retourneert een tabel met records die zijn gelezen uit Pulsar.

Deze tabelwaardefunctie ondersteunt alleen streaming en geen batchquery.

Syntaxis

read_pulsar ( { option_key => option_value } [, ...] )

Argumenten

Voor deze functie is aanroepen van benoemde parameters vereist voor de optiesleutels.

De opties serviceUrl en topic zijn verplicht.

De beschrijvingen van de argumenten zijn hier kort. Zie de documentatie voor gestructureerd streamen van Pulsar voor uitgebreide beschrijvingen.

Optie Type Standaard Beschrijving
serviceUrl STRING Verplicht De URI van de Pulsar-service.
onderwerp STRING Verplicht Het onderwerp waaruit u wilt lezen.
predefinedSubscription STRING Geen De vooraf gedefinieerde abonnementsnaam die door de connector wordt gebruikt om de voortgang van spark-toepassingen bij te houden.
subscriptionPrefix STRING Geen Een voorvoegsel dat door de connector wordt gebruikt om een willekeurig abonnement te genereren om de voortgang van spark-toepassingen bij te houden.
pollTimeoutMs LANGE 120.000 De time-out voor het lezen van berichten van Pulsar in milliseconden.
failOnDataLoss BOOLEAN true Hiermee bepaalt u of een query mislukt wanneer gegevens verloren gaan (bijvoorbeeld onderwerpen worden verwijderd of berichten worden verwijderd vanwege bewaarbeleid).
startingOffsets STRING nieuwste Het beginpunt waarop een query wordt gestart, ofwel de vroegste, meest recente of een JSON-tekenreeks die een specifieke offset aangeeft. Als de laatste is, leest de lezer de nieuwste records nadat deze is gestart. Als deze vroegst is, leest de lezer van de vroegste verschuiving. De gebruiker kan ook een JSON-tekenreeks opgeven die een specifieke offset aangeeft.
startingTime STRING Geen Wanneer dit is opgegeven, leest de Pulsar-bron berichten vanaf de positie van de opgegeven startTime.

De volgende argumenten worden gebruikt voor verificatie van de pulsar-client:

Optie Type Standaard Beschrijving
pulsarClientAuthPluginClassName STRING Geen Naam van de verificatieinvoegtoepassing.
pulsarClientAuthParams STRING Geen Parameters voor de verificatieinvoegtoepassing.
pulsarClientUseKeyStoreTls STRING Geen Of keystore moet worden gebruikt voor tls-verificatie.
pulsarClientTlsTrustStoreType STRING Geen TrustStore-bestandstype voor tls-verificatie.
pulsarClientTlsTrustStorePath STRING Geen TrustStore-bestandspad voor tls-verificatie.
pulsarClientTlsTrustStorePassword STRING Geen TrustStore-wachtwoord voor tls-verificatie.

Deze argumenten worden gebruikt voor configuratie en verificatie van pulsar-toegangsbeheer, pulsar-beheerconfiguratie is alleen vereist wanneer toegangsbeheer is ingeschakeld (wanneer maxBytesPerTrigger is ingesteld)

Optie Type Standaard Beschrijving
maxBytesPerTrigger BIGINT Geen Een zachte limiet van het maximum aantal bytes dat we per microbatch willen verwerken. Als dit is opgegeven, moet admin.url ook worden opgegeven.
adminUrl STRING Geen De Pulsar-serviceHttpUrl-configuratie. Alleen nodig wanneer maxBytesPerTrigger is opgegeven.
pulsar Beheer AuthPlugin STRING Geen Naam van de verificatieinvoegtoepassing.
pulsar Beheer AuthParams STRING Geen Parameters voor de verificatieinvoegtoepassing.
pulsarClientUseKeyStoreTls STRING Geen Of keystore moet worden gebruikt voor tls-verificatie.
pulsar Beheer TlsTrustStoreType STRING Geen TrustStore-bestandstype voor tls-verificatie.
pulsar Beheer TlsTrustStorePath STRING Geen TrustStore-bestandspad voor tls-verificatie.
pulsar Beheer TlsTrustStorePassword STRING Geen TrustStore-wachtwoord voor tls-verificatie.

Retourneert

Een tabel met pulsar-records met het volgende schema.

  • __key STRING NOT NULL: Pulsar message key.

  • value BINARY NOT NULL: Pulsar berichtwaarde.

    Opmerking: Voor onderwerpen met het Avro- of JSON-schema wordt de inhoud uitgebreid om de veldnamen en veldtypen van het Pulsar-onderwerp te behouden in plaats van inhoud in een binair waardeveld te laden.

  • __topic STRING NOT NULL: Naam van Pulsar-onderwerp.

  • __messageId BINARY NOT NULL: Pulsar bericht-id.

  • __publishTime TIMESTAMP NOT NULL: Publicatietijd van Pulsar-bericht.

  • __eventTime TIMESTAMP NOT NULL: Pulsar bericht gebeurtenis tijd.

  • __messageProperties MAP<STRING, STRING>: Pulsar berichteigenschappen.

Voorbeelden

-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
      serviceUrl => 'pulsar://broker.example.com:6650',
      startingOffsets => 'earliest',
      topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
        serviceUrl => 'pulsar://broker.example.com:6650',
        startingOffsets => 'earliest',
        topic => 'my-topic',
        pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
        pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
        );

The data can now to be queried from the testing.streaming_table for further analysis.