将实时事件从自定义应用流式传输到 Microsoft Fabric KQL 数据库

在本教程中,你将了解如何使用 Microsoft Fabric 事件流功能将自定义应用程序中的实时事件流式传输到 KQL 数据库。 你还将了解如何创建近实时 Power BI 报表,以有效监视业务数据。

本教程介绍如何执行下列操作:

  • 在 Microsoft Fabric 中创建 KQL 数据库和事件流。
  • 为事件流配置源(自定义应用)和目标(KQL 数据库)。
  • 使用自定义应用将事件发送到事件流。
  • 生成有关 KQL 数据库中数据的准实时 Power BI 报表。

先决条件

开始之前,你必须满足以下先决条件:

在 Microsoft Fabric 中创建 KQL 数据库和事件流

可通过“工作区”页或“创建中心”页创建 KQL 数据库和事件流。 按照以下步骤创建数据库,然后再次创建事件流:

  1. 将 Fabric 体验更改为“实时智能”,然后选择“KQL 数据库”或“事件流”,以在工作区或中心创建这些项。 (为获得最佳结果,请先创建 KQL 数据库,然后再创建事件流。)

    • 在“工作区”页,选择“新建”,然后选择“KQL 数据库”或“事件流”。

      显示在何处从工作区中的“新建”菜单选择“事件流”和“湖屋”的屏幕截图。

    • 在“创建中心”中,选择“KQL 数据库”或“事件流”。

      显示在何处在创建中心选择事件流和 KQL 数据库磁贴的屏幕截图。

  2. 输入新建 KQL 数据库或事件流的名称,然后选择“创建”。 对于本文中的示例,我们将 citytempdb 用于 KQL 数据库,将 citytempdata-es 用于 eventstream。

    “新建事件流”对话框的屏幕截图。

  3. 确认 citytempdb 和 citytempdata-es 显示在工作区中。

    显示工作区中双项列表的屏幕截图。

将自定义应用程序源添加到事件流

利用自定义应用程序源,可以连接自己的应用程序,并轻松地将事件数据传输到事件流。 连接端点随时可用并在自定义应用程序源中公开,使整个过程流畅。

按照以下步骤将自定义应用程序源添加到事件流:

  1. 在功能区上选择“新建源”或在主编辑器画布中选择“+”,然后选择“自定义应用”。 此时会显示“自定义应用”配置对话框。

  2. 输入自定义应用的源名称,然后选择“添加”。

    “自定义应用”对话框的屏幕截图。

  3. 成功添加自定义应用程序源后,画布上会显示一个新的源节点。 选择此节点以在下方窗格的“详细信息”选项卡中查看有关源的关键信息。

    显示自定义应用信息的屏幕截图。

    • 基本信息:显示自定义应用的名称、说明、类型和状态。
    • 密钥:显示自定义应用程序的连接字符串,可将其复制粘贴到应用程序中。
    • 示例代码:显示示例代码,可以参考或复制该代码,将事件数据推送到此事件流或从此事件流中拉取事件数据。

    对于每个选项卡(“基本信息” / “密钥” / “示例代码”),还可以切换三个协议选项卡:Eventhub、AMQP 和 Kafka,用于访问不同协议格式信息。

“详细信息”选项卡上显示的连接字符串与 Azure 事件中心兼容。 可以在应用程序中复制并使用此连接字符串,将事件发送到 Eventstream。 以下示例显示了连接字符串的呈现效果:

Endpoint=sb://eventstream-xxxxxxxx.servicebus.windows.net/;SharedAccessKeyName=key_xxxxxxxx;SharedAccessKey=xxxxxxxx;EntityPath=es_xxxxxxxx

显示“自定义应用详细信息”选项卡的屏幕截图。

创建应用程序以将事件发送到事件流

通过在自定义应用源中随时可用的事件中心的连接字符串,可以创建将事件发送到事件流的应用程序。 在下面的示例中,应用程序将模拟 10 个传感器设备,几乎每秒传输一次温度和湿度数据。

  1. 打开代码编辑器,例如 Visual Studio Code

  2. 创建名为 sendtoes.js 的文件,然后将以下代码粘贴到其中。 将以下占位符替换为连接字符串主键连接字符串辅助键中的实值。

    例如:

    Endpoint=sb://eventstream-xxxxxxxx.servicebus.windows.net/;SharedAccessKeyName=key_xxxxxxxx;SharedAccessKey=xxxxxxxx;EntityPath=es_xxxxxxxx

    连接字符串示例的屏幕截图,其中连接字符串以黄色突出显示,实体名称以蓝色突出显示。

    在此示例中,连接字符串为 Endpoint=sb://eventstream-xxxxxxxx.servicebus.windows.net/;SharedAccessKeyName=key_xxxxx;SharedAccessKey=xxxxxxxx。 实体名称是后面的 EntityPath= 字符串,即 es_xxxxxxxx

    const { EventHubProducerClient } = require("@azure/event-hubs");
    var moment = require('moment');
    
    const connectionString = "CONNECTION STRING";
    const entityName = "ENTITY NAME";
    
    //Generate event data
    function getRowData(id) {
        const time = moment().toISOString();
        const deviceID = id + 100;
        const humidity = Math.round(Math.random()*(65-35) + 35);
        const temperature = Math.round(Math.random()*(37-20) + 20);
    
        return {"entryTime":time, "messageId":id, "temperature":temperature, "humidity":humidity, "deviceID":deviceID};
      }
    
    function sleep(ms) {  
        return new Promise(resolve => setTimeout(resolve, ms));  
      } 
    
    async function main() {
        // Create a producer client to send messages to the eventstream.
        const producer = new EventHubProducerClient(connectionString, entityName);
    
        // There are 10 devices. They're sending events nearly every second. So, there are 10 events in one batch.
        // The event counts per batch. For this case, it's the sensor device count.
        const batchSize = 10;
        // The event batch count. If you want to send events indefinitely, you can increase this number to any desired value.
        const batchCount = 5;
    
        // Generating and sending events...
        for (let j = 0; j < batchCount; ++j) {
            const eventDataBatch = await producer.createBatch();
            for (let k = 0; k < batchSize; ++k) {
                eventDataBatch.tryAdd({ body: getRowData(k) });
            }  
            // Send the batch to the eventstream.
            await producer.sendBatch(eventDataBatch);
            console.log(moment().format('YYYY/MM/DD HH:mm:ss'), `[Send events to Fabric Eventstream]: batch#${j} (${batchSize} events) has been sent to eventstream`);
            // sleep for 1 second.
            await sleep(1000); 
        }
        // Close the producer client.
        await producer.close();
        console.log(moment().format('YYYY/MM/DD HH:mm:ss'), `[Send events to Fabric Eventstream]: All ${batchCount} batches have been sent to eventstream`);
    }
    
    main().catch((err) => {
        console.log("Error occurred: ", err);
    });
    
  3. 若要运行此脚本,需要安装所需的包。 打开 PowerShell 命令提示符,转到与 sendtoes.js 相同的文件夹,然后执行以下命令:

    npm install @azure/event-hubs
    npm install moment
    
  4. 在 PowerShell 命令提示符中运行 node sendtoes.js 以执行此文件。 窗口中应会显示有关正在发送的事件的消息。

    C:\wa>node sendtoes.js
    2023/06/12 20:35:39 [Send events to Fabric Eventstream]: batch#0 (10 events) has been sent to eventstream
    2023/06/12 20:35:41 [Send events to Fabric Eventstream]: batch#1 (10 events) has been sent to eventstream
    2023/06/12 20:35:42 [Send events to Fabric Eventstream]: batch#2 (10 events) has been sent to eventstream
    2023/06/12 20:35:44 [Send events to Fabric Eventstream]: batch#3 (10 events) has been sent to eventstream
    2023/06/12 20:35:45 [Send events to Fabric Eventstream]: batch#4 (10 events) has been sent to eventstream
    2023/06/12 20:35:47 [Send events to Fabric Eventstream]: All 5 batches have been sent to eventstream
    
  5. 转到事件流的主编辑器画布,然后选择事件流节点。 然后选择下部窗格上的“数据预览”选项卡。

    显示在何处查找事件流节点和事件流“数据预览”选项卡的屏幕截图。

    还可以通过选择“数据见解”选项卡来查看数据指标,以确认数据是否已流式传输到事件流中。

    显示在何处选择事件流节点和查找事件流“数据见解”选项卡的屏幕截图。

将 KQL 数据库目标添加到事件流

自定义应用程序将事件流式传输到事件流中时,可以添加和配置要从事件流接收事件的 KQL 数据库目标。 要添加 KQL 数据库目标,请执行以下步骤:

  1. 在功能区上选择“新建目标”或在主编辑器画布中选择“+”,然后选择“KQL 数据库”。

  2. 输入目标名称,选择工作区,然后从所选工作区中选择 KQL 数据库。

    显示在何处创建 Azure IoT 中心目标的屏幕截图。

  3. 选择“添加并配置”以启动数据引入向导。 在“目标”选项卡上,输入 KQL 表的名称,然后选择“下一步: 源”。

    显示“引入数据”配置向导的“目标”选项卡的屏幕截图。

  4. 在“源”选项卡上,验证事件流的源,然后选择“下一步: 架构”。

    显示“引入数据”配置向导的“源”选项卡的屏幕截图。

  5. 在“架构”选项卡上,选择“JSON”作为数据格式。 可以在右侧窗格中预览数据。 如果数据类型不符合预期,可以通过选择表标题中的箭头对其进行修改。 还可以根据要求添加或移除列。 完成后,选择“下一步: 摘要”。

    “引入数据”配置向导中“架构”选项卡的屏幕截图。

  6. 在“摘要”选项卡上,可以查看配置和状态摘要。 如果一切正确,请选择“完成”以完成配置。 事件数据开始流入 KQL 数据库,并且 KQL 数据库目标会显示在画布上。

    显示在何处在画布上查找新 KQL 数据库目标的屏幕截图。

验证 KQL 数据库中的数据

要验证新 KQL 数据库中的事件数据,请:

  1. 在画布上,选择 KQL 数据库目标节点。 然后选择下部窗格中的“信息”选项卡。

    显示在何处选择 KQL 目标节点并查看“信息”选项卡的屏幕截图。

  2. 在“信息”选项卡上,选择 citytempdb 旁边的“打开项”

  3. 在 citytempdb KQL 数据库中,选择 tempdatatbl 表。 然后选择“查询表”>“显示任意 100 条记录”。

    显示用于显示查询表中 100 条记录的选择的屏幕截图。

  4. 确认数据显示在“浏览数据”面板的下半部分。

    显示 KQL 表中前 100 条记录的屏幕截图。

使用 KQL 数据库中引入的事件数据生成近实时 Power BI 报表

将数据引入 KQL 数据库后,可以根据特定需求对其进行分析。 例如,如果对过去两小时内的温度和湿度趋势进行近实时跟踪,可以编写 KQL 查询来检索该数据。 然后,可以在 Power BI 报表中启用自动刷新并可视化结果。

  1. 在显示任意 100 条记录的 KQL 查询窗口中,按如下所示修改查询:

    // Use 'take' to view a sample number of records in the table and check the data.
    tempdatatbl
    | where entryTime > now(-2h)
    | summarize ptemperature = avg(temperature), phumidity = avg(humidity) by bin(entryTime, 1s)
    

    注意

    使用运算符 avg,因为每秒会有 10 台传感器设备发出数据。

  2. 选择“生成 Power BI 报表”以创建报表。 在 Power BI 报表对话框中,将两个折线图添加到报表中。 为这两个图表选择 temperaturehumidityentryTime 数据以监视数据。 还可以向显示最新 entryTime 数据的报表添加 card,以监视最近的事件时间。

    完成报表配置后,选择“文件”->“保存”,以将此报表保存到工作区。

    显示通过 KQL 创建 Power BI 报表的屏幕截图。

  3. 对于自动数据刷新,请在 Power BI 报表中选择“编辑”。 转到“可视化效果”下的“设置页面格式”,选择“页面刷新”并设置刷新间隔。

    注意

    管理员间隔可控制最小刷新间隔。

    显示如何启用自动刷新的屏幕截图。

在本教程中,你已了解如何将实时事件从自己的应用程序流式传输到 KQL 数据库。 然后,使用 KQL 查询语义模型创建准实时 Power BI 报表,借助该报表可以可视化事件数据中的业务见解。

如想了解用于处理 Fabric 事件流的更多高级功能,你可能会发现以下资源非常有用: