Delen via


read_kinesis functie met tabelwaarde streamen

Van toepassing op:vinkje als ja aan Databricks SQL vinkje als ja aan Databricks Runtime 13.3 LTS en hoger

Retourneert een tabel met records die zijn gelezen uit Kinesis uit een of meer streams.

Syntaxis

read_kinesis ( { parameter => value } [, ...] )

Argumenten

read_kinesis vereist de aanroep met benoemde parameters.

Het enige vereiste argument is streamName. Alle andere argumenten zijn optioneel.

De beschrijvingen van de argumenten zijn hier kort. Zie de Amazon Kinesis-documentatie voor meer informatie.

Er zijn meerdere manieren om verbinding te maken en te verifiëren met AWS. De aanbevolen methode is om een Databricks-servicereferentie te maken en op te geven met behulp van de serviceCredential optie. U kunt ook verifiëren met awsAccessKey en awsSecretKey. Deze opties kunnen worden opgegeven in de functieargumenten met behulp van de secret functie, handmatig in de argumenten worden ingesteld of geconfigureerd als omgevingsvariabelen zoals hieronder wordt aangegeven. roleArn roleExternalID, roleSessionNamekan ook worden gebruikt voor verificatie met AWS met behulp van exemplaarprofielen. Als geen van deze is opgegeven, wordt de standaardketen van de AWS-provider gebruikt.

Kenmerk Typologie Beschrijving
streamName STRING Vereiste, door komma's gescheiden lijst van een of meer kinesisstromen.
serviceCredential STRING De naam van uw Databricks-servicereferentie.
awsAccessKey STRING De AWS-toegangssleutel, indien van toepassing. Kan ook worden opgegeven via de verschillende opties die worden ondersteund via de standaardketen van de AWS-referentieprovider, inclusief omgevingsvariabelen (AWS_ACCESS_KEY_ID) en een bestand met referentieprofielen.
awsSecretKey STRING De geheime sleutel die overeenkomt met de toegangssleutel. Kan worden opgegeven in de argumenten of via de verschillende opties die worden ondersteund via de standaardketen van de AWS-referentieprovider, inclusief omgevingsvariabelen (AWS_SECRET_KEY of AWS_SECRET_ACCESS_KEY) en een bestand met referentiesprofielen.
roleArn STRING Amazon-resource-naam van de rol die je moet aannemen bij het openen van Kinesis.
roleExternalId STRING Wordt gebruikt bij het delegeren van toegang tot het AWS-account.
roleSessionName STRING Naam van AWS-rolsessie.
stsEndpoint STRING Een eindpunt voor het aanvragen van referenties voor tijdelijke toegang.
region STRING Regio voor de streams die moeten worden opgegeven. De standaardwaarde is de lokaal opgeloste regio.
endpoint STRING regionaal eindpunt voor Kinesis-gegevensstromen. De standaardwaarde is de lokaal opgeloste regio.
initialPosition STRING Beginpositie voor lezen vanuit de stroom. Een van de volgende: 'laatste' (standaard), 'trim_horizon', 'earliest', 'at_timestamp'.
consumerMode STRING Een van: 'polling' (standaard) of 'EFO' (enhanced-fan-out).
consumerName STRING De naam van de consument. Alle consumentennamen worden voorafgegaan door 'databricks_'. De standaardwaarde is een lege tekenreeks.
registerConsumerTimeoutInterval STRING de maximale wachttijd om te wachten totdat de Kinesis EFO-consument is geregistreerd bij de Kinesis-stream voordat er een fout optreedt. De standaardwaarde is '300s'.
requireConsumerDeregistration BOOLEAN true om de EFO-consument bij het stoppen van query's uit te schrijven. Standaard is false.
deregisterConsumerTimeoutInterval STRING De maximale wachttijd om te wachten totdat de Kinesis EFO-consument is afgemeld bij de Kinesis-stream voordat er een fout wordt gegenereerd. De standaardwaarde is '300s'.
consumerRefreshInterval STRING Het interval waarmee de consument wordt gecontroleerd en vernieuwd. De standaardwaarde is '300s'.

De volgende argumenten worden gebruikt voor het beheren van de leesdoorvoer en latentie voor Kinesis:

Kenmerk Typologie Beschrijving
maxRecordsPerFetch INTEGER (>0) Optioneel, met een standaardwaarde van 10.000 records die per API-aanvraag naar Kinesis moeten worden gelezen.
maxFetchRate STRING Hoe snel u gegevens per shard kunt vooraf fetcheren. Een waarde tussen '1,0' en '2.0' die wordt gemeten in MB/s. De standaardwaarde is '1.0'.
minFetchPeriod STRING De maximale wachttijd tussen opeenvolgende prefetch-acties. De standaardwaarde is 400 ms.
maxFetchDuration STRING De maximale duur voor het bufferen van vooraf geladen nieuwe gegevens. De standaardwaarde is '10s'.
fetchBufferSize STRING De hoeveelheid gegevens voor de volgende trigger. De standaardwaarde is '20 gb'.
shardsPerTask INTEGER (>0) Het aantal Kinesis-shards dat parallel per Spark-taak moet worden voorgehaald. De standaard is 5.
shardFetchinterval STRING Hoe vaak moet er worden gecontroleerd op resharding. De standaardwaarde is '1s'.
coalesceThresholdBlockSize INTEGER (>0) De drempel waarop automatische samenvoeging plaatsvindt. De standaardwaarde is 10.000.000.
coalesce BOOLEAN true om voorgesorteerde aanvragen samen te voegen. De standaardwaarde is true.
coalesceBinSize INTEGER (>0) De geschatte blokgrootte na het samenvoegen. De standaardwaarde is 128.000.000.
reuseKinesisClient BOOLEAN true om de Kinesis-client die in de cache is opgeslagen, opnieuw te gebruiken. De standaardwaarde is true behalve op een PE-cluster.
clientRetries INTEGER (>0) Het aantal herhalingen in het herhalingsscenario. De standaard is 5.

Retouren

Een tabel met Kinesis-records met het volgende schema:

Naam Gegevenstype Null-waarde toegestaan Standaard Beschrijving
partitionKey STRING Nee Een sleutel die wordt gebruikt voor het distribueren van gegevens tussen de shards van een stream. Alle gegevensrecords met dezelfde partitiesleutel worden gelezen uit dezelfde shard.
data BINARY Nee De gegevenslading van de kinesis-gegevens, Base64 gecodeerd.
stream STRING Nee De naam van de stream waaruit de gegevens zijn gelezen.
shardId STRING Nee Een unieke id voor de shard waaruit de gegevens zijn gelezen.
sequenceNumber BIGINT Nee De unieke identificatie van het record binnen zijn shard.
approximateArrivalTimestamp TIMESTAMP Nee De geschatte tijd dat de record in de stream is ingevoegd.

De kolommen (stream, shardId, sequenceNumber) vormen een primaire sleutel.

Voorbeelden

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret('test-databricks', 'awsAccessKey'),
        awsSecretKey => secret('test-databricks', 'awsSecretKey'),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => 'testing@databricks.com');