将实时事件从自定义应用流式传输到 Microsoft Fabric KQL 数据库
在本教程中,你将了解如何使用 Microsoft Fabric 事件流功能将自定义应用程序中的实时事件流式传输到 KQL 数据库。 你还将了解如何创建近实时 Power BI 报表,以有效监视业务数据。
本教程介绍如何执行下列操作:
- 在 Microsoft Fabric 中创建 KQL 数据库和事件流。
- 为事件流配置源(自定义应用)和目标(KQL 数据库)。
- 使用自定义应用将事件发送到事件流。
- 生成有关 KQL 数据库中数据的准实时 Power BI 报表。
先决条件
开始之前,你必须满足以下先决条件:
- 使用参与者或更高权限获取事件流和 KQL 数据库所在高级工作区的访问权限。
- 下载并安装最新的 Node.js 长期支持 (LTS) 版本。
- 下载并安装 Visual Studio Code(推荐)或任何其他集成开发环境 (IDE)。
在 Microsoft Fabric 中创建 KQL 数据库和事件流
可通过“工作区”页或“创建中心”页创建 KQL 数据库和事件流。 按照以下步骤创建数据库,然后再次创建事件流:
将 Fabric 体验更改为“实时智能”,然后选择“KQL 数据库”或“事件流”,以在工作区或中心创建这些项。 (为获得最佳结果,请先创建 KQL 数据库,然后再创建事件流。)
输入新建 KQL 数据库或事件流的名称,然后选择“创建”。 对于本文中的示例,我们将 citytempdb 用于 KQL 数据库,将 citytempdata-es 用于 eventstream。
确认 citytempdb 和 citytempdata-es 显示在工作区中。
将自定义应用程序源添加到事件流
利用自定义应用程序源,可以连接自己的应用程序,并轻松地将事件数据传输到事件流。 连接端点随时可用并在自定义应用程序源中公开,使整个过程流畅。
按照以下步骤将自定义应用程序源添加到事件流:
在功能区上选择“新建源”或在主编辑器画布中选择“+”,然后选择“自定义应用”。 此时会显示“自定义应用”配置对话框。
输入自定义应用的源名称,然后选择“添加”。
成功添加自定义应用程序源后,画布上会显示一个新的源节点。 选择此节点以在下方窗格的“详细信息”选项卡中查看有关源的关键信息。
- 基本信息:显示自定义应用的名称、说明、类型和状态。
- 密钥:显示自定义应用程序的连接字符串,可将其复制粘贴到应用程序中。
- 示例代码:显示示例代码,可以参考或复制该代码,将事件数据推送到此事件流或从此事件流中拉取事件数据。
对于每个选项卡(“基本信息” / “密钥” / “示例代码”),还可以切换三个协议选项卡:Eventhub、AMQP 和 Kafka,用于访问不同协议格式信息。
“详细信息”选项卡上显示的连接字符串与 Azure 事件中心兼容。 可以在应用程序中复制并使用此连接字符串,将事件发送到 Eventstream。 以下示例显示了连接字符串的呈现效果:
Endpoint=sb://eventstream-xxxxxxxx.servicebus.windows.net/;SharedAccessKeyName=key_xxxxxxxx;SharedAccessKey=xxxxxxxx;EntityPath=es_xxxxxxxx
创建应用程序以将事件发送到事件流
通过在自定义应用源中随时可用的事件中心的连接字符串,可以创建将事件发送到事件流的应用程序。 在下面的示例中,应用程序将模拟 10 个传感器设备,几乎每秒传输一次温度和湿度数据。
打开代码编辑器,例如 Visual Studio Code。
创建名为 sendtoes.js 的文件,然后将以下代码粘贴到其中。 将以下占位符替换为连接字符串主键或连接字符串辅助键中的实值。
例如:
Endpoint=sb://eventstream-xxxxxxxx.servicebus.windows.net/;SharedAccessKeyName=key_xxxxxxxx;SharedAccessKey=xxxxxxxx;EntityPath=es_xxxxxxxx
在此示例中,连接字符串为
Endpoint=sb://eventstream-xxxxxxxx.servicebus.windows.net/;SharedAccessKeyName=key_xxxxx;SharedAccessKey=xxxxxxxx
。 实体名称是后面的EntityPath=
字符串,即es_xxxxxxxx
。const { EventHubProducerClient } = require("@azure/event-hubs"); var moment = require('moment'); const connectionString = "CONNECTION STRING"; const entityName = "ENTITY NAME"; //Generate event data function getRowData(id) { const time = moment().toISOString(); const deviceID = id + 100; const humidity = Math.round(Math.random()*(65-35) + 35); const temperature = Math.round(Math.random()*(37-20) + 20); return {"entryTime":time, "messageId":id, "temperature":temperature, "humidity":humidity, "deviceID":deviceID}; } function sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)); } async function main() { // Create a producer client to send messages to the eventstream. const producer = new EventHubProducerClient(connectionString, entityName); // There are 10 devices. They're sending events nearly every second. So, there are 10 events in one batch. // The event counts per batch. For this case, it's the sensor device count. const batchSize = 10; // The event batch count. If you want to send events indefinitely, you can increase this number to any desired value. const batchCount = 5; // Generating and sending events... for (let j = 0; j < batchCount; ++j) { const eventDataBatch = await producer.createBatch(); for (let k = 0; k < batchSize; ++k) { eventDataBatch.tryAdd({ body: getRowData(k) }); } // Send the batch to the eventstream. await producer.sendBatch(eventDataBatch); console.log(moment().format('YYYY/MM/DD HH:mm:ss'), `[Send events to Fabric Eventstream]: batch#${j} (${batchSize} events) has been sent to eventstream`); // sleep for 1 second. await sleep(1000); } // Close the producer client. await producer.close(); console.log(moment().format('YYYY/MM/DD HH:mm:ss'), `[Send events to Fabric Eventstream]: All ${batchCount} batches have been sent to eventstream`); } main().catch((err) => { console.log("Error occurred: ", err); });
若要运行此脚本,需要安装所需的包。 打开 PowerShell 命令提示符,转到与 sendtoes.js 相同的文件夹,然后执行以下命令:
npm install @azure/event-hubs npm install moment
在 PowerShell 命令提示符中运行
node sendtoes.js
以执行此文件。 窗口中应会显示有关正在发送的事件的消息。C:\wa>node sendtoes.js 2023/06/12 20:35:39 [Send events to Fabric Eventstream]: batch#0 (10 events) has been sent to eventstream 2023/06/12 20:35:41 [Send events to Fabric Eventstream]: batch#1 (10 events) has been sent to eventstream 2023/06/12 20:35:42 [Send events to Fabric Eventstream]: batch#2 (10 events) has been sent to eventstream 2023/06/12 20:35:44 [Send events to Fabric Eventstream]: batch#3 (10 events) has been sent to eventstream 2023/06/12 20:35:45 [Send events to Fabric Eventstream]: batch#4 (10 events) has been sent to eventstream 2023/06/12 20:35:47 [Send events to Fabric Eventstream]: All 5 batches have been sent to eventstream
转到事件流的主编辑器画布,然后选择事件流节点。 然后选择下部窗格上的“数据预览”选项卡。
还可以通过选择“数据见解”选项卡来查看数据指标,以确认数据是否已流式传输到事件流中。
将 KQL 数据库目标添加到事件流
自定义应用程序将事件流式传输到事件流中时,可以添加和配置要从事件流接收事件的 KQL 数据库目标。 要添加 KQL 数据库目标,请执行以下步骤:
在功能区上选择“新建目标”或在主编辑器画布中选择“+”,然后选择“KQL 数据库”。
输入目标名称,选择工作区,然后从所选工作区中选择 KQL 数据库。
选择“添加并配置”以启动数据引入向导。 在“目标”选项卡上,输入 KQL 表的名称,然后选择“下一步: 源”。
在“源”选项卡上,验证事件流的源,然后选择“下一步: 架构”。
在“架构”选项卡上,选择“JSON”作为数据格式。 可以在右侧窗格中预览数据。 如果数据类型不符合预期,可以通过选择表标题中的箭头对其进行修改。 还可以根据要求添加或移除列。 完成后,选择“下一步: 摘要”。
在“摘要”选项卡上,可以查看配置和状态摘要。 如果一切正确,请选择“完成”以完成配置。 事件数据开始流入 KQL 数据库,并且 KQL 数据库目标会显示在画布上。
验证 KQL 数据库中的数据
要验证新 KQL 数据库中的事件数据,请:
在画布上,选择 KQL 数据库目标节点。 然后选择下部窗格中的“信息”选项卡。
在“信息”选项卡上,选择 citytempdb 旁边的“打开项”。
在 citytempdb KQL 数据库中,选择 tempdatatbl 表。 然后选择“查询表”>“显示任意 100 条记录”。
确认数据显示在“浏览数据”面板的下半部分。
使用 KQL 数据库中引入的事件数据生成近实时 Power BI 报表
将数据引入 KQL 数据库后,可以根据特定需求对其进行分析。 例如,如果对过去两小时内的温度和湿度趋势进行近实时跟踪,可以编写 KQL 查询来检索该数据。 然后,可以在 Power BI 报表中启用自动刷新并可视化结果。
在显示任意 100 条记录的 KQL 查询窗口中,按如下所示修改查询:
// Use 'take' to view a sample number of records in the table and check the data. tempdatatbl | where entryTime > now(-2h) | summarize ptemperature = avg(temperature), phumidity = avg(humidity) by bin(entryTime, 1s)
注意
使用运算符
avg
,因为每秒会有 10 台传感器设备发出数据。选择“生成 Power BI 报表”以创建报表。 在 Power BI 报表对话框中,将两个折线图添加到报表中。 为这两个图表选择 temperature、humidity 和 entryTime 数据以监视数据。 还可以向显示最新 entryTime 数据的报表添加 card,以监视最近的事件时间。
完成报表配置后,选择“文件”->“保存”,以将此报表保存到工作区。
对于自动数据刷新,请在 Power BI 报表中选择“编辑”。 转到“可视化效果”下的“设置页面格式”,选择“页面刷新”并设置刷新间隔。
注意
管理员间隔可控制最小刷新间隔。
相关内容
在本教程中,你已了解如何将实时事件从自己的应用程序流式传输到 KQL 数据库。 然后,使用 KQL 查询语义模型创建准实时 Power BI 报表,借助该报表可以可视化事件数据中的业务见解。
如想了解用于处理 Fabric 事件流的更多高级功能,你可能会发现以下资源非常有用:
反馈
https://aka.ms/ContentUserFeedback。
即将发布:在整个 2024 年,我们将逐步淘汰作为内容反馈机制的“GitHub 问题”,并将其取代为新的反馈系统。 有关详细信息,请参阅:提交和查看相关反馈