Azure Functions を使用して Azure SQL Database のレコードを更新またはマージする
現時点では、Azure Stream Analytics (ASA) は、SQL 出力 (Azure SQL Database と Azure Synapse Analytics) への行の挿入 (追加) のみをサポートしています。 この記事では、SQL データベースで更新、UPSERT、またはマージを有効にする回避策について説明し、Azure Functions を中間層として使用します。
Azure Functions の代替オプションについては、最後に説明します。
要件
テーブルへのデータの書き込みは、通常、次の方法で実行できます。
モード | 同等の T-SQL ステートメント | 必要条件 |
---|---|---|
Append | INSERT | なし |
Replace | MERGE (UPSERT) | 一意キー |
蓄積 | 複合代入演算子 (+= 、-= ...) を持つ MERGE (UPSERT) |
一意キーとアキュムレータ |
違いを確認するために、次の 2 つのレコードを取り込む場合の結果を見てみましょう。
Arrival_Time | Device_Id | Measure_Value |
---|---|---|
10:00 | A | 1 |
10:05 | A | 20 |
追加モードでは、2 つのレコードを挿入します。 同等の T-SQL ステートメントは次のとおりです。
INSERT INTO [target] VALUES (...);
結果:
Modified_Time | Device_Id | Measure_Value |
---|---|---|
10:00 | A | 1 |
10:05 | A | 20 |
置換モードでは、キーによって最後の値のみが取得されます。 ここでは、Device_Id をキーとして使います。同等の T-SQL ステートメントは次のとおりです。
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
結果:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 20 |
最後に、累積モードでは、複合代入演算子 (+=
) を使用して合計します。 ここでも、Device_Id をキーとして使います。
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
結果:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 21 |
パフォーマンスに関する考慮事項のため、ASA SQL データベースの出力アダプターは、現在、追加モードのみをネイティブでサポートしています。 これらのアダプターでは、一括挿入を使用してスループットを最大化し、負荷を軽減します。
この記事では、Azure Functions を使用して ASA の置換モードと累積モードを実装する方法について説明します。 関数を中間層として使用する場合、潜在的な書き込みのパフォーマンスがストリーミング ジョブに影響を与えることはありません。 この点で、Azure Functions の使用は Azure SQL に最適です。 Synapse SQL では、バルクから行単位のステートメントに切り替えると、より大きなパフォーマンス上の問題が発生する可能性があります。
Azure Functions 出力
このジョブでは、ASA SQL 出力を ASA Azure Functions 出力に置き換えます。 UPDATE、UPSERT、または MERGE の各機能が関数に実装されます。
現在、関数内の SQL Database にアクセスするには、2 つのオプションがあります。 最初は Azure SQL 出力バインドです。 現在は C# に限定されており、置換モードのみを提供しています。 2 番目は、適切な SQL ドライバー (.NET 用 Microsoft.Data.SqlClient) を使用して送信される SQL クエリを作成することです。
以下の両方のサンプルで、次のテーブル スキーマを想定しています。 バインド オプションを使用するには、対象テーブルに主キーを設定する必要があります。 SQL ドライバーを使用する場合、必須ではありませんが推奨されます。
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
ASA からの出力として使用するために、関数は次の予測を満たす必要があります。
- Azure Stream Analytics は、正常に処理されたバッチに対して Functions アプリから HTTP ステータス 200 を想定しています
- Azure Stream Analytics は、Azure 関数から 413 ("http の要求したエンティティが大きすぎる") 例外を受け取ると、Azure Functions に送信するバッチのサイズを縮小します
- テスト接続中に、Stream Analytics は空のバッチを含む POST 要求を Azure Functions に送信し、テストを検証するために HTTP ステータス 20 倍が返されることを想定しています
オプション 1: Azure 関数 SQL Binding を使用してキーで更新する
このオプションでは、Azure 関数 SQL 出力バインドを使用します。 この拡張機能では、SQL ステートメントを記述することなく、テーブル内のオブジェクトを置き換えることができます。 現時点では、複合代入演算子 (accumulations) はサポートされていません。
このサンプルは、次のものに基づいて構築されました。
- Azure Functions ランタイム バージョン 4
- .NET 6.0
- Microsoft.Azure.WebJobs.Extensions.Sql 0.1.131-preview
バインディングの方法をより深く理解するために、このチュートリアルに従うことをお勧めします。
まず、このチュートリアルに従って、既定の HttpTrigger 関数アプリを作成します。 次の情報が使用されます。
- 言語:
C#
- ランタイム:
.NET 6
(関数またはランタイム v4 より前) - テンプレート:
HTTP trigger
プロジェクト フォルダーにあるターミナルで次のコマンドを実行して、バインド拡張機能をインストールします。
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
local.settings.json
の Values
セクションに SqlConnectionString
項目を追加し、宛先サーバーの接続文字列を入力します。
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
次のコード スニペットを使って、関数 (プロジェクト内の .cs ファイル) 全体を置き換えます。 名前空間、クラス名、関数名を独自に更新します。
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
バインド セクションで、宛先テーブルの名前を更新します。
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
独自のスキーマに一致するように、Device
クラスとマッピング セクションを更新します。
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
これで、デバッグ (Visual Studio Code で F5 キーを押します) によってローカル関数とデータベース間の接続をテストすることができます。 SQL データベースには、コンピューターからアクセスできる必要があります。 SSMS を使用して接続を確認できます。 次に、POST 要求をローカル エンドポイントに送信します。 本文が空の要求では、http 204 が返されます。 実際のペイロードを含む要求は、宛先テーブル (置換/更新モード) で保持する必要があります。 このサンプルで使用されるスキーマに対応したサンプルのペイロードを次に示します。
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
これで、関数アプリが Azure に発行されました。 設定のアプリケーションは、SqlConnectionString
に対して設定される必要があります。 Azure SQL Server のファイアウォールでは、ライブ機能が到達できるように、Azure サービスを許可する必要があります。
その後、関数を ASA ジョブの出力として定義し、レコードを挿入する代わりに、レコードを置き換えるために使用できます。
オプション 2: カスタム SQL クエリを使用して複合割り当て (蓄積) をマージする
Note
再起動と復旧時に、ASA は既に生成された出力イベントを再送信することがあります。 これは、蓄積ロジックが失敗する原因となる可能性のある動作です (個々の値を 2 倍にします)。 これを回避するには、ネイティブ ASA SQL 出力を使用してテーブルに同じデータを出力することをお勧めします。 この制御テーブルを使用して、必要に応じて問題を検出し、累積を再同期することができます。
このオプションは、Microsoft.Data.SqlClient を使用します。 このライブラリを使用すると、SQL Database に対して SQL クエリを発行できます。
このサンプルは、次のものに基づいて構築されました。
まず、このチュートリアルに従って、既定の HttpTrigger 関数アプリを作成します。 次の情報が使用されます。
- 言語:
C#
- ランタイム:
.NET 6
(関数またはランタイム v4 より前) - テンプレート:
HTTP trigger
プロジェクト フォルダーにあるターミナルで次のコマンドを実行して、SqlClient ライブラリをインストールします。
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
local.settings.json
の Values
セクションに SqlConnectionString
項目を追加し、宛先サーバーの接続文字列を入力します。
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
次のコード スニペットを使って、関数 (プロジェクト内の .cs ファイル) 全体を置き換えます。 名前空間、クラス名、関数名を独自に更新します。
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
独自のスキーマに一致するように、sqltext
コマンドの構築セクションを更新します (更新時に +=
演算子を使用して累積がどのように達成されるかに注意してください)。
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
デバッグによってローカル関数とデータベース間の接続をテストできるようになりました (VS Code で F5 キーを押します)。 SQL データベースには、コンピューターからアクセスできる必要があります。 SSMS を使用して接続を確認できます。 次に、POST 要求をローカル エンドポイントに発行します。 本文が空の要求では、http 204 が返されます。 実際のペイロードを含む要求は、(累積/マージ モードで) 宛先テーブルに保存する必要があります。 このサンプルで使用されるスキーマに対応したサンプルのペイロードを次に示します。
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
これで、関数アプリが Azure に発行されました。 設定のアプリケーションは、SqlConnectionString
に対して設定される必要があります。 Azure SQL Server のファイアウォールでは、ライブ機能が到達できるように、Azure サービスを許可する必要があります。
その後、関数を ASA ジョブの出力として定義し、レコードを挿入する代わりに、レコードを置き換えるために使用できます。
代替
Azure Functions の外部では、予想される結果を得るために複数の方法があります。 このセクションでは、そのいくつかを説明します。
ターゲット SQL Database での後処理
バックグラウンド タスクは、標準の ASA SQL 出力を介してデータベースにデータが挿入されると動作します。
Azure SQL では、INSTEAD OF
DML トリガーを使用して、ASA によって発行された INSERT コマンドをインターセプトすることができます。
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
Synapse SQL の場合、ASA はステージング テーブルに挿入できます。 その後、定期的なタスクによって、必要に応じて中間テーブルにデータを変換できます。 最後に、データは運用テーブルに移動されます。
Azure Cosmos DB での前処理
Azure Cosmos DB は、ネイティブで UPSERT をサポートします。 ここでは、追加/置換のみが可能です。 Accumulations は、Azure Cosmos DB でクライアント側で管理されている必要があります。
要件が一致する場合は、ターゲットの SQL データベースを Azure Cosmos DB インスタンスで置き換えることができます。 これを行うには、ソリューション全体のアーキテクチャに重要な変更を加える必要があります。
Synapse SQL の場合、Azure Cosmos DB は、Azure Cosmos DB のために Azure Synapse Link を使用して中間層として使用できます。 Azure Synapse Link を使用して、分析ストアを作成できます。 このデータストアは、Synapse SQL で直接照会できます。
代替手段の比較
各アプローチでは、さまざまな価値提案と機能が提供されます。
Type | オプション | モード | Azure SQL データベース | Azure Synapse Analytics |
---|---|---|---|---|
後処理 | ||||
トリガー | 置換、累積 | + | 該当なし。Synapse SQL では、トリガーを使用できません | |
ステージング | 置換、累積 | + | + | |
前処理 | ||||
Azure Functions | 置換、累積 | + | - (行ごとのパフォーマンス) | |
Azure Cosmos DB 置換 | Replace | 該当なし | 該当なし | |
Azure Cosmos DB Azure Synapse Link | Replace | 該当なし | + |
サポートを受ける
詳細については、Azure Stream Analytics に関する Microsoft Q&A 質問ページを参照してください。
次のステップ
- Azure Stream Analytics からの出力を理解する
- Azure SQL Database への Azure Stream Analytics の出力
- Azure Stream Analytics から Azure SQL Database へのスループットのパフォーマンスを向上させる
- マネージド ID を使用して Azure Stream Analytics ジョブから Azure SQL Database または Azure Synapse Analytics にアクセスする
- SQL Database からの参照データを Azure Stream Analytics ジョブに使用する
- Azure Stream Analytics ジョブで Azure Functions を実行する - Redis 出力のチュートリアル
- クイック スタート: Azure Portal を使用して Stream Analytics ジョブを作成する