Spark Dataframes UPSERT to Postgres Table











up vote
12
down vote

favorite
5












I am using Apache Spark DataFrames to join two data sources and get the result as another DataFrame. I want to write the result to another Postgres table. I see this option :



myDataFrame.write.jdbc(url, table, connectionProperties)


But, what I want to do is UPSERT the dataframe into table based on the Primary Key of the Table. How is this to be done? I am using Spark 1.6.0.










share|improve this question


























    up vote
    12
    down vote

    favorite
    5












    I am using Apache Spark DataFrames to join two data sources and get the result as another DataFrame. I want to write the result to another Postgres table. I see this option :



    myDataFrame.write.jdbc(url, table, connectionProperties)


    But, what I want to do is UPSERT the dataframe into table based on the Primary Key of the Table. How is this to be done? I am using Spark 1.6.0.










    share|improve this question
























      up vote
      12
      down vote

      favorite
      5









      up vote
      12
      down vote

      favorite
      5






      5





      I am using Apache Spark DataFrames to join two data sources and get the result as another DataFrame. I want to write the result to another Postgres table. I see this option :



      myDataFrame.write.jdbc(url, table, connectionProperties)


      But, what I want to do is UPSERT the dataframe into table based on the Primary Key of the Table. How is this to be done? I am using Spark 1.6.0.










      share|improve this question













      I am using Apache Spark DataFrames to join two data sources and get the result as another DataFrame. I want to write the result to another Postgres table. I see this option :



      myDataFrame.write.jdbc(url, table, connectionProperties)


      But, what I want to do is UPSERT the dataframe into table based on the Primary Key of the Table. How is this to be done? I am using Spark 1.6.0.







      postgresql scala apache-spark apache-spark-sql spark-dataframe






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Jan 6 '16 at 21:33









      void

      99211637




      99211637
























          4 Answers
          4






          active

          oldest

          votes

















          up vote
          13
          down vote



          accepted










          It is not supported. DataFrameWriter can either append to or overwrite existing table. If your application requires more complex logic you'll have to deal with this manually.



          One option is to use an action (foreach, foreachPartition) with standard JDBC connection. Another one is to write to a temporary and handle the rest directly in the database.






          share|improve this answer





















          • Also, how to overwrite to existing jcbc table? I can only see the option of df.write.mode().saveAsTable() But this doesn't seem to support jdbc tables
            – void
            Jan 7 '16 at 6:59










          • dataframe.write.mode(SaveMode.OverWrite)
            – Aydin K.
            Feb 13 at 12:43


















          up vote
          9
          down vote













          KrisP has the right of it. The best way to do an upsert is not through a prepared statement. It's important to note that this method will insert one at a time with as many partitions as the number of workers you have. If you want to do this in batch you can as well



          import java.sql._
          dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch =>
          val dbc: Connection = DriverManager.getConnection("JDBCURL")
          val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")

          batch.grouped("# Of Rows you want per batch").foreach { session =>
          session.foreach { x =>
          st.setDouble(1, x.getDouble(1))
          st.addBatch()
          }
          st.executeBatch()
          }
          dbc.close()
          }


          This will execute batches for each worker and close the DB connection. It gives you control over how many workers, how many batches and allows you to work within those confines.






          share|improve this answer























          • Can "Update" sql statement be used here to update the single row in database table?
            – User007
            Aug 16 at 19:20










          • Yep, the prepared statement can be an insert.
            – jstuartmill
            Aug 18 at 4:14


















          up vote
          8
          down vote













          If you are going to do it manually and via option 1 mentioned by zero323, you should take a look at Spark source code for the insert statement here



            def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = {
          val columns = rddSchema.fields.map(_.name).mkString(",")
          val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
          val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
          conn.prepareStatement(sql)
          }


          The PreparedStatement is part of java.sql and it has methods like execute() and executeUpdate(). You still have to modify the sql accordingly, of course.






          share|improve this answer




























            up vote
            2
            down vote













            To insert JDBC you can use



            dataframe.write.mode(SaveMode.Append).jdbc(jdbc_url,table_name,connection_properties)



            Also,Dataframe.write gives you a DataFrameWriter and it has some methods to insert the dataframe.



            def insertInto(tableName: String): Unit



            Inserts the content of the DataFrame to the specified table. It requires that the schema of the DataFrame is the same as the schema of the table.



            Because it inserts data to an existing table, format or options will be ignored.



            http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter



            Nothing yet to update individual records out of the box from spark though






            share|improve this answer





















              Your Answer






              StackExchange.ifUsing("editor", function () {
              StackExchange.using("externalEditor", function () {
              StackExchange.using("snippets", function () {
              StackExchange.snippets.init();
              });
              });
              }, "code-snippets");

              StackExchange.ready(function() {
              var channelOptions = {
              tags: "".split(" "),
              id: "1"
              };
              initTagRenderer("".split(" "), "".split(" "), channelOptions);

              StackExchange.using("externalEditor", function() {
              // Have to fire editor after snippets, if snippets enabled
              if (StackExchange.settings.snippets.snippetsEnabled) {
              StackExchange.using("snippets", function() {
              createEditor();
              });
              }
              else {
              createEditor();
              }
              });

              function createEditor() {
              StackExchange.prepareEditor({
              heartbeatType: 'answer',
              convertImagesToLinks: true,
              noModals: true,
              showLowRepImageUploadWarning: true,
              reputationToPostImages: 10,
              bindNavPrevention: true,
              postfix: "",
              imageUploader: {
              brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
              contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
              allowUrls: true
              },
              onDemand: true,
              discardSelector: ".discard-answer"
              ,immediatelyShowMarkdownHelp:true
              });


              }
              });














               

              draft saved


              draft discarded


















              StackExchange.ready(
              function () {
              StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f34643200%2fspark-dataframes-upsert-to-postgres-table%23new-answer', 'question_page');
              }
              );

              Post as a guest















              Required, but never shown

























              4 Answers
              4






              active

              oldest

              votes








              4 Answers
              4






              active

              oldest

              votes









              active

              oldest

              votes






              active

              oldest

              votes








              up vote
              13
              down vote



              accepted










              It is not supported. DataFrameWriter can either append to or overwrite existing table. If your application requires more complex logic you'll have to deal with this manually.



              One option is to use an action (foreach, foreachPartition) with standard JDBC connection. Another one is to write to a temporary and handle the rest directly in the database.






              share|improve this answer





















              • Also, how to overwrite to existing jcbc table? I can only see the option of df.write.mode().saveAsTable() But this doesn't seem to support jdbc tables
                – void
                Jan 7 '16 at 6:59










              • dataframe.write.mode(SaveMode.OverWrite)
                – Aydin K.
                Feb 13 at 12:43















              up vote
              13
              down vote



              accepted










              It is not supported. DataFrameWriter can either append to or overwrite existing table. If your application requires more complex logic you'll have to deal with this manually.



              One option is to use an action (foreach, foreachPartition) with standard JDBC connection. Another one is to write to a temporary and handle the rest directly in the database.






              share|improve this answer





















              • Also, how to overwrite to existing jcbc table? I can only see the option of df.write.mode().saveAsTable() But this doesn't seem to support jdbc tables
                – void
                Jan 7 '16 at 6:59










              • dataframe.write.mode(SaveMode.OverWrite)
                – Aydin K.
                Feb 13 at 12:43













              up vote
              13
              down vote



              accepted







              up vote
              13
              down vote



              accepted






              It is not supported. DataFrameWriter can either append to or overwrite existing table. If your application requires more complex logic you'll have to deal with this manually.



              One option is to use an action (foreach, foreachPartition) with standard JDBC connection. Another one is to write to a temporary and handle the rest directly in the database.






              share|improve this answer












              It is not supported. DataFrameWriter can either append to or overwrite existing table. If your application requires more complex logic you'll have to deal with this manually.



              One option is to use an action (foreach, foreachPartition) with standard JDBC connection. Another one is to write to a temporary and handle the rest directly in the database.







              share|improve this answer












              share|improve this answer



              share|improve this answer










              answered Jan 6 '16 at 21:53









              zero323

              159k39459562




              159k39459562












              • Also, how to overwrite to existing jcbc table? I can only see the option of df.write.mode().saveAsTable() But this doesn't seem to support jdbc tables
                – void
                Jan 7 '16 at 6:59










              • dataframe.write.mode(SaveMode.OverWrite)
                – Aydin K.
                Feb 13 at 12:43


















              • Also, how to overwrite to existing jcbc table? I can only see the option of df.write.mode().saveAsTable() But this doesn't seem to support jdbc tables
                – void
                Jan 7 '16 at 6:59










              • dataframe.write.mode(SaveMode.OverWrite)
                – Aydin K.
                Feb 13 at 12:43
















              Also, how to overwrite to existing jcbc table? I can only see the option of df.write.mode().saveAsTable() But this doesn't seem to support jdbc tables
              – void
              Jan 7 '16 at 6:59




              Also, how to overwrite to existing jcbc table? I can only see the option of df.write.mode().saveAsTable() But this doesn't seem to support jdbc tables
              – void
              Jan 7 '16 at 6:59












              dataframe.write.mode(SaveMode.OverWrite)
              – Aydin K.
              Feb 13 at 12:43




              dataframe.write.mode(SaveMode.OverWrite)
              – Aydin K.
              Feb 13 at 12:43












              up vote
              9
              down vote













              KrisP has the right of it. The best way to do an upsert is not through a prepared statement. It's important to note that this method will insert one at a time with as many partitions as the number of workers you have. If you want to do this in batch you can as well



              import java.sql._
              dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch =>
              val dbc: Connection = DriverManager.getConnection("JDBCURL")
              val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")

              batch.grouped("# Of Rows you want per batch").foreach { session =>
              session.foreach { x =>
              st.setDouble(1, x.getDouble(1))
              st.addBatch()
              }
              st.executeBatch()
              }
              dbc.close()
              }


              This will execute batches for each worker and close the DB connection. It gives you control over how many workers, how many batches and allows you to work within those confines.






              share|improve this answer























              • Can "Update" sql statement be used here to update the single row in database table?
                – User007
                Aug 16 at 19:20










              • Yep, the prepared statement can be an insert.
                – jstuartmill
                Aug 18 at 4:14















              up vote
              9
              down vote













              KrisP has the right of it. The best way to do an upsert is not through a prepared statement. It's important to note that this method will insert one at a time with as many partitions as the number of workers you have. If you want to do this in batch you can as well



              import java.sql._
              dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch =>
              val dbc: Connection = DriverManager.getConnection("JDBCURL")
              val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")

              batch.grouped("# Of Rows you want per batch").foreach { session =>
              session.foreach { x =>
              st.setDouble(1, x.getDouble(1))
              st.addBatch()
              }
              st.executeBatch()
              }
              dbc.close()
              }


              This will execute batches for each worker and close the DB connection. It gives you control over how many workers, how many batches and allows you to work within those confines.






              share|improve this answer























              • Can "Update" sql statement be used here to update the single row in database table?
                – User007
                Aug 16 at 19:20










              • Yep, the prepared statement can be an insert.
                – jstuartmill
                Aug 18 at 4:14













              up vote
              9
              down vote










              up vote
              9
              down vote









              KrisP has the right of it. The best way to do an upsert is not through a prepared statement. It's important to note that this method will insert one at a time with as many partitions as the number of workers you have. If you want to do this in batch you can as well



              import java.sql._
              dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch =>
              val dbc: Connection = DriverManager.getConnection("JDBCURL")
              val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")

              batch.grouped("# Of Rows you want per batch").foreach { session =>
              session.foreach { x =>
              st.setDouble(1, x.getDouble(1))
              st.addBatch()
              }
              st.executeBatch()
              }
              dbc.close()
              }


              This will execute batches for each worker and close the DB connection. It gives you control over how many workers, how many batches and allows you to work within those confines.






              share|improve this answer














              KrisP has the right of it. The best way to do an upsert is not through a prepared statement. It's important to note that this method will insert one at a time with as many partitions as the number of workers you have. If you want to do this in batch you can as well



              import java.sql._
              dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch =>
              val dbc: Connection = DriverManager.getConnection("JDBCURL")
              val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")

              batch.grouped("# Of Rows you want per batch").foreach { session =>
              session.foreach { x =>
              st.setDouble(1, x.getDouble(1))
              st.addBatch()
              }
              st.executeBatch()
              }
              dbc.close()
              }


              This will execute batches for each worker and close the DB connection. It gives you control over how many workers, how many batches and allows you to work within those confines.







              share|improve this answer














              share|improve this answer



              share|improve this answer








              edited Jul 23 '17 at 22:12

























              answered Oct 6 '16 at 4:25









              jstuartmill

              21858




              21858












              • Can "Update" sql statement be used here to update the single row in database table?
                – User007
                Aug 16 at 19:20










              • Yep, the prepared statement can be an insert.
                – jstuartmill
                Aug 18 at 4:14


















              • Can "Update" sql statement be used here to update the single row in database table?
                – User007
                Aug 16 at 19:20










              • Yep, the prepared statement can be an insert.
                – jstuartmill
                Aug 18 at 4:14
















              Can "Update" sql statement be used here to update the single row in database table?
              – User007
              Aug 16 at 19:20




              Can "Update" sql statement be used here to update the single row in database table?
              – User007
              Aug 16 at 19:20












              Yep, the prepared statement can be an insert.
              – jstuartmill
              Aug 18 at 4:14




              Yep, the prepared statement can be an insert.
              – jstuartmill
              Aug 18 at 4:14










              up vote
              8
              down vote













              If you are going to do it manually and via option 1 mentioned by zero323, you should take a look at Spark source code for the insert statement here



                def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = {
              val columns = rddSchema.fields.map(_.name).mkString(",")
              val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
              val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
              conn.prepareStatement(sql)
              }


              The PreparedStatement is part of java.sql and it has methods like execute() and executeUpdate(). You still have to modify the sql accordingly, of course.






              share|improve this answer

























                up vote
                8
                down vote













                If you are going to do it manually and via option 1 mentioned by zero323, you should take a look at Spark source code for the insert statement here



                  def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = {
                val columns = rddSchema.fields.map(_.name).mkString(",")
                val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
                val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
                conn.prepareStatement(sql)
                }


                The PreparedStatement is part of java.sql and it has methods like execute() and executeUpdate(). You still have to modify the sql accordingly, of course.






                share|improve this answer























                  up vote
                  8
                  down vote










                  up vote
                  8
                  down vote









                  If you are going to do it manually and via option 1 mentioned by zero323, you should take a look at Spark source code for the insert statement here



                    def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = {
                  val columns = rddSchema.fields.map(_.name).mkString(",")
                  val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
                  val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
                  conn.prepareStatement(sql)
                  }


                  The PreparedStatement is part of java.sql and it has methods like execute() and executeUpdate(). You still have to modify the sql accordingly, of course.






                  share|improve this answer












                  If you are going to do it manually and via option 1 mentioned by zero323, you should take a look at Spark source code for the insert statement here



                    def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = {
                  val columns = rddSchema.fields.map(_.name).mkString(",")
                  val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
                  val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
                  conn.prepareStatement(sql)
                  }


                  The PreparedStatement is part of java.sql and it has methods like execute() and executeUpdate(). You still have to modify the sql accordingly, of course.







                  share|improve this answer












                  share|improve this answer



                  share|improve this answer










                  answered Jan 6 '16 at 22:10









                  KrisP

                  99157




                  99157






















                      up vote
                      2
                      down vote













                      To insert JDBC you can use



                      dataframe.write.mode(SaveMode.Append).jdbc(jdbc_url,table_name,connection_properties)



                      Also,Dataframe.write gives you a DataFrameWriter and it has some methods to insert the dataframe.



                      def insertInto(tableName: String): Unit



                      Inserts the content of the DataFrame to the specified table. It requires that the schema of the DataFrame is the same as the schema of the table.



                      Because it inserts data to an existing table, format or options will be ignored.



                      http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter



                      Nothing yet to update individual records out of the box from spark though






                      share|improve this answer

























                        up vote
                        2
                        down vote













                        To insert JDBC you can use



                        dataframe.write.mode(SaveMode.Append).jdbc(jdbc_url,table_name,connection_properties)



                        Also,Dataframe.write gives you a DataFrameWriter and it has some methods to insert the dataframe.



                        def insertInto(tableName: String): Unit



                        Inserts the content of the DataFrame to the specified table. It requires that the schema of the DataFrame is the same as the schema of the table.



                        Because it inserts data to an existing table, format or options will be ignored.



                        http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter



                        Nothing yet to update individual records out of the box from spark though






                        share|improve this answer























                          up vote
                          2
                          down vote










                          up vote
                          2
                          down vote









                          To insert JDBC you can use



                          dataframe.write.mode(SaveMode.Append).jdbc(jdbc_url,table_name,connection_properties)



                          Also,Dataframe.write gives you a DataFrameWriter and it has some methods to insert the dataframe.



                          def insertInto(tableName: String): Unit



                          Inserts the content of the DataFrame to the specified table. It requires that the schema of the DataFrame is the same as the schema of the table.



                          Because it inserts data to an existing table, format or options will be ignored.



                          http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter



                          Nothing yet to update individual records out of the box from spark though






                          share|improve this answer












                          To insert JDBC you can use



                          dataframe.write.mode(SaveMode.Append).jdbc(jdbc_url,table_name,connection_properties)



                          Also,Dataframe.write gives you a DataFrameWriter and it has some methods to insert the dataframe.



                          def insertInto(tableName: String): Unit



                          Inserts the content of the DataFrame to the specified table. It requires that the schema of the DataFrame is the same as the schema of the table.



                          Because it inserts data to an existing table, format or options will be ignored.



                          http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter



                          Nothing yet to update individual records out of the box from spark though







                          share|improve this answer












                          share|improve this answer



                          share|improve this answer










                          answered Mar 4 '16 at 21:08









                          Soumitra

                          3611517




                          3611517






























                               

                              draft saved


                              draft discarded



















































                               


                              draft saved


                              draft discarded














                              StackExchange.ready(
                              function () {
                              StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f34643200%2fspark-dataframes-upsert-to-postgres-table%23new-answer', 'question_page');
                              }
                              );

                              Post as a guest















                              Required, but never shown





















































                              Required, but never shown














                              Required, but never shown












                              Required, but never shown







                              Required, but never shown

































                              Required, but never shown














                              Required, but never shown












                              Required, but never shown







                              Required, but never shown







                              Popular posts from this blog

                              Florida Star v. B. J. F.

                              Error while running script in elastic search , gateway timeout

                              Adding quotations to stringified JSON object values