Notebook in Pyspark to compare schemas not naming the parquet file in correct format. It's defaulting instead of using name given

Wurm, Yvonne L 0 Reputation points
2025-01-07T17:34:36.94+00:00

This code compares the schemas and then should update if needed. It updates to a new directory but does not keep the same name. I have tried everything how can I get it to rename to xyzYYYYMMDD.parquet

for year in range(2024, 2025): # Adjust years as needed

for month in range(5,6):  # Adjust months as needed

    for day in range(7, 8):  # Adjust days as needed

        if not is_valid_date(year, month, day):

            continue

        file_path = f"abfss://abcdefg/{year}/{month:02d}/{day:02d}/*.parquet"

        try:

            df = spark.read.parquet(file_path)

           

            # If schema update is needed, apply it

            if df.schema != new_schema:

                updated_df = spark.createDataFrame(df.rdd, schema=new_schema)

                updated_df.coalesce(1).write.mode("overwrite").parquet(f"abfss://abcdefg/Updated/{year}/{month:02d}/{day:02d}")

                

                # Remove _SUCCESS file if created

                delete_success_files(f"abcdefg/Updated/{year}/{month:02d}/{day:02d}/")

                print(f"Schema updated for file: {file_path}")

            else:

                print(f"No schema update needed for file: {file_path}")

        except Exception as e:

            print(f"Error processing file {file_path}: {e}")
Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
5,118 questions
{count} votes

1 answer

Sort by: Most helpful
  1. Chandra Boorla 6,370 Reputation points Microsoft Vendor
    2025-01-08T20:26:37.4466667+00:00

    Hi @Wurm, Yvonne L

    Greetings & Welcome to Microsoft Q&A forum! Thanks for posting your query!

    To rename the output parquet file to a specific format like xyzYYYYMMDD.parquet, you can modify the write method to include the desired file name. Instead of writing to a directory, you can specify the full path including the file name. Here is the modified code, assuming you are using ADLS Gen2 for storage:

    for year in range(2024, 2025):
      for month in range(5, 6):
        for day in range(7, 8):
          if not is_valid_date(year, month, day):
            continue
    
          file_path = f"abfss://abcdefg/{year}/{month:02d}/{day:02d}/*.parquet"
    
          try:
            df = spark.read.parquet(file_path)
    
            if df.schema != new_schema:
              updated_df = spark.createDataFrame(df.rdd, schema=new_schema)
    
              # Write with overwrite mode and coalesce to single file
              updated_df.coalesce(1).write.mode("overwrite").parquet(f"abfss://abcdefg/Updated/{year}/{month:02d}/{day:02d}")
    
              # Get the first file path (assuming single part file)
              first_file_path = spark.read.parquet(f"abfss://abcdefg/Updated/{year}/{month:02d}/{day:02d}").select(first("path")).rdd.map(lambda x: x[0]).collect()[0]
    
              # Rename the first file
              new_file_name = f"xyz{year}{month:02d}{day:02d}.parquet"
              adl.fs.mv(first_file_path, f"abfss://abcdefg/Updated/{year}/{month:02d}/{day:02d}/{new_file_name}")
    
              # Remove _SUCCESS file if created
              delete_success_files(f"abfss://abcdefg/Updated/{year}/{month:02d}/{day:02d}")  # Adapt for ADLS interaction
    
              print(f"Schema updated for file: {file_path}")
            else:
              print(f"No schema update needed for file: {file_path}")
    
          except Exception as e:
            print(f"Error processing file {file_path}: {e}")
    

    Additional Notes - This code assumes you have access to the adl library for interacting with ADLS Gen2 in Synapse notebooks. Consider error handling for potential issues like missing files or schema mismatches.

    By following this corrected approach, you can successfully write the DataFrame to a Parquet file with the desired naming convention.

    Kindly consider upvoting the comment if the information provided is helpful. This can assist other community members in resolving similar issues.
    I hope this information helps. Please do let us know if you have any further queries.

    Thank you.


Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.