Team, I'm trying to create an API on top of Azure feature store
so that user can pass the below payload, and their feature view will be created automatically.
{
"feature_view_name": "segment_predictive_performance_view_v9",
"feature_source_configs":{
"query": "SELECT id, discount, campaign, avg_loyalty_bonus, event_timestamp FROM
segment_predictive_performance_view_v9
",
"timestamp_field": "event_timestamp"
},
"entities": ["id"]
}
but im stuck in "create_feature_set_spec" method, im getting below error
Code:
def create_feature_view(self, feature_view_name, feature_schema, entities, feature_source_configs):
from azureml.featurestore import create_feature_set_spec
from azureml.featurestore.contracts import (DateTimeOffset,FeatureSource,TransformationCode,Column,ColumnType,SourceType,TimestampColumn,)
# Logic to create feature view in AWS SageMaker Feature Store
# Hardcoded source_path and delay
source_path = "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source/*.parquet"
source_delay = DateTimeOffset(days=0, hours=0, minutes=20)
timestamp_field = feature_source_configs.get("timestamp_field")
# Dynamically create index columns from entities
index_columns = [Column(name=entity, type=ColumnType.long) for entity in entities]
# Get the base directory (project root)
root_dir = settings.BASE_DIR
# Define the paths for the template and formatted file
template_path = os.path.join(root_dir, "featurestore/azure/transformation_template/transformation_template.py")
# Read and format the transformation template
with open(template_path, "r") as template_file:
template_content = template_file.read()
table_name = "segment_predictive_performance_view_v9"
query = "SELECT * FROM segment_predictive_performance_view_v9"
# Format the template with the query and table name
formatted_content = template_content.format(table_name=table_name, query=query)
# Save the formatted transformation logic to a temporary file
formatted_file_path = os.path.join(root_dir, f"featurestore/azure/featuresets/{feature_view_name}/transformation_code/{feature_view_name}_transform.py")
# Ensure the directory exists
os.makedirs(os.path.dirname(formatted_file_path), exist_ok=True) # Create all missing directories
with open(formatted_file_path, "w") as formatted_file:
formatted_file.write(formatted_content)
featureset_code_path = os.path.join(root_dir, f"featurestore/azure/featuresets/{feature_view_name}/transformation_code")
featureset_transform_class = f"{feature_view_name}_transform.CustomTransformer"
print("featureset_code_path", featureset_code_path)
print("featureset_transform_class", featureset_transform_class)
transactions_featureset_spec = create_feature_set_spec(
source=FeatureSource(
type=SourceType.parquet,
path="wasbs://genome-data@genomemlsa.blob.core.windows.net/churn_sales_forecasting.parquet",
timestamp_column=TimestampColumn(name=timestamp_field),
source_delay=source_delay,
),
transformation_code=TransformationCode(
path=featureset_code_path,
transformer_class=featureset_transform_class,
),
index_columns=index_columns,
source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),
temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),
infer_schema=True,
)
import pdb
pdb.set_trace()
Error:
featureset_code_path /mnt/c/genome/master4/genome-demos/toolkit-backend/genome/featurestore/azure/featuresets/segment_predictive_performance_view/transformation_code
featureset_transform_class segment_predictive_performance_view_transform.CustomTransformer
/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/azureml/featurestore/feature_set_spec.py:145: DeprecationWarning: feature_transformation_code is deprecated, and will be removed in a future release.Please use feature_transformation instead.
warn(
24/12/01 08:14:28 WARN Utils: Your hostname, IN-3D0D5S3 resolves to a loopback address: 127.0.1.1; using 172.29.23.194 instead (on interface eth0)
24/12/01 08:14:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.1.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
24/12/01 08:14:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/azureml/featurestore/feature_set_spec.py", line 484, in to_spark_dataframe
df = self.source.load(
File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/azureml/featurestore/feature_source/parquet_feature_source.py", line 45, in load
source_df = spark.read.parquet(self.path)
File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 458, in parquet
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/pyspark/sql/utils.py", line 111, in deco
return f(*a, **kw)
File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o23.parquet.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2595)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3269)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3301)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:376)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:326)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:308)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:308)
at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:834)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2499)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2593)
... 25 more
None
Traceback (most recent call last):
File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/azureml/featurestore/feature_set_spec.py", line 800, in create_feature_set_spec
df = feature_set_spec.to_spark_dataframe(
File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/azure/ai/ml/_telemetry/activity.py", line 295, in wrapper
return f(*args, **kwargs)
File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/azureml/featurestore/feature_set_spec.py", line 531, in to_spark_dataframe
log_and_raise_error(error=ex, debug=True)
File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/azure/ai/ml/_exception_helper.py", line 335, in log_and_raise_error
raise error
File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/azureml/featurestore/feature_set_spec.py", line 484, in to_spark_dataframe
df = self.source.load(
File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/azureml/featurestore/feature_source/parquet_feature_source.py", line 45, in load
source_df = spark.read.parquet(self.path)
File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 458, in parquet
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/pyspark/sql/utils.py", line 111, in deco
return f(*a, **kw)
File "/mnt/c/genome/master3/genome-demos/toolkit-backend/genome/pwq/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o23.parquet.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2595)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3269)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3301)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:376)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:326)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:308)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:308)
at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:834)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2499)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2593)
... 25 more
None
Q1. How can we resolve the error when creating a feature set or feature view in Azure ML Feature Store without using serverless Spark compute? when i ran this example with serverless compute inside Azure jupyter notebook there its work successfully
Q2. In this code, where can we pass the subscription_id
and tenant_id
to ensure that the feature view is created in our Azure Cloud?