Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Şunlar için geçerlidir:
Databricks SQL
Databricks Runtime 13.3 LTS ve üzeri
Kinesis'ten bir veya daha fazla akıştan okunan kayıtları içeren bir tablo döndürür.
Söz dizimi
read_kinesis ( { parameter => value } [, ...] )
Argümanlar
read_kinesis
adlandırılmış parametre çağırma gerektirir.
Gereken tek bağımsız değişkendir streamName. Diğer tüm argümanlar isteğe bağlıdır.
Bağımsız değişkenlerin açıklamaları burada kısadır. Diğer ayrıntılar için Amazon Kinesis belgelerine bakın.
AWS ile bağlantı kurmanın ve kimlik doğrulaması yapmanın birden çok yolu vardır.
Önerilen yaklaşım, bir Databricks hizmeti kimlik bilgisi oluşturmak ve bunu serviceCredential seçeneği kullanarak belirtmektir.
Alternatif olarak, awsAccessKey ve awsSecretKey kullanarak kimlik doğrulaması yapabilirsiniz.
Bu seçenekler secret işlevini kullanarak işlev bağımsız değişkenlerinde belirtilebilir, bağımsız değişkenlerin içinde el ile ayarlanabilir veya aşağıda gösterildiği gibi ortam değişkenleri olarak yapılandırılabilir.
roleArn, örnek roleExternalIDroleSessionName profillerini kullanarak AWS ile kimlik doğrulaması yapmak için de kullanılabilir.
Bunların hiçbiri belirtilmezse, varsayılan AWS sağlayıcı zincirini kullanır.
| Parametre | Tür | Açıklama |
|---|---|---|
streamName |
STRING |
Gerekli: Bir veya daha fazla Kinesis akışının virgülle ayrılmış listesi. |
serviceCredential |
STRING |
Databricks hizmeti kimlik bilgilerinizin adı. |
awsAccessKey |
STRING |
Varsa AWS Erişim anahtarı. Ortam değişkenleri (AWS_ACCESS_KEY_ID) ve bir kimlik bilgisi profili dosyası da dahil olmak üzere AWS varsayılan kimlik bilgisi sağlayıcısı zinciri aracılığıyla desteklenen çeşitli seçenekler aracılığıyla da belirtilebilir. |
awsSecretKey |
STRING |
Erişim anahtarına karşılık gelen gizli anahtar. Bağımsız değişkenlerde veya ortam değişkenleri (AWS_SECRET_KEY veya AWS_SECRET_ACCESS_KEY) ve kimlik bilgileri profil dosyası dahil olmak üzere AWS varsayılan kimlik bilgisi sağlayıcısı zinciri aracılığıyla desteklenen çeşitli seçenekler aracılığıyla belirtilebilir. |
roleArn |
STRING |
Kinesis'e erişim sağlamak için varsayılan rolün Amazon kaynak adı. |
roleExternalId |
STRING |
AWS hesabına erişim yetkisi verildiğinde kullanılır. |
roleSessionName |
STRING |
AWS rol oturumu adı. |
stsEndpoint |
STRING |
Geçici erişim kimlik bilgileri istemek için bir uç nokta. |
region |
STRING |
Belirtilecek akışların bölgesi. Varsayılan, yerel olarak çözümlenen bölgedir. |
endpoint |
STRING |
Kinesis veri akışları için bölgesel uç nokta. Varsayılan, yerel olarak çözümlenen bölgedir. |
initialPosition |
STRING |
Veri akışında okuma için başlangıç konumu. Bunlardan biri: 'latest' (varsayılan), 'trim_horizon', 'earliest', 'at_timestamp'. |
consumerMode |
STRING |
Bunlardan biri: 'polling' (varsayılan), veya 'EFO' (geliştirilmiş fan-out). |
consumerName |
STRING |
Tüketicinin adı. Tüm tüketicilere 'databricks_' ön eki eklenir. Varsayılan değer boş bir dizedir. |
registerConsumerTimeoutInterval |
STRING |
Hata oluşturmadan önce Kinesis EFO tüketicisinin Kinesis akışına kaydedilmesini beklemek için maksimum zaman aşımı. Varsayılan değer '300s'dir. |
requireConsumerDeregistration |
BOOLEAN |
true sorgu sonlandığında EFO tüketicisinin kaydını silmek için. Varsayılan false değeridir. |
deregisterConsumerTimeoutInterval |
STRING |
Kinesis EFO tüketicisinin Kinesis akışından kaydının silinmesini beklerken hata vermeden önceki maksimum zaman aşımı süresi. Varsayılan değer '300s'dir. |
consumerRefreshInterval |
STRING |
Tüketicinin denetlendiği ve yenilendiği aralık. Varsayılan değer '300s'dir. |
Kinesis için okuma aktarım hızını ve gecikme süresini denetlemek için aşağıdaki bağımsız değişkenler kullanılır:
| Parametre | Tür | Açıklama |
|---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
İsteğe bağlı olarak, Kinesis'e yapılan API isteği başına varsayılan olarak 10.000 kayıt okunabilir. |
maxFetchRate |
STRING |
Parça başına verilerin ne kadar hızlı bir şekilde alınacağı. MB/sn cinsinden ölçülen '1.0' ile '2.0' arasında bir değer. Varsayılan değer :'1.0'. |
minFetchPeriod |
STRING |
Ardışık ön getirme denemeleri arasındaki maksimum bekleme süresi. Varsayılan değer :'400ms'dir. |
maxFetchDuration |
STRING |
Önden getirilen yeni verileri arabelleğe almak için maksimum süre. Varsayılan değer :'10s'. |
fetchBufferSize |
STRING |
Sonraki tetikleyici için veri miktarı. Varsayılan değer '20 gb'tır. |
shardsPerTask |
INTEGER (>0) |
Her spark görevi için paralel olarak önceden getirilecek Kinesis parçalarının sayısı. Varsayılan 5'dir. |
shardFetchinterval |
STRING |
Yeniden parçalara ayırma için ne sıklıkta sorgulama yapılmalı? Varsayılan değer '1s'dir. |
coalesceThresholdBlockSize |
INTEGER (>0) |
Otomatik birleştirmenin gerçekleştiği eşik. Varsayılan değer 10.000.000'dir. |
coalesce |
BOOLEAN |
true önceden oluşturulmuş istekleri birleştirmeye yöneliktir. Varsayılan değer: true. |
coalesceBinSize |
INTEGER (>0) |
Birleştirmeden sonraki yaklaşık blok boyutu. Varsayılan değer 128.000.000'dir. |
reuseKinesisClient |
BOOLEAN |
true önbellekte depolanan Kinesis istemcisini yeniden kullanmak için. Varsayılan değer pe kümesi dışındadır true . |
clientRetries |
INTEGER (>0) |
Yeniden deneme senaryosundaki deneme sayısı. Varsayılan 5'dir. |
İadeler
Aşağıdaki şemaya sahip bir Kinesis kayıtları tablosu:
| Veri Akışı Adı | Veri türü | Boş Olabilir | Standart | Açıklama |
|---|---|---|---|---|
partitionKey |
STRING |
Hayır | Bir akışın parçaları arasında veri dağıtmak için kullanılan anahtar. Aynı bölüm anahtarına sahip tüm veri kayıtları aynı parçadan okunur. | |
data |
BINARY |
Hayır | Kinesis veri yükü, base-64 kodlanmış. | |
stream |
STRING |
Hayır | Verilerin okunduğu akışın adı. | |
shardId |
STRING |
Hayır | Verilerin okunduğu parçanın benzersiz tanımlayıcısı. | |
sequenceNumber |
BIGINT |
Hayır | Kayıt dilimi içindeki benzersiz tanımlayıcı. | |
approximateArrivalTimestamp |
TIMESTAMP |
Hayır | Veri kaydının akışa eklendiği yaklaşık süre. |
Sütunlar (stream, shardId, sequenceNumber) birincil anahtarı oluşturur.
Örnekler
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret('test-databricks', 'awsAccessKey'),
awsSecretKey => secret('test-databricks', 'awsSecretKey'),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');