How can we create a feature view in Azure ML Feature Store from a local machine without using serverless Spark compute?

Puneet 0 Reputation points
2024-12-01T14:45:07.21+00:00

Team, I'm trying to create an API on top of Azure feature store
so that user can pass the below payload, and their feature view will be created automatically.

{
    "feature_view_name": "segment_predictive_performance_view_v9",
    "feature_source_configs":{
        "query": "SELECT id, discount, campaign, avg_loyalty_bonus, event_timestamp FROM 
segment_predictive_performance_view_v9
",
        "timestamp_field": "event_timestamp"
    },
    "entities": ["id"]
}

but im stuck in "create_feature_set_spec" method, im getting below error

Code:

def create_feature_view(self, feature_view_name, feature_schema, entities, feature_source_configs):
            from azureml.featurestore import create_feature_set_spec
            from azureml.featurestore.contracts import (DateTimeOffset,FeatureSource,TransformationCode,Column,ColumnType,SourceType,TimestampColumn,)
            # Logic to create feature view in AWS SageMaker Feature Store

            # Hardcoded source_path and delay
            source_path = "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source/*.parquet"
            source_delay = DateTimeOffset(days=0, hours=0, minutes=20)
            timestamp_field = feature_source_configs.get("timestamp_field")

            # Dynamically create index columns from entities
            index_columns = [Column(name=entity, type=ColumnType.long) for entity in entities]

            # Get the base directory (project root)
            root_dir = settings.BASE_DIR
            # Define the paths for the template and formatted file
            template_path = os.path.join(root_dir, "featurestore/azure/transformation_template/transformation_template.py")
            # Read and format the transformation template
            with open(template_path, "r") as template_file:
                template_content = template_file.read()
            
            table_name = "segment_predictive_performance_view_v9"
            query = "SELECT * FROM segment_predictive_performance_view_v9"
            # Format the template with the query and table name
            formatted_content = template_content.format(table_name=table_name, query=query)

            # Save the formatted transformation logic to a temporary file
            formatted_file_path = os.path.join(root_dir, f"featurestore/azure/featuresets/{feature_view_name}/transformation_code/{feature_view_name}_transform.py")
            # Ensure the directory exists
            os.makedirs(os.path.dirname(formatted_file_path), exist_ok=True)  # Create all missing directories
            with open(formatted_file_path, "w") as formatted_file:
                formatted_file.write(formatted_content)

            featureset_code_path = os.path.join(root_dir, f"featurestore/azure/featuresets/{feature_view_name}/transformation_code")
            featureset_transform_class = f"{feature_view_name}_transform.CustomTransformer"
            print("featureset_code_path", featureset_code_path)
            print("featureset_transform_class", featureset_transform_class)
            transactions_featureset_spec = create_feature_set_spec(
            source=FeatureSource(
                type=SourceType.parquet,
                path="wasbs://genome-data@genomemlsa.blob.core.windows.net/churn_sales_forecasting.parquet",
                timestamp_column=TimestampColumn(name=timestamp_field),
                source_delay=source_delay,
            ),
            transformation_code=TransformationCode(
                path=featureset_code_path,
                transformer_class=featureset_transform_class,
            ),
            index_columns=index_columns,
            source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),
            temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),
            infer_schema=True,
            )
            import pdb
            pdb.set_trace()
Error: 

featureset_code_path /mnt/c/genome/master4/genome-demos/toolkit-backend/genome/featurestore/azure/featuresets/segment_predictive_performance_view/transformation_code
featureset_transform_class segment_predictive_performance_view_transform.CustomTransformer
/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/azureml/featurestore/feature_set_spec.py:145: DeprecationWarning: feature_transformation_code is deprecated, and will be removed in a future release.Please use feature_transformation instead.
  warn(
24/12/01 08:14:28 WARN Utils: Your hostname, IN-3D0D5S3 resolves to a loopback address: 127.0.1.1; using 172.29.23.194 instead (on interface eth0)
24/12/01 08:14:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.1.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
24/12/01 08:14:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
  File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/azureml/featurestore/feature_set_spec.py", line 484, in to_spark_dataframe
    df = self.source.load(
  File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/azureml/featurestore/feature_source/parquet_feature_source.py", line 45, in load
    source_df = spark.read.parquet(self.path)
  File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 458, in parquet
    return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
  File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o23.parquet.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure not found
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2595)
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3269)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3301)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
        at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:376)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:326)
        at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:308)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:308)
        at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:834)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure not found
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2499)
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2593)
        ... 25 more
None
Traceback (most recent call last):
  File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/azureml/featurestore/feature_set_spec.py", line 800, in create_feature_set_spec
    df = feature_set_spec.to_spark_dataframe(
  File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/azure/ai/ml/_telemetry/activity.py", line 295, in wrapper
    return f(*args, **kwargs)
  File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/azureml/featurestore/feature_set_spec.py", line 531, in to_spark_dataframe
    log_and_raise_error(error=ex, debug=True)
  File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/azure/ai/ml/_exception_helper.py", line 335, in log_and_raise_error
    raise error
  File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/azureml/featurestore/feature_set_spec.py", line 484, in to_spark_dataframe
    df = self.source.load(
  File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/azureml/featurestore/feature_source/parquet_feature_source.py", line 45, in load
    source_df = spark.read.parquet(self.path)
  File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 458, in parquet
    return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
  File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o23.parquet.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure not found
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2595)
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3269)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3301)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
        at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:376)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:326)
        at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:308)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:308)
        at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:834)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure not found
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2499)
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2593)
        ... 25 more
None

Q1. How can we resolve the error when creating a feature set or feature view in Azure ML Feature Store without using serverless Spark compute? when i ran this example with serverless compute inside Azure jupyter notebook there its work successfully

Q2. In this code, where can we pass the subscription_id and tenant_id to ensure that the feature view is created in our Azure Cloud?

Azure Machine Learning
Azure Machine Learning
An Azure machine learning service for building and deploying models.
3,024 questions
{count} votes

1 answer

Sort by: Most helpful
  1. Amira Bedhiafi 27,436 Reputation points
    2024-12-01T17:48:22.2366667+00:00

    From the error I can say that the local environment lacks the necessary Azure-specific Hadoop and Spark libraries for accessing Azure Blob Storage.

    You need to install the required Hadoop and Spark libraries:

    pip install azure-storage
    pip install hadoop-azure
    

    You may need to use a distribution like spark-3.3.1-bin-hadoop3.2 that includes support for Azure file systems.

    Then add the necessary configurations for Azure Blob Storage to the Spark session:

    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
        .appName("AzureFeatureStore") \
        .config("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") \
        .config("fs.azure.account.key.<storage-account-name>.blob.core.windows.net", "<storage-account-key>") \
        .getOrCreate()
    

    If serverless Spark is not an option, ensure your local Spark cluster can connect to Azure Blob Storage by testing a simple read operation:

    df = spark.read.parquet("wasbs://container@account.blob.core.windows.net/path")
    df.show()
    

    Regarding where to pass the subscription_id and tenant_id so the feature view is created in our Azure Cloud, in the Azure Machine Learning SDK, the subscription_id and tenant_id can be passed when setting up the workspace or authentication :

    from azure.identity import DefaultAzureCredential
    from azureml.core import Workspace
    credential = DefaultAzureCredential()
    workspace = Workspace(subscription_id="<your-subscription-id>",
                          resource_group="<your-resource-group>",
                          workspace_name="<your-workspace-name>",
                          auth=credential)
    

    As a best practice, you can store your Azure credentials in environment variables to avoid hardcoding sensitive details:

    export AZURE_SUBSCRIPTION_ID="<your-subscription-id>"
    export AZURE_TENANT_ID="<your-tenant-id>"
    export AZURE_CLIENT_ID="<your-client-id>"
    export AZURE_CLIENT_SECRET="<your-client-secret>"
    
    0 comments No comments

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.