A data flow az az elérési út, amelyet az adatok a forrástól a célig visznek opcionális átalakításokkal. A data flow konfigurálásához hozzon létre egy Data flow egyéni erőforrást, vagy használja a műveleti felület webes felhasználói felületét. A data flow három részből áll: a source, a transformation és a destination.
flowchart LR
subgraph Source
A[DataflowEndpoint]
end
subgraph BuiltInTransformation
direction LR
Datasets - -> Filter
Filter - -> Map
end
subgraph Destination
B[DataflowEndpoint]
end
Source - -> BuiltInTransformation
BuiltInTransformation - -> Destination
A forrás és a cél meghatározásához konfigurálnia kell a data flow végpontokat. Az átalakítás nem kötelező, és olyan műveleteket is tartalmazhat, mint az adatok bővítése, az adatok szűrése és az adatok egy másik mezőre való leképezése.
Fontos
Minden adatfolyamnak rendelkeznie kell az Azure IoT műveletek helyi MQTT bróker alapértelmezett végpontjával, akár forrásként, akár célként.
A Azure IoT műveletek műveleti felületével létrehozhat egy data flow. A műveleti felület vizuális felületet biztosít a data flow konfigurálásához. A Bicep használatával is létrehozhat egy data flow Bicep-fájllal, vagy a Kubernetes használatával létrehozhat egy data flow YAML-fájllal.
További információ a forrás, az átalakítás és a cél konfigurálásáról.
Előfeltételek
Amint rendelkezik Azure IoT műveletek egy példányával, azonnal üzembe helyezheti az adatfolyamokat az alapértelmezett data flow profil és végpont használatával. Előfordulhat azonban, hogy data flow profilokat és végpontokat szeretne konfigurálni a data flow testreszabásához.
Adatfolyam profil
Ha nincs szüksége különböző méretezési beállításokra az adatfolyamokhoz, használja a Azure IoT műveletek által biztosított default data flow profilt. Ne társítson túl sok adatfolyamot egyetlen data flow profillal. Ha nagy mennyiségű adatfolyamot használ, ossza el őket több data flow profil között, hogy csökkentse a data flow profilkonfiguráció 70-es korlátjának túllépésének kockázatát.
Az új data flow-profilok konfigurálásáról a Konfigurálás data flow profilok című témakörben olvashat.
Adatfolyam végpontok
A data flow forrásának és céljának konfigurálásához data flow végpontokra van szükség. A gyors kezdéshez használja a default data flow végpontot a helyi MQTT-közvetítőhöz. Más típusú data flow végpontokat is létrehozhat, például Kafka, Event Hubs, OpenTelemetria vagy Azure Data Lake Storage. További információért lásd: adatfolyam-végpontok konfigurálása.
Első lépések
Ha rendelkezik az előfeltételekkel, elkezdhet létrehozni egy data flow.
Adatfolyam létrehozásához a műveleti felületen, válassza az Adatfolyam>létrehozása lehetőséget.
A data flow tulajdonságok beállításához válassza ki a helyőrző nevét new-data-flow. Adja meg a data flow nevét, és válassza ki a használni kívánt data flow profilt. Alapértelmezés szerint az alapértelmezett data flow profil van kiválasztva. A data flow profilokról további információt a Konfigurálás data flow profil című témakörben talál.
A műveleti élmény felület képernyőképe, ahol a felhasználó elnevezi az adatfolyamot, és kiválaszt ehhez egy profilt.
Fontos
Csak data flow létrehozásakor választhatja ki a data flow profilt. A data flow létrehozása után nem módosíthatja a data flow profilt.
Ha módosítani szeretné egy meglévő adatfolyam adatfolyamprofilját, törölje az eredeti adatfolyamot, és hozzon létre egy újat az új adatfolyamprofillal.
A data flow forrás-, átalakítás- és célvégpontjának konfigurálása a data flow diagram elemeinek kiválasztásával.
Az iot ops dataflow apply paranccsal hozhat létre vagy módosíthat egy adatfolyamot.
az iot ops dataflow apply --resource-group <ResourceGroupName> --instance <AioInstanceName> --profile <DataflowProfileName> --name <DataflowName> --config-file <ConfigFilePathAndName>
A --config-file paraméter az erőforrás-tulajdonságokat tartalmazó JSON-konfigurációs fájl elérési útja és fájlneve.
Ebben a példában tegyük fel, hogy a következő tartalommal elnevezett data-flow.json konfigurációs fájl a felhasználó kezdőlapján van tárolva:
{
"mode": "Enabled",
"operations": [
{
"operationType": "Source",
"sourceSettings": {
// See source configuration section
}
},
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
// See transformation configuration section
}
},
{
"operationType": "Destination",
"destinationSettings": {
// See destination configuration section
}
}
]
}
Íme egy példaparancs egy data flow létrehozásához vagy frissítéséhez az alapértelmezett adatfolyamprofil használatával:
az iot ops dataflow apply --resource-group myResourceGroup --instance myAioInstance --profile default --name data-flow --config-file ~/data-flow.json
Hozzon létre egy Bicep .bicep fájlt a data flow létrehozásához. Ez a példa a forrás-, átalakítás- és célkonfigurációkat tartalmazó data flow struktúráját mutatja be.
param aioInstanceName string = '<AIO_INSTANCE_NAME>'
param customLocationName string = '<CUSTOM_LOCATION_NAME>'
param dataflowName string = '<DATAFLOW_NAME>'
resource aioInstance 'Microsoft.IoTOperations/instances@2024-11-01' existing = {
name: aioInstanceName
}
resource customLocation 'Microsoft.ExtendedLocation/customLocations@2021-08-31-preview' existing = {
name: customLocationName
}
resource defaultDataflowEndpoint 'Microsoft.IoTOperations/instances/dataflowEndpoints@2024-11-01' existing = {
parent: aioInstance
name: 'default'
}
// Pointer to the default data flow profile
resource defaultDataflowProfile 'Microsoft.IoTOperations/instances/dataflowProfiles@2024-11-01' existing = {
parent: aioInstance
name: 'default'
}
resource dataflow 'Microsoft.IoTOperations/instances/dataflowProfiles/dataflows@2024-11-01' = {
// Reference to the parent data flow profile, the default profile in this case
// Same usage as profileRef in Kubernetes YAML
parent: defaultDataflowProfile
name: dataflowName
extendedLocation: {
name: customLocation.id
type: 'CustomLocation'
}
properties: {
mode: 'Enabled'
operations: [
{
operationType: 'Source'
sourceSettings: {
// See source configuration section
}
}
// Transformation optional
{
operationType: 'BuiltInTransformation'
builtInTransformationSettings: {
// See transformation configuration section
}
}
{
operationType: 'Destination'
destinationSettings: {
// See destination configuration section
}
}
]
}
}
Fontos
A Kubernetes üzembehelyezési jegyzékfájljainak használata éles környezetekben nem támogatott, és csak hibakereséshez és teszteléshez használható.
Hozzon létre egy Kubernetes-jegyzékfájlt .yaml az adatforma létrehozásához. Ez a példa a forrás-, átalakítás- és célkonfigurációkat tartalmazó data flow struktúráját mutatja be.
apiVersion: connectivity.iotoperations.azure.com/v1
kind: Dataflow
metadata:
name: <DATAFLOW_NAME>
namespace: azure-iot-operations
spec:
# Reference to the default data flow profile
# This field is required when configuring via Kubernetes YAML
# The syntax is different when using Bicep
profileRef: default
mode: Enabled
operations:
- operationType: Source
sourceSettings:
# See source configuration section
# Transformation optional
- operationType: BuiltInTransformation
builtInTransformationSettings:
# See transformation configuration section
- operationType: Destination
destinationSettings:
# See destination configuration section
Az alábbi szakaszokból megtudhatja, hogyan konfigurálhatja a data flow művelettípusait.
Forrás
Konfigurálja a forrásvégpontot és az adatforrásokat (témaköröket) az adatfolyamhoz. Forrásként használhatja az alapértelmezett MQTT-közvetítőt, objektumot vagy egyéni MQTT- vagy Kafka-végpontot.
A teljes konfigurációs részletekért, beleértve az MQTT-témakör helyettesítő karaktereit, a megosztott előfizetéseket, a Kafka-témaköröket és a forrássémát, tekintse meg az adatfolyam-forrás konfigurálása című témakört.
Ha nem az alapértelmezett végpontot használja forrásként, akkor azt kell használnia célként. A helyi MQTT-közvetítővégpont használatáról további információt a helyi MQTT-közvetítő végpontjának használatával kapcsolatos további információkért lásd: Az adatfolyamoknak helyi MQTT-közvetítővégpontot kell használniuk.
Lemezmegőrzés kérése
A lemezmegőrzés az újraindítások során megőrzi az adatfolyam-feldolgozási állapotot. A konfiguráció részleteiért lásd: Lemezmegőrzés konfigurálása.
Az átalakítási művelettel átalakíthatja az adatokat a forrásból, mielőtt elküldené azokat a célhelyre. Az átalakítások nem kötelezőek. Ha nem kell módosítania az adatokat, ne vegye bele az átalakítási műveletet a data flow konfigurációba. Az átalakítások több lépésben láncolódnak össze, függetlenül attól, hogy a konfigurációban milyen sorrendben adja meg őket. A szakaszok sorrendje mindig a következő:
-
Bővítés: Adjon hozzá további adatokat a forrásadatokhoz egy adatkészlet és feltétel alapján.
-
Szűrés: Az adatok szűrése feltétel alapján.
-
Új tulajdonság leképezése, számítása, átnevezése vagy hozzáadása : Adatok áthelyezése egyik mezőből a másikba opcionális átalakítással.
Ez a szakasz a data flow átalakítások bemutatása. További információ: Adatok leképezése adatfolyamok használatával és Adatok gazdagítása adatfolyamok használatával.
A műveleti felületen válassza a Data flow>Átalakítás hozzáadása (nem kötelező) lehetőséget.
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"datasets": [
// See section on enriching data
],
"filter": [
// See section on filtering data
],
"map": [
// See section on mapping data
]
}
}
builtInTransformationSettings: {
datasets: [
// See section on enriching data
]
filter: [
// See section on filtering data
]
map: [
// See section on mapping data
]
}
Fontos
A Kubernetes üzembehelyezési jegyzékfájljainak használata éles környezetekben nem támogatott, és csak hibakereséshez és teszteléshez használható.
builtInTransformationSettings:
datasets:
# See section on enriching data
filter:
# See section on filtering data
map:
# See section on mapping data
Bővítés: Referenciaadatok hozzáadása
Az adatok bővítéséhez először vegyen fel egy referenciaadatkészletet a Azure IoT műveletek state tárolóba. Az adatkészlet egy feltétel alapján további adatokat ad hozzá a forrásadatokhoz. A feltétel olyan mezőként van megadva a forrásadatokban, amelyek megegyeznek az adathalmaz egyik mezőjével.
A mintaadatokat a state tároló parancssori felületével töltheti be az állapottárolóba. Az állapottároló kulcsnevei egy adatkészletnek felelnek meg a data flow konfigurációban.
A Enrich szakasz jelenleg nem támogatott az operációs felületen.
Az adatok bővítéséhez használja a builtInTransformationSettings tulajdonságot a data flow konfigurációban.
datasets A tulajdonság használatával adja meg a gazdagításhoz szükséges adatkészleteket.
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"datasets": [
{
"key": "<DATASET_KEY>",
"inputs": [
"$source.<SOURCE_FIELD>" // ---------------- $1
"$context(<DATASET_KEY>).<DATASET_FIELD>" // - $2
],
"expression": "$1 == $2"
}
]
}
}
Ez a példa bemutatja, hogyan használhatja a deviceId forrásadatok mezőit az adathalmaz mezőinek asset megfelelőre:
builtInTransformationSettings: {
datasets: [
{
key: 'assetDataset'
inputs: [
'$source.deviceId' // ---------------- $1
'$context(assetDataset).asset' // ---- $2
]
expression: '$1 == $2'
}
]
}
Fontos
A Kubernetes üzembehelyezési jegyzékfájljainak használata éles környezetekben nem támogatott, és csak hibakereséshez és teszteléshez használható.
Például használhatja a deviceId mezőt a forrásadatokban, hogy összeillessze az adathalmaz asset mezőjével.
builtInTransformationSettings:
datasets:
- key: assetDataset
inputs:
- $source.deviceId # ------------- $1
- $context(assetDataset).asset # - $2
expression: $1 == $2
Ha az adatkészlet rendelkezik egy asset mezővel rendelkező rekorddal, a következőhöz hasonló módon:
{
"asset": "thermostat1",
"location": "room1",
"manufacturer": "Contoso"
}
A forrásból származó adatok, amelyek deviceId mezőnél egyeznek thermostat1-vel, elérhetők a location és manufacturer mezők a szűrési és leképezési szakaszokban.
További információ a feltételszintaxisról: Adatok bővítése adatfolyamok használatával
Szűrés: Adatok szűrése feltétel alapján
A szűrő lépés használatával elvetheti azokat az üzeneteket, amelyek nem felelnek meg egy feltételnek. Több szűrőszabályt is meghatározhat beviteli mezőkkel és logikai kifejezésekkel.
A teljes konfigurációs részletekért és példákért tekintse meg az adatfolyam adatainak szűrése című témakört.
Térkép: Adatok áthelyezése egyik mezőről a másikra
Ha az adatokat egy másik mezőre szeretné leképezni opcionális átalakítással, használja a map műveletet. Adja meg az átalakítást olyan képletként, amely a forrásadatok mezőit használja.
A műveleti felületen jelenleg a Számítás, Átnevezés, és Új tulajdonság átalakításokkal térképezheti fel az adatokat.
Compute
A Számítási átalakítással képletet alkalmazhat a forrásadatokra. Ez a művelet egy képletet alkalmaz a forrásadatokra, és az eredményt egy mezőben tárolja.
Az Átalakítás (nem kötelező) területen válassza a Számítás> lehetőséget.
Adja meg a szükséges beállításokat.
| Beállítás |
Leírás |
| Képlet kiválasztása |
Válasszon ki egy meglévő képletet a legördülő listából, vagy válassza az Egyéni lehetőséget a képlet manuális megadásához. |
| Kimenet |
Adja meg az eredmény kimeneti megjelenítendő nevét. |
| Képlet |
Adja meg a forrásadatokra alkalmazni kívánt képletet. |
| Leírás |
Adja meg az átalakítás leírását. |
| Utolsó ismert érték |
Ha az aktuális érték nem érhető el, használja az utolsó ismert értéket. |
Adjon meg vagy szerkesszen egy képletet a Képlet mezőben. A képlet használhatja a forrásadatok mezőit. Az adatpontok legördülő listából való kiválasztásához írja be @ vagy válassza a Ctrl + Szóköz billentyűkombinációt. Beépített képletek esetén válassza ki a <dataflow> helyőrzőt az elérhető adatpontok listájának megtekintéséhez.
Adja meg az MQTT metaadat-tulajdonságait a @$metadata.user_properties.<property> vagy @$metadata.topic formátumban. Adja meg $metadata fejléceket a formátum @$metadata.<header>használatával. A $metadata szintaxis csak az üzenetfejléc részét képező MQTT-tulajdonságokhoz szükséges. További információ: mezőhivatkozások.
A képlet használhatja a forrásadatok mezőit. Használhatja például a temperature forrásadatokban szereplő mezőt a hőmérséklet Celsius-fokra való konvertálásához és a temperatureCelsius kimeneti mezőben való tárolásához.
Válassza az Alkalmazás lehetőséget.
Átnevezés
Az Átnevezés átalakítással átnevezhet egy adatpontot. Ez a művelet átnevezi a forrásadatok egyik adatpontját egy új névre. Használja az új nevet az adatfolyam későbbi szakaszaiban.
Az Átalakítás (nem kötelező) területen válassza az Átnevezés>Hozzáadás lehetőséget.
Adja meg a szükséges beállításokat.
| Beállítás |
Leírás |
| Adatpont |
Jelöljön ki egy adatpontot a legördülő listából, vagy írjon be egy $metadata fejlécet. |
| Új datapoint-név |
Adja meg a datapoint új nevét. |
| Leírás |
Adja meg az átalakítás leírását. |
Adja meg az MQTT metaadat-tulajdonságait a @$metadata.user_properties.<property> vagy @$metadata.topic formátumban. Adja meg $metadata fejléceket a formátum @$metadata.<header>használatával. A $metadata szintaxis csak az üzenetfejléc részét képező MQTT-tulajdonságokhoz szükséges. További információ: mezőhivatkozások.
Válassza az Alkalmazás lehetőséget.
Új tulajdonság
Az Új tulajdonság átalakításával új tulajdonságot adhat hozzá a forrásadatokhoz. Ez a művelet új tulajdonságot ad hozzá a forrásadatokhoz. Használja az új tulajdonságot a data flow későbbi szakaszaiban.
Az Átalakítás (nem kötelező) területen válassza az Új tulajdonság>.
Adja meg a szükséges beállításokat.
| Beállítás |
Leírás |
| Tulajdonságkulcs |
Adja meg az új tulajdonság kulcsát. |
| Tulajdonságérték |
Adja meg az új tulajdonság értékét. |
| Leírás |
Adja meg az új tulajdonság leírását. |
Válassza az Alkalmazás lehetőséget.
Használhatja például a temperature forrásadatokban lévő mezőt a hőmérséklet Celsius-fokra való konvertálásához és a temperatureCelsius mezőben való tárolásához. Gazdagítsa a forrásadatokat a location környezetualizációs adathalmaz mezőjével:
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"map": [
{
"inputs": [
"$source.temperature ? $last" // ---------------- $1
],
"output": "temperatureCelsius",
"expression": "($1 - 32) * 5/9"
},
{
"inputs": [
"$context(assetDataset).location" // - $2
],
"output": "location"
}
]
}
}
Az MQTT metaadat-tulajdonságait $metadata.user_properties.<property> vagy $metadata.topic formátum használatával elérheti. $metadata fejléceket a $metadata.<header> formátummal is megadhat. További információ: mezőhivatkozások.
Használhatja például a temperature forrásadatokban lévő mezőt a hőmérséklet Celsius-fokra való konvertálásához és a temperatureCelsius mezőben való tárolásához. Gazdagítsa a forrásadatokat a location környezetualizációs adathalmaz mezőjével:
builtInTransformationSettings: {
map: [
{
inputs: [
'temperature'
]
output: 'temperatureCelsius'
expression: '($1 - 32) * 5/9'
}
{
inputs: [
'$context(assetDataset).location'
]
output: 'location'
}
]
}
Fontos
A Kubernetes üzembehelyezési jegyzékfájljainak használata éles környezetekben nem támogatott, és csak hibakereséshez és teszteléshez használható.
Az MQTT metaadat-tulajdonságait $metadata.user_properties.<property> vagy $metadata.topic formátum használatával elérheti. $metadata fejléceket a $metadata.<header> formátummal is megadhat. További információ: mezőhivatkozások.
Használhatja például a temperature forrásadatokban lévő mezőt a hőmérséklet Celsius-fokra való konvertálásához és a temperatureCelsius mezőben való tárolásához. Gazdagítsa a forrásadatokat a location környezetualizációs adathalmaz mezőjével:
builtInTransformationSettings:
map:
- inputs:
- temperature # - $1
expression: "($1 - 32) * 5/9"
output: temperatureCelsius
- inputs:
- $context(assetDataset).location
output: location
További információ: Adatok leképezése adatfolyamokkal.
Eltávolítás
A kimeneti séma alapértelmezés szerint az összes adatpontot tartalmazza. Távolítsa el az összes adatpontot a célhelyről az Átalakítás eltávolítása paranccsel.
Az Átalakítás (nem kötelező) területen válassza az Eltávolítás lehetőséget.
Válassza ki a kimeneti sémából eltávolítani kívánt adatpontot.
Válassza az Alkalmazás lehetőséget.
Ha el szeretne távolítani egy adatpontot a kimeneti sémából, használja a builtInTransformationSettings tulajdonságot a data flow konfigurációban.
map A tulajdonság használatával adja meg az eltávolítandó adatpontokat.
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"map": [
{
"inputs": [
"*"
],
"output": "*"
},
{
"inputs": [
"weight"
],
"output": ""
}
{
"inputs": [
"weight.SourceTimestamp"
],
"output": ""
},
{
"inputs": [
"weight.Value"
],
"output": ""
},
{
"inputs": [
"weight.StatusCode"
],
"output": ""
},
{
"inputs": [
"weight.StatusCode.Code"
],
"output": ""
},
{
"inputs": [
"weight.StatusCode.Symbol"
],
"output": ""
}
]
}
}
builtInTransformationSettings: {
map: [
{
inputs: [
'*'
]
output: '*'
}
{
inputs: [
'weight'
]
output: ''
}
{
inputs: [
'weight.SourceTimestamp'
]
output: ''
}
{
inputs: [
'weight.Value'
]
output: ''
}
{
inputs: [
'weight.StatusCode'
]
output: ''
}
{
inputs: [
'weight.StatusCode.Code'
]
output: ''
}
{
inputs: [
'weight.StatusCode.Symbol'
]
output: ''
}
]
}
Fontos
A Kubernetes üzembehelyezési jegyzékfájljainak használata éles környezetekben nem támogatott, és csak hibakereséshez és teszteléshez használható.
builtInTransformationSettings:
map:
- type: PassThrough
inputs:
- "*"
output: "*"
- inputs:
- weight
output: ""
- inputs:
- weight.SourceTimestamp
output: ""
- inputs:
- weight.Value
output: ""
- inputs:
- weight.StatusCode
output: ""
- inputs:
- weight.StatusCode.Code
output: ""
- inputs:
- weight.StatusCode.Symbol
output: ""
További információ: Adatok leképezése adatfolyamokkal.
Adatok szerializálása séma szerint
Ha a célhelyre való küldés előtt szerializálni szeretné az adatokat, adjon meg egy sémát és szerializációs formátumot. Részletekért lásd : A kimenet szerializálása sémával.
Cél
Konfigurálja a célvégpontot és az adatcélt (témakör, tároló vagy tábla) az adatfolyamhoz. Célként bármilyen támogatott végponttípust használhat, beleértve az MQTT-t, a Kafkát, az Azure Data Lake Storage-t, a Microsoft Fabricet, az Azure Data Explorert és a helyi tárolót.
A teljes konfigurációs részletekért, beleértve az adat céltábláját, a dinamikus cél témaköröket és a kimeneti szerializálást, tekintse meg az adatfolyam-célhely konfigurálása című témakört.
Ha a helyi MQTT-közvetítőn kívüli célhelyre szeretne adatokat küldeni, hozzon létre egy data flow végpontot. A „Hogyan?” kérdésre: Adatfolyam végpontok konfigurálása.
példa
Az alábbi példa egy data flow konfiguráció, amely az MQTT-végpontot használja a forráshoz és a célhoz. A forrás szűri az adatokat az MQTT-témakörből azure-iot-operations/data/thermostat. Az átalakítás fahrenheitre alakítja át a hőmérsékletet, és szűri azokat az adatokat, ahol a hőmérséklet és a páratartalom megszorozva kisebb, mint 100000. A cél elküldi az adatokat az MQTT-témakörnek factory.
Az iot ops dataflow apply paranccsal hozhat létre vagy módosíthat egy adatfolyamot.
az iot ops dataflow apply --resource-group <ResourceGroupName> --instance <AioInstanceName> --profile <DataflowProfileName> --name <DataflowName> --config-file <ConfigFilePathAndName>
A --config-file paraméter az erőforrás-tulajdonságokat tartalmazó JSON-konfigurációs fájl elérési útja és fájlneve.
Ebben a példában tegyük fel, hogy a következő tartalommal elnevezett data-flow.json konfigurációs fájl a felhasználó kezdőlapján van tárolva:
{
"mode": "Enabled",
"operations": [
{
"operationType": "Source",
"sourceSettings": {
"dataSources": [
"thermostats/+/sensor/temperature/#",
"humidifiers/+/sensor/humidity/#"
],
"endpointRef": "default",
"serializationFormat": "Json"
}
},
{
"builtInTransformationSettings": {
"datasets": [],
"filter": [
{
"expression": "$1 * $2 < 100000",
"inputs": [
"temperature.Value",
"\"Tag 10\".Value"
],
"type": "Filter"
}
],
"map": [
{
"inputs": [
"*"
],
"output": "*",
"type": "PassThrough"
},
{
"expression": "cToF($1)",
"inputs": [
"temperature.Value"
],
"output": "TemperatureF",
"type": "Compute"
},
{
"inputs": [
"@\"Tag 10\".Value"
],
"output": "Humidity",
"type": "Rename"
}
],
"serializationFormat": "Json"
},
"operationType": "BuiltInTransformation"
},
{
"destinationSettings": {
"dataDestination": "factory",
"endpointRef": "default"
},
"operationType": "Destination"
}
]
}
Íme egy példaparancs egy data flow létrehozásához vagy frissítéséhez az alapértelmezett adatfolyamprofil használatával:
az iot ops dataflow apply --resource-group myResourceGroup --instance myAioInstance --profile default --name data-flow --config-file ~/data-flow.json
Íme egy másik példa, amely dinamikus témakörfordítással irányítja át az üzeneteket a különböző termosztátokból az eszközspecifikus témakörökbe:
{
"mode": "Enabled",
"operations": [
{
"operationType": "Source",
"sourceSettings": {
"dataSources": [
"thermostats/+/sensor/temperature"
],
"endpointRef": "default",
"serializationFormat": "Json"
}
},
{
"destinationSettings": {
"dataDestination": "processed/device/${inputTopic.2}/temperature",
"endpointRef": "default"
},
"operationType": "Destination"
}
]
}
Ez a konfiguráció feldolgozza az üzeneteket thermostats/device1/sensor/temperature, és elküldi őket processed/device/device1/temperature.
param aioInstanceName string = '<AIO_INSTANCE_NAME>'
param customLocationName string = '<CUSTOM_LOCATION_NAME>'
param dataflowName string = '<DATAFLOW_NAME>'
resource aioInstance 'Microsoft.IoTOperations/instances@2024-11-01' existing = {
name: aioInstanceName
}
resource customLocation 'Microsoft.ExtendedLocation/customLocations@2021-08-31-preview' existing = {
name: customLocationName
}
// Pointer to the default data flow endpoint
resource defaultDataflowEndpoint 'Microsoft.IoTOperations/instances/dataflowEndpoints@2024-11-01' existing = {
parent: aioInstance
name: 'default'
}
// Pointer to the default data flow profile
resource defaultDataflowProfile 'Microsoft.IoTOperations/instances/dataflowProfiles@2024-11-01' existing = {
parent: aioInstance
name: 'default'
}
resource dataflow 'Microsoft.IoTOperations/instances/dataflowProfiles/dataflows@2024-11-01' = {
// Reference to the parent data flow profile, the default profile in this case
// Same usage as profileRef in Kubernetes YAML
parent: defaultDataflowProfile
name: dataflowName
extendedLocation: {
name: customLocation.id
type: 'CustomLocation'
}
properties: {
mode: 'Enabled'
operations: [
{
operationType: 'Source'
sourceSettings: {
// Use the default MQTT endpoint as the source
endpointRef: defaultDataflowEndpoint.name
// Filter the data from the MQTT topic azure-iot-operations/data/thermostat
dataSources: [
'azure-iot-operations/data/thermostat'
]
}
}
// Transformation optional
{
operationType: 'BuiltInTransformation'
builtInTransformationSettings: {
// Filter the data where temperature * "Tag 10" < 100000
filter: [
{
inputs: [
'temperature.Value'
'"Tag 10".Value'
]
expression: '$1 * $2 < 100000'
}
]
map: [
// Passthrough all values by default
{
inputs: [
'*'
]
output: '*'
}
// Convert temperature to Fahrenheit and output it to TemperatureF
{
inputs: [
'temperature.Value'
]
output: 'TemperatureF'
expression: 'cToF($1)'
}
// Extract the "Tag 10" value and output it to Humidity
{
inputs: [
'"Tag 10".Value'
]
output: 'Humidity'
}
]
}
}
{
operationType: 'Destination'
destinationSettings: {
// Use the default MQTT endpoint as the destination
endpointRef: defaultDataflowEndpoint.name
// Send the data to the MQTT topic factory
dataDestination: 'factory'
}
}
]
}
}
Fontos
A Kubernetes üzembehelyezési jegyzékfájljainak használata éles környezetekben nem támogatott, és csak hibakereséshez és teszteléshez használható.
apiVersion: connectivity.iotoperations.azure.com/v1
kind: Dataflow
metadata:
name: my-dataflow
namespace: azure-iot-operations
spec:
# Reference to the default data flow profile
profileRef: default
mode: Enabled
operations:
- operationType: Source
sourceSettings:
# Use the default MQTT endpoint as the source
endpointRef: default
# Filter the data from the MQTT topic azure-iot-operations/data/thermostat
dataSources:
- azure-iot-operations/data/thermostat
# Transformation optional
- operationType: builtInTransformation
builtInTransformationSettings:
# Filter the data where temperature * "Tag 10" < 100000
filter:
- inputs:
- 'temperature.Value'
- '"Tag 10".Value'
expression: '$1 * $2 < 100000'
map:
# Passthrough all values by default
- inputs:
- '*'
output: '*'
# Convert temperature to Fahrenheit and output it to TemperatureF
- inputs:
- temperature.Value
output: TemperatureF
expression: cToF($1)
# Extract the "Tag 10" value and output it to Humidity
- inputs:
- '"Tag 10".Value'
output: 'Humidity'
- operationType: Destination
destinationSettings:
# Use the default MQTT endpoint as the destination
endpointRef: default
# Send the data to the MQTT topic factory
dataDestination: factory
További példákat láthat a következő helyeken: Azure REST API – Adatfolyam és a Quickstart Bicep.
Annak ellenőrzése, hogy egy data flow működik-e
A data flow működésének ellenőrzéséhez kövesse a Bemutató: Kétirányú MQTT-híd az Azure Event Gridhez.
Data flow konfiguráció exportálása
A data flow konfiguráció exportálásához használja a műveleti felületet, vagy exportálja az data flow egyéni erőforrást.
Jelölje ki az exportálni kívánt data flow, majd válassza a Export lehetőséget az eszköztárról.
Az az iot ops dataflow show paranccsal exportálhat adatfolyamot.
az iot ops dataflow show --resource-group <ResourceGroupName> --instance <AioInstanceName> --name <DataflowName> --profile <DataflowProfileName> --output json > my-dataflow.json
Íme egy példaparancs egy data-flow nevű data flow exportálására egy data-flow.json nevű JSON-fájlba:
az iot ops dataflow show --resource-group myResourceGroup --instance myAioInstance --profile default --name data-flow --output json > data-flow.json
Fontos
A Kubernetes üzembehelyezési jegyzékfájljainak használata éles környezetekben nem támogatott, és csak hibakereséshez és teszteléshez használható.
kubectl get dataflow my-dataflow -o yaml > my-dataflow.yaml
Megfelelő data flow konfiguráció
Annak érdekében, hogy a data flow a várt módon működjön, ellenőrizze a következő feltételeket:
- Az alapértelmezett MQTT adatfolyam végpontot kötelező forrásként vagy célként használni.
- A adatfolyam-profil létezik, és az adatfolyam-konfigurációban van hivatkozva.
- A forrás vagy MQTT-végpont, Kafka-végpont vagy objektum. Forrásként nem használhat storage típusú végpontokat.
- Ha az Event Gridet használja forrásként, az adatfolyam-profilpéldányok számát 1 értékre állítja, mivel az Event Grid MQTT-közvetítő nem támogatja a megosztott előfizetéseket.
- Ha az Event Hubsot használja forrásként, a névtér minden eseményközpontja külön Kafka-témakör, és mindegyiket adatforrásként kell megadnia.
- Az átalakítás megfelelő szintaxissal van konfigurálva, beleértve a speciális karakterek megfelelő escapingelését is.
- Ha storage típusú végpontokat használ célként, egy schema van megadva.
- Ha dinamikus céltémaköröket használ az MQTT-végpontokhoz, győződjön meg arról, hogy a témakörváltozók érvényes szegmensekre hivatkoznak.
Következő lépések