將資料作為輸入串流處理至串流分析中
串流分析與 Azure 資料流做了絕佳的整合,並透過三種資源將其作為輸入:
這些輸入來源可存在於與串流分析作業相同的 Azure 訂用帳戶中,也可存在於不同的訂用帳戶。
壓縮
串流分析支援所有輸入來源的壓縮。 支援的壓縮類型包括:無、GZip 及 Deflate。 參考資料無法使用壓縮支援。 如果輸入資料是已壓縮的 Avro 資料,串流分析會以透明方式處理。 您不需要使用 Avro 序列化來指定壓縮類型。
建立、編輯或測試輸入
您可使用 Azure 入口網站、Visual Studio 和 Visual Studio Code 來新增及檢視或編輯串流作業上的現有輸入。 您也可以從 Azure 入口網站、Visual Studio 和 Visual Studio Code 以透過範例資料來測試輸入連線並測試查詢。 當撰寫查詢時,您會在 FROM 子句中列出輸入。 您可以從入口網站的 [查詢] 頁面取得可用輸入的清單。 如果您想要使用多個輸入,請將其 JOIN
或撰寫多個 SELECT
查詢。
注意
強烈建議您使用適用於 Visual Studio Code 的串流分析工具,以獲得最佳本機開發體驗。 適用於 Visual Studio 2019 的串流分析工具 (2.6.3000.0 版) 有已知的功能差距,且未來將不會改進。
來自事件中樞的串流資料
Azure 事件中樞是具高延展性的發佈訂閱事件擷取器。 事件中樞每秒可收集數百萬個事件,因此其可供處理和分析連線的裝置和應用程式所產生大量資料。 事件中樞和串流分析搭配在一起,可提供即時分析所需的端對端解決方案。 事件中樞可讓您即時將事件饋送至 Azure,而串流分析作業則可即時處理這些事件。 例如,您可以將網頁點擊、感應器讀數或線上記錄事件傳送至事件中樞。 然後,您可以建立串流分析作業,將事件中樞當作輸入資料來即時篩選、彙總和相互關聯。
EventEnqueuedUtcTime
是事件在事件中樞的抵達時間戳記,也是事件從事件中樞到達串流分析的預設時間戳記。 若要使用事件裝載中的時間戳記,將資料當作資料流處理,您必須使用 TIMESTAMP BY 關鍵字。
事件中樞取用者群組
您應該將每個事件中樞輸入設定為有自己的取用者群組。 當作業包含自我聯結或有多個輸入時,某些輸入可能由下游的多個讀取器所讀取。 這種情況會影響單一取用者群組中的讀取器數目。 若要避免超出每個分割區的每個取用者群組最多有 5 個讀取器的事件中樞限制,最好為每個串流分析作業指定取用者群組。 另外也限制標準層事件中樞最多有 20 個取用者群組。 如需詳細資訊,請參閱對 Azure 串流分析輸入進行疑難排解。
從事件中樞建立輸入
下表說明 Azure 入口網站的 [新的輸入] 頁面中用來從事件中樞串流處理資料輸入的每個屬性:
屬性 | 說明 |
---|---|
輸入別名 | 在作業查詢中用來參考這個輸入的易記名稱。 |
訂用帳戶 | 選擇事件中樞資源所在的 Azure 訂用帳戶。 |
事件中樞命名空間 | 事件中樞命名空間是事件中樞的容器。 當您建立事件中樞時,也會建立此命名空間。 |
事件中樞名稱 | 作為輸入的事件中樞名稱。 |
事件中樞取用者群組 (建議使用) | 建議讓每一個串流分析作業使用不同的取用者群組。 此字串可識別要用來從事件中樞擷取資料的取用者群組。 若未指定取用者群組,串流分析作業會使用 $Default 取用者群組。 |
驗證模式 | 指定您想要用來連線至事件中樞的驗證類型。 您可以使用連接字串或受控識別向事件中樞進行驗證。 針對 [受控識別] 選項,您可以將系統指派的受控識別建立至串流分析作業或使用者指派的受控識別,以向事件中樞進行驗證。 當您使用受控識別時,受控識別必須是 Azure 事件中樞資料接收者或 Azure 事件中樞資料擁有者角色的成員。 |
事件中樞原則名稱 | 支援存取事件中樞的共用存取原則。 每一個共用存取原則都會有名稱、權限 (由您設定) 和存取金鑰。 此選項會自動填入,除非您選取手動提供事件中樞設定的選項。 |
分割區索引鍵 | 這是選擇性欄位,只有在作業設定為使用相容性層級 1.2 或更高版本時,才能使用。 如果輸入是依屬性分割,則可在此新增此屬性的名稱。 如果查詢在此屬性上包含 PARTITION BY 或 GROUP BY 子句,這可用於提升查詢的效能。 如果此作業使用相容性層級 1.2 或更高版本,則此欄位預設為 PartitionId. |
事件序列化格式 | 傳入資料流的序列化格式 (JSON、CSV、Avro、Parquet 或其他 (Protobuf、XML、專用...))。 確認 JSON 格式與規格一致,並且不包含以 0 開頭的十進位數字。 |
編碼方式 | UTF-8 是目前唯一支援的編碼格式。 |
事件壓縮類型 | 用來讀取內送資料流的壓縮類型,例如無 (預設值)、Gzip 或 Deflate。 |
結構描述登錄 (預覽) | 您可以選取結構描述登錄,其中包含從事件中樞接收的事件資料結構描述。 |
如果您的資料來自事件中樞資料流輸入,您將可在串流分析查詢中存取下列中繼資料欄位:
屬性 | 說明 |
---|---|
EventProcessedUtcTime | 串流分析處理事件的日期和時間。 |
EventEnqueuedUtcTime | 事件中樞接收事件的日期和時間。 |
PartitionId | 輸入配接器的以零起始的資料分割識別碼。 |
例如,您可以使用這些欄位來撰寫類似下列範例的查詢:
SELECT
EventProcessedUtcTime,
EventEnqueuedUtcTime,
PartitionId
FROM Input
注意
將事件中樞作為 IoT 中樞路由的端點時,您可以使用 GetMetadataPropertyValue 函式存取 IoT 中樞中繼資料。
來自 IoT 中樞的串流資料
Azure IoT 中樞是可高度調整的發佈/訂閱事件擷取器,且已針對 IoT 案例最佳化。
在串流分析中,來自 IoT 中樞之事件的預設時間戳記是事件抵達 IoT 中樞的時間戳記,也就是 EventEnqueuedUtcTime
。 若要使用事件裝載中的時間戳記,將資料當作資料流處理,您必須使用 TIMESTAMP BY 關鍵字。
IoT 中樞取用者群組
您應將每個串流分析 IoT 中樞輸入設定為有其本身的取用者群組。 當作業包含自我聯結或有多個輸入時,某些輸入可能由下游的多個讀取器所讀取。 這種情況會影響單一取用者群組中的讀取器數目。 若要避免超出每個分割區的每個取用者群組最多有 5 個讀取器的 Azure IoT 中樞限制,最好為每個串流分析作業指定取用者群組。
將 IoT 中樞設定為資料流輸入
下表說明在 Azure 入口網站中將 IoT 中樞設定為資料流輸入時,[新的輸入] 頁面中的每個屬性。
屬性 | 說明 |
---|---|
輸入別名 | 在作業查詢中用來參考這個輸入的易記名稱。 |
訂用帳戶 | 選擇 IoT 中樞資源所在的訂用帳戶。 |
IoT 中樞 | 要作為輸入的 IoT 中樞的名稱。 |
取用者群組 | 建議讓每一個串流分析作業使用不同的取用者群組。 用來從 IoT 中樞擷取資料的取用者群組。 串流分析會使用 $Default 取用者群組,除非您另有指定。 |
共用存取原則名稱 | 支援存取 IoT 中樞的共用存取原則。 每一個共用存取原則都會有名稱、權限 (由您設定) 和存取金鑰。 |
共用存取原則金鑰 | 用來授與 IoT 中樞存取權的共用存取金鑰。 此選項會自動填入,除非您選取手動提供 IoT 中樞設定的選項。 |
端點 | IoT 中樞的端點。 |
分割區索引鍵 | 這是選擇性欄位,只有在作業設定為使用相容性層級 1.2 或更高版本時,才能使用。 如果輸入是依屬性分割,則可在此新增此屬性的名稱。 如果查詢在此屬性上包含 PARTITION BY 或 GROUP BY 子句,這可用於提升查詢的效能。 如果此作業使用相容性層級 1.2 或更高版本,則此欄位預設為 "PartitionId"。 |
事件序列化格式 | 傳入資料流的序列化格式 (JSON、CSV、Avro、Parquet 或其他 (Protobuf、XML、專用...))。 確認 JSON 格式與規格一致,並且不包含以 0 開頭的十進位數字。 |
編碼方式 | UTF-8 是目前唯一支援的編碼格式。 |
事件壓縮類型 | 用來讀取內送資料流的壓縮類型,例如無 (預設值)、Gzip 或 Deflate。 |
當您使用來自 IoT 中樞的串流資料時,您將可在串流分析查詢中存取下列中繼資料欄位:
屬性 | 說明 |
---|---|
EventProcessedUtcTime | 處理事件的日期與時間。 |
EventEnqueuedUtcTime | IoT 中樞接收到事件的日期和時間。 |
PartitionId | 輸入配接器的以零起始的資料分割識別碼。 |
IoTHub.MessageId | IoT 中樞內用於雙向通訊相互關聯的識別碼。 |
IoTHub.CorrelationId | IoT 中樞內用於訊息回應和回饋的識別碼。 |
IoTHub.ConnectionDeviceId | 用來傳送此訊息的驗證識別碼。 IoT 中樞會將服務繫結訊息上的此值加上戳記。 |
IoTHub.ConnectionDeviceGenerationId | 用來傳送此訊息之已驗證裝置的產生識別碼。 IoT 中樞會將 Servicebound 訊息標上此值。 |
IoTHub.EnqueuedTime | IoT 中樞接收到訊息的時間。 |
來自 Blob 儲存體或 Data Lake Storage Gen2 的串流資料
對於需要在雲端中儲存大量非結構化資料的案例中,Azure Blob 儲存體或 Azure Data Lake Storage Gen2 提供了具有成本效益且可調整的解決方案。 Blob 儲存體或 Azure Data Lake Storage Gen2 中的資料會被視為待用資料。 不過,串流分析可將此資料當作資料流來處理。
記錄處理是使用串流分析處理此類輸入時的常用案例。 在此案例中,從系統擷取遙測資料檔案之後,必須加以剖析和處理,才能得到有意義的資料。
在串流分析中,Blob 儲存體或 Azure Data Lake Storage Gen2 事件的預設時間戳記是最後修改的時間戳記,也就是 BlobLastModifiedUtcTime
。 如果在 13:00 將 blob 上傳至儲存體帳戶,然後在 13:01 使用 [現在] 選項啟動 Azure 串流分析作業,系統便不會選取該 blob,因為其修改時間落在作業執行期間以外。
如果在 13:00 將 blob 上傳至儲存體帳戶容器,然後在 13:00 (含) 以前使用 [自訂時間] 啟動 Azure 串流分析作業,則會選取該 blob,因為其修改時間落在作業執行期間以內。
如果在 13:00 使用 [現在] 來啟動串流分析作業,然後在 13:01 將 blob 上傳至儲存體帳戶容器,則串流分析會選取該 blob。 指派給每個 blob 的時間戳記只會依據 BlobLastModifiedTime
。 blob 所在的資料夾與指派的時間戳記無關。 例如,如果 Blob 2019/10-01/00/b1.txt
的 BlobLastModifiedTime
為 2019-11-11
,則指派給此 Blob 的時間戳記為 2019-11-11
。
若要使用事件裝載中的時間戳記,將資料當作資料流處理,您必須使用 TIMESTAMP BY 關鍵字。 如果 Blob 檔案可用,串流分析作業會每秒從 Azure Blob 儲存體或 Azure Data Lake Storage Gen2 輸入中提取資料。 如果 Blob 檔案無法使用,則會執行時間延遲上限為 90 秒的指數輪詢。
注意
串流分析不支援將內容加入現有的 blob 檔案。 串流分析只會檢視每個檔案一次,在作業讀取資料之後,不會處理檔案中發生的任何變更。 最佳做法是一次上傳 blob 檔案的所有資料,然後將其他較新的事件新增到不同的新 blob 檔案。
如果持續新增多個 blob,且串流分析在新增 blob 時對其進行處理,則在極少數的情況下,部分 blob 可能會因為 BlobLastModifiedTime
的細微性而被跳過。 您可在上傳每個 blob 之間至少隔兩秒來減輕此案例。 如果此選項不可行,則可使用事件中樞來串流大量事件。
將 Blob 儲存體設定為資料流輸入
下表說明在 Azure 入口網站中將 Blob 儲存體設定為資料流輸入時,[新的輸入] 頁面中的每個屬性。
屬性 | 說明 |
---|---|
輸入別名 | 在作業查詢中用來參考這個輸入的易記名稱。 |
訂用帳戶 | 選擇儲存體資源所在的訂閱。 |
儲存體帳戶 | Blob 檔案所在的儲存體帳戶名稱。 |
儲存體帳戶金鑰 | 與儲存體帳戶相關聯的密碼金鑰。 此選項會自動填入,除非您選取手動提供該設定的選項。 |
容器 | 容器會針對 Blob 提供邏輯分組。 您可以選擇 [使用現有的] 容器,或選擇 [新建] 建立新容器。 |
驗證模式 | 指定您要用來連線至儲存體帳戶的驗證類型。 您可以使用連接字串或受控識別來向儲存體帳戶進行驗證。 針對 [受控識別] 選項,您可以將系統指派的受控識別建立至串流分析作業或使用者指派的受控識別,以向儲存體帳戶進行驗證。 當您使用受控識別時,受控識別必須是儲存體帳戶上適當角色的成員。 |
路徑模式 (選用) | 用來在指定的容器中找出 blob 的檔案路徑。 如果您想要從容器的根目錄讀取 blob,請勿設定路徑模式。 在該路徑內,您可以指定下列三個變數的一個或多個執行個體:{date} 、{time} 或 {partition} 範例 1: cluster1/logs/{date}/{time}/{partition} 範例 2: cluster1/logs/{date} * 字元不是路徑前置詞允許的值。 僅允許有效的 Azure blob 字元。 請勿包含容器名稱或檔案名稱。 |
日期格式 (選用) | 在路徑中使用日期變數時,用來組織檔案的日期格式。 範例: YYYY/MM/DD 當 blob 輸入在其路徑中有 {date} 或 {time} 時,則會以遞增的時間順序來查看資料夾。 |
時間格式 (選用) | 在路徑中使用時間變數時,用來組織檔案的時間格式。 目前唯一支援的值為 HH (表示小時)。 |
分割區索引鍵 | 這是選擇性欄位,只有在作業設定為使用相容性層級 1.2 或更高版本時,才能使用。 如果輸入是依屬性分割,則可在此新增此屬性的名稱。 如果查詢在此屬性上包含 PARTITION BY 或 GROUP BY 子句,這可用於提升查詢的效能。 如果此作業使用相容性層級 1.2 或更高版本,則此欄位預設為 "PartitionId"。 |
輸入資料分割區的計數 | 只有當 {partition} 存在於路徑模式中時,才會顯示此欄位。 這個屬性的值是 >=1 的整數。 每當 pathPattern 中出現 {partition} 時,將會使用介於 0 和這個欄位的值 -1 之間的數字。 |
事件序列化格式 | 傳入資料流的序列化格式 (JSON、CSV、Avro、Parquet 或其他 (Protobuf、XML、專用...))。 確認 JSON 格式與規格一致,並且不包含以 0 開頭的十進位數字。 |
編碼方式 | 對於 CSV 和 JSON 而言,UTF-8 是目前唯一支援的編碼格式。 |
壓縮 | 用來讀取內送資料流的壓縮類型,例如無 (預設值)、Gzip 或 Deflate。 |
當您的資料來自 Blob 儲存體來源時,您可以在串流分析查詢中存取下列中繼資料欄位:
屬性 | 說明 |
---|---|
BlobName | 事件來源的輸入 Blob 名稱。 |
EventProcessedUtcTime | 串流分析處理事件的日期和時間。 |
BlobLastModifiedUtcTime | 上次修改 Blob 的時間與日期。 |
PartitionId | 輸入配接器的以零起始的資料分割識別碼。 |
例如,您可以使用這些欄位來撰寫類似下列範例的查詢:
SELECT
BlobName,
EventProcessedUtcTime,
BlobLastModifiedUtcTime
FROM Input
從 Apache Kafka 串流資料
串流分析可讓您直接連線至 Apache Kafka 叢集以內嵌資料。 解決方案是低程式碼,完全由 Microsoft 的串流分析小組管理,使其符合商務合規性標準。 Kafka 輸入具有回溯相容性,並支援從 0.10 版開始最新用戶端版本的所有版本。 使用者可以根據設定,連線至虛擬網路內的 Kafka 叢集以及具有公用端點的 Kafka 叢集。 設定依賴於現有的 Kafka 設定慣例。 支援的壓縮類型為 無、Gzip、Snappy、LZ4 和 Zstd。
如需詳細資訊,請參閱將 Kafka 的資料串流至串流分析(預覽)。