Condividi tramite


Creare un processo di streaming di dati in SQL Edge di Azure

Importante

SQL Edge di Azure non supporta più la piattaforma ARM64.

Questo articolo illustra come creare un processo di streaming T-SQL in SQL Edge di Azure. Si creano gli oggetti di input e output del flusso esterno e quindi si definisce la query del processo di streaming come parte della creazione del processo di streaming.

Configurare gli oggetti di input e output del flusso esterno

Il flusso T-SQL usa la funzionalità dell'origine dati esterna di SQL Server per definire le origini dati associate agli input e agli output del flusso esterno del processo di streaming. Usare i comandi T-SQL seguenti per creare un oggetto di input o output del flusso esterno:

Inoltre, se SQL Edge di Azure, SQL Server o database SQL di Azure viene usato come flusso di output, è necessario CREATE DATABA edizione Standard SCOPED CREDENTIAL (Transact-SQL). Questo comando T-SQL definisce le credenziali per accedere al database.

Origini dati del flusso di input e output supportate

SQL Edge di Azure attualmente supporta solo le origini dati seguenti come output e input di flusso.

Tipo di origine dati Input Output Descrizione
Hub di Azure IoT Edge S S Origine dati per leggere e scrivere dati di streaming in un hub di Azure IoT Edge. Per altre informazioni, vedere Hub IoT Edge.
Database SQL N Y Connessione all'origine dati per scrivere i dati di streaming nel database SQL. Il database può essere un database locale in SQL Edge di Azure o un database remoto in SQL Server o database SQL di Azure.
Kafka Y N Origine dati per la lettura dei dati in streaming da un argomento Kafka.

Esempio: Creare un oggetto di input/output del flusso esterno per l'hub IoT Edge di Azure

L'esempio seguente crea un oggetto flusso esterno per l'hub IoT Edge di Azure. Per creare un'origine dati di input/output del flusso esterno per l'hub di Azure IoT Edge, è prima necessario creare un formato di file esterno per il layout dei dati letti o scritti.

  1. Creare un formato di file esterno del tipo JSON.

    CREATE EXTERNAL FILE format InputFileFormat
    WITH (FORMAT_TYPE = JSON);
    GO
    
  2. Creare un'origine dati esterna per l'hub IoT Edge di Azure. Lo script T-SQL seguente crea una connessione all'origine dati a un hub IoT Edge eseguito nello stesso host Docker di SQL Edge di Azure.

    CREATE EXTERNAL DATA SOURCE EdgeHubInput
    WITH (LOCATION = 'edgehub://');
    GO
    
  3. Creare l'oggetto flusso esterno per l'hub IoT Edge di Azure. Lo script T-SQL seguente crea un oggetto flusso per l'hub IoT Edge. Nel caso di un oggetto flusso dell'hub IoT Edge, il parametro LOCATION è il nome dell'argomento o del canale dell'hub IoT Edge in cui viene letto o scritto.

    CREATE EXTERNAL STREAM MyTempSensors
    WITH (
         DATA_SOURCE = EdgeHubInput,
         FILE_FORMAT = InputFileFormat,
         LOCATION = N'TemperatureSensors',
         INPUT_OPTIONS = N'',
         OUTPUT_OPTIONS = N''
    );
    GO
    

Esempio: Creare un oggetto flusso esterno per database SQL di Azure

L'esempio seguente crea un oggetto flusso esterno al database locale in SQL Edge di Azure.

  1. Creare una chiave master nel database. Questo passaggio è necessario per crittografare il segreto delle credenziali.

    CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';
    
  2. Creare credenziali con ambito database per l'accesso all'origine di SQL Server. Nell'esempio seguente viene creata una credenziale per l'origine dati esterna, con IDENTITY = username e edizione Standard CRET = password.

    CREATE DATABASE SCOPED CREDENTIAL SQLCredential
    WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>';
    GO
    
  3. Creare un'origine dati esterna con CREATE EXTERNAL DATA SOURCE. L'esempio seguente:

    • Crea un'origine dati esterna denominata LocalSQLOutput.
    • Identifica l'origine dati esterna (LOCATION = '<vendor>://<server>[:<port>]'). Nell'esempio punta a un'istanza locale di SQL Edge di Azure.
    • Usa le credenziali create in precedenza.
    CREATE EXTERNAL DATA SOURCE LocalSQLOutput
    WITH (
         LOCATION = 'sqlserver://tcp:.,1433',
         CREDENTIAL = SQLCredential
    );
    GO
    
  4. Creare l'oggetto flusso esterno. Nell'esempio seguente viene creato un oggetto flusso esterno che punta a un dbo di tabella . TemperatureMeasurements, nel database MySQLDatabase.

    CREATE EXTERNAL STREAM TemperatureMeasurements
    WITH
    (
        DATA_SOURCE = LocalSQLOutput,
        LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements',
        INPUT_OPTIONS = N'',
        OUTPUT_OPTIONS = N''
    );
    

Esempio: Creare un oggetto flusso esterno per Kafka

L'esempio seguente crea un oggetto flusso esterno al database locale in SQL Edge di Azure. In questo esempio si presuppone che il server kafka sia configurato per l'accesso anonimo.

  1. Creare un'origine dati esterna con CREATE EXTERNAL DATA SOURCE. L'esempio seguente:

    CREATE EXTERNAL DATA SOURCE [KafkaInput]
    WITH (LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>');
    GO
    
  2. Creare un formato di file esterno per l'input Kafka. L'esempio seguente ha creato un formato di file JSON con compressione GZipped.

    CREATE EXTERNAL FILE FORMAT JsonGzipped
    WITH (
         FORMAT_TYPE = JSON,
         DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
    );
    GO
    
  3. Creare l'oggetto flusso esterno. Nell'esempio seguente viene creato un oggetto flusso esterno che punta all'argomento TemperatureMeasurementKafka .

    CREATE EXTERNAL STREAM TemperatureMeasurement
    WITH
    (
        DATA_SOURCE = KafkaInput,
        FILE_FORMAT = JsonGzipped,
        LOCATION = 'TemperatureMeasurement',
        INPUT_OPTIONS = 'PARTITIONS: 10'
    );
    GO
    

Creare il processo di streaming e le query di streaming

Usare la sys.sp_create_streaming_job stored procedure di sistema per definire le query di streaming e creare il processo di streaming. La sp_create_streaming_job stored procedure accetta i parametri seguenti:

L'esempio seguente crea un processo di streaming semplice con una query di streaming. Questa query legge gli input dall'hub IoT Edge e scrive dbo.TemperatureMeasurements nel database.

EXEC sys.sp_create_streaming_job @name = N'StreamingJob1',
    @statement = N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'

L'esempio seguente crea un processo di streaming più complesso con più query diverse. Queste query includono una che usa la funzione predefinita AnomalyDetection_ChangePoint per identificare le anomalie nei dati relativi alla temperatura.

EXEC sys.sp_create_streaming_job @name = N'StreamingJob2',
    @statement = N'
        SELECT *
        INTO TemperatureMeasurements1
        FROM MyEdgeHubInput1

        SELECT *
        INTO TemperatureMeasurements2
        FROM MyEdgeHubInput2

        SELECT *
        INTO TemperatureMeasurements3
        FROM MyEdgeHubInput3

        SELECT timestamp AS [Time],
            [Temperature] AS [Temperature],
            GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' Score '') AS ChangePointScore,
            GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' IsAnomaly '') AS IsChangePointAnomaly
        INTO TemperatureAnomalies
        FROM MyEdgeHubInput2;
';
GO

Avviare, arrestare, eliminare e monitorare i processi di streaming

Per avviare un processo di streaming in SQL Edge di Azure, eseguire la sys.sp_start_streaming_job stored procedure. La stored procedure richiede l'avvio del nome del processo di streaming, come input.

EXEC sys.sp_start_streaming_job @name = N'StreamingJob1';
GO

Per arrestare un processo di streaming, eseguire la sys.sp_stop_streaming_job stored procedure. La stored procedure richiede che il nome del processo di streaming si arresti, come input.

EXEC sys.sp_stop_streaming_job @name = N'StreamingJob1';
GO

Per eliminare o eliminare un processo di streaming, eseguire la sys.sp_drop_streaming_job stored procedure. La stored procedure richiede il nome del processo di streaming da eliminare, come input.

EXEC sys.sp_drop_streaming_job @name = N'StreamingJob1';
GO

Per ottenere lo stato corrente di un processo di streaming, eseguire la sys.sp_get_streaming_job stored procedure. La stored procedure richiede il nome del processo di streaming da eliminare, come input. Restituisce il nome e lo stato corrente del processo di streaming.

EXEC sys.sp_get_streaming_job @name = N'StreamingJob1'
WITH RESULT SETS (
        (
            name NVARCHAR(256),
            status NVARCHAR(256),
            error NVARCHAR(256)
        )
    );
GO

Il processo di streaming può avere uno dei seguenti stati:

Status Descrizione
Data di creazione Il processo di streaming è stato creato, ma non è ancora stato avviato.
Starting Il processo di streaming si trova nella fase di avvio.
Idle Il processo di streaming è in esecuzione, ma non è disponibile alcun input da elaborare.
Elaborazione Il processo di streaming è in esecuzione ed è in corso l'elaborazione degli input. Questo stato indica uno stato integro per il processo di streaming.
Degraded Il processo di streaming è in esecuzione, ma si sono verificati alcuni errori non irreversibili durante l'elaborazione dell'input. Il processo di input continua a essere eseguito, ma elimina gli input che riscontrano errori.
Arrestato Il processo di streaming è stato arrestato.
Convalida non superata Processo di streaming non riuscito. Si tratta in genere di un'indicazione di un errore irreversibile durante l'elaborazione.

Nota

Poiché il processo di streaming viene eseguito in modo asincrono, il processo potrebbe riscontrare errori in fase di esecuzione. Per risolvere un errore del processo di streaming, usare la sys.sp_get_streaming_job stored procedure o esaminare il log Docker dal contenitore SQL Edge di Azure, che può fornire i dettagli dell'errore del processo di streaming.

Passaggi successivi