Spring Integration : retry configuration with multi-instances












1














I'm running 4 instances of Spring Boot Integration based apps on 4 differents servers.
The process is :




  1. Read XML files one by one in a shared folder.

  2. Process the file (check structure, content...), transform the data and send email.

  3. Write a report about this file in another shared folder.

  4. Delete successfully processed file.


I'm looking for a non-blocking and safe solution to process theses files.



Use cases :




  • If an instance crashes while reading or processing a file (so without ending the integration chain) : another instance must process the file or the same instance must process the file after it restarts.

  • If an instance is processing a file, the others instances must not process the file.


I have built this Spring Integration XML configuration file (it includes JDBC metadatastore with a shared H2 database) :



    <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">

<int:poller default="true" fixed-rate="1000"/>
<int:channel id="inputFilesChannel">
<int:queue/>
</int:channel>

<!-- Input -->
<int-file:inbound-channel-adapter
id="inputFilesAdapter"
channel="inputFilesChannel"
directory="file:${input.files.path}"
ignore-hidden="true"
comparator="lastModifiedFileComparator"
filter="compositeFilter">
<int:poller fixed-rate="10000" max-messages-per-poll="1" task-executor="taskExecutor"/>
</int-file:inbound-channel-adapter>

<task:executor id="taskExecutor" pool-size="1"/>

<!-- Metadatastore -->
<bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
<property name="url" value="jdbc:h2:file:${database.path}/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
<property name="driverClassName" value="org.h2.Driver"/>
<property name="username" value="${database.username}"/>
<property name="password" value="${database.password}"/>
<property name="maxIdle" value="4"/>
</bean>

<bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
<constructor-arg ref="jdbcDataSource"/>
</bean>

<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="jdbcDataSource"/>
</bean>

<bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
<constructor-arg index="0" ref="jdbcMetadataStore"/>
<constructor-arg index="1" value="files"/>
</bean>
</list>
</constructor-arg>
</bean>

<!-- Workflow -->
<int:chain input-channel="inputFilesChannel" output-channel="outputFilesChannel">
<int:service-activator ref="fileActivator" method="fileRead"/>
<int:service-activator ref="fileActivator" method="fileProcess"/>
<int:service-activator ref="fileActivator" method="fileAudit"/>
</int:chain>

<bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>

<int-file:outbound-channel-adapter
id="outputFilesChannel"
directory="file:${output.files.path}"
filename-generator-expression ="payload.name">
<int-file:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
</bean>
</int-file:request-handler-advice-chain>
</int-file:outbound-channel-adapter>

</beans>


Problem :
With multiple files, when 1 file is successfully processed, the transaction commit the others existing files in the metadatastore (table INT_METADATA_STORE). So if the app is restarted, the other files will never be processed
(it works fine if the app crashes when the first file is being processed).
It seems it only apply for reading files, not for processing files in an integration chain ... How to manage rollback transaction on JVM crash file by file ?



Any help is very appreciated. It's going to make me crazy :(



Thanks !



Edits / Notes :




  • Inspired from https://github.com/caoimhindenais/spring-integration-files/blob/master/src/main/resources/context.xml


  • I have updated my configuration with the answer from Artem Bilan. And remove the transactional block in the poller block : I had conflict of transactions between instances (ugly table locks exceptions). Although the behaviour was the same.



  • I have unsuccessfully tested this configuration in the poller block (same behaviour) :



    <int:advice-chain>
    <tx:advice id="txAdvice" transaction-manager="transactionManager">
    <tx:attributes>
    <tx:method name="file*" timeout="30000" propagation="REQUIRED"/>
    </tx:attributes>
    </tx:advice>
    </int:advice-chain>


  • Maybe a solution based on Idempotent Receiver Enterprise Integration Pattern could work. But I didn't manage to configure it... I don't find precise documentation.











share|improve this question





























    1














    I'm running 4 instances of Spring Boot Integration based apps on 4 differents servers.
    The process is :




    1. Read XML files one by one in a shared folder.

    2. Process the file (check structure, content...), transform the data and send email.

    3. Write a report about this file in another shared folder.

    4. Delete successfully processed file.


    I'm looking for a non-blocking and safe solution to process theses files.



    Use cases :




    • If an instance crashes while reading or processing a file (so without ending the integration chain) : another instance must process the file or the same instance must process the file after it restarts.

    • If an instance is processing a file, the others instances must not process the file.


    I have built this Spring Integration XML configuration file (it includes JDBC metadatastore with a shared H2 database) :



        <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-file="http://www.springframework.org/schema/integration/file"
    xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/file
    http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">

    <int:poller default="true" fixed-rate="1000"/>
    <int:channel id="inputFilesChannel">
    <int:queue/>
    </int:channel>

    <!-- Input -->
    <int-file:inbound-channel-adapter
    id="inputFilesAdapter"
    channel="inputFilesChannel"
    directory="file:${input.files.path}"
    ignore-hidden="true"
    comparator="lastModifiedFileComparator"
    filter="compositeFilter">
    <int:poller fixed-rate="10000" max-messages-per-poll="1" task-executor="taskExecutor"/>
    </int-file:inbound-channel-adapter>

    <task:executor id="taskExecutor" pool-size="1"/>

    <!-- Metadatastore -->
    <bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
    <property name="url" value="jdbc:h2:file:${database.path}/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
    <property name="driverClassName" value="org.h2.Driver"/>
    <property name="username" value="${database.username}"/>
    <property name="password" value="${database.password}"/>
    <property name="maxIdle" value="4"/>
    </bean>

    <bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
    <constructor-arg ref="jdbcDataSource"/>
    </bean>

    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="jdbcDataSource"/>
    </bean>

    <bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
    <constructor-arg>
    <list>
    <bean class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
    <constructor-arg index="0" ref="jdbcMetadataStore"/>
    <constructor-arg index="1" value="files"/>
    </bean>
    </list>
    </constructor-arg>
    </bean>

    <!-- Workflow -->
    <int:chain input-channel="inputFilesChannel" output-channel="outputFilesChannel">
    <int:service-activator ref="fileActivator" method="fileRead"/>
    <int:service-activator ref="fileActivator" method="fileProcess"/>
    <int:service-activator ref="fileActivator" method="fileAudit"/>
    </int:chain>

    <bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>

    <int-file:outbound-channel-adapter
    id="outputFilesChannel"
    directory="file:${output.files.path}"
    filename-generator-expression ="payload.name">
    <int-file:request-handler-advice-chain>
    <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
    <property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
    </bean>
    </int-file:request-handler-advice-chain>
    </int-file:outbound-channel-adapter>

    </beans>


    Problem :
    With multiple files, when 1 file is successfully processed, the transaction commit the others existing files in the metadatastore (table INT_METADATA_STORE). So if the app is restarted, the other files will never be processed
    (it works fine if the app crashes when the first file is being processed).
    It seems it only apply for reading files, not for processing files in an integration chain ... How to manage rollback transaction on JVM crash file by file ?



    Any help is very appreciated. It's going to make me crazy :(



    Thanks !



    Edits / Notes :




    • Inspired from https://github.com/caoimhindenais/spring-integration-files/blob/master/src/main/resources/context.xml


    • I have updated my configuration with the answer from Artem Bilan. And remove the transactional block in the poller block : I had conflict of transactions between instances (ugly table locks exceptions). Although the behaviour was the same.



    • I have unsuccessfully tested this configuration in the poller block (same behaviour) :



      <int:advice-chain>
      <tx:advice id="txAdvice" transaction-manager="transactionManager">
      <tx:attributes>
      <tx:method name="file*" timeout="30000" propagation="REQUIRED"/>
      </tx:attributes>
      </tx:advice>
      </int:advice-chain>


    • Maybe a solution based on Idempotent Receiver Enterprise Integration Pattern could work. But I didn't manage to configure it... I don't find precise documentation.











    share|improve this question



























      1












      1








      1







      I'm running 4 instances of Spring Boot Integration based apps on 4 differents servers.
      The process is :




      1. Read XML files one by one in a shared folder.

      2. Process the file (check structure, content...), transform the data and send email.

      3. Write a report about this file in another shared folder.

      4. Delete successfully processed file.


      I'm looking for a non-blocking and safe solution to process theses files.



      Use cases :




      • If an instance crashes while reading or processing a file (so without ending the integration chain) : another instance must process the file or the same instance must process the file after it restarts.

      • If an instance is processing a file, the others instances must not process the file.


      I have built this Spring Integration XML configuration file (it includes JDBC metadatastore with a shared H2 database) :



          <?xml version="1.0" encoding="UTF-8"?>
      <beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:int="http://www.springframework.org/schema/integration"
      xmlns:int-file="http://www.springframework.org/schema/integration/file"
      xsi:schemaLocation="
      http://www.springframework.org/schema/beans
      http://www.springframework.org/schema/beans/spring-beans.xsd
      http://www.springframework.org/schema/integration
      http://www.springframework.org/schema/integration/spring-integration.xsd
      http://www.springframework.org/schema/integration/file
      http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">

      <int:poller default="true" fixed-rate="1000"/>
      <int:channel id="inputFilesChannel">
      <int:queue/>
      </int:channel>

      <!-- Input -->
      <int-file:inbound-channel-adapter
      id="inputFilesAdapter"
      channel="inputFilesChannel"
      directory="file:${input.files.path}"
      ignore-hidden="true"
      comparator="lastModifiedFileComparator"
      filter="compositeFilter">
      <int:poller fixed-rate="10000" max-messages-per-poll="1" task-executor="taskExecutor"/>
      </int-file:inbound-channel-adapter>

      <task:executor id="taskExecutor" pool-size="1"/>

      <!-- Metadatastore -->
      <bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
      <property name="url" value="jdbc:h2:file:${database.path}/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
      <property name="driverClassName" value="org.h2.Driver"/>
      <property name="username" value="${database.username}"/>
      <property name="password" value="${database.password}"/>
      <property name="maxIdle" value="4"/>
      </bean>

      <bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
      <constructor-arg ref="jdbcDataSource"/>
      </bean>

      <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
      <property name="dataSource" ref="jdbcDataSource"/>
      </bean>

      <bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
      <constructor-arg>
      <list>
      <bean class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
      <constructor-arg index="0" ref="jdbcMetadataStore"/>
      <constructor-arg index="1" value="files"/>
      </bean>
      </list>
      </constructor-arg>
      </bean>

      <!-- Workflow -->
      <int:chain input-channel="inputFilesChannel" output-channel="outputFilesChannel">
      <int:service-activator ref="fileActivator" method="fileRead"/>
      <int:service-activator ref="fileActivator" method="fileProcess"/>
      <int:service-activator ref="fileActivator" method="fileAudit"/>
      </int:chain>

      <bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>

      <int-file:outbound-channel-adapter
      id="outputFilesChannel"
      directory="file:${output.files.path}"
      filename-generator-expression ="payload.name">
      <int-file:request-handler-advice-chain>
      <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
      <property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
      </bean>
      </int-file:request-handler-advice-chain>
      </int-file:outbound-channel-adapter>

      </beans>


      Problem :
      With multiple files, when 1 file is successfully processed, the transaction commit the others existing files in the metadatastore (table INT_METADATA_STORE). So if the app is restarted, the other files will never be processed
      (it works fine if the app crashes when the first file is being processed).
      It seems it only apply for reading files, not for processing files in an integration chain ... How to manage rollback transaction on JVM crash file by file ?



      Any help is very appreciated. It's going to make me crazy :(



      Thanks !



      Edits / Notes :




      • Inspired from https://github.com/caoimhindenais/spring-integration-files/blob/master/src/main/resources/context.xml


      • I have updated my configuration with the answer from Artem Bilan. And remove the transactional block in the poller block : I had conflict of transactions between instances (ugly table locks exceptions). Although the behaviour was the same.



      • I have unsuccessfully tested this configuration in the poller block (same behaviour) :



        <int:advice-chain>
        <tx:advice id="txAdvice" transaction-manager="transactionManager">
        <tx:attributes>
        <tx:method name="file*" timeout="30000" propagation="REQUIRED"/>
        </tx:attributes>
        </tx:advice>
        </int:advice-chain>


      • Maybe a solution based on Idempotent Receiver Enterprise Integration Pattern could work. But I didn't manage to configure it... I don't find precise documentation.











      share|improve this question















      I'm running 4 instances of Spring Boot Integration based apps on 4 differents servers.
      The process is :




      1. Read XML files one by one in a shared folder.

      2. Process the file (check structure, content...), transform the data and send email.

      3. Write a report about this file in another shared folder.

      4. Delete successfully processed file.


      I'm looking for a non-blocking and safe solution to process theses files.



      Use cases :




      • If an instance crashes while reading or processing a file (so without ending the integration chain) : another instance must process the file or the same instance must process the file after it restarts.

      • If an instance is processing a file, the others instances must not process the file.


      I have built this Spring Integration XML configuration file (it includes JDBC metadatastore with a shared H2 database) :



          <?xml version="1.0" encoding="UTF-8"?>
      <beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:int="http://www.springframework.org/schema/integration"
      xmlns:int-file="http://www.springframework.org/schema/integration/file"
      xsi:schemaLocation="
      http://www.springframework.org/schema/beans
      http://www.springframework.org/schema/beans/spring-beans.xsd
      http://www.springframework.org/schema/integration
      http://www.springframework.org/schema/integration/spring-integration.xsd
      http://www.springframework.org/schema/integration/file
      http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">

      <int:poller default="true" fixed-rate="1000"/>
      <int:channel id="inputFilesChannel">
      <int:queue/>
      </int:channel>

      <!-- Input -->
      <int-file:inbound-channel-adapter
      id="inputFilesAdapter"
      channel="inputFilesChannel"
      directory="file:${input.files.path}"
      ignore-hidden="true"
      comparator="lastModifiedFileComparator"
      filter="compositeFilter">
      <int:poller fixed-rate="10000" max-messages-per-poll="1" task-executor="taskExecutor"/>
      </int-file:inbound-channel-adapter>

      <task:executor id="taskExecutor" pool-size="1"/>

      <!-- Metadatastore -->
      <bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
      <property name="url" value="jdbc:h2:file:${database.path}/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
      <property name="driverClassName" value="org.h2.Driver"/>
      <property name="username" value="${database.username}"/>
      <property name="password" value="${database.password}"/>
      <property name="maxIdle" value="4"/>
      </bean>

      <bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
      <constructor-arg ref="jdbcDataSource"/>
      </bean>

      <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
      <property name="dataSource" ref="jdbcDataSource"/>
      </bean>

      <bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
      <constructor-arg>
      <list>
      <bean class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
      <constructor-arg index="0" ref="jdbcMetadataStore"/>
      <constructor-arg index="1" value="files"/>
      </bean>
      </list>
      </constructor-arg>
      </bean>

      <!-- Workflow -->
      <int:chain input-channel="inputFilesChannel" output-channel="outputFilesChannel">
      <int:service-activator ref="fileActivator" method="fileRead"/>
      <int:service-activator ref="fileActivator" method="fileProcess"/>
      <int:service-activator ref="fileActivator" method="fileAudit"/>
      </int:chain>

      <bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>

      <int-file:outbound-channel-adapter
      id="outputFilesChannel"
      directory="file:${output.files.path}"
      filename-generator-expression ="payload.name">
      <int-file:request-handler-advice-chain>
      <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
      <property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
      </bean>
      </int-file:request-handler-advice-chain>
      </int-file:outbound-channel-adapter>

      </beans>


      Problem :
      With multiple files, when 1 file is successfully processed, the transaction commit the others existing files in the metadatastore (table INT_METADATA_STORE). So if the app is restarted, the other files will never be processed
      (it works fine if the app crashes when the first file is being processed).
      It seems it only apply for reading files, not for processing files in an integration chain ... How to manage rollback transaction on JVM crash file by file ?



      Any help is very appreciated. It's going to make me crazy :(



      Thanks !



      Edits / Notes :




      • Inspired from https://github.com/caoimhindenais/spring-integration-files/blob/master/src/main/resources/context.xml


      • I have updated my configuration with the answer from Artem Bilan. And remove the transactional block in the poller block : I had conflict of transactions between instances (ugly table locks exceptions). Although the behaviour was the same.



      • I have unsuccessfully tested this configuration in the poller block (same behaviour) :



        <int:advice-chain>
        <tx:advice id="txAdvice" transaction-manager="transactionManager">
        <tx:attributes>
        <tx:method name="file*" timeout="30000" propagation="REQUIRED"/>
        </tx:attributes>
        </tx:advice>
        </int:advice-chain>


      • Maybe a solution based on Idempotent Receiver Enterprise Integration Pattern could work. But I didn't manage to configure it... I don't find precise documentation.








      java spring spring-boot spring-integration






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 15 at 11:50

























      asked Nov 12 at 10:44









      cactuschibre

      464213




      464213
























          2 Answers
          2






          active

          oldest

          votes


















          1














          You shouldn't use a PseudoTransactionManager, but DataSourceTransactionManager instead.



          Since you use a JdbcMetadataStore, it is going to participate in the transaction and if downstream flow fails, the entry in the metadata store is going to be rolled back as well.






          share|improve this answer





















          • Good to know, but the problem persists :(
            – cactuschibre
            Nov 14 at 10:36



















          1














          Ok. I found a working solution. Maybe not the cleanest one but it works :




          • Multi-instances on separate servers, sharing the same H2 database (network folder mount). I think it should work via remote TCP. MVCC has been activated on H2 (check its doc).


          • inbound-channel-adapter has scan-each-poll option activated to permit repolling files that could be previously ignored (if the process already begun by another instance). So, if another instance crashes, the file can be polled and processed again without restart for this very instance.

          • Option defaultAutoCommit is set to false on the DB.

          • I didn't use the FileSystemPersistentAcceptOnceFileListFilter because it was aggregating all read files in the metadatastore when one file get successfully processed. I didn't manage to use it in my context ...


          • I wrote my own conditions and actions in expressions through filter and transaction synchronization.



            <!-- Input -->
            <bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>
            <int-file:inbound-channel-adapter
            id="inputAdapter"
            channel="inputChannel"
            directory="file:${input.files.path}"
            comparator="lastModifiedFileComparator"
            scan-each-poll="true">
            <int:poller max-messages-per-poll="1" fixed-rate="5000">
            <int:transactional transaction-manager="transactionManager" isolation="READ_COMMITTED" propagation="REQUIRED" timeout="60000" synchronization-factory="syncFactory"/>
            </int:poller>
            </int-file:inbound-channel-adapter>

            <!-- Continue only if the concurrentmetadatastore doesn't contain the file. If if is not the case : insert it in the metadatastore -->
            <int:filter input-channel="inputChannel" output-channel="processChannel" discard-channel="nullChannel" throw-exception-on-rejection="false" expression="@jdbcMetadataStore.putIfAbsent(headers[file_name], headers[timestamp]) == null"/>

            <!-- Rollback by removing the file from the metadatastore -->
            <int:transaction-synchronization-factory id="syncFactory">
            <int:after-rollback expression="@jdbcMetadataStore.remove(headers[file_name])" />
            </int:transaction-synchronization-factory>

            <!-- Metadatastore configuration -->
            <bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
            <property name="url" value="jdbc:h2:file:${database.path}/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
            <property name="driverClassName" value="org.h2.Driver"/>
            <property name="username" value="${database.username}"/>
            <property name="password" value="${database.password}"/>
            <property name="maxIdle" value="4"/>
            <property name="defaultAutoCommit" value="false"/>
            </bean>

            <bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
            <constructor-arg ref="jdbcDataSource"/>
            </bean>

            <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
            <property name="dataSource" ref="jdbcDataSource"/>
            </bean>

            <!-- Workflow -->
            <int:chain input-channel="processChannel" output-channel="outputChannel">
            <int:service-activator ref="fileActivator" method="fileRead"/>
            <int:service-activator ref="fileActivator" method="fileProcess"/>
            <int:service-activator ref="fileActivator" method="fileAudit"/>
            </int:chain>


            <!-- Output -->
            <int-file:outbound-channel-adapter
            id="outputChannel"
            directory="file:${output.files.path}"
            filename-generator-expression ="payload.name">
            <!-- Delete the source file -->
            <int-file:request-handler-advice-chain>
            <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
            <property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
            </bean>
            </int-file:request-handler-advice-chain>
            </int-file:outbound-channel-adapter>



          Any improvement or other solution is welcome.






          share|improve this answer





















            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53260483%2fspring-integration-retry-configuration-with-multi-instances%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            2 Answers
            2






            active

            oldest

            votes








            2 Answers
            2






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            1














            You shouldn't use a PseudoTransactionManager, but DataSourceTransactionManager instead.



            Since you use a JdbcMetadataStore, it is going to participate in the transaction and if downstream flow fails, the entry in the metadata store is going to be rolled back as well.






            share|improve this answer





















            • Good to know, but the problem persists :(
              – cactuschibre
              Nov 14 at 10:36
















            1














            You shouldn't use a PseudoTransactionManager, but DataSourceTransactionManager instead.



            Since you use a JdbcMetadataStore, it is going to participate in the transaction and if downstream flow fails, the entry in the metadata store is going to be rolled back as well.






            share|improve this answer





















            • Good to know, but the problem persists :(
              – cactuschibre
              Nov 14 at 10:36














            1












            1








            1






            You shouldn't use a PseudoTransactionManager, but DataSourceTransactionManager instead.



            Since you use a JdbcMetadataStore, it is going to participate in the transaction and if downstream flow fails, the entry in the metadata store is going to be rolled back as well.






            share|improve this answer












            You shouldn't use a PseudoTransactionManager, but DataSourceTransactionManager instead.



            Since you use a JdbcMetadataStore, it is going to participate in the transaction and if downstream flow fails, the entry in the metadata store is going to be rolled back as well.







            share|improve this answer












            share|improve this answer



            share|improve this answer










            answered Nov 12 at 14:32









            Artem Bilan

            63.8k84668




            63.8k84668












            • Good to know, but the problem persists :(
              – cactuschibre
              Nov 14 at 10:36


















            • Good to know, but the problem persists :(
              – cactuschibre
              Nov 14 at 10:36
















            Good to know, but the problem persists :(
            – cactuschibre
            Nov 14 at 10:36




            Good to know, but the problem persists :(
            – cactuschibre
            Nov 14 at 10:36













            1














            Ok. I found a working solution. Maybe not the cleanest one but it works :




            • Multi-instances on separate servers, sharing the same H2 database (network folder mount). I think it should work via remote TCP. MVCC has been activated on H2 (check its doc).


            • inbound-channel-adapter has scan-each-poll option activated to permit repolling files that could be previously ignored (if the process already begun by another instance). So, if another instance crashes, the file can be polled and processed again without restart for this very instance.

            • Option defaultAutoCommit is set to false on the DB.

            • I didn't use the FileSystemPersistentAcceptOnceFileListFilter because it was aggregating all read files in the metadatastore when one file get successfully processed. I didn't manage to use it in my context ...


            • I wrote my own conditions and actions in expressions through filter and transaction synchronization.



              <!-- Input -->
              <bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>
              <int-file:inbound-channel-adapter
              id="inputAdapter"
              channel="inputChannel"
              directory="file:${input.files.path}"
              comparator="lastModifiedFileComparator"
              scan-each-poll="true">
              <int:poller max-messages-per-poll="1" fixed-rate="5000">
              <int:transactional transaction-manager="transactionManager" isolation="READ_COMMITTED" propagation="REQUIRED" timeout="60000" synchronization-factory="syncFactory"/>
              </int:poller>
              </int-file:inbound-channel-adapter>

              <!-- Continue only if the concurrentmetadatastore doesn't contain the file. If if is not the case : insert it in the metadatastore -->
              <int:filter input-channel="inputChannel" output-channel="processChannel" discard-channel="nullChannel" throw-exception-on-rejection="false" expression="@jdbcMetadataStore.putIfAbsent(headers[file_name], headers[timestamp]) == null"/>

              <!-- Rollback by removing the file from the metadatastore -->
              <int:transaction-synchronization-factory id="syncFactory">
              <int:after-rollback expression="@jdbcMetadataStore.remove(headers[file_name])" />
              </int:transaction-synchronization-factory>

              <!-- Metadatastore configuration -->
              <bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
              <property name="url" value="jdbc:h2:file:${database.path}/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
              <property name="driverClassName" value="org.h2.Driver"/>
              <property name="username" value="${database.username}"/>
              <property name="password" value="${database.password}"/>
              <property name="maxIdle" value="4"/>
              <property name="defaultAutoCommit" value="false"/>
              </bean>

              <bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
              <constructor-arg ref="jdbcDataSource"/>
              </bean>

              <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
              <property name="dataSource" ref="jdbcDataSource"/>
              </bean>

              <!-- Workflow -->
              <int:chain input-channel="processChannel" output-channel="outputChannel">
              <int:service-activator ref="fileActivator" method="fileRead"/>
              <int:service-activator ref="fileActivator" method="fileProcess"/>
              <int:service-activator ref="fileActivator" method="fileAudit"/>
              </int:chain>


              <!-- Output -->
              <int-file:outbound-channel-adapter
              id="outputChannel"
              directory="file:${output.files.path}"
              filename-generator-expression ="payload.name">
              <!-- Delete the source file -->
              <int-file:request-handler-advice-chain>
              <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
              <property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
              </bean>
              </int-file:request-handler-advice-chain>
              </int-file:outbound-channel-adapter>



            Any improvement or other solution is welcome.






            share|improve this answer


























              1














              Ok. I found a working solution. Maybe not the cleanest one but it works :




              • Multi-instances on separate servers, sharing the same H2 database (network folder mount). I think it should work via remote TCP. MVCC has been activated on H2 (check its doc).


              • inbound-channel-adapter has scan-each-poll option activated to permit repolling files that could be previously ignored (if the process already begun by another instance). So, if another instance crashes, the file can be polled and processed again without restart for this very instance.

              • Option defaultAutoCommit is set to false on the DB.

              • I didn't use the FileSystemPersistentAcceptOnceFileListFilter because it was aggregating all read files in the metadatastore when one file get successfully processed. I didn't manage to use it in my context ...


              • I wrote my own conditions and actions in expressions through filter and transaction synchronization.



                <!-- Input -->
                <bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>
                <int-file:inbound-channel-adapter
                id="inputAdapter"
                channel="inputChannel"
                directory="file:${input.files.path}"
                comparator="lastModifiedFileComparator"
                scan-each-poll="true">
                <int:poller max-messages-per-poll="1" fixed-rate="5000">
                <int:transactional transaction-manager="transactionManager" isolation="READ_COMMITTED" propagation="REQUIRED" timeout="60000" synchronization-factory="syncFactory"/>
                </int:poller>
                </int-file:inbound-channel-adapter>

                <!-- Continue only if the concurrentmetadatastore doesn't contain the file. If if is not the case : insert it in the metadatastore -->
                <int:filter input-channel="inputChannel" output-channel="processChannel" discard-channel="nullChannel" throw-exception-on-rejection="false" expression="@jdbcMetadataStore.putIfAbsent(headers[file_name], headers[timestamp]) == null"/>

                <!-- Rollback by removing the file from the metadatastore -->
                <int:transaction-synchronization-factory id="syncFactory">
                <int:after-rollback expression="@jdbcMetadataStore.remove(headers[file_name])" />
                </int:transaction-synchronization-factory>

                <!-- Metadatastore configuration -->
                <bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
                <property name="url" value="jdbc:h2:file:${database.path}/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
                <property name="driverClassName" value="org.h2.Driver"/>
                <property name="username" value="${database.username}"/>
                <property name="password" value="${database.password}"/>
                <property name="maxIdle" value="4"/>
                <property name="defaultAutoCommit" value="false"/>
                </bean>

                <bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
                <constructor-arg ref="jdbcDataSource"/>
                </bean>

                <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
                <property name="dataSource" ref="jdbcDataSource"/>
                </bean>

                <!-- Workflow -->
                <int:chain input-channel="processChannel" output-channel="outputChannel">
                <int:service-activator ref="fileActivator" method="fileRead"/>
                <int:service-activator ref="fileActivator" method="fileProcess"/>
                <int:service-activator ref="fileActivator" method="fileAudit"/>
                </int:chain>


                <!-- Output -->
                <int-file:outbound-channel-adapter
                id="outputChannel"
                directory="file:${output.files.path}"
                filename-generator-expression ="payload.name">
                <!-- Delete the source file -->
                <int-file:request-handler-advice-chain>
                <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
                <property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
                </bean>
                </int-file:request-handler-advice-chain>
                </int-file:outbound-channel-adapter>



              Any improvement or other solution is welcome.






              share|improve this answer
























                1












                1








                1






                Ok. I found a working solution. Maybe not the cleanest one but it works :




                • Multi-instances on separate servers, sharing the same H2 database (network folder mount). I think it should work via remote TCP. MVCC has been activated on H2 (check its doc).


                • inbound-channel-adapter has scan-each-poll option activated to permit repolling files that could be previously ignored (if the process already begun by another instance). So, if another instance crashes, the file can be polled and processed again without restart for this very instance.

                • Option defaultAutoCommit is set to false on the DB.

                • I didn't use the FileSystemPersistentAcceptOnceFileListFilter because it was aggregating all read files in the metadatastore when one file get successfully processed. I didn't manage to use it in my context ...


                • I wrote my own conditions and actions in expressions through filter and transaction synchronization.



                  <!-- Input -->
                  <bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>
                  <int-file:inbound-channel-adapter
                  id="inputAdapter"
                  channel="inputChannel"
                  directory="file:${input.files.path}"
                  comparator="lastModifiedFileComparator"
                  scan-each-poll="true">
                  <int:poller max-messages-per-poll="1" fixed-rate="5000">
                  <int:transactional transaction-manager="transactionManager" isolation="READ_COMMITTED" propagation="REQUIRED" timeout="60000" synchronization-factory="syncFactory"/>
                  </int:poller>
                  </int-file:inbound-channel-adapter>

                  <!-- Continue only if the concurrentmetadatastore doesn't contain the file. If if is not the case : insert it in the metadatastore -->
                  <int:filter input-channel="inputChannel" output-channel="processChannel" discard-channel="nullChannel" throw-exception-on-rejection="false" expression="@jdbcMetadataStore.putIfAbsent(headers[file_name], headers[timestamp]) == null"/>

                  <!-- Rollback by removing the file from the metadatastore -->
                  <int:transaction-synchronization-factory id="syncFactory">
                  <int:after-rollback expression="@jdbcMetadataStore.remove(headers[file_name])" />
                  </int:transaction-synchronization-factory>

                  <!-- Metadatastore configuration -->
                  <bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
                  <property name="url" value="jdbc:h2:file:${database.path}/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
                  <property name="driverClassName" value="org.h2.Driver"/>
                  <property name="username" value="${database.username}"/>
                  <property name="password" value="${database.password}"/>
                  <property name="maxIdle" value="4"/>
                  <property name="defaultAutoCommit" value="false"/>
                  </bean>

                  <bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
                  <constructor-arg ref="jdbcDataSource"/>
                  </bean>

                  <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
                  <property name="dataSource" ref="jdbcDataSource"/>
                  </bean>

                  <!-- Workflow -->
                  <int:chain input-channel="processChannel" output-channel="outputChannel">
                  <int:service-activator ref="fileActivator" method="fileRead"/>
                  <int:service-activator ref="fileActivator" method="fileProcess"/>
                  <int:service-activator ref="fileActivator" method="fileAudit"/>
                  </int:chain>


                  <!-- Output -->
                  <int-file:outbound-channel-adapter
                  id="outputChannel"
                  directory="file:${output.files.path}"
                  filename-generator-expression ="payload.name">
                  <!-- Delete the source file -->
                  <int-file:request-handler-advice-chain>
                  <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
                  <property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
                  </bean>
                  </int-file:request-handler-advice-chain>
                  </int-file:outbound-channel-adapter>



                Any improvement or other solution is welcome.






                share|improve this answer












                Ok. I found a working solution. Maybe not the cleanest one but it works :




                • Multi-instances on separate servers, sharing the same H2 database (network folder mount). I think it should work via remote TCP. MVCC has been activated on H2 (check its doc).


                • inbound-channel-adapter has scan-each-poll option activated to permit repolling files that could be previously ignored (if the process already begun by another instance). So, if another instance crashes, the file can be polled and processed again without restart for this very instance.

                • Option defaultAutoCommit is set to false on the DB.

                • I didn't use the FileSystemPersistentAcceptOnceFileListFilter because it was aggregating all read files in the metadatastore when one file get successfully processed. I didn't manage to use it in my context ...


                • I wrote my own conditions and actions in expressions through filter and transaction synchronization.



                  <!-- Input -->
                  <bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>
                  <int-file:inbound-channel-adapter
                  id="inputAdapter"
                  channel="inputChannel"
                  directory="file:${input.files.path}"
                  comparator="lastModifiedFileComparator"
                  scan-each-poll="true">
                  <int:poller max-messages-per-poll="1" fixed-rate="5000">
                  <int:transactional transaction-manager="transactionManager" isolation="READ_COMMITTED" propagation="REQUIRED" timeout="60000" synchronization-factory="syncFactory"/>
                  </int:poller>
                  </int-file:inbound-channel-adapter>

                  <!-- Continue only if the concurrentmetadatastore doesn't contain the file. If if is not the case : insert it in the metadatastore -->
                  <int:filter input-channel="inputChannel" output-channel="processChannel" discard-channel="nullChannel" throw-exception-on-rejection="false" expression="@jdbcMetadataStore.putIfAbsent(headers[file_name], headers[timestamp]) == null"/>

                  <!-- Rollback by removing the file from the metadatastore -->
                  <int:transaction-synchronization-factory id="syncFactory">
                  <int:after-rollback expression="@jdbcMetadataStore.remove(headers[file_name])" />
                  </int:transaction-synchronization-factory>

                  <!-- Metadatastore configuration -->
                  <bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
                  <property name="url" value="jdbc:h2:file:${database.path}/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
                  <property name="driverClassName" value="org.h2.Driver"/>
                  <property name="username" value="${database.username}"/>
                  <property name="password" value="${database.password}"/>
                  <property name="maxIdle" value="4"/>
                  <property name="defaultAutoCommit" value="false"/>
                  </bean>

                  <bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
                  <constructor-arg ref="jdbcDataSource"/>
                  </bean>

                  <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
                  <property name="dataSource" ref="jdbcDataSource"/>
                  </bean>

                  <!-- Workflow -->
                  <int:chain input-channel="processChannel" output-channel="outputChannel">
                  <int:service-activator ref="fileActivator" method="fileRead"/>
                  <int:service-activator ref="fileActivator" method="fileProcess"/>
                  <int:service-activator ref="fileActivator" method="fileAudit"/>
                  </int:chain>


                  <!-- Output -->
                  <int-file:outbound-channel-adapter
                  id="outputChannel"
                  directory="file:${output.files.path}"
                  filename-generator-expression ="payload.name">
                  <!-- Delete the source file -->
                  <int-file:request-handler-advice-chain>
                  <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
                  <property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
                  </bean>
                  </int-file:request-handler-advice-chain>
                  </int-file:outbound-channel-adapter>



                Any improvement or other solution is welcome.







                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Nov 15 at 15:41









                cactuschibre

                464213




                464213






























                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.





                    Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


                    Please pay close attention to the following guidance:


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53260483%2fspring-integration-retry-configuration-with-multi-instances%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Florida Star v. B. J. F.

                    Error while running script in elastic search , gateway timeout

                    Adding quotations to stringified JSON object values