Créer une tâche de diffusion en continu de données dans Azure SQL Edge

Important

Azure SQL Edge ne prend plus en charge la plateforme ARM64.

Cet article explique comment créer une tâche de diffusion en continu T-SQL dans Azure SQL Edge. Vous créez les objets d’entrée et de sortie du flux externe, puis vous définissez la requête de travail de streaming dans le cadre de la création de la tâche de streaming.

Configurer les objets d’entrée et de sortie du flux externe

T-SQL Streaming utilise la fonctionnalité de source de données externe de SQL Server pour définir les sources de données associées aux entrées et sorties du flux externe de la tâche de streaming. Utilisez les commandes T-SQL suivantes pour créer un objet de sortie ou d’entrée de flux externe :

En outre, si Azure SQL Edge, SQL Server ou Azure SQL Database est utilisé comme flux de sortie, vous avez besoin de CREATE DATABASE SCOPED CREDENTIAL (Transact-SQL). Cette commande T-SQL définit les informations d’identification pour accéder à la base de données.

Sources de données de flux d’entrée et de sortie prises en charge

Actuellement, Azure SQL Edge prend uniquement en charge les sources de données suivantes en tant qu’entrées et sorties de flux.

Type de source de données Entrée Sortie Description
Hub Azure IoT Edge O O Source de données pour lire et écrire des données de streaming dans un hub Azure IoT Edge. Pour plus d’informations, consultez l’article IoT Edge Hub.
Base de données SQL N O Connexion à la source de données pour écrire les données de streaming dans SQL Database. La base de données peut être une base de données locale dans Azure SQL Edge, ou une base de données distante dans SQL Server ou Azure SQL Database.
Kafka O N Source de données pour lire les données de streaming à partir d’une rubrique Kafka.

Exemple : Créer un objet d’entrée/sortie de flux externe pour Azure IoT Edge Hub

L’exemple suivant crée un objet de flux externe pour Azure IoT Edge Hub. Pour créer une source de données d’entrée/de sortie de flux externe pour le hub Azure IoT Edge, vous devez d’abord créer un format de fichier externe pour la disposition des données en lecture/écriture.

  1. Créez un format de fichier externe de type JSON.

    CREATE EXTERNAL FILE format InputFileFormat
    WITH (FORMAT_TYPE = JSON);
    GO
    
  2. Créez une source de données externe pour le hub Azure IoT Edge. Le script T-SQL suivant crée une connexion de source de données à un hub IoT Edge qui s’exécute sur le même hôte Docker qu’Azure SQL Edge.

    CREATE EXTERNAL DATA SOURCE EdgeHubInput
    WITH (LOCATION = 'edgehub://');
    GO
    
  3. Créez l’objet de flux externe pour le hub Azure IoT Edge. Le script T-SQL suivant crée un objet de flux pour le hub IoT Edge. Dans le cas d’un objet de flux de hub IoT Edge, le paramètre LOCATION correspond au nom de la rubrique/du canal de hub IoT Edge en cours de lecture ou d’écriture.

    CREATE EXTERNAL STREAM MyTempSensors
    WITH (
         DATA_SOURCE = EdgeHubInput,
         FILE_FORMAT = InputFileFormat,
         LOCATION = N'TemperatureSensors',
         INPUT_OPTIONS = N'',
         OUTPUT_OPTIONS = N''
    );
    GO
    

Exemple : Créer un objet de flux externe vers Azure SQL Database

L’exemple suivant crée un objet de flux externe dans la base de données locale dans Azure SQL Edge.

  1. Créez une clé principale sur la base de données. C’est nécessaire pour chiffrer le secret des informations d’identification.

    CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';
    
  2. Créez des informations d’identification incluses dans l’étendue de la base de données pour accéder à la source SQL Server. L’exemple suivant crée des informations d’identification pour la source de données externe avec IDENTITY = username et SECRET = password.

    CREATE DATABASE SCOPED CREDENTIAL SQLCredential
    WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>';
    GO
    
  3. Créez une source de données externe avec CREATE EXTERNAL DATA SOURCE. L’exemple suivant :

    • Crée une source de données externe pour nommée LocalSQLOutput.
    • Identifie la source de données externe (LOCATION = '<vendor>://<server>[:<port>]'). Dans l’exemple, elle pointe vers une instance locale d’Azure SQL Edge.
    • Elle utilise les informations d’identification créées précédemment.
    CREATE EXTERNAL DATA SOURCE LocalSQLOutput
    WITH (
         LOCATION = 'sqlserver://tcp:.,1433',
         CREDENTIAL = SQLCredential
    );
    GO
    
  4. Créez l’objet de flux externe. L’exemple suivant crée un objet de flux externe pointant vers une table dbo.TemperatureMeasurements dans la base de données MySQLDatabase.

    CREATE EXTERNAL STREAM TemperatureMeasurements
    WITH
    (
        DATA_SOURCE = LocalSQLOutput,
        LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements',
        INPUT_OPTIONS = N'',
        OUTPUT_OPTIONS = N''
    );
    

Exemple : Créer un objet de flux externe pour Kafka

L’exemple suivant crée un objet de flux externe dans la base de données locale dans Azure SQL Edge. Cet exemple suppose que le serveur Kafka est configuré pour l’accès anonyme.

  1. Créez une source de données externe avec CREATE EXTERNAL DATA SOURCE. L’exemple suivant :

    CREATE EXTERNAL DATA SOURCE [KafkaInput]
    WITH (LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>');
    GO
    
  2. Créez un format de fichier externe pour l’entrée Kafka. Dans l’exemple suivant, un format de fichier JSON a été créé avec la compression GZipped.

    CREATE EXTERNAL FILE FORMAT JsonGzipped
    WITH (
         FORMAT_TYPE = JSON,
         DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
    );
    GO
    
  3. Créez l’objet de flux externe. L’exemple suivant crée un objet de flux externe pointant vers la rubrique Kafka TemperatureMeasurement.

    CREATE EXTERNAL STREAM TemperatureMeasurement
    WITH
    (
        DATA_SOURCE = KafkaInput,
        FILE_FORMAT = JsonGzipped,
        LOCATION = 'TemperatureMeasurement',
        INPUT_OPTIONS = 'PARTITIONS: 10'
    );
    GO
    

Créer la tâche et les requêtes de streaming

Utilisez la procédure stockée système sys.sp_create_streaming_job pour définir les requêtes de streaming et créer la tâche de streaming. La procédure stockée sp_create_streaming_job prend les paramètres suivants :

  • @job_name: nom du travail de diffusion en continu. Les noms des tâches de streaming sont uniques sur l’ensemble de l’instance.
  • @statement : Instructions de requête de streaming basées sur le langage de requête Stream Analytics.

L’exemple suivant crée une tâche de streaming simple avec une requête de streaming. Cette requête lit les entrées du hub IoT Edge et les écrit dans dbo.TemperatureMeasurements dans la base de données.

EXEC sys.sp_create_streaming_job @name = N'StreamingJob1',
    @statement = N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'

L’exemple suivant crée une tâche de streaming plus complexe avec plusieurs requêtes différentes. Ces requêtes en incluent une qui utilise la fonction intégrée AnomalyDetection_ChangePoint pour identifier les anomalies dans les données de température.

EXEC sys.sp_create_streaming_job @name = N'StreamingJob2',
    @statement = N'
        SELECT *
        INTO TemperatureMeasurements1
        FROM MyEdgeHubInput1

        SELECT *
        INTO TemperatureMeasurements2
        FROM MyEdgeHubInput2

        SELECT *
        INTO TemperatureMeasurements3
        FROM MyEdgeHubInput3

        SELECT timestamp AS [Time],
            [Temperature] AS [Temperature],
            GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' Score '') AS ChangePointScore,
            GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' IsAnomaly '') AS IsChangePointAnomaly
        INTO TemperatureAnomalies
        FROM MyEdgeHubInput2;
';
GO

Démarrer, arrêter, supprimer et surveiller des tâches de streaming

Pour démarrer une tâche de streaming dans Azure SQL Edge, exécutez la procédure stockée sys.sp_start_streaming_job. La procédure stockée requiert le nom de l’élément de tâche de streaming à démarrer, en tant qu'entrée.

EXEC sys.sp_start_streaming_job @name = N'StreamingJob1';
GO

Pour arrêter une tâche de streaming, exécutez la procédure stockée sys.sp_stop_streaming_job. La procédure stockée requiert le nom de l’élément de tâche de streaming à arrêter, en tant qu'entrée.

EXEC sys.sp_stop_streaming_job @name = N'StreamingJob1';
GO

Pour abandonner (ou supprimer) une tâche de streaming, exécutez la procédure stockée sys.sp_drop_streaming_job. La procédure stockée requiert le nom de l’élément de tâche de streaming à supprimer, en tant qu'entrée.

EXEC sys.sp_drop_streaming_job @name = N'StreamingJob1';
GO

Pour obtenir l’état actuel d’une tâche de streaming, exécutez la procédure stockée sys.sp_get_streaming_job. La procédure stockée requiert le nom de l’élément de tâche de streaming à supprimer, en tant qu'entrée. Elle renvoie le nom et l’état actuel de la tâche de streaming.

EXEC sys.sp_get_streaming_job @name = N'StreamingJob1'
WITH RESULT SETS (
        (
            name NVARCHAR(256),
            status NVARCHAR(256),
            error NVARCHAR(256)
        )
    );
GO

La tâche de streaming peut se présenter dans l’un des états suivants :

Statut Description
Créé le La tâche de streaming a été créée, mais n’a pas encore été démarrée.
Démarrage en cours La tâche de streaming est cours de démarrage.
Idle La tâche de streaming est en cours d’exécution, mais il n’y a aucune entrée à traiter.
Traitement La tâche de streaming est en cours d’exécution et traite les entrées. Cet état indique un état sain de la tâche de streaming.
Détérioré La tâche de streaming est en cours d’exécution, mais des erreurs non récupérables se sont produites pendant le traitement des entrées. Le travail d’entrée continue à s’exécuter, mais supprime les entrées qui rencontrent des erreurs.
Arrêté La tâche de streaming a été arrêtée.
Échoué La tâche de streaming a échoué. Cela indique généralement une erreur irrécupérable lors du traitement.

Remarque

Étant donné que le travail de diffusion en continu est exécuté de manière asynchrone, il peut rencontrer des erreurs au moment de son exécution. Pour résoudre les problèmes d’échec d’une tâche de diffusion en continu, utilisez la sys.sp_get_streaming_job procédure stockée ou passez en revue le journal Docker à partir du conteneur Azure SQL Edge, qui peut fournir les détails d’erreur du travail de streaming.

Étapes suivantes