Udostępnij za pośrednictwem


Tworzenie zadania przesyłania strumieniowego danych w usłudze Azure SQL Edge

Ważne

Usługa Azure SQL Edge nie obsługuje już platformy ARM64.

W tym artykule wyjaśniono, jak utworzyć zadanie przesyłania strumieniowego T-SQL w usłudze Azure SQL Edge. Utworzysz zewnętrzne obiekty wejściowe i wyjściowe strumienia, a następnie zdefiniujesz zapytanie zadania przesyłania strumieniowego w ramach tworzenia zadania przesyłania strumieniowego.

Konfigurowanie obiektów wejściowych i wyjściowych strumienia zewnętrznego

Przesyłanie strumieniowe T-SQL używa funkcji zewnętrznego źródła danych programu SQL Server do definiowania źródeł danych skojarzonych z danymi wejściowymi i wyjściowymi zewnętrznego strumienia zadania przesyłania strumieniowego. Użyj następujących poleceń języka T-SQL, aby utworzyć obiekt wejściowy lub wyjściowy strumienia zewnętrznego:

Ponadto jeśli usługa Azure SQL Edge, SQL Server lub Azure SQL Database jest używana jako strumień danych wyjściowych, potrzebne są poświadczenia CREATE DATABASE SCOPED CREDENTIAL (Transact-SQL). To polecenie języka T-SQL definiuje poświadczenia dostępu do bazy danych.

Obsługiwane źródła danych wejściowych i wyjściowych strumienia danych

Usługa Azure SQL Edge obecnie obsługuje tylko następujące źródła danych jako dane wejściowe i wyjściowe strumienia.

Typ źródła danych Dane wejściowe Dane wyjściowe opis
Centrum usługi Azure IoT Edge Y Y Źródło danych do odczytywania i zapisywania danych przesyłanych strumieniowo do centrum usługi Azure IoT Edge. Aby uzyskać więcej informacji, zobacz Centrum usługi IoT Edge.
SQL Database N Y Połączenie ze źródłem danych w celu zapisu danych przesyłanych strumieniowo do usługi SQL Database. Baza danych może być lokalną bazą danych w usłudze Azure SQL Edge lub zdalną bazą danych w programie SQL Server lub usłudze Azure SQL Database.
Kafka Y N Źródło danych do odczytywania danych przesyłanych strumieniowo z tematu platformy Kafka.

Przykład: Tworzenie zewnętrznego obiektu wejściowego/wyjściowego strumienia dla centrum usługi Azure IoT Edge

Poniższy przykład tworzy obiekt strumienia zewnętrznego dla centrum usługi Azure IoT Edge. Aby utworzyć zewnętrzne źródło danych wejściowych/wyjściowych strumienia dla centrum Azure IoT Edge, należy najpierw utworzyć zewnętrzny format pliku dla układu odczytywanych lub zapisywanych danych.

  1. Utwórz format pliku zewnętrznego typu JSON.

    CREATE EXTERNAL FILE format InputFileFormat
    WITH (FORMAT_TYPE = JSON);
    GO
    
  2. Utwórz zewnętrzne źródło danych dla centrum usługi Azure IoT Edge. Poniższy skrypt języka T-SQL tworzy połączenie źródła danych z centrum usługi IoT Edge, które działa na tym samym hoście platformy Docker co usługa Azure SQL Edge.

    CREATE EXTERNAL DATA SOURCE EdgeHubInput
    WITH (LOCATION = 'edgehub://');
    GO
    
  3. Utwórz obiekt zewnętrznego strumienia dla centrum usługi Azure IoT Edge. Poniższy skrypt języka T-SQL tworzy obiekt strumienia dla centrum usługi IoT Edge. W przypadku obiektu strumienia centrum usługi IoT Edge parametr LOCATION jest nazwą tematu lub kanału usługi IoT Edge, do których jest odczytywany lub zapisywany kanał.

    CREATE EXTERNAL STREAM MyTempSensors
    WITH (
         DATA_SOURCE = EdgeHubInput,
         FILE_FORMAT = InputFileFormat,
         LOCATION = N'TemperatureSensors',
         INPUT_OPTIONS = N'',
         OUTPUT_OPTIONS = N''
    );
    GO
    

Przykład: tworzenie obiektu zewnętrznego strumienia w usłudze Azure SQL Database

Poniższy przykład tworzy obiekt strumienia zewnętrznego do lokalnej bazy danych w usłudze Azure SQL Edge.

  1. Utwórz klucz główny w bazie danych. Jest to wymagane do zaszyfrowania wpisu tajnego poświadczeń.

    CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';
    
  2. Utwórz poświadczenia o zakresie bazy danych na potrzeby uzyskiwania dostępu do źródła programu SQL Server. Poniższy przykład tworzy poświadczenie do zewnętrznego źródła danych z wartościami IDENTITY = username i SECRET = password.

    CREATE DATABASE SCOPED CREDENTIAL SQLCredential
    WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>';
    GO
    
  3. Utwórz zewnętrzne źródło danych za pomocą polecenia CREATE EXTERNAL DATA SOURCE. Poniższy przykład:

    • Tworzy zewnętrzne źródło danych o nazwie LocalSQLOutput.
    • Identyfikuje zewnętrzne źródło danych (LOCATION = '<vendor>://<server>[:<port>]'). W tym przykładzie wskazuje lokalne wystąpienie usługi Azure SQL Edge.
    • Używa wcześniej utworzonego poświadczenia.
    CREATE EXTERNAL DATA SOURCE LocalSQLOutput
    WITH (
         LOCATION = 'sqlserver://tcp:.,1433',
         CREDENTIAL = SQLCredential
    );
    GO
    
  4. Utwórz obiekt strumienia zewnętrznego. Poniższy przykład tworzy obiekt strumienia zewnętrznego wskazujący bazę danych tabeli . TemperatureMeasurements w bazie danych MySQLDatabase.

    CREATE EXTERNAL STREAM TemperatureMeasurements
    WITH
    (
        DATA_SOURCE = LocalSQLOutput,
        LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements',
        INPUT_OPTIONS = N'',
        OUTPUT_OPTIONS = N''
    );
    

Przykład: tworzenie obiektu zewnętrznego strumienia dla platformy Kafka

Poniższy przykład tworzy obiekt strumienia zewnętrznego do lokalnej bazy danych w usłudze Azure SQL Edge. W tym przykładzie przyjęto założenie, że serwer kafka jest skonfigurowany pod kątem dostępu anonimowego.

  1. Utwórz zewnętrzne źródło danych za pomocą polecenia CREATE EXTERNAL DATA SOURCE. Poniższy przykład:

    CREATE EXTERNAL DATA SOURCE [KafkaInput]
    WITH (LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>');
    GO
    
  2. Utwórz format pliku zewnętrznego dla danych wejściowych platformy Kafka. W poniższym przykładzie utworzono format pliku JSON z kompresją GZipped.

    CREATE EXTERNAL FILE FORMAT JsonGzipped
    WITH (
         FORMAT_TYPE = JSON,
         DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
    );
    GO
    
  3. Utwórz obiekt strumienia zewnętrznego. W poniższym przykładzie tworzony jest obiekt strumienia zewnętrznego wskazujący na temat platformy TemperatureMeasurementKafka.

    CREATE EXTERNAL STREAM TemperatureMeasurement
    WITH
    (
        DATA_SOURCE = KafkaInput,
        FILE_FORMAT = JsonGzipped,
        LOCATION = 'TemperatureMeasurement',
        INPUT_OPTIONS = 'PARTITIONS: 10'
    );
    GO
    

Tworzenie zadania przesyłania strumieniowego i zapytań przesyłania strumieniowego

sys.sp_create_streaming_job Użyj procedury składowanej systemu, aby zdefiniować zapytania przesyłania strumieniowego i utworzyć zadanie przesyłania strumieniowego. Procedura sp_create_streaming_job składowana przyjmuje następujące parametry:

Poniższy przykład tworzy proste zadanie przesyłania strumieniowego z jednym zapytaniem przesyłanym strumieniowo. To zapytanie odczytuje dane wejściowe z centrum usługi IoT Edge i zapisuje je dbo.TemperatureMeasurements w bazie danych.

EXEC sys.sp_create_streaming_job @name = N'StreamingJob1',
    @statement = N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'

Poniższy przykład tworzy bardziej złożone zadanie przesyłania strumieniowego z wieloma różnymi zapytaniami. Te zapytania obejmują funkcję, która używa wbudowanej AnomalyDetection_ChangePoint funkcji do identyfikowania anomalii w danych dotyczących temperatury.

EXEC sys.sp_create_streaming_job @name = N'StreamingJob2',
    @statement = N'
        SELECT *
        INTO TemperatureMeasurements1
        FROM MyEdgeHubInput1

        SELECT *
        INTO TemperatureMeasurements2
        FROM MyEdgeHubInput2

        SELECT *
        INTO TemperatureMeasurements3
        FROM MyEdgeHubInput3

        SELECT timestamp AS [Time],
            [Temperature] AS [Temperature],
            GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' Score '') AS ChangePointScore,
            GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' IsAnomaly '') AS IsChangePointAnomaly
        INTO TemperatureAnomalies
        FROM MyEdgeHubInput2;
';
GO

Uruchamianie, zatrzymywanie, usuwanie i monitorowanie zadań przesyłania strumieniowego

Aby uruchomić zadanie przesyłania strumieniowego w usłudze Azure SQL Edge, uruchom procedurę sys.sp_start_streaming_job składowaną. Procedura składowana wymaga uruchomienia zadania przesyłania strumieniowego jako danych wejściowych.

EXEC sys.sp_start_streaming_job @name = N'StreamingJob1';
GO

Aby zatrzymać zadanie przesyłania strumieniowego, uruchom procedurę sys.sp_stop_streaming_job składowaną. Procedura składowana wymaga, aby nazwa zadania przesyłania strumieniowego została zatrzymana jako dane wejściowe.

EXEC sys.sp_stop_streaming_job @name = N'StreamingJob1';
GO

Aby usunąć (lub usunąć) zadanie przesyłania strumieniowego, uruchom procedurę składowaną sys.sp_drop_streaming_job . Procedura składowana wymaga, aby nazwa zadania przesyłania strumieniowego została porzucana jako dane wejściowe.

EXEC sys.sp_drop_streaming_job @name = N'StreamingJob1';
GO

Aby uzyskać bieżący stan zadania przesyłania strumieniowego, uruchom procedurę sys.sp_get_streaming_job składowaną. Procedura składowana wymaga, aby nazwa zadania przesyłania strumieniowego została porzucana jako dane wejściowe. Zwraca on nazwę i bieżący stan zadania przesyłania strumieniowego.

EXEC sys.sp_get_streaming_job @name = N'StreamingJob1'
WITH RESULT SETS (
        (
            name NVARCHAR(256),
            status NVARCHAR(256),
            error NVARCHAR(256)
        )
    );
GO

Zadanie przesyłania strumieniowego może mieć jeden z następujących stanów:

Status Opis
Tworzone Zadanie przesyłania strumieniowego zostało utworzone, ale nie zostało jeszcze uruchomione.
Uruchamianie Zadanie przesyłania strumieniowego znajduje się w fazie początkowej.
Okresy Zadanie przesyłania strumieniowego jest uruchomione, ale nie ma danych wejściowych do przetworzenia.
Trwa przetwarzanie Zadanie przesyłania strumieniowego jest uruchomione i przetwarza dane wejściowe. Ten stan wskazuje stan dobrej kondycji dla zadania przesyłania strumieniowego.
Obniżona wydajność Zadanie przesyłania strumieniowego jest uruchomione, ale wystąpiły pewne błędy niekrytyczne podczas przetwarzania danych wejściowych. Zadanie wejściowe jest nadal uruchamiane, ale spowoduje usunięcie danych wejściowych, które napotykają błędy.
Zatrzymana Zadanie przesyłania strumieniowego zostało zatrzymane.
Zakończone niepowodzeniem Zadanie przesyłania strumieniowego nie powiodło się. Zazwyczaj jest to wskazanie błędu krytycznego podczas przetwarzania.

Uwaga

Ponieważ zadanie przesyłania strumieniowego jest wykonywane asynchronicznie, zadanie może napotkać błędy w czasie wykonywania. Aby rozwiązać problem z niepowodzeniem zadania przesyłania strumieniowego, użyj sys.sp_get_streaming_job procedury składowanej lub przejrzyj dziennik platformy Docker z kontenera usługi Azure SQL Edge, który może podać szczegóły błędu z zadania przesyłania strumieniowego.

Następne kroki