Aktualizace nebo sloučení záznamů ve službě Azure SQL Database pomocí Azure Functions
Azure Stream Analytics (ASA) v současné době podporuje vkládání (připojování) řádků pouze do výstupů SQL (Azure SQL Database a Azure Synapse Analytics). Tento článek popisuje alternativní řešení pro povolení funkce UPDATE, UPSERT nebo MERGE v databázích SQL s Azure Functions jako zprostředkující vrstvou.
Na konci se zobrazí alternativní možnosti azure Functions.
Požadavek
Zápis dat do tabulky lze obecně provádět následujícím způsobem:
Režim | Ekvivalentní příkaz T-SQL | Požadavky |
---|---|---|
Připojit | INSERT | Nic |
Nahradit | MERGE (UPSERT) | Jedinečný klíč |
Hromadit | MERGE (UPSERT) s operátorem složeného přiřazení (+= ,-= ...) |
Jedinečný klíč a akumulátor |
Pokud chcete znázornit rozdíly, podívejte se, co se stane při ingestování následujících dvou záznamů:
Arrival_Time | Device_Id | Measure_Value |
---|---|---|
10:00 | A | 0 |
10:05 | A | 20 |
V režimu připojení vložíme dva záznamy. Ekvivalentní příkaz T-SQL je:
INSERT INTO [target] VALUES (...);
Výsledkem je:
Modified_Time | Device_Id | Measure_Value |
---|---|---|
10:00 | A | 0 |
10:05 | A | 20 |
V režimu nahrazení získáme pouze poslední hodnotu podle klíče. Tady jako klíč používáme Device_Id. Ekvivalentní příkaz T-SQL je:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Výsledkem je:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 20 |
Nakonec v režimu kumulování sečteme Value
operátorem složeného přiřazení (+=
). Tady také jako klíč používáme Device_Id:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Výsledkem je:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 21 |
Pro důležité informace o výkonu podporují výstupní adaptéry databáze ASA SQL aktuálně nativně pouze režim připojení. Tyto adaptéry používají hromadné vkládání k maximalizaci propustnosti a omezení zpětného tlaku.
V tomto článku se dozvíte, jak pomocí azure Functions implementovat režimy nahrazení a kumulování pro ASA. Pokud funkci použijete jako zprostředkující vrstvu, potenciální výkon zápisu neovlivní úlohu streamování. V tomto ohledu funguje použití Azure Functions nejlépe s Azure SQL. V případě Synapse SQL může přechod z hromadných příkazů na příkazy po řádcích způsobit vyšší problémy s výkonem.
Výstup služby Azure Functions
V naší úloze nahradíme výstup ASA SQL výstupem ASA Azure Functions. Funkce UPDATE, UPSERT nebo MERGE jsou ve funkci implementovány.
V současné době existují dvě možnosti pro přístup ke službě SQL Database ve funkci. Nejprve je výstupní vazba Azure SQL. V současné době je omezený na jazyk C# a nabízí pouze režim nahrazení. Druhý je vytvoření dotazu SQL, který se odešle přes příslušný ovladač SQL (Microsoft.Data.SqlClient pro .NET).
U obou následujících ukázek předpokládáme následující schéma tabulky. Možnost vazby vyžaduje , aby byl v cílové tabulce nastaven primární klíč . Není to nutné, ale doporučuje se při použití ovladače SQL.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
Funkce musí splňovat následující očekávání, která se mají použít jako výstup z ASA:
- Azure Stream Analytics očekává stav HTTP 200 z aplikace Functions pro dávky, které byly úspěšně zpracovány.
- Když Azure Stream Analytics obdrží výjimku 413 (http Request Entity Too Large) z funkce Azure, zmenšuje velikost dávek, které odesílá do funkce Azure Functions.
- Během testovacího připojení Stream Analytics odešle požadavek POST s prázdnou dávkou do služby Azure Functions a očekává, že stav HTTP 20x zpět ověří test.
Možnost 1: Aktualizace podle klíče pomocí vazby SQL funkce Azure Functions
Tato možnost používá výstupní vazbu SQL funkce Azure Functions. Toto rozšíření může nahradit objekt v tabulce, aniž byste museli psát příkaz SQL. V tuto chvíli nepodporuje operátory složeného přiřazení (akumulace).
Tato ukázka byla postavena na:
- Modul runtime Azure Functions verze 4
- .NET 6.0
- Microsoft.Azure.WebJobs.Extensions.Sql 0.1.131-preview
Pokud chcete lépe porozumět přístupu k vazbě, doporučujeme postupovat podle tohoto kurzu.
Nejprve podle tohoto kurzu vytvořte výchozí aplikaci funkcí HttpTrigger. Používají se následující informace:
- Jazyk:
C#
- Runtime:
.NET 6
(v části function/runtime v4) - Šablona:
HTTP trigger
Nainstalujte rozšíření vazby spuštěním následujícího příkazu v terminálu umístěném ve složce projektu:
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
SqlConnectionString
Přidejte položku do oddíluValues
, vyplňte local.settings.json
připojovací řetězec cílového serveru:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Nahraďte celou funkci (.cs soubor v projektu) následujícím fragmentem kódu. Aktualizujte obor názvů, název třídy a název funkce vlastním názvem:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
Aktualizujte název cílové tabulky v oddílu vazby:
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
Device
Aktualizujte oddíl třídy a mapování tak, aby odpovídaly vlastnímu schématu:
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
Teď můžete otestovat zapojení mezi místní funkcí a databází laděním (F5 v editoru Visual Studio Code). Databáze SQL musí být dostupná z vašeho počítače. SSMS se dá použít ke kontrole připojení. Potom odešlete požadavky POST do místního koncového bodu. Požadavek s prázdným textem by měl vrátit http 204. Požadavek se skutečnou datovou částí by měl být zachován v cílové tabulce (v režimu nahrazení nebo aktualizace). Tady je ukázková datová část odpovídající schématu použitému v této ukázce:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
Funkce je teď možné publikovat do Azure. Nastavení aplikace by mělo být nastaveno pro SqlConnectionString
. Brána firewall Azure SQL Serveru by měla umožňovat, aby se k ní dostaly služby Azure, aby se k ní dostaly živé funkce.
Funkci pak můžete definovat jako výstup v úloze ASA a místo jejich vložení je použít k nahrazení záznamů.
Možnost 2: Sloučení se složeným přiřazením (kumulování) prostřednictvím vlastního dotazu SQL
Poznámka:
Po restartování a obnovení může ASA znovu odeslat výstupní události, které už byly vygenerovány. Jedná se o očekávané chování, které může způsobit selhání logiky akumulace (zdvojnásobení jednotlivých hodnot). Pokud tomu chcete zabránit, doporučujeme vytvořit výstup stejných dat v tabulce prostřednictvím nativního výstupu ASA SQL. Tuto řídicí tabulku pak můžete použít k detekci problémů a opětovné synchronizaci akumulace v případě potřeby.
Tato možnost používá Microsoft.Data.SqlClient. Tato knihovna nám umožňuje vydávat dotazy SQL do služby SQL Database.
Tato ukázka byla postavena na:
- Modul runtime Azure Functions verze 4
- .NET 6.0
- Microsoft.Data.SqlClient 4.0.0
Nejprve podle tohoto kurzu vytvořte výchozí aplikaci funkcí HttpTrigger. Používají se následující informace:
- Jazyk:
C#
- Runtime:
.NET 6
(v části function/runtime v4) - Šablona:
HTTP trigger
Nainstalujte knihovnu SqlClient spuštěním následujícího příkazu v terminálu umístěném ve složce projektu:
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
SqlConnectionString
Přidejte položku do oddíluValues
, vyplňte local.settings.json
připojovací řetězec cílového serveru:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Nahraďte celou funkci (.cs soubor v projektu) následujícím fragmentem kódu. Aktualizujte obor názvů, název třídy a název funkce vlastním názvem:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
sqltext
Aktualizujte oddíl sestavení příkazu tak, aby odpovídal vašemu vlastnímu schématu (všimněte si, jak se dosahuje akumulace prostřednictvím operátoru +=
při aktualizaci):
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
Teď můžete otestovat zapojení mezi místní funkcí a databází laděním (F5 v editoru VS Code). Databáze SQL musí být dostupná z vašeho počítače. SSMS se dá použít ke kontrole připojení. Pak vyžádejte požadavky POST na místní koncový bod. Požadavek s prázdným textem by měl vrátit http 204. Požadavek se skutečnou datovou částí by měl být zachován v cílové tabulce (v režimu kumulování nebo sloučení). Tady je ukázková datová část odpovídající schématu použitému v této ukázce:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
Funkce je teď možné publikovat do Azure. Nastavení aplikace by mělo být nastaveno pro SqlConnectionString
. Brána firewall Azure SQL Serveru by měla umožňovat, aby se k ní dostaly služby Azure, aby se k ní dostaly živé funkce.
Funkci pak můžete definovat jako výstup v úloze ASA a místo jejich vložení je použít k nahrazení záznamů.
Alternativy
Mimo Azure Functions existuje několik způsobů, jak dosáhnout očekávaného výsledku. Tato část obsahuje některé z nich.
Následné zpracování v cílové databázi SQL
Úloha na pozadí funguje, jakmile se data vloží do databáze prostřednictvím standardních výstupů ASA SQL.
Pro Azure SQL INSTEAD OF
je možné triggery DML použít k zachycení příkazů INSERT vydaných službou ASA:
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
V případě Synapse SQL může ASA vložit do pracovní tabulky. Opakovaný úkol pak může podle potřeby transformovat data do zprostředkující tabulky. Nakonec se data přesunou do produkční tabulky.
Předběžné zpracování ve službě Azure Cosmos DB
Azure Cosmos DB nativně podporuje UPSERT. Tady je možné pouze připojit nebo nahradit. Akumulace musí být spravovaná na straně klienta ve službě Azure Cosmos DB.
Pokud se požadavky shodují, je možné nahradit cílovou databázi SQL instancí služby Azure Cosmos DB. To vyžaduje důležitou změnu v celkové architektuře řešení.
Pro Synapse SQL je možné službu Azure Cosmos DB použít jako zprostředkující vrstvu prostřednictvím Azure Synapse Linku pro Azure Cosmos DB. Azure Synapse Link se dá použít k vytvoření analytického úložiště. Toto úložiště dat se pak dá dotazovat přímo ve službě Synapse SQL.
Porovnání alternativ
Každý přístup nabízí jinou hodnotu a možnosti:
Typ | Možnost | Režimy | Azure SQL Database | Azure Synapse Analytics |
---|---|---|---|---|
Následné zpracování | ||||
Aktivační události | Nahradit, Kumulovat | + | Není k dispozici, triggery nejsou dostupné v Synapse SQL | |
Příprava | Nahradit, Kumulovat | + | + | |
Předběžné zpracování | ||||
Azure Functions | Nahradit, Kumulovat | + | - (výkon řádků po řádech) | |
Nahrazení služby Azure Cosmos DB | Nahradit | – | N/A | |
Azure Cosmos DB Azure Synapse Link | Nahradit | – | + |
Získání podpory
Pokud potřebujete další pomoc, vyzkoušejte naši stránku pro otázky Microsoftu pro Azure Stream Analytics.
Další kroky
- Vysvětlení výstupů z Azure Stream Analytics
- Výstup Azure Stream Analytics do služby Azure SQL Database
- Zvýšení výkonu propustnosti do služby Azure SQL Database z Azure Stream Analytics
- Použití spravovaných identit pro přístup ke službě Azure SQL Database nebo Azure Synapse Analytics z úlohy Azure Stream Analytics
- Použití referenčních dat z SQL Database pro úlohu Azure Stream Analytics
- Spuštění azure Functions v úlohách Azure Stream Analytics – kurz výstupu Redis
- Rychlý start: Vytvoření úlohy Stream Analytics pomocí webu Azure Portal