Hi @SriLakshmi C ,
I have been trying what was suggested but still I got the same error. I tried to submit the pipeline from a compute instance instead of my local computer, MLClient
was instantiated using a ManagedIdentityCredentials
with clientId
of a user-assigned identity
that was attached to the workspace and was given all required access. The same identity was passed to the spark component in the pipeline but again, nothing was different.
The error was again:
py4j.protocol.Py4JJavaError: An error occurred while calling z:com.microsoft.azure.synapse.tokenlibrary.TokenLibrary.getAccessToken.
: java.lang.IllegalStateException: Could not find configuration value for spark.yarn.appMasterEnv.AZUREML_OBO_SERVICE_ENDPOINT
After this, I tried to use a different spark runtime_verion
, the above error occurs when running with spark version: 3.4.0
however, when running with spark 3.3.0
, then yet I get another error:
2025-02-18 14:29:26,260 ERROR Executor [Executor task launch worker for task 12.0 in stage 3.0 (TID 9)]: Exception in task 12.0 in stage 3.0 (TID 9)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages/fsspec_wrapper/utils/token_service.py", line 63, in get_aad_credential
access_token = token_library.getAccessToken(resource)
File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
return f(*a, **kw)
File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling z:com.microsoft.azure.synapse.tokenlibrary.TokenLibrary.getAccessToken.
: java.util.NoSuchElementException: spark.aml.obotoken
at org.apache.spark.SparkConf.$anonfun$get$1(SparkConf.scala:266)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.SparkConf.get(SparkConf.scala:266)
at com.microsoft.azure.aml.tokenlibrary.TokenLibraryAML.$anonfun$getAccessTokenInternal$4(TokenLibraryAML.scala:247)
at scala.util.Try$.apply(Try.scala:213)
at com.microsoft.azure.aml.tokenlibrary.TokenLibraryAML.getAccessTokenInternal(TokenLibraryAML.scala:237)
at com.microsoft.azure.aml.tokenlibrary.TokenLibraryAML$.getAccessTokenAsync(TokenLibraryAML.scala:409)
at com.microsoft.azure.synapse.tokenlibrary.TokenLibrary.getAccessTokenAsync(TokenLibrary.scala:217)
at com.microsoft.azure.synapse.tokenlibrary.TokenLibrary$.getAccessToken(TokenLibrary.scala:1217)
at com.microsoft.azure.synapse.tokenlibrary.TokenLibrary.getAccessToken(TokenLibrary.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
What is interesting though, in this case I can see that the OBO token was fetched successfully from the executors logs:
2025-02-18 14:25:37,744 INFO TokenLibraryAML [finagle/netty4-1-1]: AmlUtils: POST success
2025-02-18 14:25:37,745 INFO TokenLibraryAML [finagle/netty4-1-1]: AmlUtils: Processing response from OBO
2025-02-18 14:25:37,896 INFO TokenLibraryAML [finagle/netty4-1-1]: AmlUtils: caching obo token
2025-02-18 14:25:37,933 INFO Utilities$ [finagle/netty4-1-1]: AmlUtils: Token expiry retrieved is =>1739893702
2025-02-18 14:25:37,933 INFO TokenLibraryAML [finagle/netty4-1-1]: AmlUtils: Token expiry added in cache =>1739893702
2025-02-18 14:25:38,007 INFO AzureMLTokenBasedTokenProviderGen2 [readingParquetFooters-ForkJoinPool-1-worker-1]: AmlUtils: Successfully get access token
2025-02-18 14:25:38,008 INFO AzureMLTokenBasedTokenProviderGen2 [readingParquetFooters-ForkJoinPool-1-worker-1]: AmlUtils: Get Expiry AzureMLTokenBasedTokenProviderGen2
2025-02-18 14:25:38,011 INFO Utilities$ [readingParquetFooters-ForkJoinPool-1-worker-1]: AmlUtils: Token expiry retrieved is =>1739893702
2025-02-18 14:25:38,017 INFO Utilities$ [readingParquetFooters-ForkJoinPool-1-worker-1]: AmlUtils: Successfully get the expiry time: Tue Feb 18 15:48:22 UTC 2025
But still the executor was not able to write data, which is very confusing!
Once again, this is the logic that I would like to execute:
def write_feather(partition):
pandas_df = pd.DataFrame([row.asDict() for row in partition])
pandas_df.to_feather(write_path + "data.feather")
df = spark.read.parquet(read_path)
df = df.repartition([col1, col2])
df.show(5)
df.foreachPartition(write_feather)
Data read part works fine, data write at the executor level fails. It's worth mentioning that if I change the code to the following, then things work fine, this is to confirm that the identity has access to write data to the storage account
df = spark.read.parquet(read_path)
df = df.repartition([col1, col2])
df.show(5)
df.write.format("parquet").save(write_path)