Spring-Integration: Config of TaskExecutors in the Pollers for Splitter and Routers to increae the concurrent processing -


enter image description herethis spring-integration-config have:

jdbc-inbound-adapter --> 1st queuechannel --> splitter --> router --> serviceactivator -->outbound-channel-adapter 

when execute load of 6000messages in table picked jdbc-inbound-adapter takes 34mins these messages processed.

and observed takes lot of time message move splitter router. decrease time can use below configuration: (please see poller fixed-delay , receive-timeout)

<si:splitter input-channel="firstqueuechannel" output-channel="splitqueuechannel" auto-startup="true" send-timeout="1">     <si:poller fixed-delay="50" time-unit="milliseconds" receive-timeout="5000" max-messages-per-poll="200" task-executor="messagetypesplittertaskexecutor"/> </si:splitter>  <task:executor id="messagetypesplittertaskexecutor" pool-size="2-20" queue-capacity="10" keep-alive="1" rejection-policy="caller_runs"/>  <si:router input-channel="splitqueuechannel" ref="messagetyperouter" method ="routemessage" auto-startup="true" >     <si:poller fixed-delay="50" time-unit="milliseconds" receive-timeout="5000" max-messages-per-poll="200" task-executor="messagetyperoutertaskexecutor"/>      </si:router>  <task:executor id="messagetyperoutertaskexecutor" pool-size="2-20" queue-capacity="10" keep-alive="1" rejection-policy="caller_runs"/> 

my question here is: should use fixed-rate or fixed-delay on poller , should receive-timeout should 0 suggested gary in link: spring integration - queue/poller seems exhaust threadpool without action

it great if 1 can point me in right direction. load of 400k messages on 30% load

update: jdbc-inbound-adapter config , adapter uses puts messages on 'firstqueuechannel':

 <si-jdbc:inbound-channel-adapter id="jdbcinboundadapter"     channel="firstqueuechannel" data-source="mydatasource"     auto-startup="true"     query="select * stage_table status='waiting'  update skip locked"     update="update stage_table set status='in_progress' id in (:id)"     max-rows-per-poll="100" row-mapper="rowmapper"     update-per-row="true">      <si:poller fixed-rate="5000">         <si:advice-chain>              <ref bean="txadvice"/>              <ref bean="inboundadapterconfiguration"/>          </si:advice-chain>     </si:poller> </si-jdbc:inbound-channel-adapter> 

update2: high-level details:

  jdbcinboundadapter  --> splitter ---> router ---> service-activators---> router (routed result category) ---> service activators --> router (routed result type) ---> service activators ---> aggregator  --> outboundchannel adapter 

below complete configuration:

<?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans"     xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"     xmlns:si="http://www.springframework.org/schema/integration"     xmlns:task="http://www.springframework.org/schema/task"     xmlns:si-jdbc="http://www.springframework.org/schema/integration/jdbc"     xmlns:tx="http://www.springframework.org/schema/tx"     xsi:schemalocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.2.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd http://www.springframework.org/schema/integration/jdbc http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">  <!--this channel linked inbound-adapter -->  <si:channel id="firstqueuechannel">     <si:queue capacity="200"/> </si:channel>  <bean id="mapperutil" class="com.foo.service.process.mapper.common.mapperutil"></bean>  <bean id="rowmapperforhighpriority" class="com.foo.service.process.mapper.highpriorityresultstatusmapper"></bean>  <bean id="inboundadapterpollingconfiguration" class="com.foo.service.download.retriever.impl.inboundadapterpollingconfigurationimpl">     <property name="channel" ref="firstqueuechannel"/>     <property name="jdbcinboundadapter" ref="jdbcinboundadapter"/>       </bean>    <!--#################1. jdbc-inbound-adapter: pushes messages channel ###########################-->  <si-jdbc:inbound-channel-adapter id="jdbcinboundadapter"     channel="firstqueuechannel" data-source="mydatasource"     auto-startup="true"     query="select * stage_table status='waiting'  update skip locked"     update="update stage_table set status='in_progress' id in (:id)"     max-rows-per-poll="100" row-mapper="rowmapper"     update-per-row="true">      <si:poller fixed-rate="5000">         <si:advice-chain>              <ref bean="txadvice"/>              <ref bean="inboundadapterconfiguration"/>          </si:advice-chain>     </si:poller> </si-jdbc:inbound-channel-adapter>  <tx:advice id="txadvice">     <tx:attributes>         <tx:method name="get*" read-only="false"/>         <tx:method name="*"/>     </tx:attributes> </tx:advice>  <!--#################2. splits payload of list individual payload ################################-->  <si:splitter input-channel="firstqueuechannel" output-channel="splitqueuechannel" auto-startup="true" send-timeout="1">     <si:poller fixed-delay="50" time-unit="milliseconds" receive-timeout="5000" max-messages-per-poll="200" task-executor="messagetypesplittertaskexecutor"/> </si:splitter>  <si:channel id="splitqueuechannel">     <si:queue capacity="200"/> </si:channel>  <task:executor id="messagetypesplittertaskexecutor" pool-size="2-20" queue-capacity="10" keep-alive="1" rejection-policy="caller_runs"/>  <!--##################################### 3. here 2 types of messages status, result , result_set router route messages depending on message type  #####################################-->   <si:router input-channel="splitqueuechannel" ref="messagetyperouter" method ="routemessage" auto-startup="true" >     <si:poller fixed-delay="50" time-unit="milliseconds" receive-timeout="5000" max-messages-per-poll="200" task-executor="messagetyperoutertaskexecutor"/>      </si:router>  <bean id="messagetypehighrouter" class="com.foo.service.process.router.messagetypehighrouter"/>  <task:executor id="messagetyperoutertaskexecutor" pool-size="2-20" queue-capacity="10" keep-alive="1" rejection-policy="caller_runs"/>   <!-- status messages go channel -->  <si:channel id="statushighchannel">     <si:queue capacity="50"/>  </si:channel>  <!--#####################################4.  service activator process status-messages #####################################--> <si:service-activator input-channel="statushighchannel" ref="statusconsumeservice" method="consumestatus" output-channel="ackhighchannel">     <si:poller task-executor="statusmessagehightaskexecutor" fixed-delay="1000"/> </si:service-activator> <task:executor id="statusmessagehightaskexecutor" pool-size="2-15" queue-capacity="5" keep-alive="1" rejection-policy="caller_runs"/>  <!-- acknowledge channel --> <si:channel id="ackhighchannel"/>   <!--#####################################5.  processing results  #####################################--> <!-- have router route results according ordertype on ordertypehandler channels       , on each channel have service activator process results accordingly   -->  <!-- below channel definitions result , result_set --> <si:channel id="highsimpleorderresultchannel">     <si:queue capacity="50"/> </si:channel>  <si:channel id="resulthighchannel">     <si:queue capacity="50"/> </si:channel>  <si:channel id="resultordersethighchannel">     <si:queue capacity="50"/> </si:channel>  <si:channel id="resultordersetsplithighchannel">     <si:queue capacity="50"/> </si:channel>     <!--##################################### 6. service-activator result_set object convert list<result> , returns #####################################--> <si:service-activator input-channel="resultordersethighchannel"     output-channel="resultordersetsplithighchannel" ref="resultordersetsplitter"     method="splitordersetresulttoorderresult">     <si:poller task-executor="resultmessagesethightaskexecutor"         fixed-delay="1000" /> </si:service-activator>   <task:executor id="resultmessagesethightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/>  <!--##################################### 7. bean converts result object list<result> object , list size 1 #####################################--> <bean id="simpleorderresultmapper" class="com.foo.service.process.transformer.simplemissionresulttransformer"/>  <si:transformer id="simpleorderresulthightransformer" input-channel="highsimpleorderresultchannel" method="transform" output-channel="resultordersetsplithighchannel" ref="simpleorderresultmapper"/>   <bean id="splitterbean" class="com.foo.service.process.impl.messagesplitter" />  <!--#####################################8. splitter : list<result> result #####################################--> <si:chain input-channel="resultordersetsplithighchannel" output-channel="resulthighchannel">     <si:splitter ref="splitterbean"/>  </si:chain>   <!--##################################### 9. service update ordertypecategory attribute in payload #####################################--> <si:service-activator input-channel="resulthighchannel"     output-channel="processresulthighchannel" ref="resultassigncategoryservice"     method="processcategoryresults">     <si:poller task-executor="resultmessagehightaskexecutor"         fixed-delay="1000" /> </si:service-activator>  <task:executor id="resultmessagehightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/>  <!--#####################################10. router uses ordertypecategory attribute route below channels #####################################--> <si:router input-channel="processresulthighchannel"  ref="ordertypecategoryrouter" method ="routehighmessage"/>   <!-- order-type-category channels --> <si:channel id="orderhighchannel">     <si:queue capacity="50"/> </si:channel> <si:channel id="notification_software_updatehighchannel">     <si:queue capacity="50"/> </si:channel> <si:channel id="notification_userhighchannel">     <si:queue capacity="50"/> </si:channel> <si:channel id="alerthighchannel">     <si:queue capacity="50"/> </si:channel> <si:channel id="file_watcherhighchannel">     <si:queue capacity="50"/> </si:channel> <si:channel id="errorhighchannel">     <si:queue capacity="50"/> </si:channel>  <si:transformer id="hightransformer" input-channel="errorhighchannel" method="transform" output-channel="ackhighchannel" ref="orderresultsmessagetransformer"/>  <!--11. service-activator update ordertype attribute in payload--> <si:service-activator input-channel="orderhighchannel" output-channel="processorderhighchannel" ref="resultassigncategoryservice" method="processorderresults" >     <si:poller task-executor="resultorderhightaskexecutor" fixed-delay="1000" />         </si:service-activator>  <task:executor id="resultorderhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/>  <!--#######12. router use ordertype attribute route different channels , service-activators on channels process messages#####--> <si:router input-channel="processorderhighchannel" ref ="ordertyperouter" method ="routehighmessage"/>  <!--############13. below list of channels , service-activators process messages when reach respective channels###############-->      <!-- get_properties handler   --> <si:channel id="get_propertieshighchannel">     <si:queue capacity="50"/> </si:channel>  <si:service-activator input-channel="get_propertieshighchannel" ref="getpropertieshandler" method="handlerawmissionresult" output-channel="prepareresultshighchannel">     <si:poller task-executor="getpropertieshandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator>  <task:executor id="getpropertieshandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/>   <!-- set_properties handler   --> <si:channel id="set_propertieshighchannel">     <si:queue capacity="50"/> </si:channel>  <si:service-activator input-channel="set_propertieshighchannel" ref="updateresultstatushandler" method="handlerawmissionresult" output-channel="prepareresultshighchannel">     <si:poller task-executor="setpropertieshandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator>  <task:executor id="setpropertieshandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/>  <!-- add_properties handler   -->  <si:channel id="add_agent_propertieshighchannel">     <si:queue capacity="50"/> </si:channel>  <si:service-activator input-channel="add_agent_propertieshighchannel" ref="updateresultstatushandler" method="handlerawmissionresult" output-channel="prepareresultshighchannel">     <si:poller task-executor="addagentpropertieshandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator>  <task:executor id="addagentpropertieshandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/>  <!-- file_download_ handler   -->     <si:channel id="file_download_highchannel">     <si:queue capacity="50"/> </si:channel>  <si:service-activator input-channel="file_download_highchannel" ref="updateresultstatushandler" method="handlerawmissionresult" output-channel="prepareresultshighchannel">     <si:poller task-executor="filedownloadhandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator>  <task:executor id="filedownloadhandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/>  <!-- groovy_script_execution handler   -->   <si:channel id="groovy_script_executionhighchannel">     <si:queue capacity="50"/> </si:channel>  <si:service-activator input-channel="groovy_script_executionhighchannel" ref="updateresultstatushandler" method="handlerawmissionresult" output-channel="prepareresultshighchannel">     <si:poller task-executor="groovyscriptexecutionhandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator>  <task:executor id="groovyscriptexecutionhandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/>  <!-- shell_script_execution handler   -->    <si:channel id="shell_script_executionhighchannel">     <si:queue capacity="50"/> </si:channel>  <si:service-activator input-channel="shell_script_executionhighchannel" ref="updateresultstatushandler" method="handlerawmissionresult" output-channel="prepareresultshighchannel">     <si:poller task-executor="shellscriptexecutionhandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator>  <task:executor id="shellscriptexecutionhandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/>  <!-- notify_device handler   -->     <si:channel id="notify_devicehighchannel">     <si:queue capacity="50"/> </si:channel>  <si:service-activator input-channel="notify_devicehighchannel" ref="updateresultstatushandler" method="handlerawmissionresult" output-channel="prepareresultshighchannel">     <si:poller task-executor="notifydevicehandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator>  <task:executor id="notifydevicehandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/>  <!-- agent_registration handler   -->    <si:channel id="agent_registrationhighchannel">     <si:queue capacity="50"/> </si:channel>  <si:service-activator input-channel="registrationhighchannel" ref="updateresultstatushandler" method="handlerawmissionresult" output-channel="prepareresultshighchannel">     <si:poller task-executor="registrationhandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator>  <task:executor id="registrationhandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/>  <!-- file_watcher handler   -->  <si:channel id="file_watcherhighchannel">     <si:queue capacity="50"/> </si:channel>  <si:service-activator input-channel="file_watcherhighchannel" ref="updateresultstatushandler" method="handlerawmissionresult" output-channel="prepareresultshighchannel">     <si:poller task-executor="filewatcherhandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator>  <task:executor id="filewatcherhandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/>  <!-- configure_fast_polling handler   -->    <si:channel id="configure_pollinghighchannel">     <si:queue capacity="50"/> </si:channel>  <si:service-activator input-channel="configure_pollinghighchannel" ref="updateresultstatushandler" method="handlerawmissionresult" output-channel="prepareresultshighchannel">     <si:poller task-executor="configurepollinghandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator>  <task:executor id="configurepollinghandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/>  <!-- file_upload_to_enterprise handler, murthy's service   --> <si:channel id="file_upload_highchannel">     <si:queue capacity="50"/> </si:channel>  <si:service-activator input-channel="file_upload_highchannel" expression="@filetransferhandler.handlemissionresult(payload,false,headers)" output-channel="prepareresultshighchannel">     <si:poller task-executor="fileuploadtoenterprisehandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator>  <task:executor id="fileuploadhandlerhightaskexecutor" pool-size="10-30" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/>   <!-- db_script_execution handler   -->   <si:channel id="db_script_executionhighchannel">     <si:queue capacity="50"/> </si:channel>  <si:service-activator input-channel="db_script_executionhighchannel" expression="@filetransferhandler.handlemissionresult(payload,false,headers)" output-channel="prepareresultshighchannel">     <si:poller task-executor="dbscriptexecutionhandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator>  <task:executor id="dbscriptexecutionhandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/>  <!-- file_search handler   -->   <si:channel id="file_searchhighchannel">     <si:queue capacity="50"/> </si:channel>  <si:service-activator input-channel="file_searchhighchannel" expression="@filesearchhandler.handlemissionresult(payload,headers)" output-channel="prepareresultshighchannel">     <si:poller task-executor="filesearchhandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator>  <task:executor id="filesearchhandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/>   <!--category handlers   -->  <!-- directory_watcher handler   --> <si:channel id="file_watcher_categoryhighchannel">     <si:queue capacity="50"/> </si:channel>   <si:service-activator input-channel="file_watcher_categoryhighchannel" expression="@filetransferhandler.handlemissionresult(payload,true,headers)" output-channel="prepareresultshighchannel">     <si:poller task-executor="filewatcherhandlercategoryhightaskexecutor" fixed-delay="1000"/> </si:service-activator>  <task:executor id="filewatcherhandlercategoryhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/>  <!-- below channels required aggregation --> <si:channel id="prepareresultshighchannel">     <si:queue capacity="500"/> </si:channel> <si:channel id="aggregatehighchannel">     <si:queue capacity="500"/> </si:channel> <si:channel id="aggregatefilesearchhighchannel">     <si:queue capacity="500"/> </si:channel> <si:channel id="aggregatecommonhighchannel">     <si:queue capacity="500"/> </si:channel> <si:channel id="discardhighchannel">     <si:queue capacity="500"/> </si:channel>  <!--######################################14.aggregator result messages######################################################################-->   <bean id="resultaggregatorbean" class="com.foo.service.consume.result.resultaggregator" />  <si:aggregator input-channel="prepareresultshighchannel" output-channel="aggregatehighchannel" ref="resultaggregatorbean" method="send" release-strategy="resultaggregatorbean" release-strategy-method="release" correlation-strategy="resultaggregatorbean" correlation-strategy-method="correlate" send-partial-result-on-expiry="false" send-timeout="300000" discard-channel="discardhighchannel"/>  <!--#####################################15 router - send different activators depending on type of result#############################################################################--> <bean id="aggregatorrouter" class="com.foo.service.process.router.aggregatorrouter"/>    <si:router input-channel="aggregatehighchannel"  ref="aggregatorrouter" method ="routehighmessage"/>       <bean id="aggregatorutil" class="com.foo.model.process.aggregate.aggregatorutil"/>  <bean id="aggregatefilesearchresultservice" class="com.foo.model.process.aggregate.impl.aggregatefilesearchresultserviceimpl">     <property name="missionresultdao" ref="missionresultdao"/>     <property name="aggregatorutil" ref="aggregatorutil"/> </bean>   <!-- service-activator process message on high priority, default or advance file-search-channel   --> <si:service-activator input-channel="aggregatefilesearchhighchannel"  ref="aggregatefilesearchresultservice" method="processaggregatorresponse" output-channel="ackhighchannel">     <si:poller task-executor="aggregateresulthightaskexecutor" fixed-delay="1000"/> </si:service-activator>  <!-- task executor process message on on high priority file-search-channel   --> <task:executor id="aggregateresulthightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/>  <!-- bean process message on common-channel   --> <bean id="aggregatecommonresultservice" class="com.foo.model.process.aggregate.impl.aggregatecommonresultserviceimpl">     <property name="missionresultdao" ref="missionresultdao"/>     <property name="aggregatorutil" ref="aggregatorutil"/> </bean>   <!-- service-activator process message on common-channel   --> <si:service-activator input-channel="aggregatecommonhighchannel"  ref="aggregatecommonresultservice" method="processaggregatorresponse" output-channel="ackhighchannel">     <si:poller task-executor="aggregatecommonhightaskexecutor" fixed-delay="1000"/> </si:service-activator>  <!-- task executor process message on common-channel   --> <task:executor id="aggregatecommonhightaskexecutor" pool-size="2-30" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/>   <bean id="evaluateresultservice" class="com.foo.model.process.aggregate.impl.evaluateresultserviceimpl">     <property name="missionresultdao" ref="missionresultdao"/> </bean>  <bean id="discardresultdao" class="com.foo.dao.mission.result.impl.discardresultdaoimpl">     <property name="sessionfactory" ref="mysessionfactory" />     <property name="missionresultdao" ref="missionresultdao"/> </bean>  <!-- bean process message on discard-channel   --> <bean id="discardresultservice" class="com.foo.model.process.aggregate.impl.discardresultserviceimpl">     <property name="discardresultdao" ref="discardresultdao"/>     <property name="aggregatorutil" ref="aggregatorutil"/> </bean>       <!-- service-activator process message on discard-channel   --> <si:service-activator input-channel="discardhighchannel"  ref="discardresultservice" method="applyfailurestatustomessagesondiscardchannel" output-channel="ackhighchannel">     <si:poller task-executor="discardhightaskexecutor" fixed-delay="1000"/> </si:service-activator>       <!-- task executor process message on discard-channel   --> <task:executor id="discardhightaskexecutor" pool-size="2-30" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/>  <!--###################################16.  dispatch result###########################################################--> <!-- final element update staging table message status sucess or failure --> <si:outbound-channel-adapter ref="acknowledgementservice" method="receivemessage" channel="ackhighchannel" order="1">        </si:outbound-channel-adapter>  </beans> 

why many queue channels? need 1 handoff point multi-thread; try invoking splitter on jdbc adapter poller's thread.

edit:

what trying achieve having pretty every channel queuechannel? causes incredible amount of thread context switching.

i suggest starting out having channels directchannels except splitqueuechannel , add further concurrency downstream if find need it. having pretty every channel queue channel excessive unless there specific trying achieve not seeing.


Comments

Popular posts from this blog

apache - Remove .php and add trailing slash in url using htaccess not loading css -

javascript - jQuery show full size image on click -