java multithreading redis spring-integration aggregator

java - Deadlock usando Aggregator+Redis



multithreading spring-integration (1)

Esta publicación está relacionada con esta

¿Interbloqueo de integración de primavera utilizando Aggregator + MessageStoreReaper + Redis?

pero este mensaje es demasiado largo para publicar. Continúo con la publicación original

Actualicé a la última versión de Java 7 1.7.0_60-b19 pero el problema todavía está allí.

Hice otro volcado de hebras y encontré el mismo problema: todos los DefaultMessageListenerContainers (recuento 20) están bloqueados por taskScheduler (entityScheduler-3) en la invocación de bloqueo de AbstractCorrelatingMessageHandler.

Aquí está la configuración del programador y agregador:

<task:scheduled-tasks scheduler="entityScheduler"> <task:scheduled ref="entityReaperBean" method="run" fixed-rate="${entity.aggregator.timeout}" /> </task:scheduled-tasks> <bean id="entityReaperBean" class="org.springframework.integration.store.MessageGroupStoreReaper"> <property name="messageGroupStore" ref="entityAggregatorRedisMessageStore" /> <property name="timeout" value="${entity.aggregator.reaper.timeout}" /> </bean> <bean id="entityAggregatorRedisMessageStore" class="org.springframework.integration.redis.store.RedisMessageStore"> <constructor-arg ref="entityRedisConnectionFactory" /> </bean> <int:aggregator id="entityAggregator" send-partial-result-on-expiry="true" release-strategy-expression="size() >= ${transactions-receiver.batch_size}" correlation-strategy-expression="payload.entityCode" expire-groups-upon-completion="true" message-store="entityAggregatorRedisMessageStore" />

Aquí está la sección de volcado de hilo:

"entityScheduler-3" - Thread t@188 java.lang.Thread.State: WAITING at sun.misc.Unsafe.park(Native Method) - parking to wait for <545079d6> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1085) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Locked ownable synchronizers: - locked <66d9180c> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) "DefaultMessageListenerContainer-5" - Thread t@48 java.lang.Thread.State: WAITING at sun.misc.Unsafe.park(Native Method) - waiting to lock <66d9180c> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) owned by "entityScheduler-3" t@188 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:894) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1221) at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:340) at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:223) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150) at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106) at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204) at com.sun.proxy.$Proxy16.handleMessage(Unknown Source) at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148) at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330) at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150) at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106) at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204) at com.sun.proxy.$Proxy17.handleMessage(Unknown Source) at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148) at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330) at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150) at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106) at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204) at com.sun.proxy.$Proxy15.handleMessage(Unknown Source) at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148) at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330) at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150) at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106) at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204) at com.sun.proxy.$Proxy15.handleMessage(Unknown Source) at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:131) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150) at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106) at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204) at com.sun.proxy.$Proxy18.handleMessage(Unknown Source) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102) at org.springframework.integration.jms.SubscribableJmsChannel$DispatchingMessageListener.onMessage(SubscribableJmsChannel.java:148) at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:562) at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:500) at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:468) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1103) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1095) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:992) at java.lang.Thread.run(Thread.java:745) Locked ownable synchronizers: - None

¿Es posible que, por alguna razón, el método handleMessageInternal de AbstractCorrelatingMessageHandler no se desbloquee? Uso un almacén de mensajes de Redis y veo que MsgStoreReaper también usa un bloqueo. ¿Podría ser eso un problema?

muchas gracias por su ayuda aquí! Saludos a Guzman

ACTUALIZAR:

Encontré este mensaje en el registro:

2014-07-23 22:47:26,967 ERROR TaskUtils$LoggingErrorHandler:95 - Unexpected error occurred in scheduled task. 2014-07-23 22:47:26,968 ERROR RedisMessageStore:122 - Exception in expiry callback 2014-07-23 22:47:26,971 ERROR TaskUtils$LoggingErrorHandler:95 - Unexpected error occurred in scheduled task.

pero no hay excepción ni stacktrace. ¿El error TaskUtils te dice algo?


En realidad, veo un lugar ...

finally { if (removeGroup) { this.remove(group); } lock.unlock(); }

... si el almacén de mensajes arroja una excepción durante la eliminación, omitiríamos el desbloqueo, ¿ves algo en el registro?