Partager via


Diffuser en continu des données en tant qu’entrées dans Stream Analytics

Stream Analytics s'intègre avec des flux de données Azure en tant qu'entrées provenant de cinq types de ressources.

Ces ressources d’entrée peuvent exister dans le même abonnement Azure que votre travail Stream Analytics ou dans un autre abonnement.

Compression

Stream Analytics prend en charge la compression pour toutes les sources d’entrée. Les types de compression pris en charge sont : None, Gzip et Deflate. Stream Analytics ne prend pas en charge la compression pour les données de référence. Si les données d’entrée sont des données Avro compressées, Stream Analytics le gère de manière transparente. Vous n’avez pas besoin de spécifier le type de compression avec la sérialisation Avro.

Créer, modifier ou tester les entrées

Vous pouvez utiliser le portail Azure, Visual Studio et Visual Studio Code pour ajouter et afficher ou modifier des entrées existantes sur votre travail de streaming. Vous pouvez également tester les connexions d’entrée et tester des requêtes à partir d’exemples de données à partir du portail Azure, Visual Studio et Visual Studio Code. Lorsque vous écrivez une requête, vous répertoriez l’entrée dans la clause FROM. Vous pouvez obtenir la liste des entrées disponibles à partir de la page Requête dans le portail. Si vous souhaitez utiliser plusieurs entrées, JOIN elles ou écrivez plusieurs SELECT requêtes.

Remarque

Pour une expérience de développement locale optimale, utilisez les outils Stream Analytics pour Visual Studio Code. Il existe des lacunes de fonctionnalités connues dans les outils Stream Analytics pour Visual Studio 2019 (version 2.6.3000.0) et elles ne seront pas améliorées à l'avenir.

Diffuser en continu des données depuis Event Hubs

Azure Event Hubs est un ingesteur d’événement de publication et d’abonnement hautement évolutif. Un concentrateur Event Hub peut collecter des millions d’événements par seconde afin que vous puissiez traiter et analyser les grandes quantités de données générées par vos appareils et applications connectés. Ensemble, Event Hubs et Stream Analytics fournissent une solution de bout en bout pour l’analytique en temps réel. Event Hubs alimente les événements en Azure en temps réel et les travaux Stream Analytics traitent ces événements en temps réel. Par exemple, vous pouvez envoyer à Event Hubs des clics web, des lectures de capteurs ou des événements de journaux en ligne. Vous pouvez alors créer des travaux Stream Analytics afin d’utiliser Event Hubs pour les données d’entrée pour le filtrage, l’agrégation et la mise en corrélation en temps réel.

EventEnqueuedUtcTime est l’horodatage de l’arrivée d’un événement dans un hub d’événements et est l’horodatage par défaut des événements provenant d’Event Hubs vers Stream Analytics. Pour traiter les flux de données à l’aide d’un horodatage dans la charge utile de l’événement, vous devez utiliser le mot clé TIMESTAMP BY.

Groupes de consommateurs Event Hubs

Configurez chaque entrée du hub d’événements avec son propre groupe de consommateurs. Lorsqu’une tâche contient une auto-jointure ou possède plusieurs entrées, plusieurs lecteurs en aval peuvent lire certaines entrées. Cette situation a une incidence sur le nombre de lecteurs dans un groupe de consommateurs unique. Pour éviter de dépasser la limite Event Hubs de cinq lecteurs par groupe de consommateurs par partition, désignez un groupe de consommateurs pour chaque travail Stream Analytics. Il existe également une limite de 20 groupes de consommateurs pour un hub d'événements de la catégorie Standard. Pour plus d'informations, consultez Résoudre les problèmes de sources de données Azure Stream Analytics.

Créer une entrée à partir d’Event Hubs

Le tableau suivant décrit chaque propriété dans la page New input dans le portail Azure pour diffuser en continu les entrées de données à partir d’un hub d’événements :

Propriété Descriptif
Alias d’entrée Nom convivial que vous utilisez dans la requête du travail pour référencer cette entrée.
Subscription Choisissez l’abonnement Azure dans lequel la ressource Event Hub existe.
Espace de noms Event Hub Un espace de noms Event Hubs sert de conteneur pour les Event Hubs. Lorsque vous créez un hub d'événements, vous créez également l’espace de noms.
Nom du hub d’événements Le nom du hub d'événements à utiliser comme entrée.
Groupe de consommateurs Event Hub (recommandé) Utilisez un groupe de consommateurs distinct pour chaque tâche de Stream Analytics. Cette chaîne identifie le groupe de consommateurs à utiliser pour ingérer les données du hub d’événements. Si vous ne spécifiez pas de groupe de consommateurs, le travail Stream Analytics utilise le $Default groupe de consommateurs.
Mode d'authentification Spécifiez le type d’authentification que vous souhaitez utiliser pour vous connecter au hub d’événements. Utilisez une connection string ou une identité managée pour s’authentifier auprès du hub d’événements. Pour l’option d’identité managée, vous pouvez créer une identité managée affectée par le système pour le travail Stream Analytics ou une identité managée affectée par l’utilisateur pour l’authentification auprès de l’Event Hub. Lorsque vous utilisez une identité managée, l’identité managée doit être membre des rôles Récepteur de données Azure Event Hubs ou Propriétaire de données Azure Event Hubs.
Nom de la stratégie Event Hub Stratégie d’accès partagé qui permet d’accéder à Event Hubs. Chaque stratégie d’accès partagé a un nom, les autorisations que vous définissez ainsi que des clés d’accès. Cette option est automatiquement renseignée, sauf si vous sélectionnez l’option permettant de fournir manuellement les paramètres Event Hubs.
Clé de partition Ce champ facultatif est disponible uniquement si vous configurez votre travail pour utiliser le niveau de compatibilité 1.2 ou supérieur. Si votre entrée est partitionnée par une propriété, ajoutez le nom de cette propriété ici. Utilisez-la pour améliorer les performances de votre requête si elle inclut une PARTITION BY ou une GROUP BY clauses sur cette propriété. Si ce travail utilise le niveau de compatibilité 1.2 ou supérieur, ce champ est défini par défaut sur PartitionId.
Format de sérialisation des événements Format de sérialisation (JSON, CSV, Avro) du flux de données entrant. Assurez-vous que le format JSON s’aligne sur la spécification et n’inclut pas les 0 premiers pour les nombres décimaux.
Encodage Pour le moment, UTF-8 est le seul format d’encodage pris en charge.
Type de compression d’événement Type de compression utilisé pour lire le flux de données entrant, par exemple None (par défaut), Gzip ou Deflate.
Registre de schémas Sélectionnez le registre de schémas avec des schémas pour les données d’événement reçues du hub d’événements.

Quand vos données proviennent d’une entrée de stream Event Hubs, vous avez accès aux champs de métadonnées suivants dans votre requête Stream Analytics :

Propriété Descriptif
EventProcessedUtcTime La date et l’heure auxquelles Stream Analytics traite l’événement.
EventEnqueuedUtcTime Date et heure auxquelles Event Hubs reçoit les événements.
PartitionId ID de partition de base zéro de l’adaptateur d’entrée.

En utilisant ces champs, vous pouvez écrire une requête comme dans l’exemple suivant :

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Remarque

Lorsque vous utilisez Event Hubs comme point de terminaison pour IoT Hub Routes, vous pouvez accéder aux métadonnées IoT Hub à l’aide de la fonction GetMetadataPropertyValue.

Diffuser en continu des données à partir de IoT Hub

Azure IoT Hub est un ingesteur d’événements de publication-abonnement hautement évolutif optimisé pour les scénarios IoT.

L’horodatage par défaut des événements provenant d’un IoT Hub dans Stream Analytics est l’horodatage que l’événement est arrivé dans le IoT Hub, qui est EventEnqueuedUtcTime. Pour traiter les données en tant que flux à l’aide d’un horodatage dans la charge utile de l’événement, utilisez le mot clé TIMESTAMP BY .

groupes de consommateurs IoT Hub

Configurez chaque entrée IoT Hub Stream Analytics pour avoir son propre groupe de consommateurs. Lorsqu’une tâche contient une auto-jointure ou lorsqu’elle comporte plusieurs entrées, plusieurs lecteurs peuvent lire certaines entrées. Cette situation a une incidence sur le nombre de lecteurs dans un groupe de consommateurs unique. Pour éviter de dépasser la limite de Azure IoT Hub de cinq lecteurs par groupe de consommateurs par partition, désignez un groupe de consommateurs pour chaque travail Stream Analytics.

Configurer un IoT Hub en tant qu’entrée de flux de données

Le tableau suivant décrit chaque propriété dans la page New input dans le portail Azure lorsque vous configurez un IoT Hub en tant qu’entrée de flux.

Propriété Descriptif
Alias d’entrée Nom convivial que vous utilisez dans la requête du travail pour référencer cette entrée.
Subscription Choisissez l’abonnement dans lequel existe la ressource IoT Hub.
IoT Hub Nom du IoT Hub à utiliser comme entrée.
Groupe de consommateurs Utilisez un groupe de consommateurs différent pour chaque travail Stream Analytics. Le groupe de consommateurs ingère les données du IoT Hub. Stream Analytics utilise le groupe de consommateurs $Default, sauf si vous spécifiez autre chose.
Nom de la stratégie d’accès partagé Stratégie d’accès partagé qui fournit l’accès au IoT Hub. Chaque stratégie d’accès partagé a un nom, les autorisations que vous définissez ainsi que des clés d’accès.
Clé de la stratégie d’accès partagé Clé d’accès partagé utilisée pour autoriser l’accès au IoT Hub. Cette option est automatiquement remplie, sauf si vous sélectionnez l’option permettant de fournir manuellement les paramètres IoT Hub.
Point de terminaison Point de terminaison de l'IoT Hub.
Clé de partition Il s’agit d’un champ facultatif disponible uniquement si vous configurez votre travail pour utiliser le niveau de compatibilité 1.2 ou supérieur. Si vous partitionnez votre entrée par une propriété, vous pouvez ajouter le nom de cette propriété ici. Cela permet d’améliorer le niveau de performance de votre requête, si celle-ci comprend une clause PARTITION BY ou GROUP BY pour cette propriété. Si ce travail utilise le niveau de compatibilité 1.2 ou ultérieur, la valeur par défaut de ce champ est « PartitionId ».
Format de sérialisation des événements Format de sérialisation (JSON, CSV, Avro) du flux de données entrant. Assurez-vous que le format JSON s’aligne sur la spécification et n’inclut pas les 0 premiers pour les nombres décimaux.
Encodage Pour le moment, UTF-8 est le seul format d’encodage pris en charge.
Type de compression d’événement Type de compression utilisé pour lire le flux de données entrant, par exemple None (par défaut), Gzip ou Deflate.

Lorsque vous utilisez des données de flux à partir d’un IoT Hub, vous avez accès aux champs de métadonnées suivants dans votre requête Stream Analytics :

Propriété Descriptif
EventProcessedUtcTime Date et heure du traitement de l'événement.
EventEnqueuedUtcTime Date et heure auxquelles le IoT Hub reçoit l’événement.
PartitionId ID de partition de base zéro de l’adaptateur d’entrée.
IoTHub.MessageId ID utilisé pour mettre en corrélation la communication bidirectionnelle dans IoT Hub.
IoTHub.CorrelationId ID utilisé dans les réponses aux messages et les commentaires dans IoT Hub.
IoTHub.ConnectionDeviceId ID d’authentification utilisé pour envoyer ce message. Le IoT Hub marque cette valeur sur les messages liés au service.
IoTHub.ConnectionDeviceGenerationId ID de génération de l’appareil authentifié qui a été utilisé pour envoyer ce message. Le IoT Hub marque cette valeur sur les messages entrants de service.
IoTHub.EnqueuedTime Heure à laquelle le IoT Hub reçoit le message.

Transférer les données à partir du stockage Blob ou du stockage de Data Lake Gen2

Pour les scénarios impliquant le stockage de grandes quantités de données non structurées dans le cloud, Azure stockage Blob ou Azure Data Lake Storage Gen2 offre une solution économique et évolutive. Les données dans le stockage Blob ou Azure Data Lake Storage Gen2 sont considérées comme des données au repos. Toutefois, Stream Analytics peut traiter ces données en tant que flux de données.

Dans Stream Analytics, un scénario souvent employé consiste à traiter des logs. Dans ce scénario, vous capturez des fichiers de données de télémétrie à partir d’un système et devez les analyser et les traiter pour extraire des données significatives.

L’horodatage par défaut d’un stockage Blob ou d’un événement Azure Data Lake Storage Gen2 dans Stream Analytics est l’horodatage qu’il a été modifié pour la dernière fois, qui est BlobLastModifiedUtcTime. Si vous chargez un objet blob vers un compte de stockage à 13:00 et démarrez le travail Azure Stream Analytics à l'aide de l'option Now à 13:01, le travail ne récupère pas l'objet blob, car son temps modifié se situe en dehors de la période d'exécution du travail.

Si vous chargez un objet blob dans un conteneur de compte de stockage à 13:00 et démarrez le travail Azure Stream Analytics à l’aide de Custom Time à 13:00 ou plus tôt, le travail récupère l’objet blob, car l'heure de modification de l'objet blob tombe dans la période de lancement du travail.

Si vous démarrez un travail Azure Stream Analytics à l’aide de Now à 13:00, puis chargez un objet blob dans le conteneur de compte de stockage à 13:01, Azure Stream Analytics récupère l’objet blob. L’horodatage affecté à chaque objet blob est basé uniquement sur BlobLastModifiedTime. Le dossier dans lequel se trouve le blob n’a aucune relation avec le horodatage assigné. Par exemple, s’il existe un objet blob 2019/10-01/00/b1.txt avec un BlobLastModifiedTime de 2019-11-11, l’horodatage affecté à cet objet blob est 2019-11-11.

Pour traiter les données en tant que flux à l’aide d’un horodatage dans la charge utile de l’événement, vous devez utiliser le mot clé TIMESTAMP BY . Une tâche Stream Analytics récupère des données depuis Azure Blob Storage ou depuis une entrée Azure Data Lake Storage Gen2 toutes les secondes si le fichier blob est disponible. Si le fichier blob n’est pas disponible, le travail utilise un recul exponentiel avec un délai maximal de 90 secondes.

Remarque

Stream Analytics ne prend pas en charge l’ajout de contenu à un fichier blob existant. Stream Analytics affiche chaque fichier une seule fois et ne traite aucune modification qui se produit dans le fichier après que le travail lit les données. Une meilleure pratique consiste à télécharger toutes les données d’un objet blob en une seule fois, puis d’ajouter les événements récents supplémentaires dans un fichier blob nouveau et différent.

Dans les scénarios où vous ajoutez en continu de nombreux objets blob et Stream Analytics traite les objets blob à mesure que vous les ajoutez, vous pouvez ignorer certains objets blob dans de rares cas en raison de la granularité du BlobLastModifiedTime. Vous pouvez atténuer ce problème en chargeant des objets blob avec au moins deux secondes d'intervalle. Si cette option n’est pas possible, vous pouvez utiliser les Event Hubs pour diffuser en continu de grands volumes d’événements.

Configurer le stockage Blob en tant qu'entrée de flux

Le tableau suivant décrit chaque propriété dans la page New input dans le portail Azure lorsque vous configurez le stockage Blob en tant qu’entrée de flux.

Propriété Descriptif
Alias d’entrée Nom convivial que vous utilisez dans la requête du travail pour référencer cette entrée.
Subscription Sélectionnez l’abonnement dans lequel se trouve la ressource de stockage.
Compte de stockage Nom du compte de stockage dans lequel se trouvent les fichiers blob.
Clé du compte de stockage Clé secrète associée au compte de stockage. Cette option est automatiquement remplie, sauf si vous sélectionnez l’option pour fournir manuellement les paramètres.
Conteneur Les conteneurs fournissent un regroupement logique pour les blobs. Vous pouvez choisir Utiliser un conteneur existant ou créer un conteneur pour créer un conteneur.
Mode d'authentification Spécifiez le type d’authentification que vous souhaitez utiliser pour vous connecter au compte de stockage. Vous pouvez utiliser une connection string ou une identité managée pour vous authentifier auprès du compte de stockage. Pour l’option d’identité managée, vous pouvez créer une identité managée affectée par le système pour le travail Stream Analytics ou une identité managée affectée par l’utilisateur pour s’authentifier auprès du compte de stockage. Lorsque vous utilisez une identité managée, l’identité managée doit être membre d’un rôle approprié sur le compte de stockage.
Modèle de chemin d’accès (facultatif) Chemin du fichier utilisé pour localiser les blobs dans le conteneur spécifié. Si vous souhaitez lire des blobs à partir de la racine du conteneur, ne définissez pas de modèle de chemin. Dans le chemin d’accès, vous pouvez spécifier une ou plusieurs instances des trois variables suivantes : {date}, {time}ou {partition}

Exemple 1 : cluster1/logs/{date}/{time}/{partition}

Exemple 2 : cluster1/logs/{date}

Le * caractère n’est pas une valeur autorisée pour le préfixe de chemin d’accès. Seuls les caractères valides de blob Azure sont autorisés. N’incluez pas de noms de conteneurs ou de fichiers.
Format de date (facultatif) Format de date suivant lequel les fichiers sont organisés si vous utilisez la variable de date dans le chemin. Exemple : YYYY/MM/DD

Lorsque l’entrée d’objet blob a {date} ou {time} dans son chemin d’accès, Stream Analytics examine les dossiers par ordre chronologique croissant.
Format de temps (facultatif) Format d’heure suivant lequel les fichiers sont organisés si vous utilisez la variable d’heure dans le chemin. Actuellement, la seule valeur prise en charge est HH pendant des heures.
Clé de partition Il s’agit d’un champ facultatif disponible uniquement si vous configurez votre travail pour utiliser le niveau de compatibilité 1.2 ou supérieur. Si vous partitionnez votre entrée par une propriété, vous pouvez ajouter le nom de cette propriété ici. Cela permet d’améliorer le niveau de performance de votre requête, si celle-ci comprend une clause PARTITION BY ou GROUP BY pour cette propriété. Si ce travail utilise le niveau de compatibilité 1.2 ou ultérieur, la valeur par défaut de ce champ est « PartitionId ».
Nombre de partitions d’entrée Ce champ est présent uniquement lorsque {partition} est utilisé dans le modèle de chemin d’accès. La valeur de cette propriété est un entier >=1. Quel que soit l’emplacement où {partition} s’affiche dans pathPattern, un nombre compris entre 0 et la valeur de ce champ -1 est utilisé.
Format de sérialisation des événements Format de sérialisation (JSON, CSV, Avro) du flux de données entrant. Assurez-vous que le format JSON s’aligne sur la spécification et n’inclut pas les 0 premiers pour les nombres décimaux.
Encodage Pour CSV et JSON, UTF-8 est le seul format d’encodage actuellement pris en charge.
Compression Type de compression utilisé pour lire le flux de données entrant, par exemple None (par défaut), Gzip ou Deflate.

Lorsque vos données proviennent d’une source de stockage Blob, vous pouvez accéder aux champs de métadonnées suivants dans votre requête Stream Analytics :

Propriété Descriptif
BlobName Nom du blob d’entrée d’où provient l’événement.
EventProcessedUtcTime La date et l’heure auxquelles Stream Analytics traite l’événement.
BlobLastModifiedUtcTime Date et heure de la dernière modification apportée au blob.
PartitionId ID de partition de base zéro de l’adaptateur d’entrée.

En utilisant ces champs, vous pouvez écrire une requête comme dans l’exemple suivant :

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

Diffuser des données à partir d’Apache Kafka

Azure Stream Analytics vous permet de vous connecter directement aux clusters Apache Kafka pour ingérer des données. La solution est à faible code et entièrement gérée par l’équipe Azure Stream Analytics chez Microsoft, de sorte qu’elle répond aux normes de conformité métier. L’entrée Kafka est compatible rétroactivement et prend en charge toutes les versions à partir de la version 0.10 jusqu'à la dernière version du client. Vous pouvez vous connecter à des clusters Kafka à l’intérieur d’un réseau virtuel et de clusters Kafka avec un point de terminaison public, en fonction des configurations. La configuration s’appuie sur des conventions de configuration Kafka existantes. Les types de compression pris en charge sont None, Gzip, Snappy, LZ4 et Zstd.

Pour plus d’informations, consultez Stream data from Kafka into Azure Stream Analytics (présentation).

Étapes suivantes