Azure Functions를 사용하여 Azure SQL Database의 레코드 업데이트 또는 병합

현재 ASA(Azure Stream Analytics)는 SQL 출력(Azure SQL DatabaseAzure Synapse Analytics)에 행을 삽입(추가)만 지원합니다. 이 문서에서는 Azure Functions를 중간 계층으로 사용하여 SQL 데이터베이스에서 UPDATE, UPSERT 또는 MERGE를 사용하도록 설정하는 해결 방법에 대해 설명합니다.

Azure Functions에 대한 대체 옵션은 마지막에 제공됩니다.

요건

테이블에 데이터 쓰기는 일반적으로 다음과 같은 방식으로 수행할 수 있습니다.

모드 동등한 T-SQL 문 요구 사항
추가 INSERT 없음
바꾸기 MERGE(UPSERT) 고유 키
축적 복합 할당 연산자(+=, -=...)가 있는 MERGE(UPSERT) 고유 키 및 누적기

차이점을 설명하기 위해 다음 두 레코드를 수집할 때 발생하는 작업을 확인합니다.

Arrival_Time Device_Id Measure_Value
10:00 A 1
10:05 A 20

추가 모드에서는 두 개의 레코드를 삽입합니다. 동등한 T-SQL 문은 다음과 같습니다.

INSERT INTO [target] VALUES (...);

결과는 다음과 같습니다.

Modified_Time Device_Id Measure_Value
10:00 A 1
10:05 A 20

바꾸기 모드에서는 키로 마지막 값만 가져옵니다. 여기서는 Device_Id 키로 사용합니다 . 해당하는 T-SQL 문은 다음과 같습니다.

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

결과는 다음과 같습니다.

Modified_Time Device_Key Measure_Value
10:05 A 20

마지막으로 누적 모드에서 복합 대입 연산자(+=)로 Value를 합산합니다. 여기서는 Device_Id 키로 사용합니다.

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

결과는 다음과 같습니다.

Modified_Time Device_Key Measure_Value
10:05 A 21

성능을 고려하여 ASA SQL 데이터베이스 출력 어댑터는 현재 기본적으로 추가 모드만 지원합니다. 이러한 어댑터는 대량 삽입을 사용하여 처리량을 최대화하고 배압을 제한합니다.

이 문서에서는 Azure Functions를 사용하여 ASA에 대한 바꾸기 및 누적 모드를 구현하는 방법을 보여 줍니다. 함수를 중간 계층으로 사용하는 경우 잠재적인 쓰기 성능은 스트리밍 작업에 영향을 주지 않습니다. 이와 관련하여 Azure Functions를 사용하는 것이 Azure SQL에 가장 적합합니다. Synapse SQL을 사용하면 대량에서 행 단위 문으로 전환하면 성능 문제가 더 커질 수 있습니다.

Azure 함수 출력

이 작업에서는 ASA SQL 출력을 ASA Azure Functions 출력으로 바꿉니다. UPDATE, UPSERT 또는 MERGE 기능은 함수에서 구현됩니다.

현재 함수에서 SQL Database에 액세스하는 두 가지 옵션이 있습니다. 첫 번째는 Azure SQL 출력 바인딩입니다. 현재 C#으로 제한되어 있으며 교체 모드만 제공합니다. 두 번째는 적절한 SQL 드라이버(.NET용 Microsoft.Data.SqlClient)를 통해 제출할 SQL 쿼리를 작성하는 것입니다.

다음 샘플 모두에 대해 다음 테이블 스키마를 가정합니다. 결합 옵션을 사용하려면 대상 테이블에 기본 키를 설정해야 합니다. SQL 드라이버를 사용할 때 필요하지는 않지만 권장됩니다.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

함수가 ASA의 출력으로 사용되려면 다음 기대치를 충족해야 합니다.

  • Azure Stream Analytics는 성공적으로 처리된 일괄 처리에 대해 Functions 앱의 200 HTTP 상태를 예상합니다.
  • Azure Stream Analytics는 Azure 함수에서 413("http 요청 엔터티가 너무 큼") 예외를 받으면 Azure Functions로 보내는 일괄 처리 크기를 줄입니다.
  • 테스트 연결 중에 Stream Analytics는 Azure Functions에 빈 일괄 처리가 포함된 POST 요청을 보내고 테스트의 유효성을 검사하기 위해 HTTP 상태 20x를 예상합니다.

옵션 1: Azure Function SQL 바인딩을 사용하여 키로 업데이트

이 옵션은 Azure Function SQL 출력 바인딩을 사용합니다. 이 확장은 SQL 문을 작성할 필요 없이 테이블의 개체를 대체할 수 있습니다. 현재 복합 대입 연산자(누적)는 지원하지 않습니다.

이 샘플은 다음을 기반으로 작성되었습니다.

결합 방식을 더 잘 이해하려면 이 자습서를 따르는 것이 좋습니다.

먼저 이 자습서에 따라 기본 HttpTrigger 함수 앱을 만듭니다. 다음 정보가 사용됩니다.

  • 언어: C#
  • 런타임: .NET 6(함수/런타임 v4에서)
  • 템플릿: HTTP trigger

프로젝트 폴더에 있는 터미널에서 다음 명령을 실행하여 바인딩 확장을 설치합니다.

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

대상 서버의 연결 문자열을 입력하여 local.settings.jsonValues 섹션에 SqlConnectionString 항목을 추가합니다.

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

전체 함수(프로젝트의.cs 파일)를 다음 코드 조각으로 바꿉니다. 네임스페이스, 클래스 이름 및 함수 이름을 사용자 고유의 이름으로 업데이트합니다.

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

바인딩 섹션에서 대상 테이블 이름을 업데이트합니다.

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

고유한 스키마와 일치하도록 Device 클래스 및 매핑 섹션을 업데이트합니다.

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

이제 디버깅을 통해 로컬 함수와 데이터베이스 간의 배선을 테스트할 수 있습니다(Visual Studio Code의 경우 F5). SQL 데이터베이스는 컴퓨터에서 연결할 수 있어야 합니다. SSMS를 사용하여 연결을 확인할 수 있습니다. 그런 다음 Postman과 같은 도구를 사용하여 로컬 엔드포인트에 POST 요청을 실행할 수 있습니다. 본문이 비어 있는 요청은 http 204를 반환해야 합니다. 실제 페이로드가 있는 요청은 대상 테이블에 유지되어야 합니다(교체/업데이트 모드에서). 다음은 이 샘플에 사용된 스키마에 해당하는 샘플 페이로드입니다.

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

이제 함수를 Azure에 게시할 수 있습니다. 애플리케이션 설정SqlConnectionString에 대해 설정되어야 합니다. Azure SQL 서버 방화벽은 라이브 함수가 방화벽에 도달할 수 있도록 Azure 서비스를 허용해야 합니다.

그런 다음 함수를 ASA 작업의 출력으로 정의하고 레코드를 삽입하는 대신 교체하는 데 사용할 수 있습니다.

옵션 2: 사용자 지정 SQL 쿼리를 통해 복합 할당과 병합(누적)

참고 항목

다시 시작 및 복구 시 ASA는 이미 내보낸 출력 이벤트를 다시 보낼 수 있습니다. 이는 누적 논리 실패를 야기할 수 있는 예상되는 동작입니다(개별 값 두 배로 증가). 이를 방지하려면 네이티브 ASA SQL 출력을 통해 동일한 데이터를 테이블에 출력하는 것이 좋습니다. 그런 다음 이 제어 테이블을 사용하여 문제를 탐지하고 필요할 때 누적을 다시 동기화할 수 있습니다.

이 옵션은 Microsoft.Data.SqlClient를 사용합니다. 이 라이브러리를 사용하면 SQL Database에 대한 모든 SQL 쿼리를 실행할 수 있습니다.

이 샘플은 다음을 기반으로 작성되었습니다.

먼저 이 자습서에 따라 기본 HttpTrigger 함수 앱을 만듭니다. 다음 정보가 사용됩니다.

  • 언어: C#
  • 런타임: .NET 6(함수/런타임 v4에서)
  • 템플릿: HTTP trigger

프로젝트 폴더에 있는 터미널에서 다음 명령을 실행하여 SqlClient 라이브러리를 설치합니다.

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

대상 서버의 연결 문자열을 입력하여 local.settings.jsonValues 섹션에 SqlConnectionString 항목을 추가합니다.

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

전체 함수(프로젝트의.cs 파일)를 다음 코드 조각으로 바꿉니다. 네임스페이스, 클래스 이름 및 함수 이름을 사용자 고유의 이름으로 업데이트합니다.

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

고유한 스키마와 일치하도록 sqltext 명령 빌드 섹션을 업데이트합니다(업데이트 시 += 연산자를 통해 누적되는 방식 참고).

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

이제 디버깅을 통해 로컬 함수와 데이터베이스 간의 연결을 테스트할 수 있습니다(VS Code의 경우 F5). SQL 데이터베이스는 컴퓨터에서 연결할 수 있어야 합니다. SSMS를 사용하여 연결을 확인할 수 있습니다. 그런 다음 Postman과 같은 도구를 사용하여 로컬 엔드포인트에 POST 요청을 실행할 수 있습니다. 본문이 비어 있는 요청은 http 204를 반환해야 합니다. 실제 페이로드가 있는 요청은 대상 테이블에 유지되어야 합니다(누적/병합 모드에서). 다음은 이 샘플에 사용된 스키마에 해당하는 샘플 페이로드입니다.

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

이제 함수를 Azure에 게시할 수 있습니다. 애플리케이션 설정SqlConnectionString에 대해 설정되어야 합니다. Azure SQL 서버 방화벽은 라이브 함수가 방화벽에 도달할 수 있도록 Azure 서비스를 허용해야 합니다.

그런 다음 함수를 ASA 작업의 출력으로 정의하고 레코드를 삽입하는 대신 교체하는 데 사용할 수 있습니다.

대안

Azure Functions 외부에는 예상 결과를 달성하는 여러 가지 방법이 있습니다. 이 섹션에서는 그 중 일부를 제공합니다.

대상 SQL Database의 사후 처리

백그라운드 작업은 표준 ASA SQL 출력을 통해 데이터베이스에 데이터가 삽입되면 작동합니다.

Azure SQL의 경우 INSTEAD OFDML 트리거를 사용하여 ASA에서 실행된 INSERT 명령을 가로챌 수 있습니다.

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

Synapse SQL의 경우 ASA는 준비 테이블에 삽입할 수 있습니다. 되풀이 작업은 필요에 따라 데이터를 중간 테이블로 변환할 수 있습니다. 마지막으로 데이터가 프로덕션 테이블로 이동됩니다.

Azure Cosmos DB의 전처리

Azure Cosmos DB는 기본적으로 UPSERT를 지원합니다. 여기서는 추가/바꾸기만 가능합니다. 누적은 Azure Cosmos DB에서 클라이언트 쪽에서 관리해야 합니다.

요구 사항이 일치하는 경우 대상 SQL 데이터베이스를 Azure Cosmos DB 인스턴스로 바꾸는 옵션이 있습니다. 그렇게 하려면 전체 솔루션 아키텍처에 중요한 변경이 필요합니다.

Synapse SQL의 경우 Azure Cosmos DB용 Azure Synapse Link를 통해 Azure Cosmos DB를 중간 계층으로 사용할 수 있습니다. Azure Synapse Link를 사용하여 분석 저장소만들 수 있습니다. 이 데이터 저장소는 Synapse SQL에서 직접 쿼리할 수 있습니다.

대안의 비교

각 방법은 서로 다른 가치 제안과 기능을 제공합니다.

Type 옵션 모드 Azure SQL Database Azure Synapse Analytics
후처리
트리거 교체, 축적 + N/A, 트리거는 Synapse SQL에서 사용할 수 없습니다.
준비 교체, 축적 + +
전처리
Azure 기능 교체, 축적 + - (행별 성능)
Azure Cosmos DB 교체 바꾸기 해당 없음 해당 없음
Azure Cosmos DB Azure Synapse Link 바꾸기 해당 없음 +

지원 받기

추가 지원이 필요한 경우 Azure Stream Analytics용 Microsoft Q&A 질문 페이지를 참조하세요.

다음 단계