通过


你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

Azure 流分析 JavaScript 自定义聚合

Azure 流分析支持以 JavaScript 编写的用户定义的聚合 (UDA),可实现复杂的有状态业务逻辑。 在 UDA 中,你可以全面控制状态数据结构、状态累积、状态分散和聚合结果计算。 本文介绍两个不同的 JavaScript UDA 接口、UDA 的创建步骤,以及如何在流分析查询中将 UDA 与基于窗口的操作结合使用。

JavaScript 用户定义的聚合

用户定义的聚合应用于时间窗口设定之上,以对该窗口内的事件进行聚合,并生成一个结果值。 流分析目前支持两种类型的 UDA 接口:AccumulateOnly 和 AccumulateDeaccumulate。 翻转窗口、跳跃窗口、滑动窗口和会话窗口都可以使用两种类型的 UDA。 与跳跃窗口、滑动窗口和会话窗口结合使用时,AccumulateDeaccumulate UDA 的表现比 AccumulateOnly UDA 更好。 可以根据所用的算法选择其中一种类型。

仅限累积聚合

AccumulateOnly 聚合只能将新事件累积到其状态中,算法不允许减少已累积的值。 在无法从状态值中实现事件信息去累积时,请选择此聚合类型。 下面是 AccumulateOnly 聚合的 JavaScript 模板:

// Sample UDA which state can only be accumulated.
function main() {
    this.init = function () {
        this.state = 0;
    }

    this.accumulate = function (value, timestamp) {
        this.state += value;
    }

    this.computeResult = function () {
        return this.state;
    }
}

AccumulateDeaccumulate 聚合与去聚合

AccumulateDeaccumulate 聚合允许从状态中减少之前积累的某个值,例如,从事件值列表中删除某个键值对,或者从求和聚合的状态中减去某个值。 以下是用于 AccumulateDeaccumulate 聚合的 JavaScript 模板:

// Sample UDA which state can be accumulated and deaccumulated.
function main() {
    this.init = function () {
        this.state = 0;
    }

    this.accumulate = function (value, timestamp) {
        this.state += value;
    }

    this.deaccumulate = function (value, timestamp) {
        this.state -= value;
    }

    this.deaccumulateState = function (otherState){
        this.state -= otherState.state;
    }

    this.computeResult = function () {
        return this.state;
    }
}

UDA - JavaScript 函数声明

每个 JavaScript UDA 由函数对象声明定义。 下面是 UDA 定义中的主要元素。

函数别名

函数别名是 UDA 标识符。 在流分析查询中调用时,始终将 UDA 别名与“uda.”前缀结合使用。

函数类型

对于 UDA 而言,函数类型应为 JavaScript UDA。

输出类型

流分析作业支持的特定类型,或在查询中处理类型时使用“任意”值。

函数名称

此函数对象的名称。 函数名称应与 UDA 别名匹配。

方法 - init()

Init() 方法初始化聚合的状态。 窗口启动时会调用此方法。

方法 – 累加()

Accumulate() 方法基于前一状态和当前事件值计算 UDA 状态。 当某个事件进入时间窗口(TUMBLINGWINDOW、HOPPINGWINDOW、SLIDINGWINDOW 或 SESSIONWINDOW)时,会调用此方法。

方法 – 去积累化()

deaccumulate() 方法基于前一状态和当前事件值重新计算状态。 当事件退出 SLIDINGWINDOW 或 SESSIONWINDOW 时,会调用此方法。

方法 – deaccumulateState()

deaccumulateState() 方法基于前一状态和跳跃状态重新评估并计算状态。 当一组事件退出 HOPPINGWINDOW 时,会调用此方法。

方法 – computeResult()

computeResult() 方法基于当前状态返回聚合结果。 当某个时间窗口(TUMBLINGWINDOW、HOPPINGWINDOW、SLIDINGWINDOW 或 SESSIONWINDOW)结束时,会调用此方法。

JavaScript UDA 支持的输入和输出数据类型

有关 JavaScript UDA 数据类型,请参阅集成 JavaScript UDF流分析和 JavaScript 类型转换部分。

通过 Azure 门户添加 JavaScript UDA

下面演练通过门户创建 UDA 的过程。 此处使用的示例计算时间加权平均值。

现在,让我们遵循以下步骤在现有的 ASA 作业下创建一个 JavaScript UDA。

  1. 登录到 Azure 门户,然后定位您的现有 Stream Analytics 作业。

  2. 然后选择作业拓扑下的功能链接。

  3. 选择“添加”以添加新函数。

  4. 在“新建函数”视图中,选择“JavaScript UDA”作为函数类型,然后,编辑器中会显示默认的 UDA 模板。

  5. 填入“TWA”作为 UDA 别名,并按如下所示更改函数实现:

    // Sample UDA which calculate Time-Weighted Average of incoming values.
    function main() {
        this.init = function () {
            this.totalValue = 0.0;
            this.totalWeight = 0.0;
        }
    
        this.accumulate = function (value, timestamp) {
            this.totalValue += value.level * value.weight;
            this.totalWeight += value.weight;
    
        }
    
        // Uncomment below for AccumulateDeaccumulate implementation
        /*
        this.deaccumulate = function (value, timestamp) {
            this.totalValue -= value.level * value.weight;
            this.totalWeight -= value.weight;
        }
    
        this.deaccumulateState = function (otherState){
            this.state -= otherState.state;
            this.totalValue -= otherState.totalValue;
            this.totalWeight -= otherState.totalWeight;
        }
        */
    
        this.computeResult = function () {
            if(this.totalValue == 0) {
                result = 0;
            }
            else {
                result = this.totalValue/this.totalWeight;
            }
            return result;
        }
    }
    
  6. 选择“保存”按钮后,该 UDA 会显示在函数列表中。

  7. 选择新函数“TWA”,可以检查函数定义。

在 ASA 查询中调用 JavaScript UDA

在 Azure 门户中打开作业,编辑查询,并调用具有必需前缀“uda.”的 TWA() 函数。 例如:

WITH value AS
(
    SELECT
    NoiseLevelDB as level,
    DurationSecond as weight
FROM
    [YourInputAlias] TIMESTAMP BY EntryTime
)
SELECT
    System.Timestamp as ts,
    uda.TWA(value) as NoseDoseTWA
FROM value
GROUP BY TumblingWindow(minute, 5)

使用 UDA 测试查询

创建包含以下内容的本地 JSON 文件,将该文件上传到流分析作业,并测试上述查询。

[
  {"EntryTime": "2017-06-10T05:01:00-07:00", "NoiseLevelDB": 80, "DurationSecond": 22.0},
  {"EntryTime": "2017-06-10T05:02:00-07:00", "NoiseLevelDB": 81, "DurationSecond": 37.8},
  {"EntryTime": "2017-06-10T05:02:00-07:00", "NoiseLevelDB": 85, "DurationSecond": 26.3},
  {"EntryTime": "2017-06-10T05:03:00-07:00", "NoiseLevelDB": 95, "DurationSecond": 13.7},
  {"EntryTime": "2017-06-10T05:03:00-07:00", "NoiseLevelDB": 88, "DurationSecond": 10.3},
  {"EntryTime": "2017-06-10T05:05:00-07:00", "NoiseLevelDB": 103, "DurationSecond": 5.5},
  {"EntryTime": "2017-06-10T05:06:00-07:00", "NoiseLevelDB": 99, "DurationSecond": 23.0},
  {"EntryTime": "2017-06-10T05:07:00-07:00", "NoiseLevelDB": 108, "DurationSecond": 1.76},
  {"EntryTime": "2017-06-10T05:07:00-07:00", "NoiseLevelDB": 79, "DurationSecond": 17.9},
  {"EntryTime": "2017-06-10T05:08:00-07:00", "NoiseLevelDB": 83, "DurationSecond": 27.1},
  {"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 91, "DurationSecond": 17.1},
  {"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 115, "DurationSecond": 7.9},
  {"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 80, "DurationSecond": 28.3},
  {"EntryTime": "2017-06-10T05:10:00-07:00", "NoiseLevelDB": 55, "DurationSecond": 18.2},
  {"EntryTime": "2017-06-10T05:10:00-07:00", "NoiseLevelDB": 93, "DurationSecond": 25.8},
  {"EntryTime": "2017-06-10T05:11:00-07:00", "NoiseLevelDB": 83, "DurationSecond": 11.4},
  {"EntryTime": "2017-06-10T05:12:00-07:00", "NoiseLevelDB": 89, "DurationSecond": 7.9},
  {"EntryTime": "2017-06-10T05:15:00-07:00", "NoiseLevelDB": 112, "DurationSecond": 3.7},
  {"EntryTime": "2017-06-10T05:15:00-07:00", "NoiseLevelDB": 93, "DurationSecond": 9.7},
  {"EntryTime": "2017-06-10T05:18:00-07:00", "NoiseLevelDB": 96, "DurationSecond": 3.7},
  {"EntryTime": "2017-06-10T05:20:00-07:00", "NoiseLevelDB": 108, "DurationSecond": 0.99},
  {"EntryTime": "2017-06-10T05:20:00-07:00", "NoiseLevelDB": 113, "DurationSecond": 25.1},
  {"EntryTime": "2017-06-10T05:22:00-07:00", "NoiseLevelDB": 110, "DurationSecond": 5.3}
]

获取帮助

如需更多帮助,请访问有关 Azure 流分析的 Microsoft 问答页

后续步骤