Azure Functions를 사용하여 Azure SQL Database의 레코드 업데이트 또는 병합
현재 ASA(Azure Stream Analytics)는 SQL 출력에 행 삽입(추가)만 지원합니다(Azure SQL Databases 및 Azure Synapse Analytics). 이 문서에서는 Azure Functions를 중간 계층으로 사용하여 SQL 데이터베이스에서 UPDATE, UPSERT 또는 MERGE를 사용하도록 설정하는 해결 방법에 대해 설명합니다.
Azure Functions에 대한 대체 옵션은 마지막에 제공됩니다.
요건
테이블에 데이터 쓰기는 일반적으로 다음과 같은 방식으로 수행할 수 있습니다.
모드 | 동등한 T-SQL 문 | 요구 사항 |
---|---|---|
추가 | INSERT | 없음 |
바꾸기 | MERGE(UPSERT) | 고유 키 |
누적 | 복합 할당 연산자(+= , -= ...)가 있는 MERGE(UPSERT) |
고유 키 및 누적기 |
차이점을 설명하기 위해 다음 두 레코드를 수집할 때 발생하는 작업을 확인합니다.
Arrival_Time | Device_Id | Measure_Value |
---|---|---|
10:00 | A | 1 |
10:05 | A | 20 |
추가 모드에서는 두 개의 레코드를 삽입합니다. 동등한 T-SQL 문은 다음과 같습니다.
INSERT INTO [target] VALUES (...);
결과는 다음과 같습니다.
Modified_Time | Device_Id | Measure_Value |
---|---|---|
10:00 | A | 1 |
10:05 | A | 20 |
바꾸기 모드에서는 키로 마지막 값만 가져옵니다. 여기서는 Device_Id 키로 사용합니다. 동등한 T-SQL 문은 다음과 같습니다.
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
결과는 다음과 같습니다.
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 20 |
마지막으로 누적 모드에서 복합 대입 연산자(+=
)로 Value
를 합산합니다. 여기서는 Device_Id 키로 사용합니다.
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
결과는 다음과 같습니다.
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 21 |
성능을 고려하여 ASA SQL 데이터베이스 출력 어댑터는 현재 기본적으로 추가 모드만 지원합니다. 이러한 어댑터는 대량 삽입을 사용하여 처리량을 최대화하고 배압을 제한합니다.
이 문서에서는 Azure Functions를 사용하여 ASA에 대한 바꾸기 및 누적 모드를 구현하는 방법을 보여 줍니다. 함수를 중간 계층으로 사용하는 경우 잠재적인 쓰기 성능은 스트리밍 작업에 영향을 주지 않습니다. 이와 관련하여 Azure Functions를 사용하는 것이 Azure SQL에 가장 적합합니다. Synapse SQL을 사용하면 대량에서 행 단위 문으로 전환하면 성능 문제가 더 커질 수 있습니다.
Azure 함수 출력
작업에서 ASA SQL 출력을 ASA Azure Functions 출력으로 바꿉니다. UPDATE, UPSERT 또는 MERGE 기능은 함수에서 구현됩니다.
현재 함수에서 SQL Database에 액세스하는 두 가지 옵션이 있습니다. 첫 번째는 Azure SQL 출력 바인딩입니다. 현재 C#으로 제한되어 있으며 교체 모드만 제공합니다. 두 번째는 적절한 SQL 드라이버(.NET용 Microsoft.Data.SqlClient)를 통해 제출할 SQL 쿼리를 작성하는 것입니다.
다음 샘플 모두에 대해 다음 테이블 스키마를 가정합니다. 결합 옵션을 사용하려면 대상 테이블에 기본 키를 설정해야 합니다. SQL 드라이버를 사용할 때 필요하지는 않지만 권장됩니다.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
함수가 ASA의 출력으로 사용되려면 다음 기대치를 충족해야 합니다.
- Azure Stream Analytics는 성공적으로 처리된 일괄 처리에 대해 Functions 앱의 200 HTTP 상태를 예상합니다.
- Azure Stream Analytics는 Azure 함수에서 413("http 요청 엔터티가 너무 큼") 예외를 받으면 Azure Functions로 보내는 일괄 처리 크기를 줄입니다.
- 테스트 연결 중에 Stream Analytics는 Azure Functions에 빈 일괄 처리가 포함된 POST 요청을 보내고 테스트의 유효성을 검사하기 위해 HTTP 상태 20x를 예상합니다.
옵션 1: Azure Function SQL 바인딩을 사용하여 키로 업데이트
이 옵션은 Azure Function SQL 출력 바인딩을 사용합니다. 이 확장은 SQL 문을 작성할 필요 없이 테이블의 개체를 대체할 수 있습니다. 현재 복합 대입 연산자(누적)는 지원하지 않습니다.
이 샘플은 다음을 기반으로 작성되었습니다.
- Azure Functions 런타임 버전 4
- .NET 6.0
- Microsoft.Azure.WebJobs.Extensions.Sql 0.1.131-preview
결합 방식을 더 잘 이해하려면 이 자습서를 따르는 것이 좋습니다.
먼저 이 자습서에 따라 기본 HttpTrigger 함수 앱을 만듭니다. 다음 정보가 사용됩니다.
- 언어:
C#
- 런타임:
.NET 6
(함수/런타임 v4에서) - 템플릿:
HTTP trigger
프로젝트 폴더에 있는 터미널에서 다음 명령을 실행하여 바인딩 확장을 설치합니다.
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
대상 서버의 연결 문자열을 입력하여 local.settings.json
의 Values
섹션에 SqlConnectionString
항목을 추가합니다.
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
전체 함수(프로젝트의.cs 파일)를 다음 코드 조각으로 바꿉니다. 네임스페이스, 클래스 이름 및 함수 이름을 사용자 고유의 이름으로 업데이트합니다.
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
바인딩 섹션에서 대상 테이블 이름을 업데이트합니다.
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
고유한 스키마와 일치하도록 Device
클래스 및 매핑 섹션을 업데이트합니다.
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
이제 디버깅을 통해 로컬 함수와 데이터베이스 간의 배선을 테스트할 수 있습니다(Visual Studio Code에서 F5). SQL 데이터베이스는 컴퓨터에서 연결할 수 있어야 합니다. SSMS를 사용하여 연결을 확인할 수 있습니다. 그런 다음 로컬 엔드포인트에 POST 요청을 보냅니다. 본문이 비어 있는 요청은 http 204를 반환해야 합니다. 실제 페이로드가 있는 요청은 대상 테이블에 유지되어야 합니다(교체/업데이트 모드에서). 다음은 이 샘플에 사용된 스키마에 해당하는 샘플 페이로드입니다.
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
이제 함수를 Azure에 게시할 수 있습니다. 애플리케이션 설정은 SqlConnectionString
에 대해 설정되어야 합니다. Azure SQL 서버 방화벽은 라이브 함수가 방화벽에 도달할 수 있도록 Azure 서비스를 허용해야 합니다.
그런 다음 함수를 ASA 작업의 출력으로 정의하고 레코드를 삽입하는 대신 교체하는 데 사용할 수 있습니다.
옵션 2: 사용자 지정 SQL 쿼리를 통해 복합 할당과 병합(누적)
참고 항목
다시 시작 및 복구 시 ASA는 이미 내보낸 출력 이벤트를 다시 보낼 수 있습니다. 이는 누적 논리 실패를 야기할 수 있는 예상되는 동작입니다(개별 값 두 배로 증가). 이를 방지하려면 네이티브 ASA SQL 출력을 통해 동일한 데이터를 테이블에 출력하는 것이 좋습니다. 그런 다음 이 제어 테이블을 사용하여 문제를 탐지하고 필요할 때 누적을 다시 동기화할 수 있습니다.
이 옵션은 Microsoft.Data.SqlClient를 사용합니다. 이 라이브러리를 사용하면 SQL Database에 대한 모든 SQL 쿼리를 실행할 수 있습니다.
이 샘플은 다음을 기반으로 작성되었습니다.
먼저 이 자습서에 따라 기본 HttpTrigger 함수 앱을 만듭니다. 다음 정보가 사용됩니다.
- 언어:
C#
- 런타임:
.NET 6
(함수/런타임 v4에서) - 템플릿:
HTTP trigger
프로젝트 폴더에 있는 터미널에서 다음 명령을 실행하여 SqlClient 라이브러리를 설치합니다.
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
대상 서버의 연결 문자열을 입력하여 local.settings.json
의 Values
섹션에 SqlConnectionString
항목을 추가합니다.
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
전체 함수(프로젝트의.cs 파일)를 다음 코드 조각으로 바꿉니다. 네임스페이스, 클래스 이름 및 함수 이름을 사용자 고유의 이름으로 업데이트합니다.
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
고유한 스키마와 일치하도록 sqltext
명령 빌드 섹션을 업데이트합니다(업데이트 시 +=
연산자를 통해 누적되는 방식 참고).
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
이제 디버깅을 통해 로컬 함수와 데이터베이스 간의 연결을 테스트할 수 있습니다(VS Code의 경우 F5). SQL 데이터베이스는 컴퓨터에서 연결할 수 있어야 합니다. SSMS를 사용하여 연결을 확인할 수 있습니다. 그런 다음 로컬 엔드포인트에 POST 요청을 실행합니다. 본문이 비어 있는 요청은 http 204를 반환해야 합니다. 실제 페이로드가 있는 요청은 대상 테이블에 유지되어야 합니다(누적/병합 모드에서). 다음은 이 샘플에 사용된 스키마에 해당하는 샘플 페이로드입니다.
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
이제 함수를 Azure에 게시할 수 있습니다. 애플리케이션 설정은 SqlConnectionString
에 대해 설정되어야 합니다. Azure SQL 서버 방화벽은 라이브 함수가 방화벽에 도달할 수 있도록 Azure 서비스를 허용해야 합니다.
그런 다음 함수를 ASA 작업의 출력으로 정의하고 레코드를 삽입하는 대신 교체하는 데 사용할 수 있습니다.
대안
Azure Functions 외부에는 예상 결과를 달성하는 여러 가지 방법이 있습니다. 이 섹션에서는 그 중 일부를 제공합니다.
대상 SQL Database의 사후 처리
백그라운드 작업은 표준 ASA SQL 출력을 통해 데이터베이스에 데이터가 삽입되면 작동합니다.
Azure SQL의 경우 INSTEAD OF
DML 트리거는 ASA에서 발급한 INSERT 명령을 가로채는데 사용할 수 있습니다.
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
Synapse SQL의 경우 ASA는 준비 테이블에 삽입할 수 있습니다. 되풀이 작업은 필요에 따라 데이터를 중간 테이블로 변환할 수 있습니다. 마지막으로 데이터가 프로덕션 테이블로 이동됩니다.
Azure Cosmos DB의 전처리
Azure Cosmos DB는 기본적으로 UPSERT를 지원합니다. 여기서는 추가/바꾸기만 가능합니다. 누적은 Azure Cosmos DB에서 클라이언트 쪽에서 관리해야 합니다.
요구 사항이 일치하는 경우 대상 SQL 데이터베이스를 Azure Cosmos DB 인스턴스로 바꾸는 옵션이 있습니다. 그렇게 하려면 전체 솔루션 아키텍처에 중요한 변경이 필요합니다.
Synapse SQL의 경우 Azure Cosmos DB용 Azure Synapse Link를 통해 Azure Cosmos DB를 중간 계층으로 사용할 수 있습니다. Azure Synapse Link를 사용하여 분석 저장소를 만들 수 있습니다. 이 데이터 저장소는 Synapse SQL에서 직접 쿼리할 수 있습니다.
대안의 비교
각 방법은 서로 다른 가치 제안과 기능을 제공합니다.
Type | 옵션 | 모드 | Azure SQL Database | Azure Synapse Analytics |
---|---|---|---|---|
후처리 | ||||
트리거 | 교체, 축적 | + | N/A, 트리거는 Synapse SQL에서 사용할 수 없습니다. | |
준비 | 교체, 축적 | + | + | |
전처리 | ||||
Azure 기능 | 교체, 축적 | + | - (행별 성능) | |
Azure Cosmos DB 교체 | 바꾸기 | 해당 없음 | 해당 없음 | |
Azure Cosmos DB Azure Synapse Link | 바꾸기 | 해당 없음 | + |
지원 받기
추가 지원이 필요한 경우 Azure Stream Analytics용 Microsoft Q&A 질문 페이지를 참조하세요.
다음 단계
- Azure Stream Analytics의 출력 이해
- Azure SQL Database에 Azure Stream Analytics 출력
- Azure Stream Analytics에서 Azure SQL Database에 대한 처리량 성능 향상
- 관리 ID를 사용하여 Azure Stream Analytics 작업에서 Azure SQL Database 또는 Azure Synapse Analytics에 액세스
- Azure Stream Analytics 작업에 SQL Database의 참조 데이터 사용
- Azure Stream Analytics 작업에서 Azure Functions 실행 - Redis 출력용 자습서
- 빠른 시작: Azure Portal을 사용하여 Stream Analytics 작업 만들기