Azure Function, calling Multiple Instance

DJ 0 Reputation points
2025-02-24T12:09:39.2733333+00:00

Hi All

This is my first post on this support, can you please confirm that my account is being acknowledged for priority support?

I would like to know how I can achieve the following.

I have SQL query that produces a list of my IoT Device IDs.

Each device ID needs to have its data processed. Therefore instead of processing it once after another,, how can I set my Functions all so that my process is happening simultaneously for multiple devices' data?

My process happens in a function similar to this and this function will be processing some raw data , therefore I would like to to run multiple in parallel.

await PrepareFlowDebice(deviceid, log);

Thank You

DJ

Azure Functions
Azure Functions
An Azure service that provides an event-driven serverless compute platform.
5,504 questions
{count} votes

2 answers

Sort by: Most helpful
  1. Sampath 750 Reputation points Microsoft External Staff
    2025-03-04T17:39:13.9766667+00:00

    Hello @DJ, Below is a sample code to retrieve device IDs from an Azure IoT Hub SQL query and pass each deviceId.

      var query = registryManager.CreateQuery("SELECT * FROM devices", 100);
        while (query.HasMoreResults)
           {
               var page = await query.GetNextAsTwinAsync();
                   foreach (var twin in page)
                      {
                          string deviceId = twin.DeviceId;
                          Console.WriteLine($"Processing Device ID: {deviceId}");
    
    

    Below is sample Azure Function HTTP trigger code, to retrieves IoT device IDs from Azure IoT Hub and processes them concurrently. Refer this MSDOC for Query examples with the service SDKs.

    
    using System;
    
    using System.IO;
    
    using System.Linq;
    
    using System.Threading.Tasks;
    
    using Microsoft.AspNetCore.Mvc;
    
    using Microsoft.Azure.Devices;
    
    using Microsoft.Azure.WebJobs;
    
    using Microsoft.Azure.WebJobs.Extensions.Http;
    
    using Microsoft.AspNetCore.Http;
    
    using Microsoft.Extensions.Logging;
    
    using Newtonsoft.Json;
    
    using System.Collections.Generic;
    
    namespace FunctionApp7
    
    {
    
        public static class ProcessIoTDevices
    
        {
    
            private const string iotHubConnectionString = "HostName=TODO.azure-devices.net;SharedAccessKeyName=TODO;SharedAccessKey=TODO";
    
            
    
            [FunctionName("ProcessIoTDevices")]
    
            public static async Task<IActionResult> Run(
    
                [HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req,
    
                ILogger log)
    
            {
    
                log.LogInformation("Processing IoT devices concurrently...");
    
                try
    
                {
    
                    using var registryManager = RegistryManager.CreateFromConnectionString(iotHubConnectionString);
    
                    List<string> deviceIds = await RetrieveDeviceIds(registryManager, log);
    
                    if (deviceIds.Count == 0)
    
                    {
    
                        return new OkObjectResult("No devices found in IoT Hub.");
    
                    }
    
                    var tasks = deviceIds.Select(deviceId => PrepareFlowDevice(deviceId, log));
    
                    await Task.WhenAll(tasks);
    
                    return new OkObjectResult($"Processed {deviceIds.Count} devices in parallel.");
    
                }
    
                catch (Exception ex)
    
                {
    
                    log.LogError($"Error processing IoT devices: {ex.Message}");
    
                    return new StatusCodeResult(500);
    
                }
    
            }
    
            private static async Task<List<string>> RetrieveDeviceIds(RegistryManager registryManager, ILogger log)
    
            {
    
                var deviceIds = new List<string>();
    
                var query = registryManager.CreateQuery("SELECT deviceId FROM devices", 100);
    
                
    
                try
    
                {
    
                    while (query.HasMoreResults)
    
                    {
    
                        var page = await query.GetNextAsJsonAsync(); 
    
                        foreach (var json in page)
    
                        {
    
                            dynamic twin = JsonConvert.DeserializeObject(json);
    
                            deviceIds.Add((string)twin.deviceId);
    
                        }
    
                    }
    
                    log.LogInformation($"Retrieved {deviceIds.Count} device IDs.");
    
                }
    
                catch (Exception ex)
    
                {
    
                    log.LogError($"Error retrieving device IDs: {ex.Message}");
    
                }
    
                return deviceIds;
    
            }
    
            private static async Task PrepareFlowDevice(string deviceId, ILogger log)
    
            {
    
                try
    
                {
    
                    await Task.Delay(500); 
    
                    log.LogInformation($"Device {deviceId} processed.");
    
                }
    
                catch (Exception ex)
    
                {
    
                    log.LogError($"Error processing device {deviceId}: {ex.Message}");
    
                }
    
            }
    
        }
    
    }
    
    

    Output

    Hope this helps!

    If you found this answer helpful, please click Accept Answer and consider upvoting it /click yes.

    accept

    If you have any further questions, please click Comment.


  2. DJ 0 Reputation points
    2025-03-06T15:17:04.6766667+00:00

    @Sampath

    Yes, we are using the consumption plan, and we still wish to continue using it for this task since it is not a highly critical interactive task. However, it does take some time to analyze data in Azure Table Storage. The aim is to perform this task multiple times a day and keep the updated evaluated results ready in SQL.

    Even though I mentioned that the process takes some time, its duration decreases as we run it more frequently since we are only analyzing the most recent data.

    What I am trying to achieve is a mechanism to prevent the server from becoming overwhelmed. If the number of tasks exceeds the server’s capacity, there should be a buffer process to ensure that the next set of devices is not executed until the previous batches are finished. This would reduce the risk of any processes being missed.

    When executing a durable function, is there an option to keep an instance in an idle state until the server is freed up before executing the next batch? My aim is to structure my code for the worst-case scenario. I am also considering whether a service bus could assist with this, assuming durable functions do not have a built-in mechanism for this natively.

    Regarding maxConcurrentOrchestratorFunctions and maxConcurrentActivityFunctions, under what circumstances would we want to limit the number of active functions to 10 instead of allowing them to run at maximum capacity for scalability purposes?

    From your logs, does my account show that I am using a support package?


Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.