Partilhar via


read_kinesis Função com valor de tabela de streaming

Aplica-se a: Marque Sim Databricks SQL Marque Sim Databricks Runtime 13.3 LTS e superior

Retorna uma tabela com registros lidos do Kinesis de um ou mais fluxos.

Sintaxe

read_kinesis ( { parameter => value } [, ...] )

Argumentos

read_kinesis requer invocação de parâmetro nomeado.

O único argumento necessário é streamName. Todos os outros argumentos são opcionais.

As descrições dos argumentos são breves aqui. Para obter mais detalhes, consulte a documentação do Amazon Kinesis .

Há várias opções de conexão para se conectar e autenticar com a AWS. awsAccessKey, e awsSecretKey pode ser especificado nos argumentos da função usando a função secreta, definida manualmente nos argumentos ou configurada como variáveis de ambiente, conforme indicado abaixo. roleArn, roleExternalID, roleSessionName também pode ser usado para autenticar com a AWS usando perfis de instância. Se nenhum deles for especificado, ele usará a cadeia de provedores padrão da AWS.

Parâmetro Tipo Description
streamName STRING Lista obrigatória separada por vírgulas de um ou mais fluxos de cinese.
awsAccessKey STRING A chave de acesso da AWS, se houver. Também pode ser especificado por meio das várias opções suportadas pela cadeia de provedores de credenciais padrão da AWS, incluindo variáveis de ambiente (AWS_ACCESS_KEY_ID) e um arquivo de perfis de credenciais.
awsSecretKey STRING A chave secreta que corresponde à chave de acesso. Pode ser especificado nos argumentos ou por meio das várias opções suportadas pela cadeia de provedores de credenciais padrão da AWS, incluindo variáveis de ambiente (AWS_SECRET_KEY ou AWS_SECRET_ACCESS_KEY) e um arquivo de perfis de credenciais.
roleArn STRING Nome do recurso da Amazon da função a ser assumida ao acessar o Kinesis.
roleExternalId STRING Usado ao delegar acesso à conta da AWS.
roleSessionName STRING Nome da sessão de função da AWS.
stsEndpoint STRING Um ponto de extremidade para solicitar credenciais de acesso temporárias.
region STRING Região para os fluxos a serem especificados. O padrão é a região resolvida localmente.
endpoint STRING ponto de extremidade regional para fluxos de dados do Kinesis. O padrão é a região resolvida localmente.
initialPosition STRING Posição inicial para leitura no fluxo. Um dos seguintes: «mais recente» (por defeito), «trim_horizon», «mais antigo», «at_timestamp».
consumerMode STRING Um de: 'polling' (padrão), ou 'EFO' (enhanced-fan-out).
consumerName STRING O nome do consumidor. Todos os consumidores são precedidos de «databricks_». O padrão é uma cadeia de caracteres vazia.
registerConsumerTimeoutInterval STRING o tempo limite máximo para aguardar que o consumidor do Kinesis EFO seja registrado no fluxo do Kinesis antes de lançar um erro. O padrão é '300s'.
requireConsumerDeregistration BOOLEAN true para cancelar o registro do consumidor EFO no encerramento da consulta. A predefinição é false.
deregisterConsumerTimeoutInterval STRING O tempo limite máximo para aguardar que o consumidor do Kinesis EFO seja cancelado no fluxo do Kinesis antes de lançar um erro. O padrão é '300s'.
consumerRefreshInterval STRING O intervalo em que o consumidor é verificado e atualizado. O padrão é '300s'.

Os argumentos a seguir são usados para controlar a taxa de transferência de leitura e a latência do Kinesis:

Parâmetro Tipo Description
maxRecordsPerFetch INTEGER (>0) Opcional, com um padrão de 10.000 registros a serem lidos por solicitação de API para o Kinesis.
maxFetchRate STRING Quão rápido para pré-buscar dados por fragmento. Um valor entre '1,0' e '2,0' medido em MB/s. O padrão é '1.0'.
minFetchPeriod STRING O tempo máximo de espera entre tentativas consecutivas de pré-busca. O padrão é '400ms'.
maxFetchDuration STRING A duração máxima para armazenar em buffer novos dados pré-obtidos. O padrão é '10s'.
fetchBufferSize STRING A quantidade de dados para o próximo gatilho. O padrão é '20gb'.
shardsPerTask INTEGER (>0) O número de fragmentos Kinesis a serem pré-buscados em paralelo por tarefa de faísca. A predefinição é 5.
shardFetchinterval STRING Com que frequência sondar para reharding. O padrão é '1s'.
coalesceThresholdBlockSize INTEGER (>0) O limiar a partir do qual ocorre a coalescência automática. O padrão é 10.000.000.
coalesce BOOLEAN true para aglutinar pedidos pré-buscados. A predefinição é true.
coalesceBinSize INTEGER (>0) O tamanho aproximado do bloco após a coalescência. O padrão é 128.000.000.
reuseKinesisClient BOOLEAN true para reutilizar o cliente Kinesis armazenado no cache. O padrão é true exceto em um cluster PE.
clientRetries INTEGER (>0) O número de novas tentativas no cenário de novas tentativas. A predefinição é 5.

Devoluções

Uma tabela de registros do Kinesis com o seguinte esquema:

Name Tipo de dados Pode ser nulo Standard Description
partitionKey STRING Não Uma chave que é usada para distribuir dados entre os fragmentos de um fluxo. Todos os registos de dados com a mesma chave de partição serão lidos a partir do mesmo fragmento.
data BINARY Não A carga útil de dados kinesis, codificada em base-64.
stream STRING Não O nome do fluxo de onde os dados foram lidos.
shardId STRING Não Um identificador exclusivo para o fragmento de onde os dados foram lidos.
sequenceNumber BIGINT Não O identificador exclusivo do registro em seu fragmento.
approximateArrivalTimestamp TIMESTAMP Não A hora aproximada em que o registro foi inserido no fluxo.

As colunas (stream, shardId, sequenceNumber) constituem uma chave primária.

Exemplos

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
        awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => 'testing@databricks.com');