バックグラウンド ジョブでは、多くの場合、特定のオーケストレーターのインスタンスが一度に 1 つだけ実行されることを保証する必要があります。 特定のインスタンス ID を作成時にオーケストレーターに割り当てて、Durable Functions でこのようなシングルトン動作を確実に行うことができます。
次の例は、シングルトン バックグラウンド ジョブのオーケストレーションを作成する HTTP トリガー関数を示しています。 このコードは、指定されたインスタンス ID の インスタンスが 1 つだけ存在することを保証します。
[FunctionName("HttpStartSingle")]
public static async Task<HttpResponseMessage> RunSingle(
[HttpTrigger(AuthorizationLevel.Function, methods: "post", Route = "orchestrators/{functionName}/{instanceId}")] HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient starter,
string functionName,
string instanceId,
ILogger log)
{
// Check if an instance with the specified ID already exists or an existing one stopped running(completed/failed/terminated).
var existingInstance = await starter.GetStatusAsync(instanceId);
if (existingInstance == null
|| existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Completed
|| existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Failed
|| existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Terminated)
{
// An instance with the specified ID doesn't exist or an existing one stopped running, create one.
dynamic eventData = await req.Content.ReadAsAsync<object>();
await starter.StartNewAsync(functionName, instanceId, eventData);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
return starter.CreateCheckStatusResponse(req, instanceId);
}
else
{
// An instance with the specified ID exists or an existing one still running, don't create one.
return new HttpResponseMessage(HttpStatusCode.Conflict)
{
Content = new StringContent($"An instance with ID '{instanceId}' already exists."),
};
}
}
Note
前記の C# コードは Durable Functions 2.x 用です。 Durable Functions 1.x では、DurableClient
属性の代わりに OrchestrationClient
属性を使用する必要があります。また、IDurableOrchestrationClient
ではなく DurableOrchestrationClient
パラメーター型を使用する必要があります。 バージョン間の相違点の詳細については、Durable Functions のバージョンに関する記事を参照してください。
function.json
{
"bindings": [
{
"authLevel": "function",
"name": "req",
"type": "httpTrigger",
"direction": "in",
"route": "orchestrators/{functionName}/{instanceId}",
"methods": ["post"]
},
{
"name": "starter",
"type": "orchestrationClient",
"direction": "in"
},
{
"name": "$return",
"type": "http",
"direction": "out"
}
]
}
index.js
const df = require("durable-functions");
module.exports = async function(context, req) {
const client = df.getClient(context);
const instanceId = req.params.instanceId;
const functionName = req.params.functionName;
// Check if an instance with the specified ID already exists or an existing one stopped running(completed/failed/terminated).
const existingInstance = await client.getStatus(instanceId);
if (!existingInstance
|| existingInstance.runtimeStatus == "Completed"
|| existingInstance.runtimeStatus == "Failed"
|| existingInstance.runtimeStatus == "Terminated") {
// An instance with the specified ID doesn't exist or an existing one stopped running, create one.
const eventData = req.body;
await client.startNew(functionName, instanceId, eventData);
context.log(`Started orchestration with ID = '${instanceId}'.`);
return client.createCheckStatusResponse(req, instanceId);
} else {
// An instance with the specified ID exists or an existing one still running, don't create one.
return {
status: 409,
body: `An instance with ID '${instanceId}' already exists.`,
};
}
};
function.json
{
"bindings": [
{
"authLevel": "function",
"name": "req",
"type": "httpTrigger",
"direction": "in",
"route": "orchestrators/{functionName}/{instanceId}",
"methods": ["post"]
},
{
"name": "starter",
"type": "orchestrationClient",
"direction": "in"
},
{
"name": "$return",
"type": "http",
"direction": "out"
}
]
}
init.py
import logging
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
instance_id = req.route_params['instanceId']
function_name = req.route_params['functionName']
existing_instance = await client.get_status(instance_id)
if existing_instance.runtime_status in [df.OrchestrationRuntimeStatus.Completed, df.OrchestrationRuntimeStatus.Failed, df.OrchestrationRuntimeStatus.Terminated, None]:
event_data = req.get_body()
instance_id = await client.start_new(function_name, instance_id, event_data)
logging.info(f"Started orchestration with ID = '{instance_id}'.")
return client.create_check_status_response(req, instance_id)
else:
return {
'status': 409,
'body': f"An instance with ID '${existing_instance.instance_id}' already exists"
}
@FunctionName("HttpStartSingle")
public HttpResponseMessage runSingle(
@HttpTrigger(name = "req") HttpRequestMessage<?> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
String instanceID = "StaticID";
DurableTaskClient client = durableContext.getClient();
// Check to see if an instance with this ID is already running
OrchestrationMetadata metadata = client.getInstanceMetadata(instanceID, false);
if (metadata.isRunning()) {
return req.createResponseBuilder(HttpStatus.CONFLICT)
.body("An instance with ID '" + instanceID + "' already exists.")
.build();
}
// No such instance exists - create a new one. De-dupe is handled automatically
// in the storage layer if another function tries to also use this instance ID.
client.scheduleNewOrchestrationInstance("MyOrchestration", null, instanceID);
return durableContext.createCheckStatusResponse(req, instanceID);
}
既定では、インスタンス ID は、ランダムに生成される GUID です。 ただし、前記の例では、URL からルート データにインスタンス ID が渡されています。 次に、コードはオーケストレーション インスタンスのメタデータをフェッチして、指定した ID を持つインスタンスが既に実行されているかどうかを確認します。 そのようなインスタンスが実行されていない場合は、その ID で新しいインスタンスが作成されます。