Partager via


Utiliser des données de référence pour les recherches dans Stream Analytics

Les données de référence sont un jeu de données fini qui est statique ou à variation lente dans la nature. Il est utilisé pour effectuer une recherche ou augmenter vos flux de données. Les données de référence sont également appelées table de correspondance.

Prenons un scénario IoT comme exemple. Vous pouvez stocker des métadonnées sur les capteurs, qui ne changent pas souvent, dans les données de référence. Vous pouvez ensuite la connecter à des flux de données de l'IoT en temps réel.

Azure Stream Analytics charge les données de référence en mémoire pour obtenir un traitement de flux à faible latence. Pour utiliser des données de référence dans votre travail Stream Analytics, vous utiliserez généralement une jointure de données de référence dans votre requête.

Example

Vous pouvez avoir un flux en temps réel d’événements généré lorsque les voitures passent un tollbooth. Le tollbooth peut capturer les plaques d’immatriculation en temps réel. Ces données peuvent se joindre à un jeu de données statique qui a des détails d’inscription pour identifier les plaques de licence qui ont expiré.

SELECT I1.EntryTime, I1.LicensePlate, I1.TollId, R.RegistrationId  
FROM Input1 I1 TIMESTAMP BY EntryTime  
JOIN Registration R  
ON I1.LicensePlate = R.LicensePlate  
WHERE R.Expired = '1'

Stream Analytics prend en charge le stockage Blob Azure, Azure Data Lake Storage Gen2 et Azure SQL Database en tant que couche de stockage pour les données de référence. Si vous disposez des données de référence dans d’autres magasins de données, essayez d’utiliser Azure Data Factory pour extraire, transformer et charger les données dans l’un des magasins de données pris en charge. Pour plus d’informations, consultez la vue d’ensemble de l’activité de copie dans Azure Data Factory.

Stockage Blob Azure ou Azure Data Lake Storage Gen2

Les données de référence sont modélisées comme une séquence de blobs dans l’ordre croissant de la date/heure spécifiée dans le nom du blob. Les objets blob ne peuvent être ajoutés qu’à la fin de la séquence à l’aide d’une date/heure supérieure à celle spécifiée par le dernier objet blob de la séquence. Les objets blob sont définis dans la configuration d’entrée.

Pour plus d’informations, consultez Utiliser des données de référence à partir du stockage Blob pour un travail Stream Analytics.

Configurer des données de référence BLOB

Pour configurer vos données de référence, vous devez d'abord créer une entrée de type données de référence. Le tableau suivant explique chaque propriété que vous devez fournir lorsque vous créez l’entrée de données de référence avec sa description.

Nom de la propriété Descriptif
Alias d’entrée Nom convivial utilisé dans la requête de travail pour référencer cette entrée.
Compte de stockage Nom du compte de stockage où se trouvent vos objets blob. S’il se trouve dans le même abonnement que votre travail Stream Analytics, sélectionnez-le dans la liste déroulante.
Clé du compte de stockage Clé secrète associée au compte de stockage. Cette clé est automatiquement renseignée si le compte de stockage se trouve dans le même abonnement que votre travail Stream Analytics.
Conteneur de stockage Les conteneurs fournissent un regroupement logique pour les blobs stockés dans Azure Blob Storage. Lorsque vous chargez un blob dans le stockage Blob, vous devez spécifier un conteneur pour ce blob.
Modèle de chemin Cette propriété requise sert à localiser vos blobs dans le conteneur spécifié. Dans le chemin d’accès, vous pouvez choisir de spécifier une ou plusieurs instances des variables {date} et {time}.
Exemple 1 : products/{date}/{time}/product-list.csv
Exemple 2 : produits/{date}/product-list.csv
Exemple 3 : product-list.csv

Si l’objet blob n’existe pas dans le chemin spécifié, le travail Stream Analytics attend indéfiniment que l’objet blob devienne disponible.
Format de date [facultatif] Si vous avez utilisé {date} dans le modèle de chemin que vous avez spécifié, sélectionnez le format de date dans lequel vos objets blob sont organisés dans la liste déroulante des formats pris en charge.
Exemple : AAAA/MM/JJ ou MM/JJ/AAAA
Format de temps [facultatif] Si vous avez utilisé {time} dans le modèle de chemin que vous avez spécifié, sélectionnez le format d’heure dans lequel vos objets blob sont organisés dans la liste déroulante des formats pris en charge.
Exemple : HH, HH/mm ou HH-mm
Format de sérialisation des événements Pour vous assurer que vos requêtes fonctionnent comme prévu, Stream Analytics doit savoir quel format de sérialisation vous utilisez pour les flux de données entrants. Pour les données de référence, les formats pris en charge sont CSV et JSON.
Codage UTF-8 est le seul format d’encodage pris en charge pour l’instant.

Données de référence statiques

Vos données de référence peuvent ne pas être censées changer. Pour activer la prise en charge des données de référence statiques, spécifiez un chemin statique dans la configuration d’entrée.

Stream Analytics récupère l’objet blob à partir du chemin spécifié. Les jetons de substitution {date} et {time} ne sont pas obligatoires. Étant donné que les données de référence sont immuables dans Stream Analytics, la réécriture d’un objet blob de données de référence statique n’est pas recommandée.

Générer des données de référence selon une planification

Vos données de référence peuvent être un jeu de données à variation lente. Pour actualiser les données de référence, spécifiez un modèle de chemin dans la configuration d’entrée à l’aide des jetons de substitution {date} et {time}. Stream Analytics récupère les définitions de données de référence mises à jour en fonction de ce modèle de chemin d’accès.

Un modèle de sample/{date}/{time}/products.csv avec un format de date de aaaa-MM-DD et un format de temps de HH-mm indique à Stream Analytics de récupérer le blob sample/2015-04-16/17-30/products.csv mis à jour le 16 avril 2015 à 17 h 30 UTC.

Stream Analytics scanne automatiquement les blobs de données de référence actualisés toutes les minutes. Un objet blob avec l’horodatage 10:30:00 peut être chargé avec un petit délai, par exemple 10:30:30. Vous remarquez un petit retard dans un travail Stream Analytics qui référence ce blob.

Pour éviter de tels scénarios, chargez l’objet blob avant l’heure prévue cible, qui est de 10:30:00 dans cet exemple. La tâche Stream Analytics dispose maintenant de suffisamment de temps pour détecter et charger le blob en mémoire et effectuer des opérations.

Note

Actuellement, les travaux Stream Analytics cherchent la mise à jour du blob uniquement lorsque l'heure système passe à l'heure codée dans le nom du blob. Par exemple, le travail recherche sample/2015-04-16/17-30/products.csv dès que possible, mais pas plus tôt que le 16 avril 2015, à 17 h 30 UTC. Il ne recherche jamais un blob avec une heure encodée antérieure à celle de la dernière découverte.

Par exemple, une fois que le travail trouve l’objet blob sample/2015-04-16/17-30/products.csv, il ignore tous les fichiers avec une date encodée antérieure au 16 avril 2015, à 17 h 30. Si un objet blob à arrivée sample/2015-04-16/17-25/products.csv tardive est créé dans le même conteneur, la tâche ne l’utilisera pas.

Dans un autre exemple, sample/2015-04-16/17-30/products.csv n’est produit que le 16 avril 2015 à 22h03, mais aucun objet blob daté antérieurement n’est présent dans le conteneur. Ensuite, le travail utilise ce fichier à partir du 16 avril 2015, à 10 h 03 et utilise les données de référence précédentes jusqu’à ce moment-là.

Une exception à ce comportement, c'est lorsque le travail doit retraiter les données à une date antérieure ou lorsqu'il démarre pour la première fois.

Au démarrage, la tâche recherche l’objet blob le plus récent produit avant l’heure de début de la tâche spécifiée. Ce comportement garantit qu’il existe un jeu de données de référence non vide au démarrage du travail. Si aucun n'est trouvé, la tâche affiche le diagnostic suivant : Initializing input without a valid reference data blob for UTC time <start time>.

Lorsqu’un jeu de données de référence est actualisé, un journal de diagnostic est généré : Loaded new reference data from <blob path>. Pour de nombreuses raisons, un travail peut avoir besoin de recharger un jeu de données de référence précédent. Le plus souvent, la raison est de retraiter les données passées. Le même journal de diagnostic est généré à ce moment-là. Cette action n’implique pas que les données de flux actuelles utilisent des données de référence antérieures.

Azure Data Factory peut être utilisé pour orchestrer la tâche de création des objets blob mis à jour requis par Stream Analytics pour mettre à jour les définitions de données de référence.

Data Factory est un service d’intégration de données basé sur le cloud qui orchestre et automatise le déplacement et la transformation des données. Data Factory prend en charge la connexion à un grand nombre de magasins de données cloud et locaux. Il peut déplacer facilement des données selon une planification régulière que vous spécifiez.

Pour plus d’informations sur la configuration d’un pipeline Data Factory pour générer des données de référence pour Stream Analytics qui s’actualisent selon une planification prédéfinie, consultez cet exemple GitHub.

Conseils sur l’actualisation des données de référence de blob

  • Ne remplacez pas les objets blob de données de référence, car ils sont immuables.
  • La méthode recommandée pour actualiser les données de référence consiste à :
    • Utilisez {date}/{time} dans le modèle de chemin d’accès.
    • Ajoutez un nouvel objet blob en utilisant le même conteneur et le même modèle de chemin défini dans l'entrée de la tâche.
    • Utilisez une date/heure supérieure à celle spécifiée par le dernier objet blob de la séquence.
  • Les objets blob de données de référence ne sont pas* classés par l’heure de dernière modification de l’objet blob. Elles sont ordonnées uniquement par la date et l’heure spécifiées dans le nom de l’objet blob à l’aide des substitutions {date} et {time}.
  • Pour éviter d’avoir à répertorier un grand nombre d’objets blob, supprimez les anciens objets blob pour lesquels le traitement ne sera plus effectué. Stream Analytics peut devoir retraiter une petite quantité dans certains scénarios, comme un redémarrage.

Azure SQL Database

Votre tâche Stream Analytics récupère les données de référence SQL Database et les stocke sous forme d’instantané en mémoire pour le traitement. L’instantané de vos données de référence est stocké également dans un conteneur dans un compte de stockage. Vous spécifiez le compte de stockage dans les paramètres de configuration.

Le conteneur est créé automatiquement au démarrage du travail. Si le travail s’arrête ou entre dans un état d’échec, les conteneurs créés automatiquement sont supprimés lors du redémarrage du travail.

Si vos données de référence sont un jeu de données à variation lente, vous devez actualiser régulièrement l’instantané utilisé dans votre travail.

Avec Stream Analytics, vous pouvez définir un taux d’actualisation lorsque vous configurez votre connexion d’entrée SQL Database. Le runtime Stream Analytics interroge votre instance SQL Database à l’intervalle spécifié par le taux d’actualisation. Le taux d’actualisation le plus rapide pris en charge est une fois par minute. Pour chaque actualisation, Stream Analytics stocke un nouvel instantané dans le compte de stockage fourni.

Stream Analytics fournit deux options pour interroger votre instance SQL Database. Une requête d’instantané est obligatoire et doit être incluse dans chaque tâche. Stream Analytics exécute régulièrement la requête d’instantané en fonction de votre intervalle d’actualisation. Il utilise le résultat de la requête (l’instantané) comme jeu de données de référence.

La requête d’instantané doit correspondre à la plupart des scénarios. Si vous rencontrez des problèmes de performances avec des jeux de données volumineux et des taux d’actualisation rapides, utilisez l’option de requête delta. Les requêtes qui prennent plus de 60 secondes pour retourner un jeu de données de référence entraînent un délai d’expiration.

Avec l’option de requête delta, Stream Analytics exécute initialement la requête de capture instantanée pour obtenir un jeu de données de référence. Ensuite, Stream Analytics exécute régulièrement la requête delta en fonction de votre intervalle d’actualisation pour récupérer les modifications incrémentielles. Ces modifications incrémentielles sont continuellement appliquées au jeu de données de référence pour la maintenir à jour. L’utilisation de l’option de requête delta peut aider à réduire les coûts de stockage et les opérations d’E/S réseau.

Configurer les données de référence de la base de données SQL

Pour configurer vos données de référence SQL Database, vous devez d’abord créer une entrée de données de référence. Le tableau suivant explique chaque propriété que vous devez fournir lorsque vous créez l’entrée de données de référence avec sa description. Pour plus d’informations, consultez Utiliser des données de référence à partir d’une base de données SQL pour un travail Stream Analytics.

Vous pouvez utiliser Azure SQL Managed Instance comme entrée de données de référence. Vous devez configurer un point de terminaison public dans SQL Managed Instance. Ensuite, vous configurez manuellement les paramètres suivants dans Stream Analytics. Une machine virtuelle Azure exécutant SQL Server avec une base de données attachée est également prise en charge en configurant manuellement ces paramètres.

Nom de la propriété Descriptif
Alias d’entrée Nom convivial utilisé dans la requête de travail pour référencer cette entrée.
Subscription Votre abonnement.
Base de données Instance de base de données SQL qui contient vos données de référence. Pour SQL Managed Instance, vous devez spécifier le port 3342. par exemple sampleserver.public.database.windows.net,3342.
Nom d’utilisateur Nom d’utilisateur associé à votre instance SQL Database.
Mot de passe Mot de passe associé à votre instance SQL Database.
Actualiser régulièrement Cette option vous permet de sélectionner un taux d’actualisation. Sélectionnez Activé pour spécifier le taux d’actualisation dans DD :HH :MM.
Requête d’instantané Cette option de requête par défaut récupère les données de référence de votre instance SQL Database.
Requête Delta Pour les scénarios avancés avec des jeux de données volumineux et un taux d’actualisation court, ajoutez une requête delta.

Limitation de taille

Utilisez des jeux de données de référence inférieurs à 300 Mo pour des performances optimales. Les ensembles de données de référence de 5 Go ou moins sont pris en charge dans les travaux avec six unités de streaming ou plus. L’utilisation d’un jeu de données de référence volumineux peut affecter la latence de bout en bout de votre travail.

La complexité des requêtes peut augmenter pour inclure le traitement avec état, comme les agrégats fenêtrés, les jointures temporelles et les fonctions analytiques temporelles. Lorsque la complexité augmente, la taille maximale prise en charge des données de référence diminue.

Si Stream Analytics ne peut pas charger les données de référence et effectuer des opérations complexes, le travail manque de mémoire et échoue. Dans ce cas, la métrique d’utilisation de l’unité de diffusion en continu atteint 100%.

Nombre d’unités de diffusion en continu Taille recommandée
1 50 Mo ou inférieur
3 150 Mo ou inférieur
6 et au-delà 5 Go ou inférieur

La prise en charge de la compression n’est pas disponible pour les données de référence. Pour les jeux de données de référence supérieurs à 300 Mo, utilisez SQL Database comme source avec l’option de requête delta pour des performances optimales. Si l'option de requête delta n'est pas utilisée dans de tels scénarios, vous pouvez observer des pics dans la métrique de retard de filigrane chaque fois que le jeu de données de référence est actualisé.

Joindre plusieurs jeux de données de référence dans un travail

Vous ne pouvez joindre qu’une entrée de données de référence à une entrée de streaming. Ainsi, pour joindre plusieurs jeux de données de référence, décomposez votre requête en plusieurs étapes. Voici un exemple :

With Step1 as (
    --JOIN input stream with reference data to get 'Desc'
    SELECT streamInput.*, refData1.Desc as Desc
    FROM    streamInput
    JOIN    refData1 ON refData1.key = streamInput.key 
)
--Now Join Step1 with second reference data
SELECT *
INTO    output 
FROM    Step1
JOIN    refData2 ON refData2.Desc = Step1.Desc 

Tâches IoT Edge

Seules les données de référence locales sont prises en charge pour les travaux edge Stream Analytics. Lorsqu’un travail est déployé sur un appareil IoT Edge, il charge des données de référence à partir du chemin d’accès de fichier défini par l’utilisateur. Disposer d’un fichier de données de référence prêt sur l’appareil.

Pour un conteneur Windows, placez le fichier de données de référence sur le lecteur local et partagez le lecteur local avec le conteneur Docker. Pour un conteneur Linux, créez un volume Docker et remplissez le fichier de données dans le volume.

Les données de référence sur une mise à jour IoT Edge sont déclenchées par un déploiement. Une fois qu’il a été déclenché, le module Stream Analytics sélectionne les données mises à jour sans arrêter le travail en cours d’exécution.

Vous pouvez mettre à jour les données de référence de deux façons :

  • Mettez à jour le chemin des données de référence dans votre travail Stream Analytics à partir du portail Azure.
  • Mettez à jour le déploiement IoT Edge.

Étapes suivantes