閱讀英文

共用方式為


read_kinesis 串流數據表值函式

適用於:已勾選「是」 Databricks SQL 已勾選「是」 Databricks Runtime 13.3 LTS 和更高版本

返回包含從一個或多個數據流中讀取的 Kinesis 記錄的表。

語法

read_kinesis ( { parameter => value } [, ...] )

引數

read_kinesis 需要具名參數調用

唯一必要的自變數是 streamName。 所有其他參數都是可選的。

這裡簡短說明參數。 如需詳細資訊,請參閱 Amazon Kinesis 檔。

有各種連線選項可連線及向 AWS 進行驗證。 awsAccessKeyawsSecretKey 可以使用 秘密函式在函式自變數中指定,或在自變數中手動設定為環境變數,如下所示。 roleArnroleExternalIDroleSessionName 也可以用來使用實例配置檔向 AWS 進行驗證。 如果未指定這些專案,則會使用預設的 AWS 提供者鏈結。

參數 類型 描述
streamName STRING 必需的逗號分隔的一或多個Kinesis資料流清單。
awsAccessKey STRING 如果有的話,AWS 存取密鑰。 也可以透過 AWS 預設認證提供者鏈結所支援的各種選項來指定,包括環境變數 (AWS_ACCESS_KEY_ID) 和認證設定檔檔案。
awsSecretKey STRING 對應至存取金鑰的秘密金鑰。 可以在自變數中指定,或透過 AWS 預設認證提供者鏈結支援的各種選項來指定,包括環境變數(AWS_SECRET_KEYAWS_SECRET_ACCESS_KEY),以及認證設定檔檔案。
roleArn STRING 存取 Kinesis 時要擔任之角色的 Amazon 資源名稱。
roleExternalId STRING 在委派 AWS 帳戶的存取權時使用。
roleSessionName STRING AWS 角色會話名稱。
stsEndpoint STRING 要求暫時存取認證的端點。
region STRING 要指定的數據流區域。 預設值為本機解析的區域。
endpoint STRING Kinesis 數據流的區域端點。 預設值為本機解析的區域。
initialPosition STRING 從數據流中讀取的起始位置。 其中一個:『latest』(預設值)、『trim_horizon』、『最早』、『at_timestamp』。
consumerMode STRING 其中一個:「polling」(預設值),或「EFO」(增強型扇出)。
consumerName STRING 消費者的名稱。 所有使用者的名稱都會加上「databricks_」。 預設值是空字串。
registerConsumerTimeoutInterval STRING Kinesis EFO 消費者在 Kinesis 資料流中註冊前,等待的最長逾時時間,在超過此時間後系統將擲回錯誤。 預設值為 『300s』。
requireConsumerDeregistration BOOLEAN true 在查詢終止時取消註冊 EFO 使用者。 預設值為 false
deregisterConsumerTimeoutInterval STRING Kinesis EFO 消費者在 Kinesis 數據流中取消註冊前,可以等待的最大逾時時間(在此時間過後將會擲回錯誤)。 預設值為 『300s』。
consumerRefreshInterval STRING 檢查和重新整理使用者的間隔時間。 預設值為 『300s』。

下列自變數用於控制 Kinesis 的讀取輸送量和延遲:

參數 類型 描述
maxRecordsPerFetch INTEGER (>0) 選擇性,預設每個 API 將從 Kinesis 讀取 10,000 筆記錄。
maxFetchRate STRING 如何設定每個分區預取數據的速度。 介於 '1.0' 和 '2.0' 之間的值,以 MB/秒為單位。 預設值為 『1.0』。
minFetchPeriod STRING 連續預先擷取嘗試之間的等候時間上限。 默認值為 『400ms』。
maxFetchDuration STRING 緩衝預取新數據的最長時間。 預設值為 『10s』。
fetchBufferSize STRING 下一個觸發程序的數據量。 默認值為 『20gb』。
shardsPerTask INTEGER (>0) 每個 Spark 工作平行擷取的 Kinesis 分區數目。 預設值為 5。
shardFetchinterval STRING 輪詢重新分區的頻率。 預設值為 『1s』。
coalesceThresholdBlockSize INTEGER (>0) 自動聯合的臨界值。 默認值為 10,000,000。
coalesce BOOLEAN true 合併預擷取的請求。 預設值為 true
coalesceBinSize INTEGER (>0) 聯合后的近似區塊大小。 默認值為 128,000,000。
reuseKinesisClient BOOLEAN true 以重複使用儲存在快取中的 Kinesis 用戶端。 預設值為 true,但在 PE 叢集上則不適用。
clientRetries INTEGER (>0) 重試情境中的重試次數。 預設值為 5。

退貨

具有下列架構的 Kinesis 紀錄資料表:

名稱 資料類型 可空 標準 描述
partitionKey STRING 分片鍵,用來在數據流片段之間分配數據。 具有相同數據分割索引鍵的所有數據記錄都會從相同的分區讀取。
data BINARY Kinesis 資料有效載荷,經 base-64 編碼。
stream STRING 從中讀取數據的數據流名稱。
shardId STRING 讀取資料所在分區的唯一識別碼。
sequenceNumber BIGINT 其分片內記錄的唯一識別碼。
approximateArrivalTimestamp TIMESTAMP 插入記錄至資料流的大約時間。

欄位 (stream, shardId, sequenceNumber) 作為主鍵。

範例

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
        awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => 'testing@databricks.com');