How to Setup AZUREML_OBO_SERVICE_ENDPOINT for Spark Component in Azure ML

Abdelkhalek Hamdi 40 Reputation points
2025-02-16T18:29:40.1166667+00:00

I am running an Azure ML pipeline that includes a Spark serverless-based component. The component's logic is supposed to fetch data from Azure Data Lake, repartition it, then write data back to Azure Data Lake in "feather" format using pandas. The goal is to leverage Spark's parallelism capabilities to write all partitions in parallel, as illustrated below:

def write_feather(partition):
    pandas_df = pd.DataFrame([row.asDict() for row in partition])
    pandas_df.to_feather(write_path + "data.feather")

df = spark.read.parquet(read_path)
df = df.repartition([col1, col2])
df.foreachPartition(write_feather)

Data is read and repartitioned successfully; however, when the job reaches the writing step using pandas, it throws the following error:

java.lang.IllegalStateException: Could not find configuration value for spark.yarn.appMasterEnv.AZUREML_OBO_SERVICE_ENDPOINT

The error suggests that executors failed to obtain an access token to write data back to Azure Data Lake. I attempted to set the missing variable using the following approach, but the same error persists:

spark = SparkSession.builder.\
    config("spark.yarn.appMasterEnv.AZUREML_OBO_SERVICE_ENDPOINT", "https://login.microsoftonline.com/
Azure Machine Learning
Azure Machine Learning
An Azure machine learning service for building and deploying models.
3,141 questions
{count} votes

1 answer

Sort by: Most helpful
  1. JAYA SHANKAR G S 485 Reputation points Microsoft Vendor
    2025-02-19T06:25:35.2833333+00:00

    Hello Abdelkhalek Hamdi,

    Whatever the spark configuration you are making it is limited to spark context and not used in pandas, to write data using pandas you need to pass credential details of storage account.

    Below are the ways you can write.

    
    st = {'tenant_id': 'tenant_id_value', 'client_id' : 'client_id_value', 'client_secret': 'client_secret_value'} service principal
    
    # or st = {'account_key' : '<account_key>'}
    
    # or st = {'sas_token' : 'sas_token_value'}
    
    # or st = {'connection_string' : 'connection_string_value'}
    
    def write_feather(partition):
    
        pandas_df = pd.DataFrame([row.asDict() for row in partition])
    
        pandas_df.to_feather(write_path + "data.feather", storage_options  = st
    
    df.foreachPartition(write_feather)
    
    

    I would recommend to use service principal or user identity.

    If you are using service principal, get tenant_id,client_id and client_secret from key vault like given here.

    
    sc = SparkSession.builder.getOrCreate()
    
    token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
    
    # Set up service principal tenant ID, client ID and secret from Azure Key Vault
    
    client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>")
    
    tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>")
    
    client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>")
    
    

    Make sure you given Contributor and Storage Blob Data Contributor to your service principal or identity.

    Also according to this documentation below are 2 mechanisms to access adls gen 2.

    • User identity passthrough
    • Service principal-based data access

    But it seems user identity passthrough not working in pandas Api so as of now work around would be using service principal, will update later regarding User identity passthrough usage in pandas.

    Output:

    enter image description here

    in adls gen 2

    enter image description here

    If you any further query do let me know, and if the solution worked for you, please do accept and give feedback clicking yes


Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.