Sdílet prostřednictvím


read_kinesis streamovaná funkce s hodnotou tabulky

Platí pro: zaškrtnutí označeného ano Databricks SQL zaškrtnutí označeného ano Databricks Runtime 13.3 LTS a vyšší

Vrátí tabulku se záznamy načtenými z Kinesis z jednoho nebo více datových proudů.

Syntaxe

read_kinesis ( { parameter => value } [, ...] )

Argumenty

read_kinesis vyžaduje vyvolání pojmenovaného parametru.

Jediným povinným argumentem je streamName. Všechny ostatní argumenty jsou volitelné.

Popisy argumentů jsou zde stručné. Další podrobnosti najdete v dokumentaci k Amazon Kinesis .

Existují různé možnosti připojení pro připojení a ověření pomocí AWS. awsAccessKeya awsSecretKey buď lze zadat v argumentech funkce pomocí tajné funkce, ručně nastavit v argumentech nebo nakonfigurovat jako proměnné prostředí, jak je uvedeno níže. roleArn, roleExternalIDroleSessionName lze také použít k ověřování pomocí AWS pomocí profilů instancí. Pokud žádnou z těchto možností nezadáte, použije se výchozí řetězec zprostředkovatele AWS.

Parametr Typ Popis
streamName STRING Povinný, čárkami oddělený seznam jednoho nebo více kinezních proudů.
awsAccessKey STRING Přístupový klíč AWS( pokud existuje). Je také možné zadat prostřednictvím různých možností podporovaných prostřednictvím výchozího řetězce zprostředkovatele přihlašovacích údajů AWS, včetně proměnných prostředí (AWS_ACCESS_KEY_ID) a souboru profilů přihlašovacích údajů.
awsSecretKey STRING Tajný klíč, který odpovídá přístupovém klíči. Je možné zadat buď v argumentech, nebo prostřednictvím různých možností podporovaných prostřednictvím výchozího řetězce zprostředkovatele přihlašovacích údajů AWS, včetně proměnných prostředí (AWS_SECRET_KEY nebo AWS_SECRET_ACCESS_KEY) a souboru profilů přihlašovacích údajů.
roleArn STRING Název prostředku Amazonu, který se má při přístupu k Kinesis předpokládat.
roleExternalId STRING Používá se při delegování přístupu k účtu AWS.
roleSessionName STRING Název relace role AWS.
stsEndpoint STRING Koncový bod pro vyžádání dočasných přihlašovacích údajů pro přístup.
region STRING Oblast pro zadání datových proudů Výchozí hodnotou je místně vyřešená oblast.
endpoint STRING regionální koncový bod datových proudů Kinesis. Výchozí hodnotou je místně vyřešená oblast.
initialPosition STRING Počáteční pozice pro čtení z datového proudu Jedna z těchto možností: "latest" (výchozí), "trim_horizon", "earliest", "at_timestamp".
consumerMode STRING Jedna z těchto možností: dotazování (výchozí) nebo EFO (enhanced-fan-out).
consumerName STRING Název příjemce. Všichni příjemci mají předponu databricks_. Výchozí hodnota je prázdný řetězec.
registerConsumerTimeoutInterval STRING Maximální časový limit čekání na registraci příjemce EFO Kinesis u datového proudu Kinesis před vyvolání chyby. Výchozí hodnota je 300s.
requireConsumerDeregistration BOOLEAN true pro zrušení registrace příjemce EFO při ukončení dotazu. Výchozí hodnota je false.
deregisterConsumerTimeoutInterval STRING Maximální časový limit čekání na zrušení registrace uživatele EFO Kinesis u datového proudu Kinesis před vyvolání chyby. Výchozí hodnota je 300s.
consumerRefreshInterval STRING Interval, ve kterém je příjemce kontrolován a aktualizován. Výchozí hodnota je 300s.

Následující argumenty slouží k řízení propustnosti čtení a latence kinesis:

Parametr Typ Popis
maxRecordsPerFetch INTEGER (>0) Volitelné, s výchozím nastavením 10 000 záznamů, které se mají načíst na požadavek rozhraní API na Kinesis.
maxFetchRate STRING Jak rychle se mají předem načítat data na horizontální oddíly. Hodnota mezi "1,0" a "2,0", která se měří v MB/s. Výchozí hodnota je 1.0.
minFetchPeriod STRING Maximální doba čekání mezi po sobě jdoucími pokusy o předběžné načtení. Výchozí hodnota je 400 ms.
maxFetchDuration STRING Maximální doba trvání pro uložení předem načtených nových dat do vyrovnávací paměti. Výchozí hodnota je 10s.
fetchBufferSize STRING Množství dat pro další aktivační událost. Výchozí hodnota je 20 gb.
shardsPerTask INTEGER (>0) Počet horizontálních oddílů Kinesis, ze které se mají předčítat paralelně na úlohu Sparku. Výchozí volba je 5.
shardFetchinterval STRING Jak často se má dotazovat na horizontální dělení. Výchozí hodnota je 1s.
coalesceThresholdBlockSize INTEGER (>0) Prahová hodnota, při které dochází k automatickému sluhování. Výchozí hodnota je 10 000 000.
coalesce BOOLEAN true a zkompilovat předem načtené požadavky. Výchozí hodnota je true.
coalesceBinSize INTEGER (>0) Přibližná velikost bloku po zvětšování. Výchozí hodnota je 128 000 000.
reuseKinesisClient BOOLEAN true pro opakované použití klienta Kinesis uloženého v mezipaměti. Výchozí hodnota je true s výjimkou clusteru PE.
clientRetries INTEGER (>0) Počet opakování ve scénáři opakování Výchozí volba je 5.

Návraty

Tabulka záznamů Kinesis s následujícím schématem:

Name Datový typ Vynulovatelné Standard Popis
partitionKey STRING No Klíč, který se používá k distribuci dat mezi horizontální oddíly datového proudu. Všechny datové záznamy se stejným klíčem oddílu se načtou ze stejného horizontálního oddílu.
data BINARY No Datová část kinesis s kódováním base-64.
stream STRING No Název datového proudu, ze kterého byla data načtena.
shardId STRING No Jedinečný identifikátor horizontálního oddílu, ze kterého byla data načtena.
sequenceNumber BIGINT No Jedinečný identifikátor záznamu v rámci jeho horizontálního oddílu.
approximateArrivalTimestamp TIMESTAMP No Přibližný čas vložení záznamu do datového proudu

Sloupce (stream, shardId, sequenceNumber) představují primární klíč.

Příklady

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
        awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => 'testing@databricks.com');