@Ashish Kumar
The error is indicating that the connection string requires either a SharedAccessKey
or SharedAccessSignature
. When using Managed Service Identity (MSI), you don't need to include these in the connection string. Instead, you should use the EventHubProducerClient
with the credential
parameter directly.
Let's adjust the code to use MSI properly:
Step 1: Install Required Libraries
Ensure you have the necessary libraries installed in your Synapse notebook:
%%configure -f
{
"conf": {
"spark.jars.packages": "com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.18,com.azure:azure-identity:1.4.1"
}
}
Step 2: Set Up Event Hubs Configuration
Define the necessary configurations for connecting to your Event Hub using MSI.
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventHubProducerClient, EventData
# Initialize Spark session
spark = SparkSession.builder.appName("EventHubMSI").getOrCreate()
# Define Event Hub parameters
event_hub_namespace = "<Your Event Hubs Namespace>"
event_hub_name = "<Your Event Hub Name>"
# Create a token credential using Managed Identity
credential = DefaultAzureCredential()
# Create Event Hub producer client
producer = EventHubProducerClient(
fully_qualified_namespace=f"{event_hub_namespace}.servicebus.windows.net",
eventhub_name=event_hub_name,
credential=credential
)
Step 3: Write Data to Event Hubs
Use Spark Streaming to write data to Event Hubs.
# Sample data to write to Event Hubs
data = [("Hello, Event Hubs!")]
# Create DataFrame
df = spark.createDataFrame(data, StringType())
# Write the data to Event Hubs
query = df.selectExpr("CAST(value AS STRING) AS body") \
.writeStream \
.format("eventhubs") \
.option("eventhubs.connectionString", connection_string) \
.option("checkpointLocation", "/mnt/checkpoints") \
.start()
query.awaitTermination()
Step 4: Verify and Monitor
Ensure that your data is being written to the Event Hub by monitoring the Event Hub metrics in the Azure portal.