Why isn’t my IoT Hub telemetry being ingested into Cosmos DB via Azure Functions?
Overview: I'm building an IoT solution on Azure where devices send telemetry to IoT Hub. The telemetry is routed using IoT Hub’s built-in Event Hub–compatible endpoint, and an Azure Function (using Python and the V2 programming model) to processes the messages and writes them to a Cosmos DB container. For the telemetry, I'm fetching air quality data via the IQAir API.
IoT Hub: Devices send telemetry successfully, which is visible in IoT Explorer and metrics.
Message Routing: I've configured a custom route (endpoint type: events) in IoT Hub to forward telemetry to the built-in endpoint.
Azure Function: I have two functions; one is a timer trigger to fetch the telemetry data periodically every 5 mins and an Event Hub trigger to process and store the data in cosmos DB, written in Python using the Azure Functions V2 model. Both functions are written in one main function_app.py
script which is configured as follows:
import logging
import json
import requests
import os
from dotenv import load_dotenv
import azure.functions as func
from azure.iot.device import IoTHubDeviceClient, Message
from azure.cosmos import CosmosClient
# Load environment variables
load_dotenv()
# Azure and API credentials
API_KEY = os.getenv("AIRVISUAL_API_KEY")
IOT_HUB_DEVICE_CONNECTION_STRING = os.getenv("IOT_HUB_DEVICE_CONNECTION_STRING")
# Verify required environment variables (for local development)
if not API_KEY or not IOT_HUB_DEVICE_CONNECTION_STRING:
raise ValueError("Missing environment variables. Check .env file")
app = func.FunctionApp()
@app.timer_trigger(schedule="0 */5 * * * *", arg_name="myTimer", run_on_startup=False,
use_monitor=False)
def fetch_data(myTimer: func.TimerRequest) -> None:
"""Fetch AQ data and send to IoT Hub every 5 minutes."""
if myTimer.past_due:
logging.info('The timer is past due!')
logging.info('Python timer trigger function executed.')
try:
airvisual_url = f"http://api.airvisual.com/v2/nearest_city?key={API_KEY}"
response = requests.get(airvisual_url)
data = response.json()
if response.status_code == 200:
# Extract relevant data
telemetry_data = {
"city": data["data"]["city"],
"state": data["data"]["state"],
"country": data["data"]["country"],
"aqi_us": data["data"]["current"]["pollution"]["aqius"],
"main_pollutant": data["data"]["current"]["pollution"]["mainus"],
"temperature": data["data"]["current"]["weather"]["tp"],
"humidity": data["data"]["current"]["weather"]["hu"],
}
message = Message(json.dumps(telemetry_data, indent=2))
message.content_encoding = "utf-8"
message.content_type = "application/json"
client = IoTHubDeviceClient.create_from_connection_string(IOT_HUB_DEVICE_CONNECTION_STRING)
print("Sending telemetry data to Azure IoT Hub...")
client.send_message(message)
logging.info("Telemetry data successfully sent to IoT Hub.")
client.shutdown()
else:
logging.error(f"Error: {response.status_code} - {response.text}")
except Exception as e:
logging.error(f"An error occurred: {e}")
@app.event_hub_message_trigger(arg_name="azeventhub",
event_hub_name="aq-iot-events",
connection="EventHubConnectionString"
)
def store_in_cosmos_db(azeventhub: func.EventHubEvent):
logging.info('Python EventHub trigger processed an event: %s',
azeventhub.get_body().decode('utf-8'))
try:
# Parse telemetry data
telemetry_data = json.loads(azeventhub.get_body().decode("utf-8"))
# Store data in Cosmos DB
COSMOS_ENDPOINT = os.getenv("COSMOS_ENDPOINT")
COSMOS_KEY = os.getenv("COSMOS_KEY")
DATABASE_NAME = "AirQualityDB"
CONTAINER_NAME = "TelemetryData"
client = CosmosClient(COSMOS_ENDPOINT, COSMOS_KEY)
database = client.get_database_client(DATABASE_NAME)
container = database.get_container_client(CONTAINER_NAME)
container.upsert_item(telemetry_data)
logging.info(f"Telemetry data stored in Cosmos DB: {telemetry_data}")
except Exception as e:
logging.error(f"Error storing data: {e}")
raise
Cosmos DB: I have created a Cosmos DB account with a database named "AirQualityDB
" and a container "TelemetryData
" (with partition key set to /city
).
The Issue: Although telemetry data appears correctly in IoT Explorer and IoT Hub metrics, nothing is showing up in the Cosmos DB container. The Azure Function deploys successfully and is listed in the Function App in the portal, but the Cosmos DB container remains empty.
What I’ve Tried:
Verified that IoT Hub is receiving telemetry in the IoT Explorer.
Configured a custom route to the built-in endpoint in the IoT Hub message route (default name "events").
- Set environment variables (e.g.
AIRVISUAL_API_KEY
,COSMOS_ENDPOINT
,COSMOS_KEY
,IOT_HUB_DEVICE_CONNECTION_STRING
,EventHubConnectionString
) in my Function App’s Application Settings in the portal. - The connection string in my Function App (under the key
EventHubConnectionString
) is copied with the value of IoT Hub built-in-endpoints -> Event Hub compatible endpoint. - Set the
event_hub_name
= "aq-iot-events"(Event Hub-compatible name in IoT Hub built-in-endpoints).
Question:
What could be causing the telemetry data not to be written into Cosmos DB maybe any misconfigurations in my function trigger or IoT Hub routing settings?
What specific diagnostic logs or metrics should I check to verify that messages are being correctly forwarded from IoT Hub to the built-in endpoint, and then picked up by my Azure Function?
Is there a recommended approach for troubleshooting this pipeline (IoT Hub → built-in Event Hub endpoint → Azure Function → Cosmos DB)?
Please, help or pointers would be greatly appreciated! Thanks!