Why isn’t my IoT Hub telemetry being ingested into Cosmos DB via Azure Functions?

Heeyaichen Konsam 0 Reputation points
2025-02-14T11:17:44.69+00:00

Overview: I'm building an IoT solution on Azure where devices send telemetry to IoT Hub. The telemetry is routed using IoT Hub’s built-in Event Hub–compatible endpoint, and an Azure Function (using Python and the V2 programming model) to processes the messages and writes them to a Cosmos DB container. For the telemetry, I'm fetching air quality data via the IQAir API.

IoT Hub: Devices send telemetry successfully, which is visible in IoT Explorer and metrics.

Message Routing: I've configured a custom route (endpoint type: events) in IoT Hub to forward telemetry to the built-in endpoint.

Azure Function: I have two functions; one is a timer trigger to fetch the telemetry data periodically every 5 mins and an Event Hub trigger to process and store the data in cosmos DB, written in Python using the Azure Functions V2 model. Both functions are written in one main function_app.py script which is configured as follows:

import logging
import json
import requests
import os
from dotenv import load_dotenv
import azure.functions as func
from azure.iot.device import IoTHubDeviceClient, Message
from azure.cosmos import CosmosClient  

# Load environment variables
load_dotenv()

# Azure and API credentials
API_KEY = os.getenv("AIRVISUAL_API_KEY")
IOT_HUB_DEVICE_CONNECTION_STRING = os.getenv("IOT_HUB_DEVICE_CONNECTION_STRING")


# Verify required environment variables (for local development)
if not API_KEY or not IOT_HUB_DEVICE_CONNECTION_STRING:
    raise ValueError("Missing environment variables. Check .env file")

app = func.FunctionApp()

@app.timer_trigger(schedule="0 */5 * * * *", arg_name="myTimer", run_on_startup=False,
              use_monitor=False) 
def fetch_data(myTimer: func.TimerRequest) -> None:

    """Fetch AQ data and send to IoT Hub every 5 minutes."""
    if myTimer.past_due:
        logging.info('The timer is past due!')

    logging.info('Python timer trigger function executed.')
    try:
        airvisual_url = f"http://api.airvisual.com/v2/nearest_city?key={API_KEY}"
        response = requests.get(airvisual_url)
        data = response.json()

        if response.status_code == 200:
            # Extract relevant data
            telemetry_data = {
                "city": data["data"]["city"],
                "state": data["data"]["state"],
                "country": data["data"]["country"],
                "aqi_us": data["data"]["current"]["pollution"]["aqius"],
                "main_pollutant": data["data"]["current"]["pollution"]["mainus"],
                "temperature": data["data"]["current"]["weather"]["tp"],
                "humidity": data["data"]["current"]["weather"]["hu"],
            }
            
            message = Message(json.dumps(telemetry_data, indent=2))
            message.content_encoding = "utf-8"
            message.content_type = "application/json"

            client = IoTHubDeviceClient.create_from_connection_string(IOT_HUB_DEVICE_CONNECTION_STRING)
            print("Sending telemetry data to Azure IoT Hub...")
            client.send_message(message)

            logging.info("Telemetry data successfully sent to IoT Hub.")
            client.shutdown()
        else:
            logging.error(f"Error: {response.status_code} - {response.text}")

    except Exception as e:
        logging.error(f"An error occurred: {e}")
            
        

@app.event_hub_message_trigger(arg_name="azeventhub", 
                               event_hub_name="aq-iot-events",
                               connection="EventHubConnectionString"
                               ) 

def store_in_cosmos_db(azeventhub: func.EventHubEvent):
    logging.info('Python EventHub trigger processed an event: %s',
                azeventhub.get_body().decode('utf-8'))
    try:
        # Parse telemetry data
        telemetry_data = json.loads(azeventhub.get_body().decode("utf-8"))

        # Store data in Cosmos DB
        COSMOS_ENDPOINT = os.getenv("COSMOS_ENDPOINT")
        COSMOS_KEY = os.getenv("COSMOS_KEY")
        DATABASE_NAME = "AirQualityDB"
        CONTAINER_NAME = "TelemetryData"

        client = CosmosClient(COSMOS_ENDPOINT, COSMOS_KEY)
        database = client.get_database_client(DATABASE_NAME)
        container = database.get_container_client(CONTAINER_NAME)

        container.upsert_item(telemetry_data)
        logging.info(f"Telemetry data stored in Cosmos DB: {telemetry_data}")

    except Exception as e:
        logging.error(f"Error storing data: {e}")
        raise  

Cosmos DB: I have created a Cosmos DB account with a database named "AirQualityDB" and a container "TelemetryData" (with partition key set to /city).

The Issue: Although telemetry data appears correctly in IoT Explorer and IoT Hub metrics, nothing is showing up in the Cosmos DB container. The Azure Function deploys successfully and is listed in the Function App in the portal, but the Cosmos DB container remains empty.

What I’ve Tried:

Verified that IoT Hub is receiving telemetry in the IoT Explorer.

Configured a custom route to the built-in endpoint in the IoT Hub message route (default name "events").

  • Set environment variables (e.g. AIRVISUAL_API_KEY,COSMOS_ENDPOINT, COSMOS_KEY, IOT_HUB_DEVICE_CONNECTION_STRING, EventHubConnectionString) in my Function App’s Application Settings in the portal.
  • The connection string in my Function App (under the key EventHubConnectionString) is copied with the value of IoT Hub built-in-endpoints -> Event Hub compatible endpoint.
  • Set the event_hub_name = "aq-iot-events"(Event Hub-compatible name in IoT Hub built-in-endpoints).

Question:

What could be causing the telemetry data not to be written into Cosmos DB maybe any misconfigurations in my function trigger or IoT Hub routing settings?

What specific diagnostic logs or metrics should I check to verify that messages are being correctly forwarded from IoT Hub to the built-in endpoint, and then picked up by my Azure Function?

Is there a recommended approach for troubleshooting this pipeline (IoT Hub → built-in Event Hub endpoint → Azure Function → Cosmos DB)?

Please, help or pointers would be greatly appreciated! Thanks!

Azure Functions
Azure Functions
An Azure service that provides an event-driven serverless compute platform.
5,414 questions
Azure IoT Hub
Azure IoT Hub
An Azure service that enables bidirectional communication between internet of things (IoT) devices and applications.
1,241 questions
Azure Event Hubs
Azure Event Hubs
An Azure real-time data ingestion service.
691 questions
Azure Cosmos DB
Azure Cosmos DB
An Azure NoSQL database service for app development.
1,762 questions
Azure Training
Azure Training
Azure: A cloud computing platform and infrastructure for building, deploying and managing applications and services through a worldwide network of Microsoft-managed datacenters.Training: Instruction to develop new skills.
2,057 questions
{count} votes

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.