Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Van toepassing op: Databricks SQL
Databricks Runtime 13.3 LTS en hoger
Retourneert een tabel met records die zijn gelezen uit Kinesis uit een of meer streams.
Syntaxis
read_kinesis ( { parameter => value } [, ...] )
Argumenten
read_kinesis
vereist de aanroep met benoemde parameters.
Het enige vereiste argument is streamName
. Alle andere argumenten zijn optioneel.
De beschrijvingen van de argumenten zijn hier kort. Zie de Amazon Kinesis-documentatie voor meer informatie.
Er zijn meerdere manieren om verbinding te maken en te verifiëren met AWS.
De aanbevolen methode is om een Databricks-servicereferentie te maken en op te geven met behulp van de serviceCredential
optie.
U kunt ook verifiëren met awsAccessKey
en awsSecretKey
.
Deze opties kunnen worden opgegeven in de functieargumenten met behulp van de secret
functie, handmatig in de argumenten worden ingesteld of geconfigureerd als omgevingsvariabelen zoals hieronder wordt aangegeven.
roleArn
roleExternalID
, roleSessionName
kan ook worden gebruikt voor verificatie met AWS met behulp van exemplaarprofielen.
Als geen van deze is opgegeven, wordt de standaardketen van de AWS-provider gebruikt.
Kenmerk | Typologie | Beschrijving |
---|---|---|
streamName |
STRING |
Vereiste, door komma's gescheiden lijst van een of meer kinesisstromen. |
serviceCredential |
STRING |
De naam van uw Databricks-servicereferentie. |
awsAccessKey |
STRING |
De AWS-toegangssleutel, indien van toepassing. Kan ook worden opgegeven via de verschillende opties die worden ondersteund via de standaardketen van de AWS-referentieprovider, inclusief omgevingsvariabelen (AWS_ACCESS_KEY_ID ) en een bestand met referentieprofielen. |
awsSecretKey |
STRING |
De geheime sleutel die overeenkomt met de toegangssleutel. Kan worden opgegeven in de argumenten of via de verschillende opties die worden ondersteund via de standaardketen van de AWS-referentieprovider, inclusief omgevingsvariabelen (AWS_SECRET_KEY of AWS_SECRET_ACCESS_KEY ) en een bestand met referentiesprofielen. |
roleArn |
STRING |
Amazon-resource-naam van de rol die je moet aannemen bij het openen van Kinesis. |
roleExternalId |
STRING |
Wordt gebruikt bij het delegeren van toegang tot het AWS-account. |
roleSessionName |
STRING |
Naam van AWS-rolsessie. |
stsEndpoint |
STRING |
Een eindpunt voor het aanvragen van referenties voor tijdelijke toegang. |
region |
STRING |
Regio voor de streams die moeten worden opgegeven. De standaardwaarde is de lokaal opgeloste regio. |
endpoint |
STRING |
regionaal eindpunt voor Kinesis-gegevensstromen. De standaardwaarde is de lokaal opgeloste regio. |
initialPosition |
STRING |
Beginpositie voor lezen vanuit de stroom. Een van de volgende: 'laatste' (standaard), 'trim_horizon', 'earliest', 'at_timestamp'. |
consumerMode |
STRING |
Een van: 'polling' (standaard) of 'EFO' (enhanced-fan-out). |
consumerName |
STRING |
De naam van de consument. Alle consumentennamen worden voorafgegaan door 'databricks_'. De standaardwaarde is een lege tekenreeks. |
registerConsumerTimeoutInterval |
STRING |
de maximale wachttijd om te wachten totdat de Kinesis EFO-consument is geregistreerd bij de Kinesis-stream voordat er een fout optreedt. De standaardwaarde is '300s'. |
requireConsumerDeregistration |
BOOLEAN |
true om de EFO-consument bij het stoppen van query's uit te schrijven. Standaard is false . |
deregisterConsumerTimeoutInterval |
STRING |
De maximale wachttijd om te wachten totdat de Kinesis EFO-consument is afgemeld bij de Kinesis-stream voordat er een fout wordt gegenereerd. De standaardwaarde is '300s'. |
consumerRefreshInterval |
STRING |
Het interval waarmee de consument wordt gecontroleerd en vernieuwd. De standaardwaarde is '300s'. |
De volgende argumenten worden gebruikt voor het beheren van de leesdoorvoer en latentie voor Kinesis:
Kenmerk | Typologie | Beschrijving |
---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
Optioneel, met een standaardwaarde van 10.000 records die per API-aanvraag naar Kinesis moeten worden gelezen. |
maxFetchRate |
STRING |
Hoe snel u gegevens per shard kunt vooraf fetcheren. Een waarde tussen '1,0' en '2.0' die wordt gemeten in MB/s. De standaardwaarde is '1.0'. |
minFetchPeriod |
STRING |
De maximale wachttijd tussen opeenvolgende prefetch-acties. De standaardwaarde is 400 ms. |
maxFetchDuration |
STRING |
De maximale duur voor het bufferen van vooraf geladen nieuwe gegevens. De standaardwaarde is '10s'. |
fetchBufferSize |
STRING |
De hoeveelheid gegevens voor de volgende trigger. De standaardwaarde is '20 gb'. |
shardsPerTask |
INTEGER (>0) |
Het aantal Kinesis-shards dat parallel per Spark-taak moet worden voorgehaald. De standaard is 5. |
shardFetchinterval |
STRING |
Hoe vaak moet er worden gecontroleerd op resharding. De standaardwaarde is '1s'. |
coalesceThresholdBlockSize |
INTEGER (>0) |
De drempel waarop automatische samenvoeging plaatsvindt. De standaardwaarde is 10.000.000. |
coalesce |
BOOLEAN |
true om voorgesorteerde aanvragen samen te voegen. De standaardwaarde is true . |
coalesceBinSize |
INTEGER (>0) |
De geschatte blokgrootte na het samenvoegen. De standaardwaarde is 128.000.000. |
reuseKinesisClient |
BOOLEAN |
true om de Kinesis-client die in de cache is opgeslagen, opnieuw te gebruiken. De standaardwaarde is true behalve op een PE-cluster. |
clientRetries |
INTEGER (>0) |
Het aantal herhalingen in het herhalingsscenario. De standaard is 5. |
Retouren
Een tabel met Kinesis-records met het volgende schema:
Naam | Gegevenstype | Null-waarde toegestaan | Standaard | Beschrijving |
---|---|---|---|---|
partitionKey |
STRING |
Nee | Een sleutel die wordt gebruikt voor het distribueren van gegevens tussen de shards van een stream. Alle gegevensrecords met dezelfde partitiesleutel worden gelezen uit dezelfde shard. | |
data |
BINARY |
Nee | De gegevenslading van de kinesis-gegevens, Base64 gecodeerd. | |
stream |
STRING |
Nee | De naam van de stream waaruit de gegevens zijn gelezen. | |
shardId |
STRING |
Nee | Een unieke id voor de shard waaruit de gegevens zijn gelezen. | |
sequenceNumber |
BIGINT |
Nee | De unieke identificatie van het record binnen zijn shard. | |
approximateArrivalTimestamp |
TIMESTAMP |
Nee | De geschatte tijd dat de record in de stream is ingevoegd. |
De kolommen (stream, shardId, sequenceNumber)
vormen een primaire sleutel.
Voorbeelden
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret('test-databricks', 'awsAccessKey'),
awsSecretKey => secret('test-databricks', 'awsSecretKey'),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');