Partager via


Validation des entrées dans les requêtes Azure Stream Analytics

La validation des entrées est une technique à utiliser pour protéger la logique de requête principale contre des événements malformés ou inattendus. La requête est mise à niveau pour traiter et vérifier explicitement les enregistrements afin qu’ils ne puissent pas arrêter la logique principale.

Pour implémenter la validation d’entrée, nous ajoutons deux étapes initiales à une requête. Nous vérifions d’abord que le schéma envoyé à la logique métier principale correspond à ses attentes. Nous trions ensuite les exceptions et routons éventuellement les enregistrements non valides vers une sortie secondaire.

Une requête avec validation d’entrée est structurée de la façon suivante :

WITH preProcessingStage AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		field1 AS in_field1,
		field2 AS in_field2,
		...

		-- Try casting fields in their expected type
		TRY_CAST(field1 AS bigint) as field1,
		TRY_CAST(field2 AS array) as field2,
		...

	FROM myInput TIMESTAMP BY myTimestamp
),

triagedOK AS (
	SELECT -- Only fields in their new expected type
		field1,
		field2,
		...
	FROM preProcessingStage
	WHERE ( ... ) -- Clauses make sure that the core business logic expectations are satisfied
),

triagedOut AS (
	SELECT -- All fields to ease diagnostic
		*
	FROM preProcessingStage
	WHERE NOT (...) -- Same clauses as triagedOK, opposed with NOT
)

-- Core business logic
SELECT
	...
INTO myOutput
FROM triagedOK
...

-- Audit output. For human review, correction, and manual re-insertion downstream
SELECT
	*
INTO BlobOutput -- To a storage adapter that doesn't require strong typing, here blob/adls
FROM triagedOut

Pour voir un exemple de requête complet configuré avec la validation d’entrée, consultez la section : Exemple de requête avec la validation d’entrée.

Cet article montre comment implémenter cette technique.

Contexte

Les travaux Azure Stream Analytics (ASA) traitent les données provenant de flux. Les flux sont des séquences de données brutes qui sont transmises sérialisées (CSV, JSON, AVRO...). Pour lire à partir d’un flux, une application doit connaître le format de sérialisation spécifique utilisé. Dans ASA, le format de sérialisation des événements doit être défini lors de la configuration d’une entrée de streaming.

Une fois les données désérialisées, il faut appliquer un schéma pour leur donner une signification. Par schéma, nous entendons la liste des champs dans le flux, et leurs types de données respectifs. Avec ASA, le schéma des données entrantes n’a pas besoin d’être défini au niveau de l’entrée. En revanche, ASA prend en charge les schémas d’entrée dynamique de manière native. Il s’attend à ce que la liste des champs (colonnes), et leurs types, changent entre les événements (lignes). L’ASA infère également les types de données quand aucun n’est fourni explicitement, et essaie de convertir implicitement les types si nécessaire.

La gestion dynamique des schémas est une fonctionnalité puissante, essentielle pour le traitement de flux. Les flux de données contiennent souvent des données provenant de plusieurs sources, incluant plusieurs types d’événements ayant chacun un schéma unique. Pour router, filtrer et traiter les événements sur ces flux, ASA doit les ingérer tous, quel que soit leur schéma.

Illustration of a pipeline with two fleet of devices sending data with conflicting schemas

Toutefois, les possibilités qu’offre la gestion dynamique des schémas vont de pair avec un inconvénient potentiel. Des événements inattendus peuvent traverser la logique de requête principale et la dégrader. À titre d’exemple, nous pouvons utiliser ROUND sur un champ de type NVARCHAR(MAX). ASA le convertit implicitement en valeur flottante pour correspondre à la signature de ROUND. Nous attendons ou espérons ici que ce champ contienne toujours des valeurs numériques. Cependant, lorsque nous recevons un événement dont le champ est défini sur "NaN", ou dont le champ est totalement absent, le travail peut échouer.

Avec une validation des entrées, nous ajoutons des étapes préliminaires à notre requête pour traiter de tels événements malformés. Nous utiliserons principalement WITH et TRY_CAST pour l’implémenter.

Scénario : validation d’entrée pour les producteurs d’événements non fiables

Nous allons créer un nouveau travail ASA qui ingérera les données provenant d’un Event Hub unique. Comme c’est le plus souvent le cas, nous ne sommes pas responsables des producteurs de données. Ici, les producteurs sont des appareils IoT vendus par plusieurs fournisseurs de matériel.

En rencontrant les parties prenantes, nous nous convenons d’un format de sérialisation et d’un schéma. Tous les appareils enverront (push) de tels messages à un Event Hub commun, entrée du travail d’ASA.

Le contrat de schéma se définit comme suit :

Nom du champ Type de champ Description du champ
deviceId Integer Identificateur d’appareil unique
readingTimestamp Datetime Heure du message, générée par une passerelle centrale
readingStr String
readingNum Numérique
readingArray Tableau de chaînes

Ce qui nous donne l’exemple de message suivant sous une sérialisation JSON :

{
    "deviceId" : 1,
    "readingTimestamp" : "2021-12-10T10:00:00",
    "readingStr" : "A String",
    "readingNum" : 1.7,
    "readingArray" : ["A","B"]
}

Nous pouvons déjà constater une discordance entre le contrat de schéma et son implémentation. Dans le format JSON, il n’y a pas de type de données pour DateHeure. La valeur sera transmise sous la forme d’une chaîne de caractères (voir readingTimestamp ci-dessus). ASA peut facilement résoudre le problème, mais révèle la nécessité de valider et convertir explicitement les types. D’autant plus pour les données sérialisées en CSV, car toutes les valeurs sont alors transmises sous forme de chaîne.

Il existe une autre discordance. L’ASA utilise son propre système de type qui ne correspond pas au système entrant. Si ASA dispose de types intégrés pour les entiers (bigint), les dates, les chaînes de caractères (nvarchar(max)) et les tableaux, il ne prend en charge que les valeurs numériques flottantes. Cette discordance n’est pas un problème pour la plupart des applications. Mais dans certains cas limites, cela pourrait entraîner de légères dérives de précision. Dans ce cas, nous convertissons la valeur numérique en chaîne de caractères dans un nouveau champ. Ensuite, en aval, nous utilisons un système prenant en charge la décimale fixe pour détecter et corriger des dérives potentielles.

Pour en revenir à notre question, nous avons l’intention d’effectuer les opérations suivantes :

Le schéma de la table SQL de destination est le suivant :

CREATE TABLE [dbo].[readings](
    [Device_Id] int NULL,
    [Reading_Timestamp] datetime2(7) NULL,
    [Reading_String] nvarchar(200) NULL,
    [Reading_Num] decimal(18,2) NULL,
    [Array_Count] int NULL
) ON [PRIMARY]

Une bonne pratique consiste à mapper ce qui arrive à chaque champ à mesure de la progression dans le travail :

Champ Entrée (JSON) Type hérité (ASA) Sortie (Azure SQL) Commentaire
deviceId nombre bigint entier
readingTimestamp string nvarchar(MAX) datetime2
readingStr string nvarchar(MAX) nvarchar(200) utilisé par l’UDF
readingNum nombre float décimal (18,2) à arrondir
readingArray tableau(chaîne) tableau de nvarchar(MAX) entier à compter

Prérequis

Nous allons développer la requête dans Visual Studio Code en utilisant l’extension Outils ASA. Les premières étapes de ce didacticiel vous guideront dans l’installation des composants requis.

Dans VS Code, nous allons utiliser des exécutions locales avec des entrées/sorties locales pour ne pas exposer de coût, et accélérer la boucle de débogage. Nous n’aurons pas besoin de configurer un Event Hub ou une Azure SQL Database.

Requête de base

Commençons par une implémentation de base, sans validation des entrées. Vous en aurez besoin dans la section suivante.

Dans VS Code, nous allons créer un projet ASA

Dans le dossier input, nous allons créer un fichier JSON nommé data_readings.json et y ajouter les enregistrements suivants :

[
    {
        "deviceId" : 1,
        "readingTimestamp" : "2021-12-10T10:00:00",
        "readingStr" : "A String",
        "readingNum" : 1.7145,
        "readingArray" : ["A","B"]
    },
    {
        "deviceId" : 2,
        "readingTimestamp" : "2021-12-10T10:01:00",
        "readingStr" : "Another String",
        "readingNum" : 2.378,
        "readingArray" : ["C"]
    },
    {
        "deviceId" : 3,
        "readingTimestamp" : "2021-12-10T10:01:20",
        "readingStr" : "A Third String",
        "readingNum" : -4.85436,
        "readingArray" : ["D","E","F"]
    },
    {
        "deviceId" : 4,
        "readingTimestamp" : "2021-12-10T10:02:10",
        "readingStr" : "A Forth String",
        "readingNum" : 1.2126,
        "readingArray" : ["G","G"]
    }
]

Ensuite, nous allons définir une entrée locale nommée readings, faisant référence au fichier JSON que nous avons créé ci-dessus.

Une fois configuré, celui-ci devrait ressembler à ceci :

{
    "InputAlias": "readings",
    "Type": "Data Stream",
    "Format": "Json",
    "FilePath": "data_readings.json",
    "ScriptType": "InputMock"
}

Avec Aperçu des données, nous pouvons observer que nos enregistrements sont chargés correctement.

Nous allons créer une UDF JavaScript nommée udfLen en faisant un clic droit sur le dossier Functions et en sélectionnant ASA: Add Function. Le code que nous allons utiliser est le suivant :

// Sample UDF that returns the length of a string for demonstration only: LEN will return the same thing in ASAQL
function main(arg1) {
    return arg1.length;
}

Dans les exécutions locales, nous n’avons pas besoin de définir de sorties. Nous n’avons même pas besoin d’utiliser INTO, s’il y a plus d’une sortie. Dans le fichier .asaql, nous pouvons remplacer la requête existante par :

SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
FROM readings AS r TIMESTAMP BY r.readingTimestamp
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

Examinons rapidement la requête que nous avons soumise :

  • Pour compter le nombre d’enregistrements dans chaque tableau, nous devons d’abord les décompresser. Nous allons utiliser CROSS APPLY et GetArrayElements() (autres exemples ici)
    • Ce faisant, nous obtenons deux ensembles de données dans la requête : l’entrée d’origine et les valeurs de tableau. Pour être sûr de ne pas mélanger les champs, nous définissons des alias (AS r) et les utilisons partout.
    • Ensuite, pour compter (COUNT) réellement les valeurs du tableau, nous devons opérer une agrégation avec GROUP BY.
    • Pour cela, nous devons définir une fenêtre de temps. Ici, puisque nous n’en avons pas besoin pour notre logique, la fenêtre d’instantané est le bon choix.
  • Nous devons également regrouper (GROUP BY) tous les champs, et les projeter dans SELECT. La projection explicite des champs est une bonne pratique, car SELECT * permet que les erreurs circulent de l’entrée à la sortie.
    • Si nous définissons une fenêtre de temps, nous pouvons définir un horodatage avec TIMESTAMP BY. Ici, ce n’est pas nécessaire pour que notre logique fonctionne. Pour les exécutions locales, sans TIMESTAMP BY, tous les enregistrements sont chargés sur un seul horodatage, l’heure de début de l’exécution.
  • Nous utilisons l’UDF pour filtrer les lectures où readingStr compte moins de deux caractères. Nous aurions dû utiliser LEN ici. Nous utilisons une UDF uniquement à des fins de démonstration.

Nous pouvons démarrer une exécution et observer les données en cours de traitement :

deviceId readingTimestamp readingStr readingNum arrayCount
1 2021-12-10T10:00:00 Chaîne 1,71 2
2 2021-12-10T10:01:00 Autre chaîne 2,38 1
3 2021-12-10T10:01:20 Troisième chaîne -4,85 3
1 2021-12-10T10:02:10 Quatrième chaîne 1.21 2

Maintenant que nous savons que notre requête fonctionne, nous allons la tester avec plus de données. Remplaçons le contenu de data_readings.json par les enregistrements suivants :

[
    {
        "deviceId" : 1,
        "readingTimestamp" : "2021-12-10T10:00:00",
        "readingStr" : "A String",
        "readingNum" : 1.7145,
        "readingArray" : ["A","B"]
    },
    {
        "deviceId" : 2,
        "readingTimestamp" : "2021-12-10T10:01:00",
        "readingNum" : 2.378,
        "readingArray" : ["C"]
    },
    {
        "deviceId" : 3,
        "readingTimestamp" : "2021-12-10T10:01:20",
        "readingStr" : "A Third String",
        "readingNum" : "NaN",
        "readingArray" : ["D","E","F"]
    },
    {
        "deviceId" : 4,
        "readingTimestamp" : "2021-12-10T10:02:10",
        "readingStr" : "A Forth String",
        "readingNum" : 1.2126,
        "readingArray" : {}
    }
]

Nous pouvons voir ici les problèmes suivants :

  • L’appareil #1 a fait tout ce qu’il fallait.
  • L’appareil #2 a oublié d’inclure un readingStr
  • L’appareil #3 a envoyé NaN en tant que nombre
  • L’appareil #4 a envoyé un enregistrement vide au lieu d’un tableau.

L’exécution du travail maintenant ne devrait pas bien se terminer. Nous allons obtenir l’un des messages d’erreur suivants :

L’appareil 2 nous donnera :

[Error] 12/22/2021 10:05:59 PM : **System Exception** Function 'udflen' resulted in an error: 'TypeError: Unable to get property 'length' of undefined or null reference' Stack: TypeError: Unable to get property 'length' of undefined or null reference at main (Unknown script code:3:5)
[Error] 12/22/2021 10:05:59 PM :    at Microsoft.EventProcessing.HostedRuntimes.JavaScript.JavaScriptHostedFunctionsRuntime.

L’appareil 3 nous donnera :

[Error] 12/22/2021 9:52:32 PM : **System Exception** The 1st argument of function round has invalid type 'nvarchar(max)'. Only 'bigint', 'float' is allowed.
[Error] 12/22/2021 9:52:32 PM :    at Microsoft.EventProcessing.SteamR.Sql.Runtime.Arithmetics.Round(CompilerPosition pos, Object value, Object length)

L’appareil 4 nous donnera :

[Error] 12/22/2021 9:50:41 PM : **System Exception** Cannot cast value of type 'record' to type 'array' in expression 'r . readingArray'. At line '9' and column '30'. TRY_CAST function can be used to handle values with unexpected type.
[Error] 12/22/2021 9:50:41 PM :    at Microsoft.EventProcessing.SteamR.Sql.Runtime.Cast.ToArray(CompilerPosition pos, Object value, Boolean isUserCast)

À chaque fois, des enregistrements malformés ont pu circuler de l’entrée à la logique de requête principale sans être validés. Nous réalisons maintenant la valeur de la validation des entrées.

Implémentation de la validation des entrées

Étendons notre requête pour valider l’entrée.

La première étape de la validation des entrées consiste à définir les attentes du schéma de la logique métier de base. Si l’on se réfère au besoin d’origine, notre logique principale consiste à effectuer les opérations suivantes :

  • Transmettre readingStr à une UDF JavaScript pour mesurer sa longueur
  • Compter le nombre d’enregistrements dans le tableau
  • Arrondir readingNum à la deuxième décimale
  • Insérer les données dans une table SQL

Pour chaque point, nous pouvons énumérer les attentes :

  • L’UDF requiert un argument de type chaîne de caractères (nvarchar(max) ici) qui ne peut pas avoir la valeur null.
  • GetArrayElements() requiert un argument de type tableau ou une valeur null
  • Round requiert un argument de type bigint ou float, ou une valeur null
  • Au lieu de nous appuyer sur la conversion implicite d’ASA, nous devrions la faire nous-mêmes et gérer les conflits de type dans la requête.

Une façon de procéder consiste à adapter la logique principale pour traiter ces exceptions. Mais dans ce cas, nous pensons que notre logique principale est parfaite. Alors validons plutôt les données entrantes.

Commençons par utiliser WITH pour ajouter une couche de validation des entrées comme première étape de la requête. Nous allons utiliser TRY_CAST pour convertir les champs dans le type attendu, et les définir sur NULL en cas d’échec de la conversion :

WITH readingsValidated AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		deviceId AS in_deviceId,
		readingTimestamp AS in_readingTimestamp,
		readingStr AS in_readingStr,
		readingNum AS in_readingNum,
		readingArray AS in_readingArray,

		-- Try casting fields in their expected type
		TRY_CAST(deviceId AS bigint) as deviceId,
		TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
		TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
		TRY_CAST(readingNum AS float) as readingNum,
		TRY_CAST(readingArray AS array) as readingArray

	FROM readings TIMESTAMP BY readingTimestamp
)

-- For debugging only
SELECT * FROM readingsValidated

Avec le dernier fichier d’entrée que nous avons utilisé (celui contenant des erreurs), cette requête retournera ce qui suit :

in_deviceId in_readingTimestamp in_readingStr in_readingNum in_readingArray deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00 Chaîne 1.7145 ["A", "B"] 1 2021-12-10T10:00:00Z Chaîne 1.7145 ["A", "B"]
2 2021-12-10T10:01:00 NULL 2.378 ["C"] 2 2021-12-10T10:01:00Z NULL 2.378 ["C"]
3 2021-12-10T10:01:20 Troisième chaîne NaN ["D","E","F"] 3 2021-12-10T10:01:20Z Troisième chaîne NULL ["D","E","F"]
4 2021-12-10T10:02:10 Quatrième chaîne 1.2126 {} 4 2021-12-10T10:02:10Z Quatrième chaîne 1.2126 NULL

Nous pouvons déjà constater que deux de nos erreurs ont été corrigées. Nous avons transformé NaN et {} en NULL. Nous sommes maintenant sûrs que ces enregistrements seront insérés correctement dans la table SQL de destination.

Nous devons maintenant décider comment traiter les enregistrements avec des valeurs manquantes ou non valides. Après discussion, nous avons décidé de rejeter les enregistrements contenant une valeur readingArray vide ou non valide, ou dans lesquels la valeur readingStr est manquante.

Nous ajoutons donc une deuxième couche qui va trier les enregistrements entre logique de validation et logique principale :

WITH readingsValidated AS (
	...
),

readingsToBeProcessed AS (
	SELECT
		deviceId,
		readingTimestamp,
		readingStr,
		readingNum,
		readingArray
	FROM readingsValidated
	WHERE
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
),

readingsToBeRejected AS (
	SELECT
		*
	FROM readingsValidated
	WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
	NOT (
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
	)
)

-- For debugging only
SELECT * INTO Debug1 FROM readingsToBeProcessed
SELECT * INTO Debug2 FROM readingsToBeRejected

Une bonne pratique consiste à écrire une seule clause WHERE pour les deux sorties, et à utiliser NOT (...) dans la seconde. De cette façon, aucun enregistrement ne peut être exclu des deux sorties et perdu.

Nous obtenons maintenant deux sorties. Debug1 contient les enregistrements qui seront envoyés à la logique principale :

deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00Z Chaîne 1.7145 ["A", "B"]
3 2021-12-10T10:01:20Z Troisième chaîne NULL ["D","E","F"]

Debug2 contient les enregistrements qui seront rejetés :

in_deviceId in_readingTimestamp in_readingStr in_readingNum in_readingArray deviceId readingTimestamp readingStr readingNum readingArray
2 2021-12-10T10:01:00 NULL 2.378 ["C"] 2 2021-12-10T10:01:00Z NULL 2.378 ["C"]
4 2021-12-10T10:02:10 Quatrième chaîne 1.2126 {} 4 2021-12-10T10:02:10Z Quatrième chaîne 1.2126 NULL

L’étape finale consiste à rajouter notre logique principale. Nous allons également ajouter la sortie qui rassemble les rejets. Dans ce cas, l’idéal consiste à utiliser un adaptateur de sortie qui n’impose pas de typage fort, comme un compte de stockage.

La requête complète figure dans la dernière section.

WITH
readingsValidated AS (...),
readingsToBeProcessed AS (...),
readingsToBeRejected AS (...)

SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

SELECT
	*
INTO BlobOutput
FROM readingsToBeRejected

Ce qui nous donnera l’ensemble suivant pour la sortie SQLOutput, sans erreur possible :

deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00Z Chaîne 1.7145 2
3 2021-12-10T10:01:20Z Troisième chaîne NULL 3

Les deux autres enregistrements sont envoyés à une sortie BlobOutput pour un examen humain et un post-traitement. Notre requête est maintenant sûre.

Exemple de requête avec validation des entrées

WITH readingsValidated AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		deviceId AS in_deviceId,
		readingTimestamp AS in_readingTimestamp,
		readingStr AS in_readingStr,
		readingNum AS in_readingNum,
		readingArray AS in_readingArray,

		-- Try casting fields in their expected type
		TRY_CAST(deviceId AS bigint) as deviceId,
		TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
		TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
		TRY_CAST(readingNum AS float) as readingNum,
		TRY_CAST(readingArray AS array) as readingArray

	FROM readings TIMESTAMP BY readingTimestamp
),

readingsToBeProcessed AS (
	SELECT
		deviceId,
		readingTimestamp,
		readingStr,
		readingNum,
		readingArray
	FROM readingsValidated
	WHERE
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
),

readingsToBeRejected AS (
	SELECT
		*
	FROM readingsValidated
	WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
	NOT (
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
	)
)

-- Core business logic
SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

-- Rejected output. For human review, correction, and manual re-insertion downstream
SELECT
	*
INTO BlobOutput -- to a storage adapter that doesn't require strong typing, here blob/adls
FROM readingsToBeRejected

Extension de la validation des entrées

Le commande GetType peut être utilisée pour vérifier explicitement un type. Elle fonctionne bien avec CASE dans la projection ou WHERE au niveau de l’ensemble. La commande GetType peut également être utilisée pour vérifier de façon dynamique le schéma entrant par rapport à un référentiel de métadonnées. Le référentiel peut être chargé via un ensemble de données de référence.

L’opération unit-testing est une bonne pratique pour assurer la résilience de notre requête. Nous allons créer une série de tests consistant en des fichiers d’entrée et leur sortie attendue. Notre requête devra correspondre à la sortie qu’elle génère pour aboutir. Dans ASA, l’opération unit-testing est effectuée via le module npm asa-streamanalytics-cicd. Des cas de test avec divers événements malformés devraient être créés et testés dans le pipeline de déploiement.

Enfin, nous pouvons effectuer quelques tests d’intégration légers dans VS Code. Nous pouvons insérer des enregistrements dans la table SQL via une exécution locale vers une sortie dynamique.

Obtenir de l’aide

Pour obtenir de l’aide supplémentaire, essayez notre page de questions Microsoft Q&A pour Azure Stream Analytics.

Étapes suivantes