在 Durable Functions (Azure Functions) 中處理外部事件
本文內容
協調器函式能夠等候和接聽外部事件。 Durable Functions 的這項功能通常有助於處理人為互動或其他外部觸發程序。
注意
外部事件都是單向的非同步作業。 不適用於傳送事件的用戶端需要來自協調器函式的同步回應的狀況。
等候事件
協調器觸發程序繫結 的 wait-for-external-event API 可讓協調器函數以非同步方式等候和接聽外部用戶端所傳遞的事件。 接聽協調器函式會宣告事件的「名稱」 和預期收到的「資料形式」 。
[FunctionName("BudgetApproval")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
bool approved = await context.WaitForExternalEvent<bool>("Approval");
if (approved)
{
// approval granted - do the approved action
}
else
{
// approval denied - send a notification
}
}
注意
先前的 C# 程式碼適用於 Durable Functions 2.x。 針對 Durable Functions 1.x,您必須使用 DurableOrchestrationContext
而不是 IDurableOrchestrationContext
。 如需版本差異的詳細資訊,請參閱 Durable Functions 版本 一文。
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const approved = yield context.df.waitForExternalEvent("Approval");
if (approved) {
// approval granted - do the approved action
} else {
// approval denied - send a notification
}
});
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
approved = yield context.wait_for_external_event('Approval')
if approved:
# approval granted - do the approved action
else:
# approval denied - send a notification
main = df.Orchestrator.create(orchestrator_function)
param($Context)
$approved = Start-DurableExternalEventListener -EventName "Approval"
if ($approved) {
# approval granted - do the approved action
} else {
# approval denied - send a notification
}
@FunctionName("WaitForExternalEvent")
public void waitForExternalEvent(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
boolean approved = ctx.waitForExternalEvent("Approval", boolean.class).await();
if (approved) {
// approval granted - do the approved action
} else {
// approval denied - send a notification
}
}
上述範例會接聽特定單一事件,並於收到該事件時採取動作。
您可以同時接聽多個事件,例如,下列範例中會等候三個可能的事件通知之一。
[FunctionName("Select")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var event1 = context.WaitForExternalEvent<float>("Event1");
var event2 = context.WaitForExternalEvent<bool>("Event2");
var event3 = context.WaitForExternalEvent<int>("Event3");
var winner = await Task.WhenAny(event1, event2, event3);
if (winner == event1)
{
// ...
}
else if (winner == event2)
{
// ...
}
else if (winner == event3)
{
// ...
}
}
注意
先前的 C# 程式碼適用於 Durable Functions 2.x。 針對 Durable Functions 1.x,您必須使用 DurableOrchestrationContext
而不是 IDurableOrchestrationContext
。 如需版本差異的詳細資訊,請參閱 Durable Functions 版本 一文。
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const event1 = context.df.waitForExternalEvent("Event1");
const event2 = context.df.waitForExternalEvent("Event2");
const event3 = context.df.waitForExternalEvent("Event3");
const winner = yield context.df.Task.any([event1, event2, event3]);
if (winner === event1) {
// ...
} else if (winner === event2) {
// ...
} else if (winner === event3) {
// ...
}
});
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
event1 = context.wait_for_external_event('Event1')
event2 = context.wait_for_external_event('Event2')
event3 = context.wait_for_external_event('Event3')
winner = yield context.task_any([event1, event2, event3])
if winner == event1:
# ...
elif winner == event2:
# ...
elif winner == event3:
# ...
main = df.Orchestrator.create(orchestrator_function)
param($Context)
$event1 = Start-DurableExternalEventListener -EventName "Event1" -NoWait
$event2 = Start-DurableExternalEventListener -EventName "Event2" -NoWait
$event3 = Start-DurableExternalEventListener -EventName "Event3" -NoWait
$winner = Wait-DurableTask -Task @($event1, $event2, $event3) -Any
if ($winner -eq $event1) {
# ...
} else if ($winner -eq $event2) {
# ...
} else if ($winner -eq $event3) {
# ...
}
@FunctionName("Select")
public void selectOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
Task<Void> event1 = ctx.waitForExternalEvent("Event1");
Task<Void> event2 = ctx.waitForExternalEvent("Event2");
Task<Void> event3 = ctx.waitForExternalEvent("Event3");
Task<?> winner = ctx.anyOf(event1, event2, event3).await();
if (winner == event1) {
// ...
} else if (winner == event2) {
// ...
} else if (winner == event3) {
// ...
}
}
前一個範例會接聽多個事件中的「任何」 事件。 也可以等候「所有」 事件。
[FunctionName("NewBuildingPermit")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
string applicationId = context.GetInput<string>();
var gate1 = context.WaitForExternalEvent("CityPlanningApproval");
var gate2 = context.WaitForExternalEvent("FireDeptApproval");
var gate3 = context.WaitForExternalEvent("BuildingDeptApproval");
// all three departments must grant approval before a permit can be issued
await Task.WhenAll(gate1, gate2, gate3);
await context.CallActivityAsync("IssueBuildingPermit", applicationId);
}
注意
先前的程式碼適用於 Durable Functions 2.x。 針對 Durable Functions 1.x,您必須使用 DurableOrchestrationContext
而不是 IDurableOrchestrationContext
。 如需版本差異的詳細資訊,請參閱 Durable Functions 版本 一文。
在 .NET 中,如果事件裝載無法轉換為預期的類型 T
,則會擲回例外狀況。
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const applicationId = context.df.getInput();
const gate1 = context.df.waitForExternalEvent("CityPlanningApproval");
const gate2 = context.df.waitForExternalEvent("FireDeptApproval");
const gate3 = context.df.waitForExternalEvent("BuildingDeptApproval");
// all three departments must grant approval before a permit can be issued
yield context.df.Task.all([gate1, gate2, gate3]);
yield context.df.callActivity("IssueBuildingPermit", applicationId);
});
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
application_id = context.get_input()
gate1 = context.wait_for_external_event('CityPlanningApproval')
gate2 = context.wait_for_external_event('FireDeptApproval')
gate3 = context.wait_for_external_event('BuildingDeptApproval')
yield context.task_all([gate1, gate2, gate3])
yield context.call_activity('IssueBuildingPermit', application_id)
main = df.Orchestrator.create(orchestrator_function)
param($Context)
$applicationId = $Context.Input
$gate1 = Start-DurableExternalEventListener -EventName "CityPlanningApproval" -NoWait
$gate2 = Start-DurableExternalEventListener -EventName "FireDeptApproval" -NoWait
$gate3 = Start-DurableExternalEventListener -EventName "BuildingDeptApproval" -NoWait
Wait-DurableTask -Task @($gate1, $gate2, $gate3)
Invoke-ActivityFunction -FunctionName 'IssueBuildingPermit' -Input $applicationId
@FunctionName("NewBuildingPermit")
public void newBuildingPermit(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String applicationId = ctx.getInput(String.class);
Task<Void> gate1 = ctx.waitForExternalEvent("CityPlanningApproval");
Task<Void> gate2 = ctx.waitForExternalEvent("FireDeptApproval");
Task<Void> gate3 = ctx.waitForExternalEvent("BuildingDeptApproval");
// all three departments must grant approval before a permit can be issued
ctx.allOf(List.of(gate1, gate2, gate3)).await();
ctx.callActivity("IssueBuildingPermit", applicationId).await();
}
wait-for-external-event API 會無限期地等候某個輸入。 等候時可以安心卸載函式應用程式。 當此協調流程執行個體有事件抵達時,就會自動甦醒並立即處理事件。
注意
如果函數應用程式使用取用方案,則協調器函數等候外部事件工作時,不論等多久都不會產生計費費用。
如同活動函式,外部事件至少具有一 次 傳遞保證。 這表示,在特定情況下(例如重新啟動、調整、當機等),您的應用程式可能會收到相同外部事件的重複專案。 因此,我們建議外部事件包含某種標識碼,可讓它們以手動方式在協調器中取消複製。
傳送事件
您可以使用協調流程用戶端 繫結所定義的 raise-event API,將外部事件傳送至協調流程。 您也可以使用內建引發事件 HTTP API ,將外部事件傳送至協調流程。
引發的事件包括「執行個體識別碼」 、eventName 和 eventData 作為參數。 協調器函數會使用 wait-for-external-event API 來處理這些事件。 eventName 必須同時符合傳送端和接收端,才能處理事件。 事件資料也必須是 JSON 可序列化。
在內部,raise-event 機制會將訊息加入佇列,供等候中的協調器函數取用。 如果執行個體不是在等候指定的「事件名稱」 ,事件訊息就會新增至記憶體內部佇列。 如果協調流程執行個體稍後開始接聽該「事件名稱」 ,它將會檢查佇列是否有事件訊息。
注意
如果沒有具有指定「執行個體識別碼」 的協調流程執行個體,則會捨棄事件訊息。
以下的範例佇列觸發函式會將「核准」事件傳送至協調器函式執行個體。 協調流程執行個體識別碼來自佇列訊息的本文。
[FunctionName("ApprovalQueueProcessor")]
public static async Task Run(
[QueueTrigger("approval-queue")] string instanceId,
[DurableClient] IDurableOrchestrationClient client)
{
await client.RaiseEventAsync(instanceId, "Approval", true);
}
注意
先前的 C# 程式碼適用於 Durable Functions 2.x。 針對 Durable Functions 1.x,您必須使用 OrchestrationClient
屬性 (而非 DurableClient
屬性),而且必須使用 DurableOrchestrationClient
參數類型 (而非 IDurableOrchestrationClient
)。 如需版本差異的詳細資訊,請參閱 Durable Functions 版本 一文。
const df = require("durable-functions");
module.exports = async function(context, instanceId) {
const client = df.getClient(context);
await client.raiseEvent(instanceId, "Approval", true);
};
import azure.functions as func
import azure.durable_functions as df
async def main(instance_id:str, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
await client.raise_event(instance_id, 'Approval', True)
param($instanceId)
Send-DurableExternalEvent -InstanceId $InstanceId -EventName "Approval"
@FunctionName("ApprovalQueueProcessor")
public void approvalQueueProcessor(
@QueueTrigger(name = "instanceID", queueName = "approval-queue") String instanceID,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
durableContext.getClient().raiseEvent(instanceID, "Approval", true);
}
在內部,raise-event API 會將訊息加入佇列,供等候中的協調器函數取用。 如果執行個體不是在等候指定的「事件名稱」 ,則會將事件訊息新增至記憶體內部緩衝區。 如果協調流程執行個體稍後開始接聽該「事件名稱」 ,則會檢查事件訊息的緩衝區,並觸發正在等候它的工作。
注意
如果沒有具有指定「執行個體識別碼」 的協調流程執行個體,則會捨棄事件訊息。
HTTP
以下範例說明將「核准」事件引發至協調流程執行個體的 HTTP 要求。
POST /runtime/webhooks/durabletask/instances/MyInstanceId/raiseEvent/Approval&code=XXX
Content-Type: application/json
"true"
在此情況下,執行個體識別碼會硬式編碼為 MyInstanceId 。
下一步