How to Spark Stream into azure PostgreSQL

Arik Levy 20 Reputation points
2024-11-19T16:52:06.9+00:00

I am trying spark stream data into postgresql via databricks, with delta tables and change data feed enabled. As far as I understand I can use JDBC or the databricks connector as seen here: https://learn.microsoft.com/en-us/azure/databricks/connect/external-systems/postgresql

(However I also managed to get it working with psycopg2.)

I need to be able to insert, update and delete records in postgres. As I understand it JDBC and the connector don't allow for updates and deletes natively? There isn't much in the documentation about what the best approach is and what type of operations each method supports.

I'm wondering what the best solution for this is?

Thanks

Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
2,236 questions
{count} votes

1 answer

Sort by: Most helpful
  1. Sina Salam 12,816 Reputation points
    2024-11-19T18:05:52.14+00:00

    Hello Arik Levy ,

    Welcome to the Microsoft Q&A and thank you for posting your questions here.

    I understand that you would like to know how you can Spark Stream into azure PostgreSQL.

    Regarding your question and your requirement of streaming data from Databricks into PostgreSQL while supporting insert, update, and delete operations, this is a comprehensive analysis of your options:

    Firstly, the JDBC connector in Spark does support writing data to PostgreSQL, but with limitations:

    By default, it handles inserts effectively and Updates, and deletes are not natively supported as part of the JDBC batch write operation. But you can still achieve this by using Upserts:

    You can use PostgreSQL's ON CONFLICT clause in your query to perform updates during inserts.

      (df.write
         .format("jdbc")
         .option("url", "jdbc:postgresql://<host>:<port>/<db>")
         .option("dbtable", "table_name")
         .option("user", "<user>")
         .option("password", "<password>")
         .option("driver", "org.postgresql.Driver")
         .mode("append")
         .option("truncate", "false")
         .save())
    

    Therefore, to handle updates, you could preprocess your data in Databricks to create a new DataFrame specifically for updates, then execute SQL MERGE or DELETE queries using the JDBC connection.

    Secondly, Databricks offers its connector for PostgreSQL, which is built to handle various types of operations, such as Batch Insertions and Transaction Management to execute complex SQL commands. However, just like JDBC, for updates and deletes, you would need to explicitly run SQL commands.

    The benefit is that it simplifies integration and leverages PostgreSQL capabilities directly and you can execute raw SQL for UPDATE or DELETE operations.

    Thirdly, about the psycopg2` library, it gives you direct control over executing SQL statements, which makes it suitable for inserts, updates, and deletes and this is particularly useful when:

    • You want finer control over transactions.
    • You need to run custom SQL commands like MERGE, DELETE, or UPDATE.

    The sample usage is here below:

    import psycopg2
    conn = psycopg2.connect(
        dbname="your_db",
        user="your_user",
        password="your_password",
        host="your_host",
        port="your_port"
    )
    cur = conn.cursor()
    # Insert example
    cur.execute("INSERT INTO your_table (id, value) VALUES (%s, %s) ON CONFLICT (id) DO UPDATE SET value = EXCLUDED.value", (id, value))
    # Delete example
    cur.execute("DELETE FROM your_table WHERE id = %s", (id,))
    conn.commit()
    cur.close()
    conn.close()
    

    Also, if your delta tables capture changes (e.g., inserts, updates, and deletes), you can leverage Change Data Capture (CDC) logic:

    • Identify new, updated, and deleted rows from Delta tables.
    • Based on the operation, push the data to PostgreSQL using either JDBC, Databricks Connector, or psycopg2.

    Finaly, regarding experience and best practices for Insert/Update/Delete Operations:

    • Psycopg2 gives you maximum flexibility for executing custom SQL commands and handling complex update/delete logic.
    • If you want to maintain a streaming pipeline while periodically updating the PostgreSQL database, consider using Delta Tables with CDC and JDBC or Databricks PostgreSQL Connector for inserts and writes, but use raw SQL or psycopg2 for updates and deletes.
    • And if performance is a concern and real-time operations are required, lean towards Delta Tables with CDC in combination with JDBC for batch inserts and psycopg2 for more complex updates/deletes.

    NOTE:

    I recommend using Delta Tables with CDC in combination with JDBC for batch inserts and psycopg2 for updates and deletes. This method makes sure you will have efficient data streaming and provides the flexibility needed for complex SQL operations.

    I hope this is helpful! Do not hesitate to let me know if you have any other questions.


    Please don't forget to close up the thread here by upvoting and accept it as an answer if it is helpful.


Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.