I have an Azure ML pipeline that has been running successfully on a daily schedule. However, the pipeline failed during its latest run, producing the following error message:
AzureMLException: Message: Error Code: ScriptExecution.StreamAccess.Unexpected Native Error: error in streaming from input data sources StreamError(Unknown("Dataflow at inmemory://dataflow/82a5f49706292dab81a220c3f28b76e5 is not valid.", Some(DataflowInvalid("inmemory://dataflow/82a5f49706292dab81a220c3f28b76e5", VisitError(ExecutionError(ArgumentError(InvalidArgument { argument: "writer", expected: "parquet|preppy|delimited", actual: "dfd" }))))))) => Dataflow at inmemory://dataflow/82a5f49706292dab81a220c3f28b76e5 is not valid. Unknown("Dataflow at inmemory://dataflow/82a5f49706292dab81a220c3f28b76e5 is not valid.", Some(DataflowInvalid("inmemory://dataflow/82a5f49706292dab81a220c3f28b76e5", VisitError(ExecutionError(ArgumentError(InvalidArgument { argument: "writer", expected: "parquet|preppy|delimited", actual: "dfd" })))))) => Dataflow at inmemory://dataflow/82a5f49706292dab81a220c3f28b76e5 is not valid. DataflowInvalid("inmemory://dataflow/82a5f49706292dab81a220c3f28b76e5", VisitError(ExecutionError(ArgumentError(InvalidArgument { argument: "writer", expected: "parquet|preppy|delimited", actual: "dfd" })))) Error Message: Got unexpected error: Dataflow at inmemory://dataflow/82a5f49706292dab81a220c3f28b76e5 is not valid.. DataflowInvalid("inmemory://dataflow/82a5f49706292dab81a220c3f28b76e5", VisitError(ExecutionError(ArgumentError(InvalidArgument { argument: "writer", expected: "parquet|preppy|delimited", actual: "dfd" }))))| session_id=b6b7c584-5e20-4d06-9882-74bb3f22eab4 InnerException None
Pipeline Overview:
- Datasets:
- I am using two datasets created within Azure ML itself. These datasets pull data from Azure SQL Database tables.
- These datasets have been used successfully without issues until now.
- Pipeline Step:
- The error occurs in the Execute Python Script step.
- Python Code in the Script: Below is the code used in the Execute Python Script module:
def azureml_main(dataframe1=None, dataframe2=None):
import subprocess
import sys
def install(package):
subprocess.check_call([sys.executable, "-m", "pip", "install", package])
install('sentence-transformers')
install('numpy')
install('scikit-learn')
install('pandas')
import pandas as pd
import re
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
def exactmatch_preprocess_text(text):
text = re.sub(r'\W', ' ', text)
text = re.sub(r'\s+', ' ', text)
return text
def exact_matching(text_df_match, label_df_match, cln_text_col, cln_label_col, label_col):
any_matches_found = False
for index, row in label_df_match.iterrows():
keyword = row[cln_label_col]
is_match = row['IsMatch']
matches = pd.Series([False] * len(text_df_match), index=text_df_match.index)
if is_match == 1:
pattern = r'^\b' + re.escape(keyword) + r'\b$'
matches = text_df_match[cln_text_col].apply(lambda x: bool(re.fullmatch(pattern, x)))
if matches.any():
any_matches_found = True
text_df_match.loc[matches, "ContainsKeyword"] = True
text_df_match.loc[matches, "MatchingKeywords"] += row[label_col]
return text_df_match
text_df_match = dataframe1[['Desc']].rename(columns={'Desc': 'text'})
text_df_match['cleaned_text'] = text_df_match['text'].apply(exactmatch_preprocess_text)
label_df_match = dataframe2.rename(columns={'Keyword': 'label'})
label_df_match['cleaned_keyword'] = label_df_match['label'].apply(exactmatch_preprocess_text)
label_df_match = label_df_match[label_df_match['IsMatch'] == 1].reset_index(drop=True)
text_df_match['ContainsKeyword'] = False
text_df_match['MatchingKeywords'] = ''
text_df_match = exact_matching(text_df_match, label_df_match, 'cleaned_text', 'cleaned_keyword', 'label')
exact_match_condition = (text_df_match["ContainsKeyword"] == True)
exact_match_df = text_df_match.loc[exact_match_condition, ['text', 'MatchingKeywords']]
exact_match_df.reset_index(inplace=True, drop=True)
exact_match_df.rename(columns={'MatchingKeywords': 'assigned_labels'}, inplace=True)
exact_match_df['confidence_score'] = 1.0
exact_match_df = exact_match_df.astype({
'text': 'string',
'assigned_labels': 'string',
'confidence_score': 'float'
})
return exact_match_df,