Records in Azure SQL Database bijwerken of samenvoegen met Azure Functions
Op dit moment ondersteunt Azure Stream Analytics (ASA) alleen het invoegen (toevoegen) van rijen aan SQL-uitvoer (Azure SQL Databases en Azure Synapse Analytics). In dit artikel worden tijdelijke oplossingen besproken voor het inschakelen van UPDATE, UPSERT of MERGE op SQL-databases, met Azure Functions als intermediaire laag.
Alternatieve opties voor Azure Functions worden aan het einde weergegeven.
Vereiste
Het schrijven van gegevens in een tabel kan over het algemeen op de volgende manier worden uitgevoerd:
Modus | Equivalente T-SQL-instructie | Vereisten |
---|---|---|
Toevoegen | INSERT | Geen |
Replace | SAMENVOEGEN (UPSERT) | Unieke sleutel |
Ophopen | MERGE (UPSERT) met samengestelde toewijzingsoperator (+= , -= ...) |
Unieke sleutel en accumulator |
Bekijk wat er gebeurt wanneer u de volgende twee records opneemt om de verschillen te illustreren:
Arrival_Time | Device_Id | Measure_Value |
---|---|---|
10:00:00 | V | 1 |
10:05 | A | 20 |
In de toevoegmodus voegen we twee records in. De equivalente T-SQL-instructie is:
INSERT INTO [target] VALUES (...);
Resulteert in:
Modified_Time | Device_Id | Measure_Value |
---|---|---|
10:00:00 | V | 1 |
10:05 | A | 20 |
In de vervangingsmodus krijgen we alleen de laatste waarde per sleutel. Hier gebruiken we Device_Id als sleutel. De equivalente T-SQL-instructie is:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Resulteert in:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 20 |
Ten slotte wordt in de cumulatieve modus opgeteld Value
met een samengestelde toewijzingsoperator (+=
). Hier gebruiken we ook Device_Id als sleutel:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Resulteert in:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 21 |
Voor prestatieoverwegingen ondersteunen de UITVOERadapters van de ASA SQL-database momenteel alleen de toevoegmodus. Deze adapters gebruiken bulksgewijs invoegen om de doorvoer te maximaliseren en de terugdruk te beperken.
In dit artikel wordt beschreven hoe u Azure Functions gebruikt om de modi Replace and Accumulate voor ASA te implementeren. Wanneer u een functie als intermediaire laag gebruikt, hebben de mogelijke schrijfprestaties geen invloed op de streamingtaak. In dit opzicht werkt het gebruik van Azure Functions het beste met Azure SQL. Met Synapse SQL kan het overschakelen van bulksgewijs naar rij-per-rij-instructies grotere prestatieproblemen opleveren.
Uitvoer van Azure Functions
In onze taak vervangen we de ASA SQL-uitvoer door de ASA Azure Functions-uitvoer. De mogelijkheden UPDATE, UPSERT of MERGE worden geïmplementeerd in de functie.
Er zijn momenteel twee opties voor toegang tot een SQL Database in een functie. Eerst is de Azure SQL-uitvoerbinding. Het is momenteel beperkt tot C# en biedt alleen de vervangingsmodus. Ten tweede bestaat uit het opstellen van een SQL-query die moet worden verzonden via het juiste SQL-stuurprogramma (Microsoft.Data.SqlClient voor .NET).
Voor beide voorbeelden gaan we uit van het volgende tabelschema. Voor de bindingsoptie moet een primaire sleutel worden ingesteld in de doeltabel. Het is niet nodig, maar wordt aanbevolen bij het gebruik van een SQL-stuurprogramma.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
Een functie moet voldoen aan de volgende verwachtingen die moeten worden gebruikt als uitvoer van ASA:
- Azure Stream Analytics verwacht HTTP-status 200 van de Functions-app voor batches die zijn verwerkt
- Wanneer Azure Stream Analytics een 413(http-aanvraagentiteit te groot) ontvangt van een Azure-functie, vermindert deze de grootte van de batches die worden verzonden naar De Azure-functie
- Tijdens de testverbinding verzendt Stream Analytics een POST-aanvraag met een lege batch naar Azure Functions en verwacht de HTTP-status 20x terug om de test te valideren
Optie 1: Bijwerken op sleutel met de Azure Function SQL-binding
Deze optie maakt gebruik van de Azure Function SQL-uitvoerbinding. Deze extensie kan een object in een tabel vervangen zonder dat u een SQL-instructie hoeft te schrijven. Op dit moment worden samengestelde toewijzingsoperatoren (accumulaties) niet ondersteund.
Dit voorbeeld is gebaseerd op:
- Runtimeversie 4 van Azure Functions
- .NET 6.0
- Microsoft.Azure.WebJobs.Extensions.Sql 0.1.131-preview
Als u meer inzicht wilt krijgen in de bindingsbenadering, wordt u aangeraden deze zelfstudie te volgen.
Maak eerst een standaard-HttpTrigger-functie-app door deze zelfstudie te volgen. De volgende informatie wordt gebruikt:
- Taal:
C#
- Runtime:
.NET 6
(onder function/runtime v4) - Sjabloon:
HTTP trigger
Installeer de bindingsextensie door de volgende opdracht uit te voeren in een terminal die zich in de projectmap bevindt:
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
Voeg het SqlConnectionString
item toe in de Values
sectie van uwlocal.settings.json
, vul de verbindingsreeks van de doelserver in:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Vervang de volledige functie (.cs bestand in het project) door het volgende codefragment. Werk de naamruimte, de klassenaam en de functienaam zelf bij:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
Werk de naam van de doeltabel in de bindingssectie bij:
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
Werk de Device
sectie Klasse en toewijzing bij zodat deze overeenkomen met uw eigen schema:
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
U kunt nu de bedrading tussen de lokale functie en de database testen door foutopsporing (F5 in Visual Studio Code). De SQL-database moet bereikbaar zijn vanaf uw computer. SSMS kan worden gebruikt om de connectiviteit te controleren. Verzend vervolgens POST-aanvragen naar het lokale eindpunt. Een aanvraag met een lege hoofdtekst moet http 204 retourneren. Een aanvraag met een werkelijke nettolading moet worden bewaard in de doeltabel (in de vervang-/updatemodus). Hier volgt een voorbeeldpayload die overeenkomt met het schema dat in dit voorbeeld wordt gebruikt:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
De functie kan nu worden gepubliceerd naar Azure. Er moet een toepassingsinstelling worden ingesteld voor SqlConnectionString
. De Azure SQL Server-firewall moet toestaan dat Azure-services in de live-functie deze bereiken.
De functie kan vervolgens worden gedefinieerd als uitvoer in de ASA-taak en wordt gebruikt om records te vervangen in plaats van ze in te voegen.
Optie 2: Samenvoegen met samengestelde toewijzing (cumulatie) via een aangepaste SQL-query
Notitie
Bij het opnieuw opstarten en herstellen kan ASA uitvoergebeurtenissen die al zijn verzonden, opnieuw verzenden. Dit is een verwacht gedrag dat ertoe kan leiden dat de accumulatielogica mislukt (het verdubbelen van afzonderlijke waarden). U kunt dit voorkomen door dezelfde gegevens in een tabel uit te voeren via de systeemeigen ASA SQL-uitvoer. Deze besturingstabel kan vervolgens worden gebruikt om problemen te detecteren en de accumulatie zo nodig opnieuw te synchroniseren.
Deze optie maakt gebruik van Microsoft.Data.SqlClient. Met deze bibliotheek kunnen sql-query's worden uitgevoerd naar een SQL Database.
Dit voorbeeld is gebaseerd op:
Maak eerst een standaard-HttpTrigger-functie-app door deze zelfstudie te volgen. De volgende informatie wordt gebruikt:
- Taal:
C#
- Runtime:
.NET 6
(onder function/runtime v4) - Sjabloon:
HTTP trigger
Installeer de SqlClient-bibliotheek door de volgende opdracht uit te voeren in een terminal die zich in de projectmap bevindt:
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
Voeg het SqlConnectionString
item toe in de Values
sectie van uwlocal.settings.json
, vul de verbindingsreeks van de doelserver in:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Vervang de volledige functie (.cs bestand in het project) door het volgende codefragment. Werk de naamruimte, de klassenaam en de functienaam zelf bij:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
Werk de sectie van het sqltext
opdrachtgebouw bij zodat deze overeenkomt met uw eigen schema (u ziet hoe accumulatie wordt bereikt via de operator bij de +=
update):
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
U kunt nu de bedrading tussen de lokale functie en de database testen door foutopsporing (F5 in VS Code). De SQL-database moet bereikbaar zijn vanaf uw computer. SSMS kan worden gebruikt om de connectiviteit te controleren. Verzend vervolgens POST-aanvragen naar het lokale eindpunt. Een aanvraag met een lege hoofdtekst moet http 204 retourneren. Een aanvraag met een werkelijke nettolading moet worden opgeslagen in de doeltabel (in de samengevoegde/samenvoegmodus). Hier volgt een voorbeeldpayload die overeenkomt met het schema dat in dit voorbeeld wordt gebruikt:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
De functie kan nu worden gepubliceerd naar Azure. Er moet een toepassingsinstelling worden ingesteld voor SqlConnectionString
. De Azure SQL Server-firewall moet toestaan dat Azure-services in de live-functie deze bereiken.
De functie kan vervolgens worden gedefinieerd als uitvoer in de ASA-taak en wordt gebruikt om records te vervangen in plaats van ze in te voegen.
Alternatieven
Buiten Azure Functions zijn er meerdere manieren om het verwachte resultaat te bereiken. In deze sectie vindt u een aantal van deze secties.
Naverwerking in de doel-SQL Database
Een achtergrondtaak werkt zodra de gegevens in de database worden ingevoegd via de standaard-ASA SQL-uitvoer.
Voor Azure SQL INSTEAD OF
kunnen DML-triggers worden gebruikt om de INSERT-opdrachten te onderscheppen die zijn uitgegeven door ASA:
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
Voor Synapse SQL kan ASA worden ingevoegd in een faseringstabel. Een terugkerende taak kan vervolgens de gegevens naar behoefte transformeren in een tussenliggende tabel. Ten slotte worden de gegevens verplaatst naar de productietabel.
Voorverwerking in Azure Cosmos DB
Azure Cosmos DB biedt systeemeigen ondersteuning voor UPSERT. Hier is alleen toevoegen/vervangen mogelijk. Accumulaties moeten worden beheerd aan de clientzijde in Azure Cosmos DB.
Als de vereisten overeenkomen, kunt u de doel-SQL-database vervangen door een Azure Cosmos DB-exemplaar. Hiervoor is een belangrijke wijziging in de algehele oplossingsarchitectuur vereist.
Voor Synapse SQL kan Azure Cosmos DB worden gebruikt als intermediaire laag via Azure Synapse Link voor Azure Cosmos DB. Azure Synapse Link kan worden gebruikt om een analytische opslag te maken. Dit gegevensarchief kan vervolgens rechtstreeks in Synapse SQL worden opgevraagd.
Vergelijking van de alternatieven
Elke benadering biedt verschillende waardeproposities en mogelijkheden:
Type | Optie | Modi | Azure SQL Database | Azure Synapse Analytics |
---|---|---|---|---|
Naverwerking | ||||
Triggers | Vervangen, verzamelen | + | N.v.t., triggers zijn niet beschikbaar in Synapse SQL | |
Staging | Vervangen, verzamelen | + | + | |
Voorverwerking | ||||
Azure Functions | Vervangen, verzamelen | + | - (prestaties van rij per rij) | |
Vervanging van Azure Cosmos DB | Replace | N.v.t. | N.v.t. | |
Azure Cosmos DB Azure Synapse Link | Replace | N.v.t. | + |
Ondersteuning krijgen
Probeer onze microsoft Q&A-vragenpagina voor Azure Stream Analytics voor meer hulp.
Volgende stappen
- Uitvoer van Azure Stream Analytics begrijpen
- Uitvoer van Azure Stream Analytics naar Azure SQL Database
- Doorvoerprestaties verhogen naar Azure SQL Database vanuit Azure Stream Analytics
- Beheerde identiteiten gebruiken voor toegang tot Azure SQL Database of Azure Synapse Analytics vanuit een Azure Stream Analytics-taak
- Referentiegegevens uit een SQL Database gebruiken voor een Azure Stream Analytics-taak
- Azure Functions uitvoeren in Azure Stream Analytics-taken - Zelfstudie voor Redis-uitvoer
- Quickstart: Een Stream Analytics-taak maken met behulp van Azure Portal