Azure SQL Edge'de veri akışı işi oluşturma

Önemli

Azure SQL Edge artık ARM64 platformunu desteklememektedir.

Bu makalede, Azure SQL Edge'de T-SQL akış işinin nasıl oluşturulacağı açıklanır. Dış akış giriş ve çıkış nesnelerini oluşturur ve akış işi sorgusunu akış işi oluşturma işleminin bir parçası olarak tanımlarsınız.

Dış akış giriş ve çıkış nesnelerini yapılandırma

T-SQL akışı, akış işinin dış akış girişleri ve çıkışlarıyla ilişkili veri kaynaklarını tanımlamak için SQL Server'ın dış veri kaynağı işlevselliğini kullanır. Dış akış girişi veya çıkış nesnesi oluşturmak için aşağıdaki T-SQL komutlarını kullanın:

Ayrıca, Azure SQL Edge, SQL Server veya Azure SQL Veritabanı çıkış akışı olarak kullanılıyorsa CREATE DATABASE SCOPED CREDENTIAL (Transact-SQL) gerekir. Bu T-SQL komutu veritabanına erişmek için kimlik bilgilerini tanımlar.

Desteklenen giriş ve çıkış akışı veri kaynakları

Azure SQL Edge şu anda yalnızca aşağıdaki veri kaynaklarını akış girişleri ve çıkışları olarak desteklemektedir.

Veri kaynağı türü Girdi Çıktı Tanım
Azure IoT Edge hub'ı Y Y Azure IoT Edge hub'ına akış verilerini okumak ve yazmak için veri kaynağı. Daha fazla bilgi için bkz . IoT Edge Hub.
SQL Veritabanı N Y akış verilerini SQL Veritabanı yazmak için veri kaynağı bağlantısı. Veritabanı, Azure SQL Edge'deki yerel bir veritabanı veya SQL Server veya Azure SQL Veritabanı uzak veritabanı olabilir.
Kafka Y N Kafka konusundan akış verilerini okumak için veri kaynağı.

Örnek: Azure IoT Edge hub'ı için dış akış giriş/çıkış nesnesi oluşturma

Aşağıdaki örnek, Azure IoT Edge hub'ı için bir dış akış nesnesi oluşturur. Azure IoT Edge hub'ına yönelik bir dış akış giriş/çıkış veri kaynağı oluşturmak için, önce okunan veya yazılan verilerin düzeni için bir dış dosya biçimi oluşturmanız gerekir.

  1. JSON türünde bir dış dosya biçimi oluşturun.

    CREATE EXTERNAL FILE format InputFileFormat
    WITH (FORMAT_TYPE = JSON);
    GO
    
  2. Azure IoT Edge hub'ı için bir dış veri kaynağı oluşturun. Aşağıdaki T-SQL betiği, Azure SQL Edge ile aynı Docker ana bilgisayarında çalışan bir IoT Edge hub'ına veri kaynağı bağlantısı oluşturur.

    CREATE EXTERNAL DATA SOURCE EdgeHubInput
    WITH (LOCATION = 'edgehub://');
    GO
    
  3. Azure IoT Edge hub'ı için dış akış nesnesi oluşturun. Aşağıdaki T-SQL betiği IoT Edge hub'ı için bir akış nesnesi oluşturur. IoT Edge hub'ı akış nesnesi söz konusu olduğunda LOCATION parametresi, okunan veya yazılan IoT Edge hub'ı konusunun veya kanalının adıdır.

    CREATE EXTERNAL STREAM MyTempSensors
    WITH (
         DATA_SOURCE = EdgeHubInput,
         FILE_FORMAT = InputFileFormat,
         LOCATION = N'TemperatureSensors',
         INPUT_OPTIONS = N'',
         OUTPUT_OPTIONS = N''
    );
    GO
    

Örnek: Azure SQL Veritabanı için dış akış nesnesi oluşturma

Aşağıdaki örnek, Azure SQL Edge'deki yerel veritabanına bir dış akış nesnesi oluşturur.

  1. Veritabanında bir ana anahtar oluşturun. Kimlik bilgisi gizli dizisini şifrelemek için bu gereklidir.

    CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';
    
  2. SQL Server kaynağına erişmek için veritabanı kapsamlı bir kimlik bilgisi oluşturun. Aşağıdaki örnek, dış veri kaynağı için IDENTITY = kullanıcı adı ve GİzLİ = parola ile bir kimlik bilgisi oluşturur.

    CREATE DATABASE SCOPED CREDENTIAL SQLCredential
    WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>';
    GO
    
  3. CREATE EXTERNAL DATA SOURCE ile bir dış veri kaynağı oluşturun. Aşağıdaki örnek:

    • LocalSQLOutput adlı bir dış veri kaynağı oluşturur.
    • Dış veri kaynağını (LOCATION = '<vendor>://<server>[:<port>]' ) tanımlar. Örnekte, Azure SQL Edge'in yerel bir örneğine işaret eder.
    • Daha önce oluşturulan kimlik bilgilerini kullanır.
    CREATE EXTERNAL DATA SOURCE LocalSQLOutput
    WITH (
         LOCATION = 'sqlserver://tcp:.,1433',
         CREDENTIAL = SQLCredential
    );
    GO
    
  4. Dış akış nesnesini oluşturun. Aşağıdaki örnek, dbo tablosunu işaret eden bir dış akış nesnesi oluşturur. MySQLDatabase veritabanındaki TemperatureMeasurements.

    CREATE EXTERNAL STREAM TemperatureMeasurements
    WITH
    (
        DATA_SOURCE = LocalSQLOutput,
        LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements',
        INPUT_OPTIONS = N'',
        OUTPUT_OPTIONS = N''
    );
    

Örnek: Kafka için dış akış nesnesi oluşturma

Aşağıdaki örnek, Azure SQL Edge'deki yerel veritabanına bir dış akış nesnesi oluşturur. Bu örnekte kafka sunucusunun anonim erişim için yapılandırıldığı varsayılır.

  1. CREATE EXTERNAL DATA SOURCE ile bir dış veri kaynağı oluşturun. Aşağıdaki örnek:

    CREATE EXTERNAL DATA SOURCE [KafkaInput]
    WITH (LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>');
    GO
    
  2. Kafka girişi için bir dış dosya biçimi oluşturun. Aşağıdaki örnek GZipped Compression ile bir JSON dosya biçimi oluşturmuştur.

    CREATE EXTERNAL FILE FORMAT JsonGzipped
    WITH (
         FORMAT_TYPE = JSON,
         DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
    );
    GO
    
  3. Dış akış nesnesini oluşturun. Aşağıdaki örnek, Kafka konusuna TemperatureMeasurementişaret eden bir dış akış nesnesi oluşturur.

    CREATE EXTERNAL STREAM TemperatureMeasurement
    WITH
    (
        DATA_SOURCE = KafkaInput,
        FILE_FORMAT = JsonGzipped,
        LOCATION = 'TemperatureMeasurement',
        INPUT_OPTIONS = 'PARTITIONS: 10'
    );
    GO
    

Akış işini ve akış sorgularını oluşturma

sys.sp_create_streaming_job Akış sorgularını tanımlamak ve akış işini oluşturmak için sistem saklı yordamını kullanın. sp_create_streaming_job Saklı yordam aşağıdaki parametreleri alır:

  • @job_name: Akış işinin adı. Akış işi adları örnek genelinde benzersizdir.
  • @statement: Stream Analytics Sorgu Dili tabanlı akış sorgu deyimleri.

Aşağıdaki örnek, tek bir akış sorgusuyla basit bir akış işi oluşturur. Bu sorgu IoT Edge hub'ından girişleri okur ve veritabanına yazar dbo.TemperatureMeasurements .

EXEC sys.sp_create_streaming_job @name = N'StreamingJob1',
    @statement = N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'

Aşağıdaki örnek, birden çok farklı sorguyla daha karmaşık bir akış işi oluşturur. Bu sorgular, sıcaklık verilerindeki AnomalyDetection_ChangePoint anomalileri tanımlamak için yerleşik işlevi kullanan sorgulardan birini içerir.

EXEC sys.sp_create_streaming_job @name = N'StreamingJob2',
    @statement = N'
        SELECT *
        INTO TemperatureMeasurements1
        FROM MyEdgeHubInput1

        SELECT *
        INTO TemperatureMeasurements2
        FROM MyEdgeHubInput2

        SELECT *
        INTO TemperatureMeasurements3
        FROM MyEdgeHubInput3

        SELECT timestamp AS [Time],
            [Temperature] AS [Temperature],
            GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' Score '') AS ChangePointScore,
            GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' IsAnomaly '') AS IsChangePointAnomaly
        INTO TemperatureAnomalies
        FROM MyEdgeHubInput2;
';
GO

Akış işlerini başlatma, durdurma, bırakma ve izleme

Azure SQL Edge'de bir akış işi başlatmak için saklı yordamı çalıştırın sys.sp_start_streaming_job . Saklı yordam, giriş olarak başlatılması için akış işinin adını gerektirir.

EXEC sys.sp_start_streaming_job @name = N'StreamingJob1';
GO

Akış işini durdurmak için saklı yordamı çalıştırın sys.sp_stop_streaming_job . Saklı yordam, giriş olarak durdurulması için akış işinin adını gerektirir.

EXEC sys.sp_stop_streaming_job @name = N'StreamingJob1';
GO

Akış işini bırakmak (veya silmek) için saklı yordamı çalıştırın sys.sp_drop_streaming_job . Saklı yordam, akış işinin adının giriş olarak bırakılmasını gerektirir.

EXEC sys.sp_drop_streaming_job @name = N'StreamingJob1';
GO

Akış işinin geçerli durumunu almak için saklı yordamı çalıştırın sys.sp_get_streaming_job . Saklı yordam, akış işinin adının giriş olarak bırakılmasını gerektirir. Akış işinin adını ve geçerli durumunu döndürür.

EXEC sys.sp_get_streaming_job @name = N'StreamingJob1'
WITH RESULT SETS (
        (
            name NVARCHAR(256),
            status NVARCHAR(256),
            error NVARCHAR(256)
        )
    );
GO

Akış işi aşağıdaki durumlardan herhangi birine sahip olabilir:

Status Açıklama
Oluşturulma Akış işi oluşturuldu, ancak henüz başlatılmadı.
Başlatılıyor Akış işi başlangıç aşamasındadır.
Boş Akış işi çalışıyor, ancak işlenmek için giriş yok.
İşleme Akış işi çalışıyor ve girişleri işliyor. Bu durum, akış işi için iyi durumda olduğunu gösterir.
Düzeyi düşürüldü Akış işi çalışıyor, ancak giriş işlemi sırasında önemli olmayan bazı hatalar oluştu. Giriş işi çalışmaya devam eder, ancak hatalarla karşılaşan girişleri bırakır.
Durduruldu Akış işi durduruldu.
Yapılamadı Akış işi başarısız oldu. Bu genellikle işleme sırasında önemli bir hatanın göstergesidir.

Dekont

Akış işi zaman uyumsuz olarak yürütüleceğinden, iş çalışma zamanında hatalarla karşılaşabilir. Akış işi hatasını gidermek için saklı yordamı kullanın sys.sp_get_streaming_job veya Azure SQL Edge kapsayıcısından Docker günlüğünü gözden geçirin. Bu işlem akış işinden hata ayrıntılarını sağlayabilir.

Sonraki adımlar