教學課程:從 Azure 串流分析作業執行 Azure Functions

在本教學課程中,您會建立 Azure 串流分析作業,以從 Azure 事件中樞 讀取事件、對事件數據執行查詢,然後叫用 Azure 函式,以寫入 Azure Cache for Redis 實例。

顯示解決方案中 Azure 服務之間關聯性的螢幕快照。

注意

  • 您可以將 Functions 設定為串流分析作業的其中一個接收(輸出),以從 Azure 串流分析執行 Azure Functions。 Functions 是事件驅動的隨選計算體驗,可讓您實作由 Azure 或第三方服務中發生的事件所觸發的程式代碼。 Functions 回應觸發程序的這種能力使其成為串流分析作業的自然輸出。
  • 串流分析會透過 HTTP 觸發程序叫用 Functions。 Functions 輸出配接器可讓使用者將 Functions 連線到串流分析,以便根據串流分析查詢來觸發事件。
  • 不支援從在多租使用者叢集中執行的串流分析作業,將 連線 至虛擬網路內的 Azure Functions。

在本教學課程中,您會了解如何:

  • 建立 Azure 事件中樞 實例
  • 建立 Azure Cache for Redis 執行個體
  • 建立 Azure 函式
  • 建立串流分析作業
  • 將事件中樞設定為輸入,並將函式設定為輸出
  • 執行串流分析作業
  • 檢查 Azure Cache for Redis 以取得結果

如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶

必要條件

開始之前,請確定您已完成下列步驟:

  • 如果您沒有 Azure 訂用帳戶,請建立 免費帳戶
  • 從 Microsoft 下載中心下載電話事件產生器應用程式TelcoGenerator.zip,或從 GitHub 取得原始程式碼。

登入 Azure

登入 Azure 入口網站

建立事件中樞

您需要先將一些範例數據傳送至事件中樞,串流分析才能分析詐騙通話數據流。 在本教學課程中,您會使用 Azure 事件中樞 將數據傳送至 Azure。

使用下列步驟建立事件中樞,並將呼叫數據傳送至該事件中樞:

  1. 登入 Azure 入口網站

  2. 選取 左側功能表上的 [所有服務 ]、選取 [物聯網]、將滑鼠停留 在 [事件中樞] 上,然後選取 [+ [新增] 按鈕。

    顯示事件中樞建立頁面的螢幕快照。

  3. 在 [ 建立命名空間] 頁面上,遵循下列步驟:

    1. 選取您想要在其中建立事件中樞的 Azure 訂用帳戶。

    2. 針對 [ 資源群組],選取 [ 新建 ],然後輸入資源群組的名稱。 事件中樞命名空間會在此資源群組中建立。

    3. 針對 [命名空間名稱],輸入事件中樞命名空間的唯一名稱。

    4. 針對 [ 位置],選取您要在其中建立命名空間的區域。

    5. 針對 [ 定價層],選取 [ 標準]。

    6. 選取頁面底部的 [檢閱 + 建立] 。

      顯示 [建立命名空間] 頁面的螢幕快照。

    7. 在命名空間建立精靈的 [ 檢閱 + 建立 ] 頁面上,選取 頁面底部的 [建立 ],然後檢閱所有設定。

  4. 成功部署命名空間之後,請選取 [移至資源 ] 以流覽至 [事件中 樞命名空間] 頁面。

  5. 在 [事件中 樞命名空間] 頁面上,選取 命令行上的 [+事件中樞 ]。

    顯示 [事件中樞命名空間] 頁面上 [新增事件中樞] 按鈕的螢幕快照。

  6. 在 [建立事件中樞] 頁面上,輸入事件中樞的名稱。 將 [ 數據分割計數 ] 設定為 2。 使用其餘設定中的預設選項,然後選取 [ 檢閱 + 建立]。

    顯示 [建立事件中樞] 頁面的螢幕快照。

  7. [檢閱 + 建立] 頁面上,選取頁面底部的 [建立]。 然後等候部署成功。

授與事件中樞的存取權,並取得 連接字串

應用程式必須先有允許存取的原則,應用程式才能將數據傳送至 Azure 事件中樞。 存取原則會產生包含授權資訊的 連接字串。

  1. 在 [事件中 樞命名空間] 頁面上,選取 左側功能表上的 [共用存取原則 ]。

  2. 從原則清單中選取 [RootManageSharedAccessKey ]。

  3. 然後,選取 連線 字串 - 主鍵旁的 [複製] 按鈕。

  4. 將 連接字串 貼到文字編輯器中。 在下一節中,您需要此 連接字串。

    連接字串 如下所示:

    Endpoint=sb://<Your event hub namespace>.servicebus.windows.net/;SharedAccessKeyName=<Your shared access policy name>;SharedAccessKey=<generated key>

    請注意,連接字串 包含以分號分隔的多個索引鍵/值組:EndpointSharedAccessKeyNameSharedAccessKey

啟動事件產生器應用程式

開始 TelcoGenerator 應用程式之前,您應該將其設定為將數據傳送至您稍早建立的 Azure 事件中樞。

  1. 擷取TelcoGenerator.zip檔案的內容

  2. TelcoGenerator\TelcoGenerator\telcodatagen.exe.config在您選擇的文本編輯器中開啟檔案 有一個.config以上的檔案,因此請務必開啟正確的檔案。

  3. <appSettings>使用下列詳細資料更新組態檔中的專案:

    • 將 EventHubName 索引鍵的值設定為 連接字串 結尾的 EntityPath
    • 設定 Microsoft.ServiceBus.連線 的值namespace 連接字串 的 ionString 索引鍵。 如果您使用事件中樞 連接字串,而不是命名空間,請移除EntityPath結尾的值 (;EntityPath=myeventhub)。 別忘 了移除 EntityPath 值前面的分號。
  4. 儲存檔案。

  5. 接下來開啟命令視窗,並變更為您解壓縮 TelcoGenerator 應用程式的資料夾。 輸入下列命令:

    .\telcodatagen.exe 1000 0.2 2
    

    此指令會採用下列參數:

    • 每小時的通話數據記錄數目。
    • 詐騙機率的百分比,也就是應用程式應該模擬詐騙電話的頻率。 值 0.2 表示大約 20% 的通話記錄看起來是詐騙的。
    • 以小時為單位的持續時間,這是應用程式應該執行的時數。 您也可以在命令行結束程式 (Ctrl+C) 以隨時停止應用程式。

    幾秒鐘后,應用程式就會開始在螢幕上顯示通話記錄,因為它會將記錄傳送至事件中樞。 通話資料包含下列欄位:

    錄製 [定義]
    CallrecTime 呼叫開始時間的時間戳。
    SwitchNum 用來連接通話的電話交換器。 在此範例中,參數是代表來源國家/地區的字串串(美國、中國、英國、德國或澳大利亞)。
    CallingNum 來電者的電話號碼。
    CallingIMSI 國際移動訂閱者身份(IMSI)。 這是呼叫端的唯一標識碼。
    CalledNum 通話收件者的電話號碼。
    CalledIMSI 國際行動使用者身分識別(IMSI)。 這是通話收件者的唯一標識符。

建立串流分析作業

既然您已擁有呼叫事件的數據流,您可以建立串流分析作業,以從事件中樞讀取數據。

  1. 若要建立串流分析作業,請流覽至 Azure 入口網站
  2. 選取 [建立資源 ],然後搜尋 串流分析作業。 選取 [ 串流分析作業 ] 圖格,然後選取 [ 建立]。
  3. 在 [新增串流分析作業] 頁面上,遵循下列步驟:
    1. 針對 [ 用帳戶],選取包含事件中樞命名空間的訂用帳戶。

    2. 針對 [ 資源群組],選取您稍早建立的資源群組。

    3. 在 [實例詳細數據]段中,針對 [名稱] 輸入串流分析作業的唯一名稱。

    4. 針對 [ 區域],選取您要在其中建立串流分析作業的區域。 建議您將作業和事件中樞放在相同區域中,以獲得最佳效能,因此您不需要支付在區域之間傳輸數據的費用。

    5. 若尚未選取 [裝載環境<],請選取 [雲端]。 串流分析作業可以部署到雲端或邊緣裝置。 雲端 可讓您部署至 Azure 雲端,而 Edge 可讓您部署至 IoT Edge 裝置。

    6. 針對 [串流單位],選取 [1]。 串流單位代表執行作業所需的計算資源。 根據預設,此值會設定為1。 若要瞭解如何調整串流單位,請參閱 瞭解和調整串流單位 一文。

    7. 選取頁面底部的 [檢閱 + 建立] 。

      顯示 [建立 Azure 串流分析作業] 頁面的螢幕快照。

  4. 在 [ 檢閱 + 建立] 頁面上,檢閱設定,然後選取 [建立 ] 以建立串流分析作業。
  5. 部署作業之後,選取 [移至資源 ] 以流覽至 [ 串流分析作業 ] 頁面。

設定作業輸入

下一個步驟是定義作業的輸入來源,以使用您在上一節中建立的事件中樞來讀取數據。

  1. 在 [ 串流分析作業 ] 頁面上,於 左側功能表中的 [作業拓撲 ] 區段中,選取 [ 輸入]。

  2. 在 [ 輸入] 頁面上,選取 [+ 新增輸入 ] 和 [事件中樞]。

    顯示串流分析作業之 [輸入] 頁面的螢幕快照。

  3. 在 [事件中 ] 頁面上,遵循下列步驟:

    1. 針對 [ 輸入別名],輸入 CallStream。 輸入別名是用來識別輸入的易記名稱。 輸入別名只能包含英數位元、連字元和底線,且長度必須為 3-63 個字元。

    2. 針對 [ 訂用帳戶],選取您建立事件中樞的 Azure 訂用帳戶。 事件中樞可以和串流分析作業位於相同或不同的訂用帳戶中。

    3. 針對 事件中樞命名空間,選取您在上一節中建立的事件中樞命名空間。 您目前訂用帳戶中可用的所有命名空間都會列在下拉式清單中。

    4. 針對 [ 事件中樞名稱],選取您在上一節中建立的事件中樞。 所選命名空間中可用的所有事件中樞都會列在下拉式清單中。

    5. 針對 事件中樞取用者群組,保留已選取 [ 建立新 ] 選項,以便在事件中樞上建立新的取用者群組。 建議讓每一個串流分析作業使用不同的取用者群組。 若未指定取用者群組,串流分析作業會使用 $Default 取用者群組。 當作業包含自我聯結或有多個輸入時,某些輸入稍後可能會由多個讀取器讀取。 這種情況會影響單一取用者群組中的讀取器數目。

    6. 針對 [驗證模式],選取 [連接字串]。 使用此選項更容易測試教學課程。

    7. 針對 [ 事件中樞原則名稱],選取 [ 使用現有的],然後選取您稍早建立的原則。

    8. 選取頁面底部的 [儲存]

      顯示輸入事件中樞設定頁面的螢幕快照。

建立 Azure Cache for Redis 執行個體

  1. 使用建立快取中所述 的步驟,在 Azure Cache for Redis 中建立快取

  2. 建立快取之後,請在 [設定] 底下,選取 [存取密鑰]。 記下主要 連接字串

    顯示 [存取金鑰] 選單項選取項目的螢幕快照。

在 Azure Functions 中建立函式,以將數據寫入 Azure Cache for Redis

  1. 請參閱 Functions 檔的<建立函式應用程式>一節。 此範例建置於:

  2. 遵循本教學課程,在 Visual Studio Code建立預設 HttpTrigger 函式應用程式。 使用下列資訊:語言: C#、運行時間: .NET 6 (在函式 v4 下),範本: HTTP trigger

  3. 在位於專案資料夾的終端機中執行下列命令,以安裝 Redis 用戶端連結庫:

    dotnet add package StackExchange.Redis --version 2.2.88
    
  4. RedisConnectionStringValues 的 區段中local.settings.json新增 和 RedisDatabaseIndex 專案,填入目的地伺服器的 連接字串:

    {
        "IsEncrypted": false,
        "Values": {
            "AzureWebJobsStorage": "",
            "FUNCTIONS_WORKER_RUNTIME": "dotnet",
            "RedisConnectionString": "Your Redis Connection String",
            "RedisDatabaseIndex":"0"
        }
    }
    

    Redis 資料庫索引是從 0 到 15 的數位,可識別實例上的資料庫。

  5. 將整個函式 (專案中的 .cs 檔案) 取代為下列程式碼片段。 自行更新命名空間、類別名稱和函式名稱:

    using System;
    using System.IO;
    using System.Threading.Tasks;
    using Microsoft.AspNetCore.Mvc;
    using Microsoft.Azure.WebJobs;
    using Microsoft.Azure.WebJobs.Extensions.Http;
    using Microsoft.AspNetCore.Http;
    using Microsoft.Extensions.Logging;
    using Newtonsoft.Json;
    
    using StackExchange.Redis;
    
    namespace Company.Function
    {
        public static class HttpTrigger1{
            [FunctionName("HttpTrigger1")]
            public static async Task<IActionResult> Run(
                [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
                ILogger log)
            {
                // Extract the body from the request
                string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
                if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
    
                dynamic data = JsonConvert.DeserializeObject(requestBody);
    
                // Reject if too large, as per the doc
                if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
    
                string RedisConnectionString = Environment.GetEnvironmentVariable("RedisConnectionString");
                int RedisDatabaseIndex = int.Parse(Environment.GetEnvironmentVariable("RedisDatabaseIndex"));
    
                using (var connection = ConnectionMultiplexer.Connect(RedisConnectionString))
                {
                    // Connection refers to a property that returns a ConnectionMultiplexer
                    IDatabase db = connection.GetDatabase(RedisDatabaseIndex);
    
                    // Parse items and send to binding
                    for (var i = 0; i < data.Count; i++)
                    {
                        string key = data[i].Time + " - " + data[i].CallingNum1;
    
                        db.StringSet(key, data[i].ToString());
                        log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
    
                        // Simple get of data types from the cache
                        string value = db.StringGet(key);
                        log.LogInformation($"Database got: {key} => {value}");
    
                    }
                }
                return new OkResult(); // 200
            }
        }
    }
    

    當串流分析從函式收到「HTTP 要求實體太大」例外狀況時,它會減少傳送至函式的批次大小。 下列程式代碼可確保串流分析不會傳送過大的批次。 請確定函式中使用的批次計數和大小值上限與串流分析入口網站中輸入的值一致。

  6. 函式現在可以發佈至 Azure。

  7. 開啟 Azure 入口網站 上的函式,並設定 和的應用程式設定RedisConnectionStringRedisDatabaseIndex

使用函式作為輸出更新串流分析作業

  1. 在 Azure 入口網站 上開啟串流分析作業。

  2. 流覽至您的函式,然後選取 [概觀>輸出>新增]。 若要新增輸出,請選取 [接收] 選項的 [Azure 函式 ]。 Functions 輸出配接器具有下列屬性:

    屬性名稱 說明
    輸出別名 您在作業查詢中用來參考輸出的使用者易記名稱。
    匯入選項 您可以使用目前訂用帳戶中的 函式,或在函式位於另一個訂用帳戶時手動提供設定。
    函數應用程式 Functions 應用程式的名稱。
    函式 Functions 應用程式中函式的名稱(run.csx 函式的名稱)。
    批次大小上限 設定每個輸出批次的大小上限,以位元組為單位傳送至函式。 根據預設,此值會設定為 262,144 個字節(256 KB)。
    最大批次計數 指定傳送至函式的每個批次中事件數目上限。 預設值是 100。 這個屬性為選擇性。
    機碼 可讓您從另一個訂用帳戶使用函式。 提供金鑰值來存取您的函式。 這個屬性為選擇性。
  3. 提供輸出別名的名稱。 在本教學課程中,它會命名為 saop1,但您可以使用您選擇的任何名稱。 填寫其他詳細數據。

  4. 開啟串流分析作業,並將查詢更新為下列內容。

    重要

    下列範例腳本假設您使用 CallStream 作為輸入名稱,並將 saop1 用於輸出名稱。 如果您使用不同的名稱,別忘了更新查詢。

     SELECT
             System.Timestamp as Time, CS1.CallingIMSI, CS1.CallingNum as CallingNum1,
             CS2.CallingNum as CallingNum2, CS1.SwitchNum as Switch1, CS2.SwitchNum as Switch2
         INTO saop1
         FROM CallStream CS1 TIMESTAMP BY CallRecTime
            JOIN CallStream CS2 TIMESTAMP BY CallRecTime
             ON CS1.CallingIMSI = CS2.CallingIMSI AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5
         WHERE CS1.SwitchNum != CS2.SwitchNum
    
  5. 在命令行中執行下列命令,以啟動telcodatagen.exe應用程式。 命令會使用 格式 telcodatagen.exe [#NumCDRsPerHour] [SIM Card Fraud Probability] [#DurationHours]

    telcodatagen.exe 1000 0.2 2
    
  6. 啟動串流分析作業。

  7. 在 Azure 函式的 [ 監視] 頁面上,您會看到已叫用函式。

    顯示 Azure Functions 的 [監視] 頁面與函式調用的螢幕快照。

  8. Azure Cache for Redis 頁面上,選取 左側功能表上的 [計量 ]、新增 快取寫入 計量,並將持續時間設定為 最後一小時。 您會看到類似下圖的圖表。

    此螢幕快照顯示 Azure Cache for Redis 的 [計量] 頁面。

檢查 Azure Cache for Redis 以取得結果

從 Azure Functions 記錄取得金鑰

首先,取得插入至 Azure Cache for Redis 之記錄的索引鍵。 在程序代碼中,金鑰會在 Azure 函式中計算,如下列代碼段所示:

string key = data[i].Time + " - " + data[i].CallingNum1;

db.StringSet(key, data[i].ToString());
log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
  1. 流覽至 Azure 入口網站,並尋找您的 Azure Functions 應用程式。

  2. 選取左側功能表上的 [函式]

  3. 從函式清單中選取 [HTTPTrigger1 ]。

  4. 選取 左側功能表上的 [監視 ]。

  5. 切換至 [ 記錄] 索引標籤。

  6. 記下資訊訊息中的索引鍵,如下列螢幕快照所示。 您可以使用此金鑰來尋找 Azure Cache for Redis 中的值。

    顯示 Azure 函式 [監視記錄] 頁面的螢幕快照。

使用金鑰在 Azure Cache for Redis 中尋找記錄

  1. 流覽至 Azure 入口網站,並尋找您的 Azure Cache for Redis。 選取 [主控台]

  2. 使用 Azure Cache for Redis 命令 來確認您的數據位於 Azure Cache for Redis 中。 (此命令採用 Get {key}格式。使用您從 Azure 函式 [監視記錄] 複製的金鑰(在上一節中)。

    取得“KEY-FROM-THE-PREVIOUS-SECTION”

    此指令應該列印指定索引鍵的值:

    顯示 Redis 快取主控台的螢幕快照,其中顯示 Get 命令的輸出。

錯誤處理和重試

如果在將事件傳送至 Azure Functions 時發生失敗,串流分析會重試大部分的作業。 所有 HTTP 例外狀況都會重試,直到成功為止,但 HTTP 錯誤 413 除外(實體太大)。 實體太大錯誤會被視為受重試或卸除原則約束的數據錯誤。

注意

從串流分析到 Azure Functions 的 HTTP 要求逾時會設定為 100 秒。 如果您的 Azure Functions 應用程式需要 100 秒以上的時間來處理批次,串流分析就會發生錯誤,並會重新處理批次。

重試逾時可能會導致寫入輸出接收的重複事件。 當串流分析重試失敗的批次時,它會重試批次中的所有事件。 例如,請考慮從串流分析傳送至 Azure Functions 的 20 個事件批次。 假設 Azure Functions 需要 100 秒的時間來處理該批次中的前 10 個事件。 100 秒之後,串流分析會暫停要求,因為該要求尚未收到來自 Azure Functions 的正面回應,而且會針對相同的批次傳送另一個要求。 批次中的前 10 個事件會由 Azure Functions 再次處理,這會導致重複。

已知問題

在 Azure 入口網站 中,當您嘗試將 [最大批次大小/ 最大批次計數] 值重設為空白時,值會在儲存時變更回先前輸入的值。 在此案例中,手動輸入這些欄位的預設值。

串流分析目前不支援在 Azure Functions 上使用 HTTP 路由

不支援連線到虛擬網路中裝載的 Azure Functions。

清除資源

若不再需要,可刪除資源群組、串流作業和所有相關資源。 刪除作業可避免因為作業使用串流單位而產生費用。 如果您計劃在未來使用該作業,您可以將其停止並在之後需要時重新啟動。 如果您將不繼續使用此作業,請使用下列步驟,刪除本快速入門所建立的所有資源:

  1. 從 Azure 入口網站的左側功能表中,選取 [資源群組],然後選取您所建立資源的名稱。
  2. 在資源群組頁面上,選取 [刪除],在文字方塊中輸入要刪除的資源名稱,然後選取 [刪除]

下一步

在本教學課程中,您已建立執行 Azure 函式的簡單串流分析作業。 若要深入瞭解串流分析作業,請繼續進行下一個教學課程: