你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

Durable Functions 中的监视场景 - 天气观察程序示例

监视模式是工作流中灵活的重复过程 - 例如,反复轮询,直到满足特定的条件为止。 本文介绍使用 Durable Functions 实现监视的示例。

先决条件

方案概述

此示例监视某个地点的当前天气状况,如果是晴天,则通过短信通知用户。 可以使用常规的计时器触发函数来检查天气和发送提醒。 但是,此方法存在生存期管理方面的问题。 如果只应发送一条提醒,则在检测到晴天后,监视器需要禁用自身。 监视模式可以结束自身的执行,同时还具有其他优点:

  • 监视器按时间间隔而不是计划运行:计时器触发器每隔一小时运行;监视器等待一小时,然后执行下一项操作。 除非已指定,否则监视器的操作不会重叠,这对于长时间运行的任务可能很重要。
  • 监视器可以使用动态时间间隔:可以根据某种条件更改等待时间。
  • 监视器可以在满足某种条件时终止,或者由其他进程终止。
  • 监视器可以采用参数。 此示例演示如何将同一个天气监视进程应用到任何请求的地点和电话号码。
  • 监视器可缩放。 由于每个监视器是一个业务流程实例,因此可以创建多个监视器,而无需创建新函数或定义更多的代码。
  • 监视器可轻松集成到更大的工作流。 监视器可以是更复杂业务流程函数或子业务流程的一部分。

配置

配置 Twilio 集成

此示例涉及使用 Twilio 服务向移动电话发送短信。 Azure Functions 已通过 Twilio 绑定提供对 Twilio 的支持,此示例使用了这一功能。

首先,需要一个 Twilio 帐户。 可以通过 https://www.twilio.com/try-twilio 免费创建一个帐户。 创建帐户以后,向函数应用添加以下三个应用设置

应用设置名称 值说明
TwilioAccountSid Twilio 帐户的 SID
TwilioAuthToken Twilio 帐户的身份验证令牌
TwilioPhoneNumber 与 Twilio 帐户关联的电话号码。 此电话号码用于发送短信。

配置 Weather Underground 集成

此示例涉及到使用 Weather Underground API 来检查某个地点的当前天气状况。

首先需要创建一个 Weather Underground 帐户。 可以通过 https://www.wunderground.com/signup 免费创建一个帐户。 创建帐户后,需要获取 API 密钥。 可以访问 https://www.wunderground.com/weather/api,然后选择“密钥设置”来获取此密钥。 Stratus Developer 计划是免费的,足以用于运行此示例。

获取 API 密钥后,将以下应用设置添加到函数应用。

应用设置名称 值说明
WeatherUndergroundApiKey Weather Underground API 密钥。

函数

本文介绍示例应用中的以下函数:

  • E3_Monitor:一个业务流程协调程序函数,它定期调用 E3_GetIsClear。 如果 E3_GetIsClear 返回 true,则此函数会调用 E3_SendGoodWeatherAlert
  • E3_GetIsClear:检查某个地点当前天气状况的活动函数
  • E3_SendGoodWeatherAlert:通过 Twilio 发送短信的活动函数。

E3_Monitor 业务流程协调程序函数

[FunctionName("E3_Monitor")]
public static async Task Run([OrchestrationTrigger] IDurableOrchestrationContext monitorContext, ILogger log)
{
    MonitorRequest input = monitorContext.GetInput<MonitorRequest>();
    if (!monitorContext.IsReplaying) { log.LogInformation($"Received monitor request. Location: {input?.Location}. Phone: {input?.Phone}."); }

    VerifyRequest(input);

    DateTime endTime = monitorContext.CurrentUtcDateTime.AddHours(6);
    if (!monitorContext.IsReplaying) { log.LogInformation($"Instantiating monitor for {input.Location}. Expires: {endTime}."); }

    while (monitorContext.CurrentUtcDateTime < endTime)
    {
        // Check the weather
        if (!monitorContext.IsReplaying) { log.LogInformation($"Checking current weather conditions for {input.Location} at {monitorContext.CurrentUtcDateTime}."); }

        bool isClear = await monitorContext.CallActivityAsync<bool>("E3_GetIsClear", input.Location);

        if (isClear)
        {
            // It's not raining! Or snowing. Or misting. Tell our user to take advantage of it.
            if (!monitorContext.IsReplaying) { log.LogInformation($"Detected clear weather for {input.Location}. Notifying {input.Phone}."); }

            await monitorContext.CallActivityAsync("E3_SendGoodWeatherAlert", input.Phone);
            break;
        }
        else
        {
            // Wait for the next checkpoint
            var nextCheckpoint = monitorContext.CurrentUtcDateTime.AddMinutes(30);
            if (!monitorContext.IsReplaying) { log.LogInformation($"Next check for {input.Location} at {nextCheckpoint}."); }

            await monitorContext.CreateTimer(nextCheckpoint, CancellationToken.None);
        }
    }

    log.LogInformation($"Monitor expiring.");
}

[Deterministic]
private static void VerifyRequest(MonitorRequest request)
{
    if (request == null)
    {
        throw new ArgumentNullException(nameof(request), "An input object is required.");
    }

    if (request.Location == null)
    {
        throw new ArgumentNullException(nameof(request.Location), "A location input is required.");
    }

    if (string.IsNullOrEmpty(request.Phone))
    {
        throw new ArgumentNullException(nameof(request.Phone), "A phone number input is required.");
    }
}

该业务流程协调程序需要一个要监视的地点,还需要一个电话号码,以便在该地点的天气变得晴朗时向此号码发送消息。 此数据会作为强类型 MonitorRequest 对象传递到该业务流程协调程序。

此业务流程协调程序函数执行以下操作:

  1. 获取 MonitorRequest,其中包括要监视的地点,以及要将短信通知发送到的电话号码。
  2. 确定监视器的过期时间。 为简便起见,本示例使用了硬编码值。
  3. 调用 E3_GetIsClear 来确定请求的地点是否为晴天。
  4. 如果是晴天,则调用 E3_SendGoodWeatherAlert 将短信通知发送到请求的电话号码。
  5. 创建一个持久计时器,以便在下一个轮询间隔恢复业务流程。 为简便起见,本示例使用了硬编码值。
  6. 持续运行,直至当前 UTC 时间超过监视器的过期时间,或者发送了短信警报。

通过多次调用业务流程协调程序函数,多个业务流程协调程序实例可以同时运行。 可以指定要监视的地点,以及要将短信提醒发送到的电话号码。 最后,请务必注意,业务流程协调程序函数在等待计时器时是不运行的,因此不会产生费用。

E3_GetIsClear 活动函数

与其他示例一样,帮助器活动函数是使用 activityTrigger 触发器绑定的正则函数。 E3_GetIsClear 函数使用 Weather Underground API 获取当前天气状况并确定是否为晴天。

[FunctionName("E3_GetIsClear")]
public static async Task<bool> GetIsClear([ActivityTrigger] Location location)
{
    var currentConditions = await WeatherUnderground.GetCurrentConditionsAsync(location);
    return currentConditions.Equals(WeatherCondition.Clear);
}

E3_SendGoodWeatherAlert 活动函数

E3_SendGoodWeatherAlert 函数使用 Twilio 绑定来发送短信,告知最终用户目前的天气适合散步。

    [FunctionName("E3_SendGoodWeatherAlert")]
    public static void SendGoodWeatherAlert(
        [ActivityTrigger] string phoneNumber,
        ILogger log,
        [TwilioSms(AccountSidSetting = "TwilioAccountSid", AuthTokenSetting = "TwilioAuthToken", From = "%TwilioPhoneNumber%")]
            out CreateMessageOptions message)
    {
        message = new CreateMessageOptions(new PhoneNumber(phoneNumber));
        message.Body = $"The weather's clear outside! Go take a walk!";
    }

internal class WeatherUnderground
{
    private static readonly HttpClient httpClient = new HttpClient();
    private static IReadOnlyDictionary<string, WeatherCondition> weatherMapping = new Dictionary<string, WeatherCondition>()
    {
        { "Clear", WeatherCondition.Clear },
        { "Overcast", WeatherCondition.Clear },
        { "Cloudy", WeatherCondition.Clear },
        { "Clouds", WeatherCondition.Clear },
        { "Drizzle", WeatherCondition.Precipitation },
        { "Hail", WeatherCondition.Precipitation },
        { "Ice", WeatherCondition.Precipitation },
        { "Mist", WeatherCondition.Precipitation },
        { "Precipitation", WeatherCondition.Precipitation },
        { "Rain", WeatherCondition.Precipitation },
        { "Showers", WeatherCondition.Precipitation },
        { "Snow", WeatherCondition.Precipitation },
        { "Spray", WeatherCondition.Precipitation },
        { "Squall", WeatherCondition.Precipitation },
        { "Thunderstorm", WeatherCondition.Precipitation },
    };

    internal static async Task<WeatherCondition> GetCurrentConditionsAsync(Location location)
    {
        var apiKey = Environment.GetEnvironmentVariable("WeatherUndergroundApiKey");
        if (string.IsNullOrEmpty(apiKey))
        {
            throw new InvalidOperationException("The WeatherUndergroundApiKey environment variable was not set.");
        }

        var callString = string.Format("http://api.wunderground.com/api/{0}/conditions/q/{1}/{2}.json", apiKey, location.State, location.City);
        var response = await httpClient.GetAsync(callString);
        var conditions = await response.Content.ReadAsAsync<JObject>();

        JToken currentObservation;
        if (!conditions.TryGetValue("current_observation", out currentObservation))
        {
            JToken error = conditions.SelectToken("response.error");

            if (error != null)
            {
                throw new InvalidOperationException($"API returned an error: {error}.");
            }
            else
            {
                throw new ArgumentException("Could not find weather for this location. Try being more specific.");
            }
        }

        return MapToWeatherCondition((string)(currentObservation as JObject).GetValue("weather"));
    }

    private static WeatherCondition MapToWeatherCondition(string weather)
    {
        foreach (var pair in weatherMapping)
        {
            if (weather.Contains(pair.Key))
            {
                return pair.Value;
            }
        }

        return WeatherCondition.Other;
    }
}

注意

需要安装 Microsoft.Azure.WebJobs.Extensions.Twilio Nuget 包才能运行示例代码。

运行示例

使用示例中包含的 HTTP 触发型函数,可以通过发送以下 HTTP POST 请求来启动业务流程:

POST https://{host}/orchestrators/E3_Monitor
Content-Length: 77
Content-Type: application/json

{ "location": { "city": "Redmond", "state": "WA" }, "phone": "+1425XXXXXXX" }
HTTP/1.1 202 Accepted
Content-Type: application/json; charset=utf-8
Location: https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635?taskHub=SampleHubVS&connection=Storage&code={SystemKey}
RetryAfter: 10

{"id": "f6893f25acf64df2ab53a35c09d52635", "statusQueryGetUri": "https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635?taskHub=SampleHubVS&connection=Storage&code={systemKey}", "sendEventPostUri": "https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635/raiseEvent/{eventName}?taskHub=SampleHubVS&connection=Storage&code={systemKey}", "terminatePostUri": "https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635/terminate?reason={text}&taskHub=SampleHubVS&connection=Storage&code={systemKey}"}

E3_Monitor 实例启动,并查询请求位置的当前天气状况。 如果天气为晴,则调用某个活动函数来发送提醒;否则将设置计时器。 当计时器过期时,业务流程将会恢复。

可以通过在 Azure Functions 门户中查看函数日志,来了解业务流程的活动。

2018-03-01T01:14:41.649 Function started (Id=2d5fcadf-275b-4226-a174-f9f943c90cd1)
2018-03-01T01:14:42.741 Started orchestration with ID = '1608200bb2ce4b7face5fc3b8e674f2e'.
2018-03-01T01:14:42.780 Function completed (Success, Id=2d5fcadf-275b-4226-a174-f9f943c90cd1, Duration=1111ms)
2018-03-01T01:14:52.765 Function started (Id=b1b7eb4a-96d3-4f11-a0ff-893e08dd4cfb)
2018-03-01T01:14:52.890 Received monitor request. Location: Redmond, WA. Phone: +1425XXXXXXX.
2018-03-01T01:14:52.895 Instantiating monitor for Redmond, WA. Expires: 3/1/2018 7:14:52 AM.
2018-03-01T01:14:52.909 Checking current weather conditions for Redmond, WA at 3/1/2018 1:14:52 AM.
2018-03-01T01:14:52.954 Function completed (Success, Id=b1b7eb4a-96d3-4f11-a0ff-893e08dd4cfb, Duration=189ms)
2018-03-01T01:14:53.226 Function started (Id=80a4cb26-c4be-46ba-85c8-ea0c6d07d859)
2018-03-01T01:14:53.808 Function completed (Success, Id=80a4cb26-c4be-46ba-85c8-ea0c6d07d859, Duration=582ms)
2018-03-01T01:14:53.967 Function started (Id=561d0c78-ee6e-46cb-b6db-39ef639c9a2c)
2018-03-01T01:14:53.996 Next check for Redmond, WA at 3/1/2018 1:44:53 AM.
2018-03-01T01:14:54.030 Function completed (Success, Id=561d0c78-ee6e-46cb-b6db-39ef639c9a2c, Duration=62ms)

该业务流程在超时时间已到或检测到晴天时完成。 也可以在另一函数中使用 terminate API,或调用上面的 202 响应中引用的 terminatePostUri HTTP POST Webhook。 若要使用该 Webhook,请将 {text} 替换为提前终止的原因。 HTTP POST URL 大致如下所示:

POST https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635/terminate?reason=Because&taskHub=SampleHubVS&connection=Storage&code={systemKey}

后续步骤

本示例演示了如何使用 Durable Functions,通过持久性计时器和条件逻辑来监视外部源的状态。 下一个示例演示如何使用外部事件和持久计时器处理人工交互。