read_pulsar
functie met streamingtabelwaarde
Van toepassing op: Databricks SQL Databricks Runtime 14.1 en hoger
Belangrijk
Deze functie is beschikbaar als openbare preview.
Retourneert een tabel met records die zijn gelezen uit Pulsar.
Deze tabelwaardefunctie ondersteunt alleen streaming en geen batchquery.
Syntaxis
read_pulsar ( { option_key => option_value } [, ...] )
Argumenten
Voor deze functie is aanroepen van benoemde parameters vereist voor de optiesleutels.
De opties serviceUrl
en topic
zijn verplicht.
De beschrijvingen van de argumenten zijn hier kort. Zie de documentatie voor gestructureerd streamen van Pulsar voor uitgebreide beschrijvingen.
Optie | Type | Standaard | Beschrijving |
---|---|---|---|
serviceUrl | STRING | Verplicht | De URI van de Pulsar-service. |
onderwerp | STRING | Verplicht | Het onderwerp waaruit u wilt lezen. |
predefinedSubscription | STRING | Geen | De vooraf gedefinieerde abonnementsnaam die door de connector wordt gebruikt om de voortgang van spark-toepassingen bij te houden. |
subscriptionPrefix | STRING | Geen | Een voorvoegsel dat door de connector wordt gebruikt om een willekeurig abonnement te genereren om de voortgang van spark-toepassingen bij te houden. |
pollTimeoutMs | LANGE | 120.000 | De time-out voor het lezen van berichten van Pulsar in milliseconden. |
failOnDataLoss | BOOLEAN | true | Hiermee bepaalt u of een query mislukt wanneer gegevens verloren gaan (bijvoorbeeld onderwerpen worden verwijderd of berichten worden verwijderd vanwege bewaarbeleid). |
startingOffsets | STRING | nieuwste | Het beginpunt waarop een query wordt gestart, ofwel de vroegste, meest recente of een JSON-tekenreeks die een specifieke offset aangeeft. Als de laatste is, leest de lezer de nieuwste records nadat deze is gestart. Als deze vroegst is, leest de lezer van de vroegste verschuiving. De gebruiker kan ook een JSON-tekenreeks opgeven die een specifieke offset aangeeft. |
startingTime | STRING | Geen | Wanneer dit is opgegeven, leest de Pulsar-bron berichten vanaf de positie van de opgegeven startTime. |
De volgende argumenten worden gebruikt voor verificatie van de pulsar-client:
Optie | Type | Standaard | Beschrijving |
---|---|---|---|
pulsarClientAuthPluginClassName | STRING | Geen | Naam van de verificatieinvoegtoepassing. |
pulsarClientAuthParams | STRING | Geen | Parameters voor de verificatieinvoegtoepassing. |
pulsarClientUseKeyStoreTls | STRING | Geen | Of keystore moet worden gebruikt voor tls-verificatie. |
pulsarClientTlsTrustStoreType | STRING | Geen | TrustStore-bestandstype voor tls-verificatie. |
pulsarClientTlsTrustStorePath | STRING | Geen | TrustStore-bestandspad voor tls-verificatie. |
pulsarClientTlsTrustStorePassword | STRING | Geen | TrustStore-wachtwoord voor tls-verificatie. |
Deze argumenten worden gebruikt voor configuratie en verificatie van pulsar-toegangsbeheer, pulsar-beheerconfiguratie is alleen vereist wanneer toegangsbeheer is ingeschakeld (wanneer maxBytesPerTrigger is ingesteld)
Optie | Type | Standaard | Beschrijving |
---|---|---|---|
maxBytesPerTrigger | BIGINT | Geen | Een zachte limiet van het maximum aantal bytes dat we per microbatch willen verwerken. Als dit is opgegeven, moet admin.url ook worden opgegeven. |
adminUrl | STRING | Geen | De Pulsar-serviceHttpUrl-configuratie. Alleen nodig wanneer maxBytesPerTrigger is opgegeven. |
pulsar Beheer AuthPlugin | STRING | Geen | Naam van de verificatieinvoegtoepassing. |
pulsar Beheer AuthParams | STRING | Geen | Parameters voor de verificatieinvoegtoepassing. |
pulsarClientUseKeyStoreTls | STRING | Geen | Of keystore moet worden gebruikt voor tls-verificatie. |
pulsar Beheer TlsTrustStoreType | STRING | Geen | TrustStore-bestandstype voor tls-verificatie. |
pulsar Beheer TlsTrustStorePath | STRING | Geen | TrustStore-bestandspad voor tls-verificatie. |
pulsar Beheer TlsTrustStorePassword | STRING | Geen | TrustStore-wachtwoord voor tls-verificatie. |
Retourneert
Een tabel met pulsar-records met het volgende schema.
__key STRING NOT NULL
: Pulsar message key.value BINARY NOT NULL
: Pulsar berichtwaarde.Opmerking: Voor onderwerpen met het Avro- of JSON-schema wordt de inhoud uitgebreid om de veldnamen en veldtypen van het Pulsar-onderwerp te behouden in plaats van inhoud in een binair waardeveld te laden.
__topic STRING NOT NULL
: Naam van Pulsar-onderwerp.__messageId BINARY NOT NULL
: Pulsar bericht-id.__publishTime TIMESTAMP NOT NULL
: Publicatietijd van Pulsar-bericht.__eventTime TIMESTAMP NOT NULL
: Pulsar bericht gebeurtenis tijd.__messageProperties MAP<STRING, STRING>
: Pulsar berichteigenschappen.
Voorbeelden
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.