read_kinesis tablo değerli akış işlevi

Şunlar için geçerlidir:onay işareti evet olarak işaretlenmiş Databricks SQL onay işareti evet olarak işaretlenmiş Databricks Runtime 13.3 LTS ve üzeri

Kinesis'ten bir veya daha fazla akıştan okunan kayıtları içeren bir tablo döndürür.

Söz dizimi

read_kinesis ( { parameter => value } [, ...] )

Argümanlar

read_kinesis adlandırılmış parametre çağırma gerektirir.

Gereken tek bağımsız değişkendir streamName. Diğer tüm argümanlar isteğe bağlıdır.

Bağımsız değişkenlerin açıklamaları burada kısadır. Diğer ayrıntılar için Amazon Kinesis belgelerine bakın.

AWS ile bağlantı kurmanın ve kimlik doğrulaması yapmanın birden çok yolu vardır. Önerilen yaklaşım, bir Databricks hizmeti kimlik bilgisi oluşturmak ve bunu serviceCredential seçeneği kullanarak belirtmektir. Alternatif olarak, awsAccessKey ve awsSecretKey kullanarak kimlik doğrulaması yapabilirsiniz. Bu seçenekler secret işlevini kullanarak işlev bağımsız değişkenlerinde belirtilebilir, bağımsız değişkenlerin içinde el ile ayarlanabilir veya aşağıda gösterildiği gibi ortam değişkenleri olarak yapılandırılabilir. roleArn, örnek roleExternalIDroleSessionName profillerini kullanarak AWS ile kimlik doğrulaması yapmak için de kullanılabilir. Bunların hiçbiri belirtilmezse, varsayılan AWS sağlayıcı zincirini kullanır.

Parametre Tür Açıklama
streamName STRING Gerekli: Bir veya daha fazla Kinesis akışının virgülle ayrılmış listesi.
serviceCredential STRING Databricks hizmeti kimlik bilgilerinizin adı.
awsAccessKey STRING Varsa AWS Erişim anahtarı. Ortam değişkenleri (AWS_ACCESS_KEY_ID) ve bir kimlik bilgisi profili dosyası da dahil olmak üzere AWS varsayılan kimlik bilgisi sağlayıcısı zinciri aracılığıyla desteklenen çeşitli seçenekler aracılığıyla da belirtilebilir.
awsSecretKey STRING Erişim anahtarına karşılık gelen gizli anahtar. Bağımsız değişkenlerde veya ortam değişkenleri (AWS_SECRET_KEY veya AWS_SECRET_ACCESS_KEY) ve kimlik bilgileri profil dosyası dahil olmak üzere AWS varsayılan kimlik bilgisi sağlayıcısı zinciri aracılığıyla desteklenen çeşitli seçenekler aracılığıyla belirtilebilir.
roleArn STRING Kinesis'e erişim sağlamak için varsayılan rolün Amazon kaynak adı.
roleExternalId STRING AWS hesabına erişim yetkisi verildiğinde kullanılır.
roleSessionName STRING AWS rol oturumu adı.
stsEndpoint STRING Geçici erişim kimlik bilgileri istemek için bir uç nokta.
region STRING Belirtilecek akışların bölgesi. Varsayılan, yerel olarak çözümlenen bölgedir.
endpoint STRING Kinesis veri akışları için bölgesel uç nokta. Varsayılan, yerel olarak çözümlenen bölgedir.
initialPosition STRING Veri akışında okuma için başlangıç konumu. Bunlardan biri: 'latest' (varsayılan), 'trim_horizon', 'earliest', 'at_timestamp'.
consumerMode STRING Bunlardan biri: 'polling' (varsayılan), veya 'EFO' (geliştirilmiş fan-out).
consumerName STRING Tüketicinin adı. Tüm tüketicilere 'databricks_' ön eki eklenir. Varsayılan değer boş bir dizedir.
registerConsumerTimeoutInterval STRING Hata oluşturmadan önce Kinesis EFO tüketicisinin Kinesis akışına kaydedilmesini beklemek için maksimum zaman aşımı. Varsayılan değer '300s'dir.
requireConsumerDeregistration BOOLEAN true sorgu sonlandığında EFO tüketicisinin kaydını silmek için. Varsayılan false değeridir.
deregisterConsumerTimeoutInterval STRING Kinesis EFO tüketicisinin Kinesis akışından kaydının silinmesini beklerken hata vermeden önceki maksimum zaman aşımı süresi. Varsayılan değer '300s'dir.
consumerRefreshInterval STRING Tüketicinin denetlendiği ve yenilendiği aralık. Varsayılan değer '300s'dir.

Kinesis için okuma aktarım hızını ve gecikme süresini denetlemek için aşağıdaki bağımsız değişkenler kullanılır:

Parametre Tür Açıklama
maxRecordsPerFetch INTEGER (>0) İsteğe bağlı olarak, Kinesis'e yapılan API isteği başına varsayılan olarak 10.000 kayıt okunabilir.
maxFetchRate STRING Parça başına verilerin ne kadar hızlı bir şekilde alınacağı. MB/sn cinsinden ölçülen '1.0' ile '2.0' arasında bir değer. Varsayılan değer :'1.0'.
minFetchPeriod STRING Ardışık ön getirme denemeleri arasındaki maksimum bekleme süresi. Varsayılan değer :'400ms'dir.
maxFetchDuration STRING Önden getirilen yeni verileri arabelleğe almak için maksimum süre. Varsayılan değer :'10s'.
fetchBufferSize STRING Sonraki tetikleyici için veri miktarı. Varsayılan değer '20 gb'tır.
shardsPerTask INTEGER (>0) Her spark görevi için paralel olarak önceden getirilecek Kinesis parçalarının sayısı. Varsayılan 5'dir.
shardFetchinterval STRING Yeniden parçalara ayırma için ne sıklıkta sorgulama yapılmalı? Varsayılan değer '1s'dir.
coalesceThresholdBlockSize INTEGER (>0) Otomatik birleştirmenin gerçekleştiği eşik. Varsayılan değer 10.000.000'dir.
coalesce BOOLEAN true önceden oluşturulmuş istekleri birleştirmeye yöneliktir. Varsayılan değer: true.
coalesceBinSize INTEGER (>0) Birleştirmeden sonraki yaklaşık blok boyutu. Varsayılan değer 128.000.000'dir.
reuseKinesisClient BOOLEAN true önbellekte depolanan Kinesis istemcisini yeniden kullanmak için. Varsayılan değer pe kümesi dışındadır true .
clientRetries INTEGER (>0) Yeniden deneme senaryosundaki deneme sayısı. Varsayılan 5'dir.

İadeler

Aşağıdaki şemaya sahip bir Kinesis kayıtları tablosu:

Veri Akışı Adı Veri türü Boş Olabilir Standart Açıklama
partitionKey STRING Hayır Bir akışın parçaları arasında veri dağıtmak için kullanılan anahtar. Aynı bölüm anahtarına sahip tüm veri kayıtları aynı parçadan okunur.
data BINARY Hayır Kinesis veri yükü, base-64 kodlanmış.
stream STRING Hayır Verilerin okunduğu akışın adı.
shardId STRING Hayır Verilerin okunduğu parçanın benzersiz tanımlayıcısı.
sequenceNumber BIGINT Hayır Kayıt dilimi içindeki benzersiz tanımlayıcı.
approximateArrivalTimestamp TIMESTAMP Hayır Veri kaydının akışa eklendiği yaklaşık süre.

Sütunlar (stream, shardId, sequenceNumber) birincil anahtarı oluşturur.

Örnekler

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret('test-databricks', 'awsAccessKey'),
        awsSecretKey => secret('test-databricks', 'awsSecretKey'),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => 'testing@databricks.com');