Co to Durable Functions?
Durable Functions to funkcja usługi Azure Functions, która umożliwia pisanie funkcji stanowych w bezserwerowym środowisku obliczeniowym. Rozszerzenie umożliwia definiowanie stanowych przepływów pracy przez pisanie funkcji orkiestratora i jednostek stanowych przez pisanie funkcji jednostek przy użyciu modelu programowania usługi Azure Functions. W tle rozszerzenie zarządza stanem, punktami kontrolnymi i ponownymi uruchomieniami, co pozwala skupić się na logice biznesowej.
Obsługiwane języki
Rozszerzenie Durable Functions jest przeznaczone do pracy ze wszystkimi językami programowania usługi Azure Functions, ale może mieć różne minimalne wymagania dla każdego języka. W poniższej tabeli przedstawiono minimalne obsługiwane konfiguracje aplikacji:
Stos języka | Wersje środowiska uruchomieniowego usługi Azure Functions | Wersja procesu roboczego języka | Minimalna wersja pakietów |
---|---|---|---|
.NET/C# /F# | Funkcje w wersji 1.0 lub nowszej | Proces Poza procesem |
nie dotyczy |
JavaScript/TypeScript (model w wersji 3) | Funkcje w wersji 2.0 lub nowszej | Węzeł 8+ | Pakiety 2.x |
JavaScript/TypeScript (model progów w wersji 4) | Funkcje 4.25+ | Węzeł 18+ | 3.15+ pakiety |
Python | Funkcje w wersji 2.0 lub nowszej | Python 3.7+ | Pakiety 2.x |
Python (model progów w wersji 2) | Funkcje w wersji 4.0 lub nowszej | Python 3.7+ | 3.15+ pakiety |
PowerShell | Funkcje w wersji 3.0 lub nowszej | PowerShell 7+ | Pakiety 2.x |
Java | Funkcje w wersji 4.0 lub nowszej | Java 8+ | Pakiety 4.x |
Ważne
W tym artykule są używane karty do obsługi wielu wersji modelu programowania Node.js. Model w wersji 4 jest ogólnie dostępny i ma bardziej elastyczne i intuicyjne środowisko dla deweloperów języka JavaScript i Języka TypeScript. Aby uzyskać więcej informacji na temat sposobu działania modelu w wersji 4, zapoznaj się z przewodnikiem dewelopera dotyczącym usługi Azure Functions Node.js. Aby dowiedzieć się więcej o różnicach między wersjami 3 i v4, zapoznaj się z przewodnikiem migracji.
Ważne
W tym artykule są używane karty do obsługi wielu wersji modelu programowania w języku Python. Model w wersji 2 jest ogólnie dostępny i ma na celu zapewnienie bardziej skoncentrowanego na kodzie sposobu tworzenia funkcji za pośrednictwem dekoratorów. Aby uzyskać więcej informacji na temat sposobu działania modelu w wersji 2, zapoznaj się z przewodnikiem dla deweloperów języka Python usługi Azure Functions.
Podobnie jak usługa Azure Functions, istnieją szablony ułatwiające opracowywanie rozszerzenia Durable Functions przy użyciu programu Visual Studio, programu Visual Studio Code i witryny Azure Portal.
Wzorce aplikacji
Podstawowym zastosowaniem rozszerzenia Durable Functions jest uproszczenie złożonych wymagań związanych z koordynacją stanową w aplikacjach bezserwerowych. W poniższych sekcjach opisano typowe wzorce aplikacji, które mogą korzystać z rozszerzenia Durable Functions:
- Łączenie łańcuchów funkcji
- Model fan-out/fan-in
- Interfejsy API protokołu HTTP Async
- Monitorowanie
- Interakcja z użytkownikami
- Agregator (jednostki stanowe)
Wzorzec nr 1: Tworzenie łańcucha funkcji
We wzorcu tworzenia łańcucha funkcji sekwencja funkcji jest wykonywana w określonej kolejności. W tym wzorcu dane wyjściowe jednej funkcji są stosowane do danych wejściowych innej funkcji. Użycie kolejek między każdą funkcją gwarantuje, że system pozostanie trwały i skalowalny, mimo że istnieje przepływ kontroli z jednej funkcji do następnej.
Za pomocą rozszerzenia Durable Functions można zaimplementować zwięzły wzorzec tworzenia łańcucha funkcji, jak pokazano w poniższym przykładzie.
W tym przykładzie wartości F1
, F2
, F3
i F4
są nazwami innych funkcji w tej samej aplikacji funkcji. Przepływ sterowania można zaimplementować przy użyciu normalnych konstrukcji kodowania imperatywnego. Kod jest wykonywany z góry w dół. Kod może obejmować istniejącą semantykę przepływu sterowania językiem, na przykład warunkowe i pętle. Logikę obsługi błędów można uwzględnić w try
//catch
finally
blokach.
[FunctionName("Chaining")]
public static async Task<object> Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
try
{
var x = await context.CallActivityAsync<object>("F1", null);
var y = await context.CallActivityAsync<object>("F2", x);
var z = await context.CallActivityAsync<object>("F3", y);
return await context.CallActivityAsync<object>("F4", z);
}
catch (Exception)
{
// Error handling or compensation goes here.
}
}
Można użyć parametru context
do wywoływania innych funkcji według nazwy, przekazywania parametrów i zwracania danych wyjściowych funkcji. Za każdym razem, gdy kod wywołuje await
element , platforma Durable Functions wskazuje postęp bieżącego wystąpienia funkcji. Jeśli proces lub maszyna wirtualna są odtwarzane w połowie wykonywania, wystąpienie funkcji zostanie wznowione z poprzedniego wywołania await
. Aby uzyskać więcej informacji, zobacz następną sekcję Pattern #2: Fan out/fan in.
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
try {
const x = yield context.df.callActivity("F1");
const y = yield context.df.callActivity("F2", x);
const z = yield context.df.callActivity("F3", y);
return yield context.df.callActivity("F4", z);
} catch (error) {
// Error handling or compensation goes here.
}
});
Obiekt umożliwia context.df
wywoływanie innych funkcji według nazwy, przekazywania parametrów i zwracania danych wyjściowych funkcji. Za każdym razem, gdy kod wywołuje yield
element , platforma Durable Functions wskazuje postęp bieżącego wystąpienia funkcji. Jeśli proces lub maszyna wirtualna są odtwarzane w połowie wykonywania, wystąpienie funkcji zostanie wznowione z poprzedniego wywołania yield
. Aby uzyskać więcej informacji, zobacz następną sekcję Pattern #2: Fan out/fan in.
Uwaga
Obiekt context
w języku JavaScript reprezentuje cały kontekst funkcji. Uzyskaj dostęp do kontekstu rozszerzenia Durable Functions przy użyciu df
właściwości w kontekście głównym.
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
x = yield context.call_activity("F1", None)
y = yield context.call_activity("F2", x)
z = yield context.call_activity("F3", y)
result = yield context.call_activity("F4", z)
return result
main = df.Orchestrator.create(orchestrator_function)
Obiekt umożliwia context
wywoływanie innych funkcji według nazwy, przekazywania parametrów i zwracania danych wyjściowych funkcji. Za każdym razem, gdy kod wywołuje yield
element , platforma Durable Functions wskazuje postęp bieżącego wystąpienia funkcji. Jeśli proces lub maszyna wirtualna są odtwarzane w połowie wykonywania, wystąpienie funkcji zostanie wznowione z poprzedniego wywołania yield
. Aby uzyskać więcej informacji, zobacz następną sekcję Pattern #2: Fan out/fan in.
Uwaga
context
Obiekt w języku Python reprezentuje kontekst orkiestracji. Uzyskaj dostęp do głównego kontekstu usługi Azure Functions przy użyciu function_context
właściwości w kontekście aranżacji.
param($Context)
$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z
Możesz użyć Invoke-DurableActivity
polecenia , aby wywołać inne funkcje według nazwy, przekazać parametry i zwrócić dane wyjściowe funkcji. Za każdym razem, gdy kod wywołuje Invoke-DurableActivity
bez przełącznika NoWait
, platforma Durable Functions wskazuje postęp bieżącego wystąpienia funkcji. Jeśli proces lub maszyna wirtualna są odtwarzane w połowie wykonywania, wystąpienie funkcji zostanie wznowione z poprzedniego wywołania Invoke-DurableActivity
. Aby uzyskać więcej informacji, zobacz następną sekcję Pattern #2: Fan out/fan in.
@FunctionName("Chaining")
public double functionChaining(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String input = ctx.getInput(String.class);
int x = ctx.callActivity("F1", input, int.class).await();
int y = ctx.callActivity("F2", x, int.class).await();
int z = ctx.callActivity("F3", y, int.class).await();
return ctx.callActivity("F4", z, double.class).await();
}
Obiekt umożliwia ctx
wywoływanie innych funkcji według nazwy, przekazywania parametrów i zwracania danych wyjściowych funkcji. Dane wyjściowe tych metod to obiekt, w którym V
jest typem Task<V>
danych zwracanych przez wywołaną funkcję. Za każdym razem, gdy wywołujesz Task<V>.await()
metodę , platforma Durable Functions wskazuje postęp bieżącego wystąpienia funkcji. Jeśli proces nieoczekiwanie odtwarza się w połowie wykonywania, wystąpienie funkcji zostanie wznowione z poprzedniego wywołania Task<V>.await()
. Aby uzyskać więcej informacji, zobacz następną sekcję Pattern #2: Fan out/fan in.
Wzorzec nr 2: Wentylator na wyprzedaj/wentylator w
We wzorcu wyprzedaj/wentylator wykonujesz wiele funkcji równolegle, a następnie czekasz na zakończenie wszystkich funkcji. Często niektóre prace agregacji są wykonywane na wynikach zwracanych z funkcji.
Dzięki normalnym funkcjom można wymyślić możliwość wysyłania wielu komunikatów do kolejki przez funkcję . Fanning z powrotem w jest znacznie trudniejsze. Aby zasymać działanie, w normalnej funkcji piszesz kod do śledzenia po zakończeniu funkcji wyzwalanych przez kolejkę, a następnie przechowywać dane wyjściowe funkcji.
Rozszerzenie Durable Functions obsługuje ten wzorzec za pomocą stosunkowo prostego kodu:
[FunctionName("FanOutFanIn")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var parallelTasks = new List<Task<int>>();
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
for (int i = 0; i < workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
parallelTasks.Add(task);
}
await Task.WhenAll(parallelTasks);
// Aggregate all N outputs and send the result to F3.
int sum = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("F3", sum);
}
Praca nad wentylatorem jest dystrybuowana do wielu wystąpień F2
funkcji. Praca jest śledzona przy użyciu dynamicznej listy zadań. Task.WhenAll
jest wywoływana, aby poczekać na zakończenie wszystkich wywoływanych funkcji. F2
Następnie dane wyjściowe funkcji są agregowane z listy zadań dynamicznych i przekazywane do F3
funkcji.
Automatyczne tworzenie punktów kontrolnych, które odbywa się przy await
wywołaniu Task.WhenAll
, gwarantuje, że potencjalna awaria lub ponowny rozruch w połowie drogi nie wymaga ponownego uruchomienia już ukończonego zadania.
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const parallelTasks = [];
// Get a list of N work items to process in parallel.
const workBatch = yield context.df.callActivity("F1");
for (let i = 0; i < workBatch.length; i++) {
parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
}
yield context.df.Task.all(parallelTasks);
// Aggregate all N outputs and send the result to F3.
const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
yield context.df.callActivity("F3", sum);
});
Praca nad wentylatorem jest dystrybuowana do wielu wystąpień F2
funkcji. Praca jest śledzona przy użyciu dynamicznej listy zadań. context.df.Task.all
Interfejs API jest wywoływany, aby poczekać na zakończenie wszystkich wywoływanych funkcji. F2
Następnie dane wyjściowe funkcji są agregowane z listy zadań dynamicznych i przekazywane do F3
funkcji.
Automatyczne tworzenie punktów kontrolnych, które odbywa się przy yield
wywołaniu context.df.Task.all
, gwarantuje, że potencjalna awaria lub ponowny rozruch w połowie drogi nie wymaga ponownego uruchomienia już ukończonego zadania.
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
# Get a list of N work items to process in parallel.
work_batch = yield context.call_activity("F1", None)
parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]
outputs = yield context.task_all(parallel_tasks)
# Aggregate all N outputs and send the result to F3.
total = sum(outputs)
yield context.call_activity("F3", total)
main = df.Orchestrator.create(orchestrator_function)
Praca nad wentylatorem jest dystrybuowana do wielu wystąpień F2
funkcji. Praca jest śledzona przy użyciu dynamicznej listy zadań. context.task_all
Interfejs API jest wywoływany, aby poczekać na zakończenie wszystkich wywoływanych funkcji. F2
Następnie dane wyjściowe funkcji są agregowane z listy zadań dynamicznych i przekazywane do F3
funkcji.
Automatyczne tworzenie punktów kontrolnych, które odbywa się przy yield
wywołaniu context.task_all
, gwarantuje, że potencjalna awaria lub ponowny rozruch w połowie drogi nie wymaga ponownego uruchomienia już ukończonego zadania.
param($Context)
# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'
$ParallelTasks =
foreach ($WorkItem in $WorkBatch) {
Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
}
$Outputs = Wait-ActivityFunction -Task $ParallelTasks
# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total
Praca nad wentylatorem jest dystrybuowana do wielu wystąpień F2
funkcji. Zwróć uwagę na NoWait
użycie przełącznika F2
w wywołaniu funkcji: ten przełącznik umożliwia orkiestratorowi kontynuowanie wywoływania F2
bez oczekiwania na ukończenie działania. Praca jest śledzona przy użyciu dynamicznej listy zadań. Polecenie Wait-ActivityFunction
jest wywoływane, aby poczekać na zakończenie wszystkich wywoływanych funkcji. F2
Następnie dane wyjściowe funkcji są agregowane z listy zadań dynamicznych i przekazywane do F3
funkcji.
Automatyczne tworzenie punktów kontrolnych, które odbywa się podczas Wait-ActivityFunction
wywołania, gwarantuje, że potencjalna awaria lub ponowny rozruch w połowie drogi nie wymaga ponownego uruchomienia już ukończonego zadania.
@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Get the list of work-items to process in parallel
List<?> batch = ctx.callActivity("F1", List.class).await();
// Schedule each task to run in parallel
List<Task<Integer>> parallelTasks = batch.stream()
.map(item -> ctx.callActivity("F2", item, Integer.class))
.collect(Collectors.toList());
// Wait for all tasks to complete, then return the aggregated sum of the results
List<Integer> results = ctx.allOf(parallelTasks).await();
return results.stream().reduce(0, Integer::sum);
}
Praca nad wentylatorem jest dystrybuowana do wielu wystąpień F2
funkcji. Praca jest śledzona przy użyciu dynamicznej listy zadań. ctx.allOf(parallelTasks).await()
jest wywoływana, aby poczekać na zakończenie wszystkich wywoływanych funkcji. F2
Następnie dane wyjściowe funkcji są agregowane z listy zadań dynamicznych i zwracane jako dane wyjściowe funkcji orkiestratora.
Automatyczne tworzenie punktów kontrolnych, które odbywa się podczas .await()
wywołania, ctx.allOf(parallelTasks)
gwarantuje, że nieoczekiwany proces recyklingu nie wymaga ponownego uruchomienia żadnych już ukończonych zadań.
Uwaga
W rzadkich okolicznościach możliwe jest, że awaria może wystąpić w oknie po zakończeniu funkcji działania, ale przed jego ukończeniem zostanie zapisana w historii aranżacji. W takim przypadku funkcja działania zostanie ponownie uruchomiona od początku po odzyskaniu procesu.
Wzorzec nr 3: asynchroniczne interfejsy API HTTP
Wzorzec asynchronicznego interfejsu API HTTP rozwiązuje problem koordynowania stanu długotrwałych operacji z klientami zewnętrznymi. Typowym sposobem zaimplementowania tego wzorca jest posiadanie punktu końcowego HTTP wyzwalającego długotrwałą akcję. Następnie przekieruj klienta do punktu końcowego stanu, który klient sonduje, aby dowiedzieć się, kiedy operacja zostanie zakończona.
Rozszerzenie Durable Functions zapewnia wbudowaną obsługę tego wzorca, upraszczając lub nawet usuwając kod, który należy napisać w celu interakcji z długotrwałymi wykonywaniami funkcji. Na przykład przykłady szybkiego startu rozszerzenia Durable Functions (C#, JavaScript, TypeScript, Python, PowerShell i Java) pokazują proste polecenie REST, którego można użyć do uruchamiania nowych wystąpień funkcji orkiestratora. Po uruchomieniu wystąpienia rozszerzenie uwidacznia interfejsy API HTTP elementu webhook, które wysyłają zapytanie do stanu funkcji orkiestratora.
W poniższym przykładzie przedstawiono polecenia REST, które uruchamiają koordynatora i wysyłają zapytania dotyczące jego stanu. W celu zapewnienia przejrzystości niektóre szczegóły protokołu zostaną pominięte w przykładzie.
> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec
{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec
{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json
{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}
Ponieważ środowisko uruchomieniowe Durable Functions zarządza stanem, nie trzeba implementować własnego mechanizmu śledzenia stanu.
Rozszerzenie Durable Functions uwidacznia wbudowane interfejsy API HTTP, które zarządzają długotrwałymi aranżacjami. Możesz też samodzielnie zaimplementować ten wzorzec przy użyciu własnych wyzwalaczy funkcji (takich jak HTTP, kolejka lub Usługa Azure Event Hubs) oraz trwałe powiązanie klienta. Na przykład możesz użyć komunikatu kolejki do wyzwolenia zakończenia. Możesz też użyć wyzwalacza HTTP chronionego przez zasady uwierzytelniania entra firmy Microsoft zamiast wbudowanych interfejsów API HTTP, które używają wygenerowanego klucza do uwierzytelniania.
Aby uzyskać więcej informacji, zobacz artykuł funkcje PROTOKOŁU HTTP, w którym wyjaśniono, jak można uwidaczniać asynchroniczne, długotrwałe procesy za pośrednictwem protokołu HTTP przy użyciu rozszerzenia Durable Functions.
Wzorzec nr 4: Monitorowanie
Wzorzec monitora odnosi się do elastycznego, cyklicznego procesu w przepływie pracy. Przykładem jest sondowanie do momentu spełnienia określonych warunków. Wyzwalacz czasomierza regularnego umożliwia rozwiązanie podstawowego scenariusza, takiego jak okresowe zadanie oczyszczania, ale jego interwał jest statyczny i zarządzanie okresami istnienia wystąpień staje się złożone. Za pomocą rozszerzenia Durable Functions można tworzyć elastyczne interwały cyklu, zarządzać okresami istnienia zadań i tworzyć wiele procesów monitorowania na podstawie jednej aranżacji.
Przykładem wzorca monitora jest odwrócenie wcześniejszego scenariusza asynchronicznego interfejsu API HTTP. Zamiast uwidaczniać punkt końcowy dla klienta zewnętrznego w celu monitorowania długotrwałej operacji, długotrwały monitor zużywa zewnętrzny punkt końcowy, a następnie czeka na zmianę stanu.
W kilku wierszach kodu można użyć rozszerzenia Durable Functions, aby utworzyć wiele monitorów, które obserwują dowolne punkty końcowe. Monitory mogą zakończyć wykonywanie po spełnieniu warunku lub inna funkcja może użyć klienta trwałej aranżacji w celu zakończenia monitorów. Interwał monitora wait
można zmienić na podstawie określonego warunku (na przykład wycofywania wykładniczego).
Poniższy kod implementuje podstawowy monitor:
[FunctionName("MonitorJobStatus")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
int jobId = context.GetInput<int>();
int pollingInterval = GetPollingInterval();
DateTime expiryTime = GetExpiryTime();
while (context.CurrentUtcDateTime < expiryTime)
{
var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
if (jobStatus == "Completed")
{
// Perform an action when a condition is met.
await context.CallActivityAsync("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
await context.CreateTimer(nextCheck, CancellationToken.None);
}
// Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");
module.exports = df.orchestrator(function*(context) {
const jobId = context.df.getInput();
const pollingInterval = getPollingInterval();
const expiryTime = getExpiryTime();
while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
if (jobStatus === "Completed") {
// Perform an action when a condition is met.
yield context.df.callActivity("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
yield context.df.createTimer(nextCheck.toDate());
}
// Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
job = json.loads(context.get_input())
job_id = job["jobId"]
polling_interval = job["pollingInterval"]
expiry_time = job["expiryTime"]
while context.current_utc_datetime < expiry_time:
job_status = yield context.call_activity("GetJobStatus", job_id)
if job_status == "Completed":
# Perform an action when a condition is met.
yield context.call_activity("SendAlert", job_id)
break
# Orchestration sleeps until this time.
next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
yield context.create_timer(next_check)
# Perform more work here, or let the orchestration end.
main = df.Orchestrator.create(orchestrator_function)
param($Context)
$output = @()
$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime
while ($Context.CurrentUtcDateTime -lt $expiryTime) {
$jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
if ($jobStatus -eq "Completed") {
# Perform an action when a condition is met.
$output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
break
}
# Orchestration sleeps until this time.
Start-DurableTimer -Duration $pollingInterval
}
# Perform more work here, or let the orchestration end.
$output
@FunctionName("Monitor")
public String monitorOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
JobInfo jobInfo = ctx.getInput(JobInfo.class);
String jobId = jobInfo.getJobId();
Instant expiryTime = jobInfo.getExpirationTime();
while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();
// Perform an action when a condition is met
if (status.equals("Completed")) {
// send an alert and exit
ctx.callActivity("SendAlert", jobId).await();
break;
}
// wait N minutes before doing the next poll
Duration pollingDelay = jobInfo.getPollingDelay();
ctx.createTimer(pollingDelay).await();
}
return "done";
}
Po odebraniu żądania zostanie utworzone nowe wystąpienie orkiestracji dla tego identyfikatora zadania. Wystąpienie sonduje stan do momentu spełnienia warunku lub wygaśnięcia limitu czasu. Trwały czasomierz kontroluje interwał sondowania. Następnie można wykonać więcej pracy lub orkiestracja może zakończyć się.
Wzorzec nr 5: Interakcja z człowiekiem
Wiele zautomatyzowanych procesów obejmuje jakąś interakcję człowieka. Zaangażowanie ludzi w zautomatyzowany proces jest trudne, ponieważ ludzie nie są tak wysoko dostępne i reagują jak usługi w chmurze. Zautomatyzowany proces może zezwalać na tę interakcję przy użyciu limitów czasu i logiki kompensacji.
Proces zatwierdzania to przykład procesu biznesowego, który obejmuje interakcję człowieka. Zatwierdzenie przez menedżera może być wymagane dla raportu wydatków, który przekracza określoną kwotę w dolarach. Jeśli menedżer nie zatwierdzi raportu wydatków w ciągu 72 godzin (może być menedżer udał się na wakacje), proces eskalacji rozpoczyna się, aby uzyskać zgodę od kogoś innego (być może menedżer menedżera).
Wzorzec w tym przykładzie można zaimplementować przy użyciu funkcji orkiestratora. Orkiestrator używa trwałego czasomierza do żądania zatwierdzenia. Orkiestrator eskaluje, jeśli wystąpi przekroczenie limitu czasu. Orkiestrator czeka na zdarzenie zewnętrzne, takie jak powiadomienie wygenerowane przez interakcję człowieka.
Te przykłady umożliwiają utworzenie procesu zatwierdzania w celu zademonstrowania wzorca interakcji człowieka:
[FunctionName("ApprovalWorkflow")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
await context.CallActivityAsync("RequestApproval", null);
using (var timeoutCts = new CancellationTokenSource())
{
DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);
Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
{
timeoutCts.Cancel();
await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
}
else
{
await context.CallActivityAsync("Escalate", null);
}
}
}
Aby utworzyć trwały czasomierz, wywołaj metodę context.CreateTimer
. Powiadomienie jest odbierane przez element context.WaitForExternalEvent
. Następnie jest wywoływana, aby zdecydować, Task.WhenAny
czy eskalować (przekroczenie limitu czasu nastąpi najpierw) lub przetworzyć zatwierdzenie (zatwierdzenie jest odbierane przed przekroczeniem limitu czasu).
const df = require("durable-functions");
const moment = require('moment');
module.exports = df.orchestrator(function*(context) {
yield context.df.callActivity("RequestApproval");
const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
const durableTimeout = context.df.createTimer(dueTime.toDate());
const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
if (winningEvent === approvalEvent) {
durableTimeout.cancel();
yield context.df.callActivity("ProcessApproval", approvalEvent.result);
} else {
yield context.df.callActivity("Escalate");
}
});
Aby utworzyć trwały czasomierz, wywołaj metodę context.df.createTimer
. Powiadomienie jest odbierane przez element context.df.waitForExternalEvent
. Następnie jest wywoływana, aby zdecydować, context.df.Task.any
czy eskalować (przekroczenie limitu czasu nastąpi najpierw) lub przetworzyć zatwierdzenie (zatwierdzenie jest odbierane przed przekroczeniem limitu czasu).
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
yield context.call_activity("RequestApproval", None)
due_time = context.current_utc_datetime + timedelta(hours=72)
durable_timeout_task = context.create_timer(due_time)
approval_event_task = context.wait_for_external_event("ApprovalEvent")
winning_task = yield context.task_any([approval_event_task, durable_timeout_task])
if approval_event_task == winning_task:
durable_timeout_task.cancel()
yield context.call_activity("ProcessApproval", approval_event_task.result)
else:
yield context.call_activity("Escalate", None)
main = df.Orchestrator.create(orchestrator_function)
Aby utworzyć trwały czasomierz, wywołaj metodę context.create_timer
. Powiadomienie jest odbierane przez element context.wait_for_external_event
. Następnie jest wywoływana, aby zdecydować, context.task_any
czy eskalować (przekroczenie limitu czasu nastąpi najpierw) lub przetworzyć zatwierdzenie (zatwierdzenie jest odbierane przed przekroczeniem limitu czasu).
param($Context)
$output = @()
$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId
$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId
$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait
$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any
if ($approvalEvent -eq $firstEvent) {
Stop-DurableTimerTask -Task $durableTimeoutEvent
$output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
$output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}
$output
Aby utworzyć trwały czasomierz, wywołaj metodę Start-DurableTimer
. Powiadomienie jest odbierane przez element Start-DurableExternalEventListener
. Następnie jest wywoływana, aby zdecydować, Wait-DurableTask
czy eskalować (przekroczenie limitu czasu nastąpi najpierw) lub przetworzyć zatwierdzenie (zatwierdzenie jest odbierane przed przekroczeniem limitu czasu).
@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
ctx.callActivity("RequestApproval", approvalInfo).await();
Duration timeout = Duration.ofHours(72);
try {
// Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
approvalInfo.setApproved(approved);
ctx.callActivity("ProcessApproval", approvalInfo).await();
} catch (TaskCanceledException timeoutEx) {
ctx.callActivity("Escalate", approvalInfo).await();
}
}
Wywołanie ctx.waitForExternalEvent(...).await()
metody wstrzymuje aranżację do momentu odebrania zdarzenia o nazwie ApprovalEvent
, które ma boolean
ładunek. Jeśli zdarzenie zostanie odebrane, zostanie wywołana funkcja działania, aby przetworzyć wynik zatwierdzenia. Jeśli jednak takie zdarzenie nie zostanie odebrane przed timeout
wygaśnięciem (72 godziny), zostanie zgłoszone, a TaskCanceledException
Escalate
funkcja działania zostanie wywołana.
Uwaga
Nie są naliczane opłaty za czas oczekiwania na zdarzenia zewnętrzne podczas uruchamiania w planie Zużycie.
Klient zewnętrzny może dostarczyć powiadomienie o zdarzeniu do funkcji oczekującej orkiestratora przy użyciu wbudowanych interfejsów API PROTOKOŁU HTTP:
curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"
Zdarzenie można również wywołać przy użyciu klienta trwałej aranżacji z innej funkcji w tej samej aplikacji funkcji:
[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
[HttpTrigger] string instanceId,
[DurableClient] IDurableOrchestrationClient client)
{
bool isApproved = true;
await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
const df = require("durable-functions");
module.exports = async function (context) {
const client = df.getClient(context);
const isApproved = true;
await client.raiseEvent(instanceId, "ApprovalEvent", isApproved);
};
import azure.durable_functions as df
async def main(client: str):
durable_client = df.DurableOrchestrationClient(client)
is_approved = True
await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)
Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"
@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
@HttpTrigger(name = "instanceId") String instanceId,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
DurableTaskClient client = durableContext.getClient();
client.raiseEvent(instanceId, "ApprovalEvent", true);
}
Wzorzec nr 6: Agregator (jednostki stanowe)
Szósty wzorzec dotyczy agregowania danych zdarzeń w danym okresie do pojedynczej, adresowalnej jednostki. W tym wzorcu agregowane dane mogą pochodzić z wielu źródeł, mogą być dostarczane w partiach lub mogą być rozproszone w długich okresach czasu. Agregator może wymagać podjęcia działań dotyczących danych zdarzenia w miarę ich nadejścia, a klienci zewnętrzni mogą wymagać wykonania zapytań dotyczących zagregowanych danych.
Trudną rzeczą o próbie zaimplementowania tego wzorca z normalnymi funkcjami bezstanowymi jest to, że kontrola współbieżności staje się ogromnym wyzwaniem. Nie tylko musisz martwić się o wiele wątków modyfikujących te same dane w tym samym czasie. Należy również martwić się o to, aby agregator był uruchamiany tylko na jednej maszynie wirtualnej jednocześnie.
Za pomocą jednostek Durable można łatwo zaimplementować ten wzorzec jako pojedynczą funkcję.
[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
int currentValue = ctx.GetState<int>();
switch (ctx.OperationName.ToLowerInvariant())
{
case "add":
int amount = ctx.GetInput<int>();
ctx.SetState(currentValue + amount);
break;
case "reset":
ctx.SetState(0);
break;
case "get":
ctx.Return(currentValue);
break;
}
}
Jednostki trwałe można również modelować jako klasy na platformie .NET. Ten model może być przydatny, jeśli lista operacji jest stała i staje się duża. Poniższy przykład to równoważna implementacja Counter
jednostki przy użyciu klas i metod platformy .NET.
public class Counter
{
[JsonProperty("value")]
public int CurrentValue { get; set; }
public void Add(int amount) => this.CurrentValue += amount;
public void Reset() => this.CurrentValue = 0;
public int Get() => this.CurrentValue;
[FunctionName(nameof(Counter))]
public static Task Run([EntityTrigger] IDurableEntityContext ctx)
=> ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");
module.exports = df.entity(function(context) {
const currentValue = context.df.getState(() => 0);
switch (context.df.operationName) {
case "add":
const amount = context.df.getInput();
context.df.setState(currentValue + amount);
break;
case "reset":
context.df.setState(0);
break;
case "get":
context.df.return(currentValue);
break;
}
});
import azure.functions as func
import azure.durable_functions as df
def entity_function(context: df.DurableOrchestrationContext):
current_value = context.get_state(lambda: 0)
operation = context.operation_name
if operation == "add":
amount = context.get_input()
current_value += amount
context.set_result(current_value)
elif operation == "reset":
current_value = 0
elif operation == "get":
context.set_result(current_value)
context.set_state(current_value)
main = df.Entity.create(entity_function)
Uwaga
Jednostki trwałe nie są obecnie obsługiwane w programie PowerShell.
Uwaga
Jednostki trwałe nie są obecnie obsługiwane w języku Java.
Klienci mogą umieścić operacje kolejkowania dla funkcji jednostki (znanej również jako "sygnalizowanie") przy użyciu powiązania klienta jednostki.
[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
[EventHubTrigger("device-sensor-events")] EventData eventData,
[DurableClient] IDurableEntityClient entityClient)
{
var metricType = (string)eventData.Properties["metric"];
var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);
// The "Counter/{metricType}" entity is created on-demand.
var entityId = new EntityId("Counter", metricType);
await entityClient.SignalEntityAsync(entityId, "add", delta);
}
Uwaga
Dynamicznie generowane serwery proxy są również dostępne na platformie .NET na potrzeby sygnalizowania jednostek w bezpieczny sposób. Oprócz sygnalizowania klienci mogą również wykonywać zapytania dotyczące stanu funkcji jednostki przy użyciu metod bezpiecznych typu w powiązaniu klienta aranżacji.
const df = require("durable-functions");
const { app } = require("@azure/functions");
module.exports = async function (context) {
const client = df.getClient(context);
const entityId = new df.EntityId("Counter", "myCounter");
await client.signalEntity(entityId, "add", 1);
};
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
entity_id = df.EntityId("Counter", "myCounter")
instance_id = await client.signal_entity(entity_id, "add", 1)
return func.HttpResponse("Entity signaled")
Funkcje jednostek są dostępne w rozszerzeniach Durable Functions w wersji 2.0 lub nowszej dla języków C#, JavaScript i Python.
Technologia
W tle rozszerzenie Durable Functions jest oparte na strukturze Durable Task Framework— bibliotece typu open source w usłudze GitHub używanej do tworzenia przepływów pracy w kodzie. Podobnie jak usługa Azure Functions to bezserwerowa ewolucja zadań WebJob platformy Azure, rozszerzenie Durable Functions to bezserwerowa ewolucja platformy Durable Task Framework. Firma Microsoft i inne organizacje intensywnie używają platformy Durable Task Framework do automatyzowania procesów o znaczeniu krytycznym. Jest to naturalne rozwiązanie dla bezserwerowego środowiska usługi Azure Functions.
Ograniczenia kodu
Aby zapewnić niezawodne i długotrwałe gwarancje wykonywania, funkcje orkiestratora mają zestaw reguł kodowania, które muszą być przestrzegane. Aby uzyskać więcej informacji, zobacz artykuł Dotyczący ograniczeń kodu funkcji programu Orchestrator.
Rozliczenia
Opłaty za rozszerzenie Durable Functions są naliczane tak samo, jak w przypadku usługi Azure Functions. Aby uzyskać więcej informacji, zobacz Cennik usługi Azure Functions. Podczas wykonywania funkcji orkiestratora w planie zużycie usługi Azure Functions należy pamiętać o pewnych zachowaniach rozliczeniowych. Aby uzyskać więcej informacji na temat tych zachowań, zobacz artykuł dotyczący rozliczeń rozszerzenia Durable Functions.
Błyskawicznie rozpocznij pracę
Ukończ jeden z tych samouczków Szybki start dotyczących poszczególnych języków, aby rozpocząć korzystanie z rozszerzenia Durable Functions w niecałe 10 minut:
- C# przy użyciu programu Visual Studio 2019
- JavaScript w programie Visual Studio Code
- TypeScript przy użyciu programu Visual Studio Code
- Język Python korzystający z programu Visual Studio Code
- Program PowerShell przy użyciu programu Visual Studio Code
- Język Java korzystający z narzędzia Maven
W tych przewodnikach Szybki start utworzysz lokalnie i przetestujesz trwałą funkcję "hello world". Kod funkcji zostanie następnie opublikowany na platformie Azure. Utworzona przez Ciebie funkcja aranżuje i łączy w łańcuchy wywołania do innych funkcji.
Publikacje
Rozszerzenie Durable Functions jest opracowywane we współpracy z firmą Microsoft Research. W związku z tym zespół Durable Functions aktywnie tworzy dokumenty badawcze i artefakty; należą do nich:
- Durable Functions: Semantyka dla bezserwerowych stanowych (OOPSLA'21)
- Bezserwerowe przepływy pracy z rozszerzeniami Durable Functions i netherite (drukowanie wstępne)
Dowiedz się więcej
Poniższy klip wideo prezentuje zalety rozszerzenia Durable Functions:
Ponieważ Durable Functions to zaawansowane rozszerzenie usługi Azure Functions, nie jest odpowiednie dla wszystkich aplikacji. Aby zapoznać się z porównaniem technologii orkiestracji dostępnych na platformie Azure, zobacz Porównanie usług Azure Functions i Azure Logic Apps.