Creación de un trabajo de streaming de datos en Azure SQL Edge

Importante

Azure SQL Edge ya no admite la plataforma ARM64.

En este artículo se explica cómo crear un trabajo de streaming de T-SQL en Azure SQL Edge. Cree los objetos de entrada y salida de transmisión externa y, a continuación, defina la consulta de trabajo de streaming como parte de la creación del trabajo de streaming.

Configuración de los objetos de entrada y salida de transmisión externa

El streaming de T-SQL utiliza la funcionalidad de origen de datos externo de SQL Server para definir los orígenes de datos asociados a las entradas y salidas de transmisión externa del trabajo de streaming. Utilice los siguientes comandos de T-SQL para crear un objeto de entrada o salida de transmisión externa:

Además, si utiliza Azure SQL Edge, SQL Server o Azure SQL Database como transmisión de salida, necesita CREATE DATABASE SCOPED CREDENTIAL (Transact-SQL). Este comando de T-SQL define las credenciales para acceder a la base de datos.

Orígenes de datos de transmisión de entrada y salida admitidos

Azure SQL Edge actualmente solo admite los siguientes orígenes de datos como entradas y salidas de transmisión.

Tipo de origen de datos Entrada Salida Descripción
Centro de Azure IoT Edge Y Y Origen de datos para leer y escribir datos de streaming en un centro de Azure IoT Edge. Para más información, consulte Centro de IoT Edge.
SQL Database N S Conexión de origen de datos para escribir datos de streaming en SQL Database. La base de datos puede ser una base de datos local de Azure SQL Edge, o una base de datos remota de SQL Server o Azure SQL Database.
Kafka S N Origen de datos para leer datos de streaming de un tema de Kafka.

Ejemplo: Creación de un objeto de entrada y salida de flujo externo para el centro de Azure IoT Edge

En el ejemplo siguiente se crea un objeto de transmisión externa para el centro de Azure IoT Edge. Para crear un origen de datos de entrada/salida de transmisión externa para el centro de Azure IoT Edge, primero debe crear un formato de archivo externo para el diseño de los datos que se leen o escriben.

  1. Creación de un formato de archivo externo del tipo JSON.

    CREATE EXTERNAL FILE format InputFileFormat
    WITH (FORMAT_TYPE = JSON);
    GO
    
  2. Cree un origen de datos externo para el centro de Azure IoT Edge. El siguiente script de T-SQL crea una conexión de origen de datos a un centro de IoT Edge que se ejecuta en el mismo host de Docker que Azure SQL Edge.

    CREATE EXTERNAL DATA SOURCE EdgeHubInput
    WITH (LOCATION = 'edgehub://');
    GO
    
  3. Cree el objeto de transmisión externa para el centro de Azure IoT Edge. El siguiente script de T-SQL crea un objeto de transmisión para el centro de IoT Edge. En el caso de un objeto de transmisión del centro de IoT Edge, el parámetro LOCATION es el nombre del tema o canal del centro de IoT Edge en el que se lee o se escribe.

    CREATE EXTERNAL STREAM MyTempSensors
    WITH (
         DATA_SOURCE = EdgeHubInput,
         FILE_FORMAT = InputFileFormat,
         LOCATION = N'TemperatureSensors',
         INPUT_OPTIONS = N'',
         OUTPUT_OPTIONS = N''
    );
    GO
    

Ejemplo: Creación de un objeto de flujo externo en Azure SQL Database

En el ejemplo siguiente se crea un objeto de transmisión externa en la base de datos local de Azure SQL Edge.

  1. Cree una clave maestra en la base de datos. Esto es necesario para cifrar el secreto de credencial.

    CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';
    
  2. Cree una credencial de ámbito de base de datos para acceder al origen de SQL Server. En el ejemplo siguiente se crea una credencial para el origen de datos externo con IDENTITY = username y SECRET = password.

    CREATE DATABASE SCOPED CREDENTIAL SQLCredential
    WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>';
    GO
    
  3. Cree un origen de datos externo con CREATE EXTERNAL DATA SOURCE. En el ejemplo siguiente:

    • Se crea un origen de datos externo denominado LocalSQLOutput.
    • Se identifica el origen de datos externo (LOCATION = '<vendor>://<server>[:<port>]'). En el ejemplo, se apunta a una instancia local de Azure SQL Edge.
    • Se usa la credencial creada antes.
    CREATE EXTERNAL DATA SOURCE LocalSQLOutput
    WITH (
         LOCATION = 'sqlserver://tcp:.,1433',
         CREDENTIAL = SQLCredential
    );
    GO
    
  4. Cree el objeto de transmisión externa. En el ejemplo siguiente se crea un objeto de transmisión externo que apunta a una tabla dbo.TemperatureMeasurements en la base de datos MySQLDatabase.

    CREATE EXTERNAL STREAM TemperatureMeasurements
    WITH
    (
        DATA_SOURCE = LocalSQLOutput,
        LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements',
        INPUT_OPTIONS = N'',
        OUTPUT_OPTIONS = N''
    );
    

Ejemplo: Creación de un objeto de flujo externo para Kafka

En el ejemplo siguiente se crea un objeto de transmisión externa en la base de datos local de Azure SQL Edge. En este ejemplo se da por supuesto que el servidor de Kafka está configurado para el acceso anónimo.

  1. Cree un origen de datos externo con CREATE EXTERNAL DATA SOURCE. En el ejemplo siguiente:

    CREATE EXTERNAL DATA SOURCE [KafkaInput]
    WITH (LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>');
    GO
    
  2. Cree un formato de archivo externo para la entrada de Kafka. En el ejemplo siguiente se crea un formato de archivo JSON con compresión GZIP.

    CREATE EXTERNAL FILE FORMAT JsonGzipped
    WITH (
         FORMAT_TYPE = JSON,
         DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
    );
    GO
    
  3. Cree el objeto de transmisión externa. En el ejemplo siguiente se crea un objeto de transmisión externa que apunta al tema de Kafka TemperatureMeasurement.

    CREATE EXTERNAL STREAM TemperatureMeasurement
    WITH
    (
        DATA_SOURCE = KafkaInput,
        FILE_FORMAT = JsonGzipped,
        LOCATION = 'TemperatureMeasurement',
        INPUT_OPTIONS = 'PARTITIONS: 10'
    );
    GO
    

Creación del trabajo de streaming y las consultas de streaming

Use el procedimiento almacenado del sistema sys.sp_create_streaming_job para definir las consultas de streaming y crear el trabajo de streaming. El procedimiento almacenado sp_create_streaming_job utiliza los siguientes parámetros:

  • @job_name: nombre del trabajo de streaming. Los nombres de los trabajos de streaming son únicos en la instancia.
  • @statement: instrucciones de consultas de streaming basadas en el lenguaje de consulta de Stream Analytics.

En el ejemplo siguiente se crea un trabajo de streaming simple con una consulta de streaming. Esta consulta lee las entradas del centro de IoT Edge y escribe en dbo.TemperatureMeasurements en la base de datos.

EXEC sys.sp_create_streaming_job @name = N'StreamingJob1',
    @statement = N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'

En el ejemplo siguiente se crea un trabajo de streaming más complejo con varias consultas diferentes. Estas consultas incluyen una que utiliza la función integrada AnomalyDetection_ChangePoint para identificar anomalías en los datos de temperatura.

EXEC sys.sp_create_streaming_job @name = N'StreamingJob2',
    @statement = N'
        SELECT *
        INTO TemperatureMeasurements1
        FROM MyEdgeHubInput1

        SELECT *
        INTO TemperatureMeasurements2
        FROM MyEdgeHubInput2

        SELECT *
        INTO TemperatureMeasurements3
        FROM MyEdgeHubInput3

        SELECT timestamp AS [Time],
            [Temperature] AS [Temperature],
            GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' Score '') AS ChangePointScore,
            GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' IsAnomaly '') AS IsChangePointAnomaly
        INTO TemperatureAnomalies
        FROM MyEdgeHubInput2;
';
GO

Inicio, detención, abandono y supervisión de trabajos de streaming

Para iniciar un trabajo de streaming en Azure SQL Edge, ejecute el procedimiento almacenado sys.sp_start_streaming_job. El procedimiento almacenado requiere que se inicie el nombre del trabajo de streaming como entrada.

EXEC sys.sp_start_streaming_job @name = N'StreamingJob1';
GO

Para detener un trabajo de streaming, ejecute el procedimiento almacenado sys.sp_stop_streaming_job. El procedimiento almacenado requiere que se detenga el nombre del trabajo de streaming como entrada.

EXEC sys.sp_stop_streaming_job @name = N'StreamingJob1';
GO

Para abandonar (o eliminar) un trabajo de streaming, ejecute el procedimiento almacenado sys.sp_drop_streaming_job. El procedimiento almacenado requiere que se abandone el nombre del trabajo de streaming como entrada.

EXEC sys.sp_drop_streaming_job @name = N'StreamingJob1';
GO

Para obtener el estado actual de un trabajo de streaming, ejecute el procedimiento almacenado sys.sp_get_streaming_job. El procedimiento almacenado requiere que se abandone el nombre del trabajo de streaming como entrada. Proporciona el nombre y el estado actual del trabajo de streaming.

EXEC sys.sp_get_streaming_job @name = N'StreamingJob1'
WITH RESULT SETS (
        (
            name NVARCHAR(256),
            status NVARCHAR(256),
            error NVARCHAR(256)
        )
    );
GO

El trabajo de streaming puede tener cualquiera de los siguientes estados:

Estado Descripción
Creado El trabajo de streaming se ha creado, pero aún no se ha iniciado.
Iniciando El trabajo de streaming se encuentra en la fase de inicio.
Inactivo El trabajo de streaming se está ejecutando, pero no hay ninguna entrada para procesar.
Procesamiento El trabajo de streaming se está ejecutando y está procesando entradas. Este estado indica un estado de mantenimiento del trabajo de streaming.
Degradado El trabajo de streaming se está ejecutando, pero se han producido algunos errores no graves durante el procesamiento de la entrada. El trabajo de entrada continúa ejecutándose, pero quitará las entradas que encuentran errores.
Detenido El trabajo de streaming se ha detenido.
Con errores Error del trabajo de streaming. Suele indicar un error grave durante el procesamiento.

Nota:

Dado que el trabajo de streaming se ejecuta de forma asincrónica, es posible que el trabajo encuentre errores en tiempo de ejecución. Para solucionar problemas de un error de trabajo de streaming, use el sys.sp_get_streaming_job procedimiento almacenado o revise el registro de Docker desde el contenedor de Azure SQL Edge, que puede proporcionar los detalles de error del trabajo de streaming.

Pasos siguientes