你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
将数据从 Azure IoT MQ 预览版发送到 Data Lake Storage
重要
Azure IoT 操作预览版(由 Azure Arc 启用)当前处于预览状态。 不应在生产环境中使用此预览版软件。
有关 beta 版本、预览版或尚未正式发布的版本的 Azure 功能所适用的法律条款,请参阅 Microsoft Azure 预览版的补充使用条款。
可以使用数据湖连接器将数据从 Azure IoT MQ 预览版代理发送到数据湖,例如 Azure Data Lake Storage Gen2 (ADLSv2)、Microsoft Fabric Onelake 和 Azure 数据资源管理器。 该连接器订阅 MQTT 主题,并将消息引入 Data Lake Storage 帐户中的 Delta 表。
先决条件
Azure 中的 Data Lake Storage 帐户,其中包含用于数据的容器和文件夹。 有关创建 Data Lake Storage 的详细信息,请使用以下快速入门选项之一:
- Microsoft Fabric OneLake 快速入门:
- Azure Data Lake Storage Gen2 快速入门:
- 创建一个存储帐户以用于 Azure Data Lake Storage Gen2。
- Azure 数据资源管理器群集:
- 按照“快速入门:创建 Azure 数据资源管理器群集和数据库”中的“完整群集”步骤操作。
IoT MQ MQTT 代理。 有关如何部署 IoT MQ MQTT 代理的详细信息,请参阅快速入门:将 Azure IoT 操作预览版部署到已启用 Arc 的 Kubernetes 群集。
配置为使用托管标识将数据发送到 Microsoft Fabric OneLake
配置数据湖连接器以使用托管标识连接到 Microsoft Fabric OneLake。
确保满足先决条件中的步骤,包括 Microsoft Fabric 工作区和湖屋。 无法使用默认的我的工作区。
确保已安装 IoT MQ Arc 扩展并使用托管标识进行配置。
在 Azure 门户中,转到 Arc 连接的 Kubernetes 群集,然后选择“设置”>“扩展”。 在扩展列表中,查找 IoT MQ 扩展名称。 名称以
mq-
开头,后跟五个随机字符。 例如,mq-4jgjs。获取与 IoT MQ Arc 扩展托管标识关联的应用 ID,并记下 GUID 值。 应用 ID不同于对象或主体 ID。 可以查找托管标识的对象 ID,然后查询与托管标识关联的服务主体的应用 ID,从而使用 Azure CLI。 例如:
OBJECT_ID=$(az k8s-extension show --name <IOT_MQ_EXTENSION_NAME> --cluster-name <ARC_CLUSTER_NAME> --resource-group <RESOURCE_GROUP_NAME> --cluster-type connectedClusters --query identity.principalId -o tsv) az ad sp show --query appId --id $OBJECT_ID --output tsv
应获取 GUID 值的输出:
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
此 GUID 是需要在下一步中使用的应用 ID。
在 Microsoft Fabric 工作区中,使用管理访问,然后选择“+ 添加人员或组”。
按名称“mq”搜索 IoT MQ Arc 扩展,并确保选择在上一步中找到的应用 ID GUID 值。
选择“参与者”作为角色,然后选择“添加”。
创建DataLakeConnector资源,用于定义连接器的配置和终结点设置。 可以使用提供的 YAML 作为示例,但请确保更改以下字段:
target.fabricOneLake.endpoint
:Microsoft Fabric OneLake 帐户的终结点。 可以在“文件”>“属性”下,从 Microsoft Fabric lakehouse 获取终结点 URL。 URL 应如下所示:https://onelake.dfs.fabric.microsoft.com
。target.fabricOneLake.names
:工作区和湖屋的名称。 使用此字段或guids
。 不要同时使用这两者。workspaceName
:工作区的名称。lakehouseName
:湖屋的名称。
apiVersion: mq.iotoperations.azure.com/v1beta1 kind: DataLakeConnector metadata: name: my-datalake-connector namespace: azure-iot-operations spec: protocol: v5 image: repository: mcr.microsoft.com/azureiotoperations/datalake tag: 0.4.0-preview pullPolicy: IfNotPresent instances: 2 logLevel: info databaseFormat: delta target: fabricOneLake: # Example: https://onelake.dfs.fabric.microsoft.com endpoint: <example-endpoint-url> names: workspaceName: <example-workspace-name> lakehouseName: <example-lakehouse-name> ## OR # guids: # workspaceGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx # lakehouseGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx fabricPath: tables authentication: systemAssignedManagedIdentity: audience: https://storage.azure.com/ localBrokerConnection: endpoint: aio-mq-dmqtt-frontend:8883 tls: tlsEnabled: true trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only authentication: kubernetes: {}
创建DataLakeConnectorTopicMap资源,用于定义 MQTT 主题与 Data Lake Storage 中 Delta 表之间的映射。 可以使用提供的 YAML 作为示例,但请确保更改以下字段:
dataLakeConnectorRef
:前面创建的 DataLakeConnector 资源的名称。clientId
:MQTT 客户端的唯一标识符。mqttSourceTopic
:要从中获取数据的 MQTT 主题的名称。table.tableName
:要在湖屋中追加到的表的名称。 如果该表不存在,则会自动创建该表。table.schema
:应与发送到 MQTT 主题的 JSON 消息的格式和字段匹配的 Delta 表的架构。
使用
kubectl apply -f datalake-connector.yaml
将 DataLakeConnector 和 DataLakeConnectorTopicMap 资源应用到 Kubernetes 群集。开始使用 MQTT 发布者将 JSON 消息发送到 MQTT 主题。 数据湖连接器实例订阅主题并将消息引入 Delta 表。
使用浏览器,验证数据是否已导入湖屋。 在 Microsoft Fabric 工作区中,依次选择湖屋、“表”。 应会看到表中的数据。
无法识别的表
如果数据显示在无法识别的表中:
原因可能是表名称中的字符不受支持。 表名称必须是有效的 Azure 存储容器名称,这意味着它可以包含任何英文字母、大写或小写和下划线_
,长度最多为 256 个字符。 不允许使用短划线-
或空格字符。
配置为使用 SAS 令牌将数据发送到 Azure Data Lake Storage Gen2
配置数据湖连接器以使用共享访问签名 (SAS) 令牌连接到 Azure Data Lake Storage Gen2 (ADLS Gen2) 帐户。
获取 Azure Data Lake Storage Gen2 (ADLS Gen2) 帐户的SAS 令牌。 例如,使用 Azure 门户浏览到存储帐户。 在“安全 + 网络”下的菜单中,选择“共享访问签名”。 使用下表设置所需的权限。
参数 值 允许的服务 Blob 允许的资源类型 对象, 容器 允许的权限 读取, 写入, 删除, 列出, 创建 要针对最低特权进行优化,还可以选择获取单个容器的 SAS。 要防止身份验证错误,请确保容器与主题映射配置中的
table.tableName
值匹配。使用 SAS 令牌创建 Kubernetes 机密。 不要包含可能位于令牌开头的问号
?
。kubectl create secret generic my-sas \ --from-literal=accessToken='sv=2022-11-02&ss=b&srt=c&sp=rwdlax&se=2023-07-22T05:47:40Z&st=2023-07-21T21:47:40Z&spr=https&sig=xDkwJUO....' \ -n azure-iot-operations
创建DataLakeConnector资源,用于定义连接器的配置和终结点设置。 可以使用提供的 YAML 作为示例,但请确保更改以下字段:
endpoint
:ADLSv2 存储帐户的 Data Lake Storage 终结点,格式为https://example.blob.core.windows.net
。 在 Azure 门户中,在“存储帐户>”“设置>”“终结点>”“Data Lake Storage”下找到终结点。accessTokenSecretName
:包含 SAS 令牌的 Kubernetes 机密的名称(前面示例中的my-sas
)。
apiVersion: mq.iotoperations.azure.com/v1beta1 kind: DataLakeConnector metadata: name: my-datalake-connector namespace: azure-iot-operations spec: protocol: v5 image: repository: mcr.microsoft.com/azureiotoperations/datalake tag: 0.4.0-preview pullPolicy: IfNotPresent instances: 2 logLevel: "debug" databaseFormat: "delta" target: datalakeStorage: endpoint: "https://example.blob.core.windows.net" authentication: accessTokenSecretName: "my-sas" localBrokerConnection: endpoint: aio-mq-dmqtt-frontend:8883 tls: tlsEnabled: true trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only authentication: kubernetes: {}
创建DataLakeConnectorTopicMap资源,用于定义 MQTT 主题与 Data Lake Storage 中 Delta 表之间的映射。 可以使用提供的 YAML 作为示例,但请确保更改以下字段:
dataLakeConnectorRef
:前面创建的 DataLakeConnector 资源的名称。clientId
:MQTT 客户端的唯一标识符。mqttSourceTopic
:要从中获取数据的 MQTT 主题的名称。table.tableName
:要在 Data Lake Storage 中追加到的容器的名称。 如果 SAS 令牌限定为帐户,会在缺少时自动创建容器。table.schema
:应与发送到 MQTT 主题的 JSON 消息的格式和字段匹配的 Delta 表的架构。
使用
kubectl apply -f datalake-connector.yaml
将DataLakeConnector和DataLakeConnectorTopicMap资源应用到 Kubernetes 群集。开始使用 MQTT 发布者将 JSON 消息发送到 MQTT 主题。 数据湖连接器实例订阅主题并将消息引入 Delta 表。
使用 Azure 门户,验证是否已创建 Delta 表。 这些文件按客户端 ID、连接器实例名称、MQTT 主题和时间进行整理。 在存储帐户>容器中,打开在DataLakeConnectorTopicMap中指定的容器。 验证是否存在_delta_log,以及 parque 文件是否显示 MQTT 流量。 打开 parque 文件,确认有效负载与架构中发送和定义的内容匹配。
使用托管标识向 ADLSv2 进行身份验证
要使用托管标识,请将其指定为 DataLakeConnector authentication
下的唯一方法。 使用az k8s-extension show
查找 IoT MQ Arc 扩展的主体 ID,然后将角色分配给托管标识,该托管标识授予写入存储帐户的权限,例如存储 Blob 数据参与者。 要了解详细信息,请参阅使用 Microsoft Entra ID 授予对 blob 的访问权限。
authentication:
systemAssignedManagedIdentity:
audience: https://my-account.blob.core.windows.net
配置为使用托管标识将数据发送到 Azure 数据资源管理器
配置数据湖连接器以使用托管标识将数据发送到 Azure 数据资源管理器终端
确保满足先决条件中的步骤,包括完整的 Azure 数据资源管理器群集。 “免费群集”选项不起作用。
创建群集后,创建用于存储数据的数据库。
可以通过 Azure 门户为给定数据创建表并手动创建列,也可以使用查询选项卡中的 KQL。例如:
.create table thermostat ( externalAssetId: string, assetName: string, CurrentTemperature: real, Pressure: real, MqttTopic: string, Timestamp: datetime )
启用流式引入
对表和数据库启用流式引入。 在查询选项卡中运行以下命令,用数据库名称替换 <DATABASE_NAME>
:
.alter database <DATABASE_NAME> policy streamingingestion enable
将托管标识添加到 Azure 数据资源管理器群集
若要使连接器向 Azure 数据资源管理器进行身份验证,必须将托管标识添加到 Azure 数据资源管理器群集。
- 在 Azure 门户中,转到 Arc 连接的 Kubernetes 群集,然后选择“设置”>“扩展”。 在扩展列表中,查找 IoT MQ 扩展的名称。 名称以
mq-
开头,后跟五个随机字符。 例如,mq-4jgjs。 IoT MQ 扩展名称与 MQ 托管标识名称相同。 - 在 Azure 数据资源管理器数据库中,选择“权限”>“添加”>“引入器”。 搜索 MQ 托管标识名称并添加它。
有关添加权限的详细信息,请参阅“管理 Azure 数据资源管理器群集权限”。
现在,你已准备好部署连接器并将数据发送到 Azure 数据资源管理器。
部署文件示例
Azure 数据资源管理器连接器的部署文件示例。 以 TODO
开头的注释要求你将占位符设置替换为你的信息。
apiVersion: mq.iotoperations.azure.com/v1beta1
name: my-adx-connector
namespace: azure-iot-operations
spec:
repository: mcr.microsoft.com/azureiotoperations/datalake
tag: 0.4.0-preview
pullPolicy: Always
databaseFormat: adx
target:
# TODO: insert the ADX cluster endpoint
endpoint: https://<CLUSTER>.<REGION>.kusto.windows.net
authentication:
systemAssignedManagedIdentity:
audience: https://api.kusto.windows.net
localBrokerConnection:
endpoint: aio-mq-dmqtt-frontend:8883
tls:
tlsEnabled: true
trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
authentication:
kubernetes: {}
---
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
name: adx-topicmap
namespace: azure-iot-operations
spec:
mapping:
allowedLatencySecs: 1
messagePayloadType: json
maxMessagesPerBatch: 10
clientId: id
mqttSourceTopic: azure-iot-operations/data/thermostat
qos: 1
table:
# TODO: add DB and table name
tablePath: <DATABASE_NAME>
tableName: <TABLE_NAME>
schema:
- name: externalAssetId
format: utf8
optional: false
mapping: $property.externalAssetId
- name: assetName
format: utf8
optional: false
mapping: DataSetWriterName
- name: CurrentTemperature
format: float32
optional: false
mapping: Payload.temperature.Value
- name: Pressure
format: float32
optional: true
mapping: "Payload.Tag 10.Value"
- name: MqttTopic
format: utf8
optional: false
mapping: $topic
- name: Timestamp
format: timestamp
optional: false
mapping: $received_time
此示例接受来自 azure-iot-operations/data/thermostat
主题的数据,其中包含 JSON 格式的消息,如下所示:
{
"SequenceNumber": 4697,
"Timestamp": "2024-04-02T22:36:03.1827681Z",
"DataSetWriterName": "thermostat",
"MessageType": "ua-deltaframe",
"Payload": {
"temperature": {
"SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
"Value": 5506
},
"Tag 10": {
"SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
"Value": 5506
}
}
}
DataLakeConnector
DataLakeConnector是 Kubernetes 自定义资源,用于定义数据湖连接器实例的配置和属性。 数据湖连接器将数据从 MQTT 主题引入 Data Lake Storage 帐户中的 Delta 表。
DataLakeConnector资源的规范字段包含以下子字段:
protocol
:MQTT 版本。 它可以是v5
或v3
之一。image
:映像字段指定数据湖连接器模块的容器映像。 它具有以下子字段:repository
:存储映像的容器注册表和存储库的名称。tag
:要使用的映像的标记。pullPolicy
:映像的拉取策略。 它可以是Always
、IfNotPresent
或Never
之一。
instances
:要运行的数据湖连接器的副本数。logLevel
:数据湖连接器模块的日志级别。 它可以是trace
、debug
、info
、warn
、error
或fatal
之一。databaseFormat
:要引入 Data Lake Storage 的数据的格式。 它可以是delta
或parquet
之一。target
:目标字段指定数据引入的目标。 可以是datalakeStorage
、fabricOneLake
、adx
或localStorage
。datalakeStorage
:指定 ADLSv2 帐户的配置和属性。 它具有以下子字段:endpoint
:Data Lake Storage 帐户终结点的 URL。 不要包含任何尾斜杠/
。authentication
:身份验证字段指定用于访问 Data Lake Storage 帐户的类型和凭据。 该参数可以是下列值之一。accessTokenSecretName
:Kubernetes 机密的名称,用于对 Data Lake Storage 帐户使用共享访问令牌身份验证。 如果类型为accessToken
,需要此字段。systemAssignedManagedIdentity
:使用系统托管标识进行身份验证。 它有一个子字段audience
:以托管标识令牌受众的https://<my-account-name>.blob.core.windows.net
形式表示的字符串,适用于任何存储帐户的帐户级别或https://storage.azure.com
。
fabricOneLake
:指定 Microsoft Fabric OneLake 的配置和属性。 它具有以下子字段:endpoint
:Microsoft Fabric OneLake 终结点的 URL。 它通常是https://onelake.dfs.fabric.microsoft.com
,因为这是 OneLake 全局终结点。 如果正在使用区域终结点,则它采用https://<region>-onelake.dfs.fabric.microsoft.com
形式。 不要包含任何尾斜杠/
。 要了解详细信息,请参阅连接到 Microsoft OneLake。names
:指定工作区和胡屋的名称。 使用此字段或guids
。 不要同时使用这两者。 它具有以下子字段:workspaceName
:工作区的名称。lakehouseName
:湖屋的名称。
guids
:指定工作区和湖屋的 GUID。 使用此字段或names
。 不要同时使用这两者。 它具有以下子字段:workspaceGuid
:工作区的 GUID。lakehouseGuid
:湖屋的 GUID。
fabricPath
:Fabric 工作区中数据的位置。 它可以是tables
或files
。 如果是tables
,则数据作为表存储在 Fabric OneLake 中。 如果是files
,则数据作为文件存储在 Fabric OneLake 中。 如果是files
,databaseFormat
必须是parquet
。authentication
:身份验证字段指定用于访问 Microsoft Fabric OneLake 的类型和凭据。 它目前只能是systemAssignedManagedIdentity
。 它有一个子字段:systemAssignedManagedIdentity
:使用系统托管标识进行身份验证。 它有一个子字段audience
:托管标识令牌受众的字符串,它必须是https://storage.azure.com
。
adx
:指定 Azure 数据资源管理器帐户的配置和属性。 它具有以下子字段:endpoint
:Azure 数据资源管理器群集终结点的 URL,格式为https://<CLUSTER>.<REGION>.kusto.windows.net
。 不要包含任何尾斜杠/
。authentication
:身份验证字段指定用于访问 Azure 数据资源管理器帐户的类型和凭据。 它目前只能是systemAssignedManagedIdentity
。 它有一个子字段:systemAssignedManagedIdentity
:使用系统托管标识进行身份验证。 它有一个子字段audience
:托管标识令牌受众的字符串,它必须是https://api.kusto.windows.net
。
localStorage
:指定本地存储帐户的配置和属性。 它具有以下子字段:volumeName
:装载到每个连接器 Pod 中的卷的名称。
localBrokerConnection
:用于替代与 IoT MQ MQTT 代理的默认连接配置。 请参阅管理本地代理连接。
DataLakeConnectorTopicMap
DataLakeConnectorTopicMap 是 Kubernetes 自定义资源,用于定义 MQTT 主题与 Data Lake Storage 帐户中 Delta 表之间的映射。 DataLakeConnectorTopicMap 资源引用在同一边缘设备上运行的 DataLakeConnector 资源,并将 MQTT 主题中的数据引入到 Delta 表中。
DataLakeConnectorTopicMap 资源的规范字段包含以下子字段:
dataLakeConnectorRef
:本主题映射所属的 DataLakeConnector 资源的名称。mapping
:映射字段指定 MQTT 主题和 Delta 表的详细信息和属性。 它具有以下子字段:allowedLatencySecs
:从 MQTT 主题接收消息并将其引入 Delta 表之间的最大延迟(以秒为单位)。 此字段为必填字段。clientId
:订阅主题的 MQTT 客户端的唯一标识符。maxMessagesPerBatch
:要在一个批中引入 Delta 表中的最大消息数。 由于临时限制,如果qos
设置为 1,则此值必须小于 16。 此字段为必填字段。messagePayloadType
:发送到 MQTT 主题的有效负载的类型。 它可以是json
或avro
之一(尚不支持)。mqttSourceTopic
:要订阅的 MQTT 主题的名称。 支持MQTT 主题通配符表示法。qos
:订阅 MQTT 主题的服务级别质量。 它可以是 0 或 1 之一。table
:表字段指定 Data Lake Storage 帐户中 Delta 表的配置和属性。 它具有以下子字段:tableName
:要创建或追加到 Data Lake Storage 帐户中 Delta 表的名称。 与 Azure Data Lake Storage Gen2 结合使用时,此字段也称为容器名称。 它可以包含任意小写英文字母和下划线_
,长度不超过 256 个字符。 不允许使用短划线-
或空格字符。tablePath
:使用adx
类型连接器时 Azure 数据资源管理器数据库的名称。schema
:Delta 表的架构,该架构应与消息有效负载的格式和字段匹配。 它是对象数组,每个对象都有以下子字段:name
:Delta 表中列的名称。format
:Delta 表中列的数据类型。 它可以是boolean
、int8
、int16
、int32
、int64
、uInt8
、uInt16
、uInt32
、uInt64
、float16
、float32
、float64
、date32
、timestamp
、binary
或utf8
之一。 无符号类型(例如uInt8
)不受完全支持,如果在此处指定,则被视为有符号类型。optional
:布尔值,指示列是可选的还是必需的。 此字段可选,默认为 false。mapping
:用于定义如何从 MQTT 消息有效负载中提取列值的 JSON 路径表达式。 内置映射$client_id
、$topic
、$properties
和$received_time
可用作列以扩充 MQTT 消息正文中的 JSON。 此字段为必填字段。 将 $properties 用于 MQTT 用户属性。 例如,$properties.assetId 表示 MQTT 消息中的 assetId 属性的值。
下面是DataLakeConnectorTopicMap资源的示例:
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
name: datalake-topicmap
namespace: azure-iot-operations
spec:
dataLakeConnectorRef: my-datalake-connector
mapping:
allowedLatencySecs: 1
messagePayloadType: json
maxMessagesPerBatch: 10
clientId: id
mqttSourceTopic: azure-iot-operations/data/thermostat
qos: 1
table:
tableName: thermostat
schema:
- name: externalAssetId
format: utf8
optional: false
mapping: $property.externalAssetId
- name: assetName
format: utf8
optional: false
mapping: DataSetWriterName
- name: CurrentTemperature
format: float32
optional: false
mapping: Payload.temperature.Value
- name: Pressure
format: float32
optional: true
mapping: "Payload.Tag 10.Value"
- name: Timestamp
format: timestamp
optional: false
mapping: $received_time
不支持像 "{\"SequenceNumber\": 4697, \"Timestamp\": \"2024-04-02T22:36:03.1827681Z\", \"DataSetWriterName\": \"thermostat-de\", \"MessageType\": \"ua-deltaframe\", \"Payload\": {\"temperature\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949717Z\", \"Value\": 5506}, \"Tag 10\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949888Z\", \"Value\": 5506}}}"
这样的字符串化 JSON,这会导致连接器引发转换程序找到 null 值错误。
适用于此架构的azure-iot-operations/data/thermostat
主题的示例消息:
{
"SequenceNumber": 4697,
"Timestamp": "2024-04-02T22:36:03.1827681Z",
"DataSetWriterName": "thermostat",
"MessageType": "ua-deltaframe",
"Payload": {
"temperature": {
"SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
"Value": 5506
},
"Tag 10": {
"SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
"Value": 5506
}
}
}
映射到:
externalAssetId | assetName | CurrentTemperature | 压力 | mqttTopic | timestamp |
---|---|---|---|---|---|
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx | thermostat-de | 5506 | 5506 | dlc | 2024-04-02T22:36:03.1827681Z |
重要
如果已更新数据架构,例如已更改数据类型或名称,传入数据的转换可能会停止工作。 发生架构更改时,需要更改数据表名称。
Delta 或 parquet
支持 delta 和 parquet 格式。
管理本地代理连接
与 MQTT 桥一样,数据湖连接器充当 IoT MQ MQTT 代理的客户端。 如果已自定义 IoT MQ MQTT 代理的侦听器端口或身份验证,请同时替代数据湖连接器的本地 MQTT 连接配置。 要了解详细信息,请参阅MQTT 桥本地代理连接。