你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

快速入门:使用 Azure CLI 创建 Azure 流分析作业

在本快速入门中,将使用 Azure CLI 定义一个流分析作业,以便筛选温度读数高于 27 的实时传感器消息。 该流分析作业会从 IoT 中心读取数据,对数据进行转换,然后将输出数据写入到 Blob 存储中的容器。 在本快速入门中使用的输入数据由 Raspberry Pi 联机模拟器生成。

开始之前

如果没有 Azure 订阅,请在开始之前创建一个 Azure 免费帐户

先决条件

  • 创建资源组。 必须将所有 Azure 资源部署到资源组。 使用资源组可以组织和管理相关的 Azure 资源。

    对于本快速入门,请使用以下 az group create 命令在 eastus 位置创建名为 streamanalyticsrg 的资源组 :

    az group create --name streamanalyticsrg --location eastus
    

对输入数据进行准备

在定义流分析作业之前,请准备用于作业输入的数据。 以下 Azure CLI 命名将准备作业所需的输入数据。

  1. 使用 az iot hub create 命令创建 IoT 中心。 此示例创建名为 MyASAIoTHub 的 IoT 中心。 由于 IoT 中心名称必须全局唯一,因此可能需要更改名称(如果已被使用)。 将 SKU 设置为 F1 即可使用免费层,前提是它在订阅中可用。 否则,请选择下一个最低的层。

    iotHubName=MyASAIoTHub
    az iot hub create --name $iotHubName --resource-group streamanalyticsrg --sku S1
    

    创建 IoT 中心以后,请使用 az iot hub connection-string show 命令获取 IoT 中心连接字符串。 复制整个连接字符串并进行保存。 在将 IoT 中心作为输入添加到流分析作业时使用它。

    az iot hub connection-string show --hub-name $iotHubName
    
  2. 使用 az iothub device-identity create 命令将设备添加到 IoT 中心。 此示例创建名为 MyASAIoTDevice 的设备。

    az iot hub device-identity create --hub-name $iotHubName --device-id "MyASAIoTDevice"
    
  3. 使用 az iot hub device-identity connection-string show 命令获取设备连接字符串。 复制整个连接字符串并将其保存。这样,在创建 Raspberry Pi 模拟器时,就可以使用该字符串。

    az iot hub device-identity connection-string show --hub-name $iotHubName --device-id "MyASAIoTDevice" --output table
    

    输出示例:

    HostName=MyASAIoTHub.azure-devices.net;DeviceId=MyASAIoTDevice;SharedAccessKey=a2mnUsg52+NIgYudxYYUNXI67r0JmNubmfVafojG8=
    

创建 Blob 存储帐户

以下 Azure CLI 命令将创建用于作业输出的 Blob 存储帐户

  1. 使用 az storage account create 命令创建常规用途存储帐户。 常规用途的存储帐户可用于以下四种服务:Blob、文件、表和队列。

    storageAccountName="asatutorialstorage$RANDOM"
    az storage account create \
        --name $storageAccountName \
        --resource-group streamanalyticsrg \
        --location eastus \
        --sku Standard_ZRS \
        --encryption-services blob
    
  2. 通过运行 az storage account keys list 命令获取存储帐户的密钥。

    key=$(az storage account keys list -g streamanalyticsrg -n $storageAccountName --query "[0].value" -o tsv)
    echo $key
    

    重要

    记下 Azure 存储帐户的访问密钥。 稍后将在本快速入门中使用此密钥。

  3. 使用 az storage container create 命令创建一个名为 state 的容器来存储 Blob。 使用存储帐户密钥来授权操作创建容器。 有关使用 Azure CLI 授权数据操作的详细信息,请参阅使用 Azure CLI 授权访问 blob 或队列数据

    az storage container create \
        --account-name $storageAccountName \
        --name state \
        --account-key $key \
        --auth-mode key
    

创建流分析作业

使用 az stream-analytics job create 命令创建流分析作业。

az stream-analytics job create \
    --job-name "streamanalyticsjob" \
    --resource-group "streamanalyticsrg" \
    --location "eastus" \
    --output-error-policy "Drop" \
    --out-of-order-policy "Drop" \
    --order-max-delay 5 \
    --arrival-max-delay 16 \
    --data-locale "en-US"

配置作业输入

使用 az stream-analytics input cmdlet 将输入添加到作业。 此 cmdlet 将作业名称、作业输入名称、资源组名称和 JSON 格式的输入属性作为参数。 在此示例中,需将 IoT 中心创建为输入。

重要

  • IOT HUB ACCESS KEY 替换为你保存的 IOT 中心连接字符串中的共享访问密钥值。 例如,如果 IOT 中心连接字符串为 HostName=MyASAIoTHub.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey=xxxxxxxxxxxxxx=,则共享访问密钥值为 xxxxxxxxxxxxxx=。 替换值时,请确保不要删除 "(双引号)的 \(转义)字符。
  • 如果使用了 MyASAIoTHub 以外的名称,请在以下命令中更新 iotHubNamespace 的值。 运行 echo $iotHubName 可查看 IoT 中心的名称。
az stream-analytics input create \
    --properties "{\"type\":\"Stream\",\"datasource\":{\"type\":\"Microsoft.Devices/IotHubs\",\"properties\":{\"consumerGroupName\":\"\$Default\",\"endpoint\":\"messages/events\",\"iotHubNamespace\":\"MyASAIoTHub\",\"sharedAccessPolicyKey\":\"IOT HUB ACCESS KEY\",\"sharedAccessPolicyName\":\"iothubowner\"}},\"serialization\":{\"type\":\"Json\",\"encoding\":\"UTF8\"}}" \
    --input-name "asaiotinput" \
    --job-name "streamanalyticsjob" \
    --resource-group "streamanalyticsrg"

配置作业输出

使用 az stream-analytics output create cmdlet 将输出添加到作业。 此 cmdlet 将作业名称、作业输出名称、资源组名称、JSON 格式的数据源和序列化类型作为参数。

重要

STORAGEACCOUNTNAME> 替换为 Azure 存储帐户的名称,并将 STORAGEACCESSKEY> 替换为存储帐户的访问密钥。 如果未记下这些值,请运行以下命令来获取这些值:echo $storageAccountNameecho $key。 替换这些值时,请确保不要删除 "(双引号)的 \(转义)字符。

az stream-analytics output create \
    --job-name streamanalyticsjob \
    --datasource "{\"type\":\"Microsoft.Storage/Blob\",\"properties\":{\"container\":\"state\",\"dateFormat\":\"yyyy/MM/dd\",\"pathPattern\":\"{date}/{time}\",\"storageAccounts\":[{\"accountKey\":\"STORAGEACCESSKEY\",\"accountName\":\"STORAGEACCOUNTNAME\"}],\"timeFormat\":\"HH\"}}" \
    --serialization "{\"type\":\"Json\",\"properties\":{\"format\":\"Array\",\"encoding\":\"UTF8\"}}" \
    --output-name asabloboutput \
    --resource-group streamanalyticsrg

定义转换查询

使用 az stream-analytics transformation create cmdlet 将转换添加到作业。

az stream-analytics transformation create \
    --resource-group streamanalyticsrg \
    --job-name streamanalyticsjob \
    --name Transformation \
    --streaming-units "6" \
    --saql "SELECT * INTO asabloboutput FROM asaiotinput WHERE Temperature > 27"

运行 IoT 模拟器

  1. 打开 Raspberry Pi Azure IoT 联机模拟器

  2. 将第 15 行中的占位符替换为在快速入门开头保存的整个 Azure IoT 中心设备连接字符串(而不是 IoT 中心连接字符串)。

  3. 选择“运行”。 输出会显示传感器数据和发送到 IoT 中心的消息。

    Raspberry Pi Azure IoT Online Simulator

启动流分析作业并检查输出

使用 az stream-analytics job start cmdlet 启动作业。 此 cmdlet 使用作业名称、资源组名称、输出启动模式和启动时间作为参数。 OutputStartMode 接受的值为 JobStartTimeCustomTimeLastOutputEventTime

以下 cmdlet 在运行以后会返回 True 作为输出(如果作业启动)。

az stream-analytics job start \
    --resource-group streamanalyticsrg \
    --name streamanalyticsjob \
    --output-start-mode JobStartTime

等待几分钟,然后验证是否已在 state Blob 容器中创建输出文件。

Screenshot showing the output file in the State blob container.

下载并打开该文件,可以看到类似于以下内容的几个条目:

{
    "messageId": 229,
    "deviceId": "Raspberry Pi Web Client",
    "temperature": 31.85214010589595,
    "humidity": 60.278830289656284,
    "EventProcessedUtcTime": "2023-02-28T22:06:33.5567789Z",
    "PartitionId": 3,
    "EventEnqueuedUtcTime": "2023-02-28T22:05:49.6520000Z",
    "IoTHub": {
        "MessageId": null,
        "CorrelationId": null,
        "ConnectionDeviceId": "MyASAIoTDevice",
        "ConnectionDeviceGenerationId": "638132150746523845",
        "EnqueuedTime": "2023-02-28T22:05:49.6520000Z",
        "StreamId": null
    }
}

清理资源

删除资源组,这将删除资源组中的所有资源,包括流分析作业、IoT 中心和 Azure 存储帐户。

az group delete \
    --name streamanalyticsrg \
    --no-wait

后续步骤

在本快速入门中,你使用 Azure CLI 部署了一个简单的流分析作业。 也可通过 Azure 门户Visual Studio 部署流分析作业。

若要了解如何配置其他输入源并执行实时检测,请继续阅读以下文章: