Share via


read_pulsar funzione con valori di tabella di streaming

Si applica a:check marked yes Databricks SQL check marked yes Databricks Runtime 14.1 e versioni successive

Importante

Questa funzionalità è disponibile in anteprima pubblica.

Restituisce una tabella con record letti da Pulsar.

Questa funzione con valori di tabella supporta solo lo streaming e non la query batch.

Sintassi

read_pulsar ( { option_key => option_value } [, ...] )

Argomenti

Questa funzione richiede la chiamata di parametri denominati per le chiavi di opzione.

Le opzioni serviceUrl e topic sono obbligatorie.

Le descrizioni degli argomenti sono brevi qui. Per le descrizioni estese, vedere la documentazione di Pulsar di streaming strutturato.

Opzione Type Default Descrizione
serviceUrl STRING Obbligatorio URI del servizio Pulsar.
argomento STRING Obbligatorio Argomento da cui leggere.
predefinedSubscription STRING None Nome di sottoscrizione predefinito usato dal connettore per tenere traccia dello stato dell'applicazione Spark.
subscriptionPrefix STRING None Prefisso usato dal connettore per generare una sottoscrizione casuale per tenere traccia dello stato dell'applicazione Spark.
pollTimeoutMs LONG 120000 Timeout per la lettura dei messaggi da Pulsar in millisecondi.
failOnDataLoss BOOLEAN vero Controlla se non eseguire una query quando i dati vengono persi( ad esempio, gli argomenti vengono eliminati o i messaggi vengono eliminati a causa dei criteri di conservazione).
startingOffsets STRING più recente Punto iniziale all'avvio di una query, prima, più recente o stringa JSON che specifica un offset specifico. Se più recente, il lettore legge i record più recenti dopo l'avvio dell'esecuzione. Se meno recente, il lettore legge dall'offset meno recente. L'utente può anche specificare una stringa JSON che specifica un offset specifico.
startingTime STRING None Se specificato, l'origine Pulsar leggerà i messaggi a partire dalla posizione del valore startingTime specificato.

Per l'autenticazione del client pulsar vengono usati gli argomenti seguenti:

Opzione Type Default Descrizione
pulsarClientAuthPluginClassName STRING None Nome del plug-in di autenticazione.
pulsarClientAuthParams STRING None Parametri per il plug-in di autenticazione.
pulsarClientUseKeyStoreTls STRING None Indica se usare KeyStore per l'autenticazione tls.
pulsarClientTlsTrustStoreType STRING None Tipo di file TrustStore per l'autenticazione tls.
pulsarClientTlsTrustStorePath STRING None Percorso del file TrustStore per l'autenticazione tls.
pulsarClientTlsTrustStorePassword STRING None Password trustStore per l'autenticazione tls.

Questi argomenti vengono usati per la configurazione e l'autenticazione del controllo di ammissione pulsar, la configurazione dell'amministratore pulsar è necessaria solo quando il controllo di ammissione è abilitato (quando è impostato maxBytesPerTrigger)

Opzione Type Default Descrizione
maxBytesPerTrigger bigint None Limite flessibile del numero massimo di byte da elaborare per microbatch. Se questa opzione è specificata, è necessario specificare anche admin.url.
adminUrl STRING None Configurazione del servizio PulsarHttpUrl. È necessario solo quando viene specificato maxBytesPerTrigger.
pulsar Amministrazione AuthPlugin STRING None Nome del plug-in di autenticazione.
pulsar Amministrazione AuthParams STRING None Parametri per il plug-in di autenticazione.
pulsarClientUseKeyStoreTls STRING None Indica se usare KeyStore per l'autenticazione tls.
pulsar Amministrazione TlsTrustStoreType STRING None Tipo di file TrustStore per l'autenticazione tls.
pulsar Amministrazione TlsTrustStorePath STRING None Percorso del file TrustStore per l'autenticazione tls.
pulsar Amministrazione TlsTrustStorePassword STRING None Password trustStore per l'autenticazione tls.

Valori restituiti

Tabella di record pulsar con lo schema seguente.

  • __key STRING NOT NULL: chiave del messaggio Pulsar.

  • value BINARY NOT NULL: valore del messaggio Pulsar.

    Nota: per gli argomenti con lo schema Avro o JSON, invece di caricare il contenuto in un campo valore binario, il contenuto verrà espanso per mantenere i nomi dei campi e i tipi di campo dell'argomento Pulsar.

  • __topic STRING NOT NULL: nome dell'argomento Pulsar.

  • __messageId BINARY NOT NULL: ID messaggio Pulsar.

  • __publishTime TIMESTAMP NOT NULL: tempo di pubblicazione del messaggio Pulsar.

  • __eventTime TIMESTAMP NOT NULL: ora dell'evento del messaggio Pulsar.

  • __messageProperties MAP<STRING, STRING>: proprietà del messaggio Pulsar.

Esempi

-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
      serviceUrl => 'pulsar://broker.example.com:6650',
      startingOffsets => 'earliest',
      topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
        serviceUrl => 'pulsar://broker.example.com:6650',
        startingOffsets => 'earliest',
        topic => 'my-topic',
        pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
        pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
        );

The data can now to be queried from the testing.streaming_table for further analysis.