分享方式:


教學課程:從 Azure 串流分析作業執行 Azure Functions

在本教學課程中,您會建立 Azure 串流分析作業,以從 Azure 事件中樞 讀取事件、對事件數據執行查詢,然後叫用 Azure 函式,以寫入 Azure Cache for Redis 實例。

顯示解決方案中 Azure 服務之間關聯性的螢幕快照。

注意

  • 您可以將 Functions 設定為串流分析作業的其中一個接收 (輸出),藉以從 Azure 串流分析執行 Azure Functions。 Functions 是事件導向隨選計算的體驗,可讓您實作在 Azure 或第三方服務中發生之事件所觸發的程式碼。 Functions 回應觸發程序的這種能力使其成為串流分析作業的自然輸出。
  • 串流分析會透過 HTTP 觸發程序叫用 Functions。 Functions 輸出配接器可讓使用者將 Functions 連線到串流分析,以便根據串流分析查詢來觸發事件。
  • 不支援從執行於多租用戶叢集中的串流分析作業連線至虛擬網路 (VNet) 內的 Azure Functions。

在本教學課程中,您會了解如何:

  • 建立 Azure 事件中樞 實例
  • 建立 Azure Cache for Redis 執行個體
  • 建立 Azure 函式
  • 建立串流分析作業
  • 將事件中樞設定為輸入,並將函式設定為輸出
  • 執行串流分析作業
  • 檢查 Azure Cache for Redis 尋找結果

如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶

必要條件

開始之前,請確定您已完成下列步驟:

  • 如果您沒有 Azure 訂閱,請建立免費帳戶
  • 從 Microsoft 下載中心下載通話事件產生器應用程式 TelcoGenerator.zip,或從 GitHub 取得原始程式碼。

登入 Azure

登入 Azure 入口網站

建立事件中樞

您必須先將某些範例資料傳送至事件中樞,串流分析才能分析詐騙電話資料流。 在本教學課程中,您會使用 Azure 事件中樞將資料傳送至 Azure。

請使用下列步驟來建立事件中樞,並將通話資料傳送至事件中樞:

  1. 登入 Azure 入口網站

  2. 選取左側功能表上的 [所有服務],選取 [物聯網],將滑鼠游標停留在 [事件中樞] 上,然後選取 [+ (新增)] 按鈕。

    顯示事件中樞建立頁面的螢幕快照。

  3. 在 [建立命名空間] 頁面上,遵循下列步驟:

    1. 選取您要在其中建立事件中樞的 Azure 訂用帳戶

    2. 針對 [資源群組],選取 [新建],並為資源群組輸入名稱。 事件中樞命名空間會在此資源群組中建立。

    3. 針對 [命名空間名稱],輸入事件中樞命名空間的唯一名稱。

    4. 針對 [位置],選取您想要在其中建立命名空間的區域。

    5. 針對 [定價層],選取 [標準]

    6. 選取頁面底部的 [檢閱 + 建立] 。

      顯示 [建立Namespace] 頁面的螢幕快照。

    7. 檢閱所有設定之後,在命名空間建立精靈的 [檢閱 + 建立] 頁面上,選取頁面底部的 [建立]

  4. 成功部署命名空間後,選取 [前往資源] 以瀏覽至 [事件中樞命名空間] 頁面。

  5. 在 [事件中樞命名空間] 頁面中,選取命令列中的 [+事件中樞]

    顯示事件中樞Namespace頁面上 [新增事件中樞] 按鈕的螢幕快照。

  6. 在 [建立事件中樞] 頁面,輸入事件中樞的 [名稱]。 將 [分割區計數] 設定為 2。 對其餘的設定使用預設選項,然後選取 [檢閱 + 建立]

    顯示 [建立事件中樞] 頁面的螢幕快照。

  7. [檢閱 + 建立] 頁面上,選取頁面底部的 [建立]。 然後,等待部署完成。

授權存取事件中樞並取得連接字串

事件中樞必須先具有允許存取權的原則,應用程式才能將資料傳送到 Azure 事件中樞。 存取原則會產生包含授權資訊的連接字串。

  1. 在 [事件中樞命名空間] 頁面中,選取左側功能表上的 [共用存取原則]

  2. 選取原則清單中的 RootManageSharedAccessKey

    顯示 [共用存取原則] 頁面的螢幕快照。

  3. 然後選取 [連接字串 - 主索引鍵] 旁的複製按鈕。

  4. 將連接字串貼到文字編輯器。 您在下一節中需要此連接字串。

    連接字串如下所示:

    Endpoint=sb://<Your event hub namespace>.servicebus.windows.net/;SharedAccessKeyName=<Your shared access policy name>;SharedAccessKey=<generated key>

    請注意,連接字串包含多個以分號分隔的索引鍵/值組:EndpointSharedAccessKeyNameSharedAccessKey

啟動事件產生器應用程式

啟動 TelcoGenerator 應用程式之前,您應該先將它設定為將資料傳送到您先前建立的事件中樞。

  1. TelcoGenerator.zip 檔案的內容解壓縮。

  2. 在您選擇的文字編輯器中開啟 TelcoGenerator\TelcoGenerator\telcodatagen.exe.config 檔案。有一個以上的 .config 檔案,請確實開啟正確的檔案。

  3. 使用下列詳細資料更新組態檔中的 <appSettings> 元素:

    • EventHubName 索引鍵的值設為連接字串結尾的 EntityPath 值。
    • 將 Microsoft.ServiceBus.ConnectionString 索引鍵的值設定為命名空間 連接字串。 如果您使用事件中樞 連接字串,而不是命名空間,請在結尾移除EntityPath值 (;EntityPath=myeventhub)。 別忘了移除 EntityPath 值前面的分號。
  4. 儲存檔案。

  5. 接著,開啟命令視窗,並切換至解壓縮 TelcoGenerator 應用程式的資料夾。 輸入下列命令:

    .\telcodatagen.exe 1000 0.2 2
    

    此命令採用下列參數︰

    • 每小時的通話資料記錄筆數。
    • 詐騙機率的百分比,也就是應用程式模擬詐騙電話的頻率。 值 0.2 表示大約 20% 的通話記錄可能是詐騙。
    • 以小時為單位的持續時間,也就是應用程式應該執行的時數。 您也可以在命令列結束程序 (Ctrl+C),隨時停止應用程式。

    幾秒之後,隨著應用程式將通話記錄傳送到事件中樞,它會開始在螢幕上顯示通話記錄。 通話資料包含下列欄位:

    錄製 [定義]
    CallrecTime 通話開始時間的時間戳記。
    SwitchNum 用來接通電話的電話交換機。 在此範例中,交換機是代表發話國家/地區的字串 (美國、中國、英國、德國或澳大利亞)。
    CallingNum 來電者的電話號碼。
    CallingIMSI 國際行動用戶識別碼 (IMSI)。 這是來電者的唯一識別碼。
    CalledNum 受話方的電話號碼。
    CalledIMSI 國際行動用戶識別碼 (IMSI)。 這是受話方的唯一識別碼。

建立串流分析作業

既然已取得通話事件的串流,您可以建立串流分析作業,以從事件中樞讀取資料。

  1. 若要建立串流分析作業,請瀏覽至 Azure 入口網站
  2. 選取 [建立資源],並搜尋 [串流分析作業]。 選取 [串流分析作業] 圖格,然後選取 [建立]
  3. 在 [新增串流分析作業] 頁面上,遵循下列步驟:
    1. 針對 [訂用帳戶],選取包含事件中樞命名空間的訂用帳戶。

    2. 針對 [資源群組],選取您稍早建立的資源群組。

    3. 在 [執行個體詳細資料] 區段中,針對 [名稱],輸入串流分析作業的唯一名稱。

    4. 針對 [區域],選取您想要在其中建立串流分析作業的區域。 建議您將作業和事件中樞放在相同的區域,以達到最佳效能,在區域之間傳送資料也不需要付費。

    5. 針對 [裝載環境]<,如果尚未選取,請選取 [雲端]。 串流分析作業可以部署到雲端或邊緣裝置。 雲端可讓您部署到 Azure 雲端,而邊緣可讓您部署到 IoT Edge 裝置。

    6. 針對 [串流單位],選取 [1]。 串流單位代表執行作業所需的計算資源。 根據預設,此值設定為 1。 若要深入了解如何調整串流單位,請參閱了解與調整串流單位一文。

    7. 選取頁面底部的 [檢閱 + 建立] 。

      顯示 [建立 Azure 串流分析作業] 頁面的螢幕快照。

  4. 在 [檢閱 + 建立] 頁面上檢閱設定,然後選取 [建立] 以建立串流分析作業。
  5. 部署工作後,選取 [前往資源] 以瀏覽至 [串流分析作業] 頁面。

設定作業輸入

下一個步驟是使用您在上一節中建立的事件中樞定義作業的輸入來源,以讀取資料。

  1. 在 [串流分析作業] 頁面左側功能表中的 [作業拓撲] 區段中,選取 [輸入]

  2. 在 [輸入] 頁面中,選取 [+新增輸入] 與 [事件中樞]

    顯示串流分析作業之 [輸入] 頁面的螢幕快照。

  3. 在 [事件中樞] 頁面上,遵循下列步驟:

    1. 在 [輸入別名] 中,輸入 CallStream。 輸入別名是可識別輸入的易記名稱。 輸入別名只可包含英數字元、連字號與底線,且其長度必須介於 3-63 個字元之間。

    2. 針對 [訂用帳戶],選取您在其中建立事件中樞的 Azure 訂用帳戶。 事件中樞可位於與串流分析作業相同或不同的訂用帳戶。

    3. 針對 [事件中樞命名空間],選取您在上一節中建立的事件中樞命名空間。 您目前訂用帳戶中所有可用的命名空間都會列在下拉式清單中。

    4. 針對 [事件中樞名稱],選取您在上一節中建立的事件中樞。 已選取命名空間中所有可用的事件中樞都會列在下拉式清單中。

    5. 針對 [事件中樞取用者群組],保留選取 [新建] 選項,以便在事件中樞上建立新的取用者群組。 建議讓每一個串流分析作業使用不同的取用者群組。 若未指定取用者群組,串流分析作業會使用 $Default 取用者群組。 當作業包含自我聯結或有多個輸入時,某些輸入稍後可能由多個讀取器所讀取。 這種情況會影響單一取用者群組中的讀取器數目。

    6. 針對 [驗證模式],選取 [連接字串]。 使用此選項更容易測試教學課程。

    7. 針對 [事件中樞原則名稱],選取 [使用現有項目],然後選取您先前建立的原則。

    8. 選取頁面底部的 [儲存]

      顯示輸入事件中樞設定頁面的螢幕快照。

建立 Azure Cache for Redis 執行個體

  1. 使用在建立快取中所述的步驟,在 Azure Cache for Redis 中建立快取。

  2. 建立快取後,請在 [設定] 底下,選取 [存取金鑰]。 記下主要連接字串

    顯示 [存取金鑰] 選單項選取項目的螢幕快照。

在 Azure Functions 中建立可將資料寫入至 Azure Cache for Redis 的函式

  1. 請參閱 Functions 文件的建立函式應用程式一節。 此範例建置於:

  2. 遵循本教學課程,在 Visual Studio Code 中建立預設的 HttpTrigger 函式應用程式。 會使用下列資訊:語言:C#、執行階段:.NET 6 (位於函式 v4 下)、範本:HTTP trigger

  3. 在位於專案資料夾的終端中執行下列命令,以安裝 Redis 程式庫:

    dotnet add package StackExchange.Redis --version 2.2.88
    
  4. local.settings.jsonValues 區段中新增 RedisConnectionStringRedisDatabaseIndex 項目,填入目的地伺服器的連接字串:

    {
        "IsEncrypted": false,
        "Values": {
            "AzureWebJobsStorage": "",
            "FUNCTIONS_WORKER_RUNTIME": "dotnet",
            "RedisConnectionString": "Your Redis Connection String",
            "RedisDatabaseIndex":"0"
        }
    }
    

    Redis 資料庫索引是從 0 到 15 的數字,可識別執行個體上的資料庫。

  5. 將整個函式 (專案中的 .cs 檔案) 取代為下列程式碼片段。 自行更新命名空間、類別名稱和函式名稱:

    using System;
    using System.IO;
    using System.Threading.Tasks;
    using Microsoft.AspNetCore.Mvc;
    using Microsoft.Azure.WebJobs;
    using Microsoft.Azure.WebJobs.Extensions.Http;
    using Microsoft.AspNetCore.Http;
    using Microsoft.Extensions.Logging;
    using Newtonsoft.Json;
    
    using StackExchange.Redis;
    
    namespace Company.Function
    {
        public static class HttpTrigger1{
            [FunctionName("HttpTrigger1")]
            public static async Task<IActionResult> Run(
                [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
                ILogger log)
            {
                // Extract the body from the request
                string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
                if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
    
                dynamic data = JsonConvert.DeserializeObject(requestBody);
    
                // Reject if too large, as per the doc
                if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
    
                string RedisConnectionString = Environment.GetEnvironmentVariable("RedisConnectionString");
                int RedisDatabaseIndex = int.Parse(Environment.GetEnvironmentVariable("RedisDatabaseIndex"));
    
                using (var connection = ConnectionMultiplexer.Connect(RedisConnectionString))
                {
                    // Connection refers to a property that returns a ConnectionMultiplexer
                    IDatabase db = connection.GetDatabase(RedisDatabaseIndex);
    
                    // Parse items and send to binding
                    for (var i = 0; i < data.Count; i++)
                    {
                        string key = data[i].Time + " - " + data[i].CallingNum1;
    
                        db.StringSet(key, data[i].ToString());
                        log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
    
                        // Simple get of data types from the cache
                        string value = db.StringGet(key);
                        log.LogInformation($"Database got: {key} => {value}");
    
                    }
                }
                return new OkResult(); // 200
            }
        }
    }
    

    串流分析從函式收到「HTTP 要求實體太大」的例外狀況時,會縮減傳送至函式的批次大小。 下列程式碼可確保串流分析不會傳送過大的批次。 請確認函式中使用的最大批次計數和大小值都符合串流分析入口網站中輸入的值。

  6. 函式現在可以發佈至 Azure。

  7. 在 Azure 入口網站上開啟函式,並設定 RedisConnectionStringRedisDatabaseIndex應用程式設定

使用函式做為輸出,更新串流分析作業

  1. 在 Azure 入口網站上,開啟您的串流分析作業。

  2. 瀏覽至您的函式,並選取 [概觀]>[輸出]>[新增]。 若要新增輸出,請對於接收選項選取 Azure Function。 新的 Functions 輸出配接器具有下列屬性:

    屬性名稱 說明
    輸出別名 在作業的查詢中用來參考該次輸入的易記名稱。
    匯入選項 您可以使用目前訂用帳戶的函式,如果函式位於另一個訂用帳戶中,也可以手動提供設定。
    函數應用程式 Functions 應用程式的名稱。
    函式 您的 Functions 應用程式中的函式名稱 (您的 run.csx 函式名稱)。
    批次大小上限 設定每個輸出批次 (傳送至您的函式) 的大小上限 (以位元組為單位)。 預設值為 262,144 位元組 (256 KB)。
    批次計數上限 傳送至函式的每個批次中的事件數上限。 預設值是 100。 這個屬性為選擇性。
    機碼 可讓您使用其他訂用帳戶中的函式。 提供存取您函式的索引鍵值。 這個屬性為選擇性。
  3. 提供輸出別名的別名。 在此教學課程中,其命名為 saop1,您可以使用您選擇的任何名稱。 填入其他詳細資料。

  4. 開啟串流分析作業,並更新下列的查詢。

    重要

    下列範例腳本假設您使用 CallStream 作為輸入名稱,並將 saop1 用於輸出名稱。 如果您使用不同的名稱,別忘了更新查詢。

     SELECT
             System.Timestamp as Time, CS1.CallingIMSI, CS1.CallingNum as CallingNum1,
             CS2.CallingNum as CallingNum2, CS1.SwitchNum as Switch1, CS2.SwitchNum as Switch2
         INTO saop1
         FROM CallStream CS1 TIMESTAMP BY CallRecTime
            JOIN CallStream CS2 TIMESTAMP BY CallRecTime
             ON CS1.CallingIMSI = CS2.CallingIMSI AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5
         WHERE CS1.SwitchNum != CS2.SwitchNum
    
  5. 在命令列中執行下列命令,啟動 telcodatagen.exe 應用程式。 此命令使用的格式為 telcodatagen.exe [#NumCDRsPerHour] [SIM Card Fraud Probability] [#DurationHours]

    telcodatagen.exe 1000 0.2 2
    
  6. 啟動串流分析作業。

  7. 在 Azure 函式的 [監視] 頁面上,您會看到已叫用函式。

    顯示 Azure Functions 的 [監視] 頁面與函式調用的螢幕快照。

  8. 在您建立的 [Azure Cache for Redis] 頁面上,選取左側功能表上的 [計量]、新增 [快取寫入] 計量,並將持續時間設定為 [前一小時]。 您會看到類似下圖的圖表。

    此螢幕快照顯示 Azure Cache for Redis 的 [計量] 頁面。

檢查 Azure Cache for Redis 尋找結果

取得 Azure Functions 記錄中的索引鍵

首先,取得在 Azure Cache for Redis 插入之記錄的索引鍵。 如下列程式碼片段所示,會在程式碼中使用 Azure 函式計算索引鍵:

string key = data[i].Time + " - " + data[i].CallingNum1;

db.StringSet(key, data[i].ToString());
log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
  1. 瀏覽至 Azure 入口網站,並找出 Azure Functions 應用程式。

  2. 選取左側功能表上的 [函式]

  3. 從函式清單中選取 [HTTPTrigger1]

  4. 在左側功能表上選取 [監視]

  5. 切換至 [記錄] 索引標籤。

  6. 如下列螢幕擷取畫面所示,記下資訊訊息中的索引鍵。 您可以使用此索引鍵,以尋找 Azure Cache for Redis 中的值。

    顯示 Azure 函式 [監視記錄] 頁面的螢幕快照。

使用索引鍵在 Azure Cache for Redis 中尋找記錄

  1. 瀏覽至 Azure 入口網站,並找出您的 Azure Cache for Redis。 選取 [主控台]

  2. 使用 Azure Cache for Redis 命令,確認您的資料在 Azure Cache for Redis 中。 (此命令接受 Get {key} 的格式。)使用您從 Azure 函式 [監視記錄] 複製的索引鍵 (在上一節中)。

    取得 "KEY-FROM-THE-PREVIOUS-SECTION"

    此命令應該會列出指定索引鍵的值:

    顯示 Redis 快取主控台的螢幕快照,其中顯示 Get 命令的輸出。

錯誤處理和重試

如果將事件傳送至 Azure Functions 時發生失敗,則串流分析會重試大部分的作業。 所有 HTTP 例外狀況都會重試,直到成功但出現 HTTP 錯誤 413 (實體太大) 為止。 實體太大的錯誤會被視為受制於重試或捨棄原則的資料錯誤。

注意

從串流分析到 Azure Functions 的 HTTP 要求逾時時間設定為 100 秒。 如果您的 Azure Functions 應用程式需要超過 100 秒來處理批次,串流分析就會發生錯誤,並會重試批次。

重試逾時可能會導致寫入輸出接收的重複事件。 當串流分析重試失敗的批次時,其會針對批次中的所有事件重試。 例如,從串流分析傳送其中有 20 個事件的批次到 Azure Functions。 假設 Azure Functions 需要 100 秒的時間來處理該批次中的前 10 個事件。 在 100 秒之後,串流分析暫停要求,因為其尚未收到來自 Azure Functions 的正面回應,而另一個要求又針對相同的批次傳送。 Azure Functions 會再次處理批次中的前 10 個事件,因此導致重複。

已知問題

在 Azure 入口網站中,您嘗試將最大批次大小/最大批次計數值重設為空 (預設值) 時,值將儲存時變更回先前輸入的值。 在此情況下,請手動將預設值輸入欄位。

串流分析目前不支援在 Azure Functions 上使用 HTTP 路由

不會啟用對連線到虛擬網路中所裝載 Azure Functions 的支援。

清除資源

若不再需要,可刪除資源群組、串流作業和所有相關資源。 刪除作業可避免因為作業使用串流單位而產生費用。 如果您計劃在未來使用該作業,您可以將其停止並在之後需要時重新啟動。 如果您將不繼續使用此作業,請使用下列步驟,刪除本快速入門所建立的所有資源:

  1. 從 Azure 入口網站的左側功能表中,選取 [資源群組],然後選取您所建立資源的名稱。
  2. 在資源群組頁面上,選取 [刪除],在文字方塊中輸入要刪除的資源名稱,然後選取 [刪除]

下一步

在本教學課程中,您已建立可執行 Azure 函式的串流分析作業。 若要深入了解串流分析作業,請繼續下一個教學課程: