Spring-Integration: Config of TaskExecutors in the Pollers for Splitter and Routers to increae the concurrent processing -
this spring-integration-config have:
jdbc-inbound-adapter --> 1st queuechannel --> splitter --> router --> serviceactivator -->outbound-channel-adapter
when execute load of 6000messages in table picked jdbc-inbound-adapter takes 34mins these messages processed.
and observed takes lot of time message move splitter router. decrease time can use below configuration: (please see poller fixed-delay , receive-timeout)
<si:splitter input-channel="firstqueuechannel" output-channel="splitqueuechannel" auto-startup="true" send-timeout="1"> <si:poller fixed-delay="50" time-unit="milliseconds" receive-timeout="5000" max-messages-per-poll="200" task-executor="messagetypesplittertaskexecutor"/> </si:splitter> <task:executor id="messagetypesplittertaskexecutor" pool-size="2-20" queue-capacity="10" keep-alive="1" rejection-policy="caller_runs"/> <si:router input-channel="splitqueuechannel" ref="messagetyperouter" method ="routemessage" auto-startup="true" > <si:poller fixed-delay="50" time-unit="milliseconds" receive-timeout="5000" max-messages-per-poll="200" task-executor="messagetyperoutertaskexecutor"/> </si:router> <task:executor id="messagetyperoutertaskexecutor" pool-size="2-20" queue-capacity="10" keep-alive="1" rejection-policy="caller_runs"/>
my question here is: should use fixed-rate or fixed-delay on poller , should receive-timeout should 0 suggested gary in link: spring integration - queue/poller seems exhaust threadpool without action
it great if 1 can point me in right direction. load of 400k messages on 30% load
update: jdbc-inbound-adapter config , adapter uses puts messages on 'firstqueuechannel':
<si-jdbc:inbound-channel-adapter id="jdbcinboundadapter" channel="firstqueuechannel" data-source="mydatasource" auto-startup="true" query="select * stage_table status='waiting' update skip locked" update="update stage_table set status='in_progress' id in (:id)" max-rows-per-poll="100" row-mapper="rowmapper" update-per-row="true"> <si:poller fixed-rate="5000"> <si:advice-chain> <ref bean="txadvice"/> <ref bean="inboundadapterconfiguration"/> </si:advice-chain> </si:poller> </si-jdbc:inbound-channel-adapter>
update2: high-level details:
jdbcinboundadapter --> splitter ---> router ---> service-activators---> router (routed result category) ---> service activators --> router (routed result type) ---> service activators ---> aggregator --> outboundchannel adapter
below complete configuration:
<?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:si="http://www.springframework.org/schema/integration" xmlns:task="http://www.springframework.org/schema/task" xmlns:si-jdbc="http://www.springframework.org/schema/integration/jdbc" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemalocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.2.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd http://www.springframework.org/schema/integration/jdbc http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd"> <!--this channel linked inbound-adapter --> <si:channel id="firstqueuechannel"> <si:queue capacity="200"/> </si:channel> <bean id="mapperutil" class="com.foo.service.process.mapper.common.mapperutil"></bean> <bean id="rowmapperforhighpriority" class="com.foo.service.process.mapper.highpriorityresultstatusmapper"></bean> <bean id="inboundadapterpollingconfiguration" class="com.foo.service.download.retriever.impl.inboundadapterpollingconfigurationimpl"> <property name="channel" ref="firstqueuechannel"/> <property name="jdbcinboundadapter" ref="jdbcinboundadapter"/> </bean> <!--#################1. jdbc-inbound-adapter: pushes messages channel ###########################--> <si-jdbc:inbound-channel-adapter id="jdbcinboundadapter" channel="firstqueuechannel" data-source="mydatasource" auto-startup="true" query="select * stage_table status='waiting' update skip locked" update="update stage_table set status='in_progress' id in (:id)" max-rows-per-poll="100" row-mapper="rowmapper" update-per-row="true"> <si:poller fixed-rate="5000"> <si:advice-chain> <ref bean="txadvice"/> <ref bean="inboundadapterconfiguration"/> </si:advice-chain> </si:poller> </si-jdbc:inbound-channel-adapter> <tx:advice id="txadvice"> <tx:attributes> <tx:method name="get*" read-only="false"/> <tx:method name="*"/> </tx:attributes> </tx:advice> <!--#################2. splits payload of list individual payload ################################--> <si:splitter input-channel="firstqueuechannel" output-channel="splitqueuechannel" auto-startup="true" send-timeout="1"> <si:poller fixed-delay="50" time-unit="milliseconds" receive-timeout="5000" max-messages-per-poll="200" task-executor="messagetypesplittertaskexecutor"/> </si:splitter> <si:channel id="splitqueuechannel"> <si:queue capacity="200"/> </si:channel> <task:executor id="messagetypesplittertaskexecutor" pool-size="2-20" queue-capacity="10" keep-alive="1" rejection-policy="caller_runs"/> <!--##################################### 3. here 2 types of messages status, result , result_set router route messages depending on message type #####################################--> <si:router input-channel="splitqueuechannel" ref="messagetyperouter" method ="routemessage" auto-startup="true" > <si:poller fixed-delay="50" time-unit="milliseconds" receive-timeout="5000" max-messages-per-poll="200" task-executor="messagetyperoutertaskexecutor"/> </si:router> <bean id="messagetypehighrouter" class="com.foo.service.process.router.messagetypehighrouter"/> <task:executor id="messagetyperoutertaskexecutor" pool-size="2-20" queue-capacity="10" keep-alive="1" rejection-policy="caller_runs"/> <!-- status messages go channel --> <si:channel id="statushighchannel"> <si:queue capacity="50"/> </si:channel> <!--#####################################4. service activator process status-messages #####################################--> <si:service-activator input-channel="statushighchannel" ref="statusconsumeservice" method="consumestatus" output-channel="ackhighchannel"> <si:poller task-executor="statusmessagehightaskexecutor" fixed-delay="1000"/> </si:service-activator> <task:executor id="statusmessagehightaskexecutor" pool-size="2-15" queue-capacity="5" keep-alive="1" rejection-policy="caller_runs"/> <!-- acknowledge channel --> <si:channel id="ackhighchannel"/> <!--#####################################5. processing results #####################################--> <!-- have router route results according ordertype on ordertypehandler channels , on each channel have service activator process results accordingly --> <!-- below channel definitions result , result_set --> <si:channel id="highsimpleorderresultchannel"> <si:queue capacity="50"/> </si:channel> <si:channel id="resulthighchannel"> <si:queue capacity="50"/> </si:channel> <si:channel id="resultordersethighchannel"> <si:queue capacity="50"/> </si:channel> <si:channel id="resultordersetsplithighchannel"> <si:queue capacity="50"/> </si:channel> <!--##################################### 6. service-activator result_set object convert list<result> , returns #####################################--> <si:service-activator input-channel="resultordersethighchannel" output-channel="resultordersetsplithighchannel" ref="resultordersetsplitter" method="splitordersetresulttoorderresult"> <si:poller task-executor="resultmessagesethightaskexecutor" fixed-delay="1000" /> </si:service-activator> <task:executor id="resultmessagesethightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/> <!--##################################### 7. bean converts result object list<result> object , list size 1 #####################################--> <bean id="simpleorderresultmapper" class="com.foo.service.process.transformer.simplemissionresulttransformer"/> <si:transformer id="simpleorderresulthightransformer" input-channel="highsimpleorderresultchannel" method="transform" output-channel="resultordersetsplithighchannel" ref="simpleorderresultmapper"/> <bean id="splitterbean" class="com.foo.service.process.impl.messagesplitter" /> <!--#####################################8. splitter : list<result> result #####################################--> <si:chain input-channel="resultordersetsplithighchannel" output-channel="resulthighchannel"> <si:splitter ref="splitterbean"/> </si:chain> <!--##################################### 9. service update ordertypecategory attribute in payload #####################################--> <si:service-activator input-channel="resulthighchannel" output-channel="processresulthighchannel" ref="resultassigncategoryservice" method="processcategoryresults"> <si:poller task-executor="resultmessagehightaskexecutor" fixed-delay="1000" /> </si:service-activator> <task:executor id="resultmessagehightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/> <!--#####################################10. router uses ordertypecategory attribute route below channels #####################################--> <si:router input-channel="processresulthighchannel" ref="ordertypecategoryrouter" method ="routehighmessage"/> <!-- order-type-category channels --> <si:channel id="orderhighchannel"> <si:queue capacity="50"/> </si:channel> <si:channel id="notification_software_updatehighchannel"> <si:queue capacity="50"/> </si:channel> <si:channel id="notification_userhighchannel"> <si:queue capacity="50"/> </si:channel> <si:channel id="alerthighchannel"> <si:queue capacity="50"/> </si:channel> <si:channel id="file_watcherhighchannel"> <si:queue capacity="50"/> </si:channel> <si:channel id="errorhighchannel"> <si:queue capacity="50"/> </si:channel> <si:transformer id="hightransformer" input-channel="errorhighchannel" method="transform" output-channel="ackhighchannel" ref="orderresultsmessagetransformer"/> <!--11. service-activator update ordertype attribute in payload--> <si:service-activator input-channel="orderhighchannel" output-channel="processorderhighchannel" ref="resultassigncategoryservice" method="processorderresults" > <si:poller task-executor="resultorderhightaskexecutor" fixed-delay="1000" /> </si:service-activator> <task:executor id="resultorderhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/> <!--#######12. router use ordertype attribute route different channels , service-activators on channels process messages#####--> <si:router input-channel="processorderhighchannel" ref ="ordertyperouter" method ="routehighmessage"/> <!--############13. below list of channels , service-activators process messages when reach respective channels###############--> <!-- get_properties handler --> <si:channel id="get_propertieshighchannel"> <si:queue capacity="50"/> </si:channel> <si:service-activator input-channel="get_propertieshighchannel" ref="getpropertieshandler" method="handlerawmissionresult" output-channel="prepareresultshighchannel"> <si:poller task-executor="getpropertieshandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator> <task:executor id="getpropertieshandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/> <!-- set_properties handler --> <si:channel id="set_propertieshighchannel"> <si:queue capacity="50"/> </si:channel> <si:service-activator input-channel="set_propertieshighchannel" ref="updateresultstatushandler" method="handlerawmissionresult" output-channel="prepareresultshighchannel"> <si:poller task-executor="setpropertieshandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator> <task:executor id="setpropertieshandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/> <!-- add_properties handler --> <si:channel id="add_agent_propertieshighchannel"> <si:queue capacity="50"/> </si:channel> <si:service-activator input-channel="add_agent_propertieshighchannel" ref="updateresultstatushandler" method="handlerawmissionresult" output-channel="prepareresultshighchannel"> <si:poller task-executor="addagentpropertieshandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator> <task:executor id="addagentpropertieshandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/> <!-- file_download_ handler --> <si:channel id="file_download_highchannel"> <si:queue capacity="50"/> </si:channel> <si:service-activator input-channel="file_download_highchannel" ref="updateresultstatushandler" method="handlerawmissionresult" output-channel="prepareresultshighchannel"> <si:poller task-executor="filedownloadhandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator> <task:executor id="filedownloadhandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/> <!-- groovy_script_execution handler --> <si:channel id="groovy_script_executionhighchannel"> <si:queue capacity="50"/> </si:channel> <si:service-activator input-channel="groovy_script_executionhighchannel" ref="updateresultstatushandler" method="handlerawmissionresult" output-channel="prepareresultshighchannel"> <si:poller task-executor="groovyscriptexecutionhandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator> <task:executor id="groovyscriptexecutionhandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/> <!-- shell_script_execution handler --> <si:channel id="shell_script_executionhighchannel"> <si:queue capacity="50"/> </si:channel> <si:service-activator input-channel="shell_script_executionhighchannel" ref="updateresultstatushandler" method="handlerawmissionresult" output-channel="prepareresultshighchannel"> <si:poller task-executor="shellscriptexecutionhandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator> <task:executor id="shellscriptexecutionhandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/> <!-- notify_device handler --> <si:channel id="notify_devicehighchannel"> <si:queue capacity="50"/> </si:channel> <si:service-activator input-channel="notify_devicehighchannel" ref="updateresultstatushandler" method="handlerawmissionresult" output-channel="prepareresultshighchannel"> <si:poller task-executor="notifydevicehandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator> <task:executor id="notifydevicehandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/> <!-- agent_registration handler --> <si:channel id="agent_registrationhighchannel"> <si:queue capacity="50"/> </si:channel> <si:service-activator input-channel="registrationhighchannel" ref="updateresultstatushandler" method="handlerawmissionresult" output-channel="prepareresultshighchannel"> <si:poller task-executor="registrationhandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator> <task:executor id="registrationhandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/> <!-- file_watcher handler --> <si:channel id="file_watcherhighchannel"> <si:queue capacity="50"/> </si:channel> <si:service-activator input-channel="file_watcherhighchannel" ref="updateresultstatushandler" method="handlerawmissionresult" output-channel="prepareresultshighchannel"> <si:poller task-executor="filewatcherhandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator> <task:executor id="filewatcherhandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/> <!-- configure_fast_polling handler --> <si:channel id="configure_pollinghighchannel"> <si:queue capacity="50"/> </si:channel> <si:service-activator input-channel="configure_pollinghighchannel" ref="updateresultstatushandler" method="handlerawmissionresult" output-channel="prepareresultshighchannel"> <si:poller task-executor="configurepollinghandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator> <task:executor id="configurepollinghandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/> <!-- file_upload_to_enterprise handler, murthy's service --> <si:channel id="file_upload_highchannel"> <si:queue capacity="50"/> </si:channel> <si:service-activator input-channel="file_upload_highchannel" expression="@filetransferhandler.handlemissionresult(payload,false,headers)" output-channel="prepareresultshighchannel"> <si:poller task-executor="fileuploadtoenterprisehandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator> <task:executor id="fileuploadhandlerhightaskexecutor" pool-size="10-30" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/> <!-- db_script_execution handler --> <si:channel id="db_script_executionhighchannel"> <si:queue capacity="50"/> </si:channel> <si:service-activator input-channel="db_script_executionhighchannel" expression="@filetransferhandler.handlemissionresult(payload,false,headers)" output-channel="prepareresultshighchannel"> <si:poller task-executor="dbscriptexecutionhandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator> <task:executor id="dbscriptexecutionhandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/> <!-- file_search handler --> <si:channel id="file_searchhighchannel"> <si:queue capacity="50"/> </si:channel> <si:service-activator input-channel="file_searchhighchannel" expression="@filesearchhandler.handlemissionresult(payload,headers)" output-channel="prepareresultshighchannel"> <si:poller task-executor="filesearchhandlerhightaskexecutor" fixed-delay="1000"/> </si:service-activator> <task:executor id="filesearchhandlerhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/> <!--category handlers --> <!-- directory_watcher handler --> <si:channel id="file_watcher_categoryhighchannel"> <si:queue capacity="50"/> </si:channel> <si:service-activator input-channel="file_watcher_categoryhighchannel" expression="@filetransferhandler.handlemissionresult(payload,true,headers)" output-channel="prepareresultshighchannel"> <si:poller task-executor="filewatcherhandlercategoryhightaskexecutor" fixed-delay="1000"/> </si:service-activator> <task:executor id="filewatcherhandlercategoryhightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/> <!-- below channels required aggregation --> <si:channel id="prepareresultshighchannel"> <si:queue capacity="500"/> </si:channel> <si:channel id="aggregatehighchannel"> <si:queue capacity="500"/> </si:channel> <si:channel id="aggregatefilesearchhighchannel"> <si:queue capacity="500"/> </si:channel> <si:channel id="aggregatecommonhighchannel"> <si:queue capacity="500"/> </si:channel> <si:channel id="discardhighchannel"> <si:queue capacity="500"/> </si:channel> <!--######################################14.aggregator result messages######################################################################--> <bean id="resultaggregatorbean" class="com.foo.service.consume.result.resultaggregator" /> <si:aggregator input-channel="prepareresultshighchannel" output-channel="aggregatehighchannel" ref="resultaggregatorbean" method="send" release-strategy="resultaggregatorbean" release-strategy-method="release" correlation-strategy="resultaggregatorbean" correlation-strategy-method="correlate" send-partial-result-on-expiry="false" send-timeout="300000" discard-channel="discardhighchannel"/> <!--#####################################15 router - send different activators depending on type of result#############################################################################--> <bean id="aggregatorrouter" class="com.foo.service.process.router.aggregatorrouter"/> <si:router input-channel="aggregatehighchannel" ref="aggregatorrouter" method ="routehighmessage"/> <bean id="aggregatorutil" class="com.foo.model.process.aggregate.aggregatorutil"/> <bean id="aggregatefilesearchresultservice" class="com.foo.model.process.aggregate.impl.aggregatefilesearchresultserviceimpl"> <property name="missionresultdao" ref="missionresultdao"/> <property name="aggregatorutil" ref="aggregatorutil"/> </bean> <!-- service-activator process message on high priority, default or advance file-search-channel --> <si:service-activator input-channel="aggregatefilesearchhighchannel" ref="aggregatefilesearchresultservice" method="processaggregatorresponse" output-channel="ackhighchannel"> <si:poller task-executor="aggregateresulthightaskexecutor" fixed-delay="1000"/> </si:service-activator> <!-- task executor process message on on high priority file-search-channel --> <task:executor id="aggregateresulthightaskexecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/> <!-- bean process message on common-channel --> <bean id="aggregatecommonresultservice" class="com.foo.model.process.aggregate.impl.aggregatecommonresultserviceimpl"> <property name="missionresultdao" ref="missionresultdao"/> <property name="aggregatorutil" ref="aggregatorutil"/> </bean> <!-- service-activator process message on common-channel --> <si:service-activator input-channel="aggregatecommonhighchannel" ref="aggregatecommonresultservice" method="processaggregatorresponse" output-channel="ackhighchannel"> <si:poller task-executor="aggregatecommonhightaskexecutor" fixed-delay="1000"/> </si:service-activator> <!-- task executor process message on common-channel --> <task:executor id="aggregatecommonhightaskexecutor" pool-size="2-30" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/> <bean id="evaluateresultservice" class="com.foo.model.process.aggregate.impl.evaluateresultserviceimpl"> <property name="missionresultdao" ref="missionresultdao"/> </bean> <bean id="discardresultdao" class="com.foo.dao.mission.result.impl.discardresultdaoimpl"> <property name="sessionfactory" ref="mysessionfactory" /> <property name="missionresultdao" ref="missionresultdao"/> </bean> <!-- bean process message on discard-channel --> <bean id="discardresultservice" class="com.foo.model.process.aggregate.impl.discardresultserviceimpl"> <property name="discardresultdao" ref="discardresultdao"/> <property name="aggregatorutil" ref="aggregatorutil"/> </bean> <!-- service-activator process message on discard-channel --> <si:service-activator input-channel="discardhighchannel" ref="discardresultservice" method="applyfailurestatustomessagesondiscardchannel" output-channel="ackhighchannel"> <si:poller task-executor="discardhightaskexecutor" fixed-delay="1000"/> </si:service-activator> <!-- task executor process message on discard-channel --> <task:executor id="discardhightaskexecutor" pool-size="2-30" queue-capacity="2" keep-alive="1" rejection-policy="caller_runs"/> <!--###################################16. dispatch result###########################################################--> <!-- final element update staging table message status sucess or failure --> <si:outbound-channel-adapter ref="acknowledgementservice" method="receivemessage" channel="ackhighchannel" order="1"> </si:outbound-channel-adapter> </beans>
why many queue channels? need 1 handoff point multi-thread; try invoking splitter on jdbc adapter poller's thread.
edit:
what trying achieve having pretty every channel queuechannel
? causes incredible amount of thread context switching.
i suggest starting out having channels directchannel
s except splitqueuechannel
, add further concurrency downstream if find need it. having pretty every channel queue channel excessive unless there specific trying achieve not seeing.
Comments
Post a Comment