java - propagacion - Tener problemas para manejar excepciones en Spring Integration
propagacion de excepciones en java (1)
Cuando se lanza la excepción, se envuelve junto con requestMessage
a MessagingException
. Su propia excepción comercial está en la cause
y puede obtener acceso al requestMessage
de la propiedad MessagingException.failedMessage
.
Por lo tanto, parece que tiene todo lo que necesita para su caso de uso. Solo el problema es que antes de enviar al error-exchange
realmente debería tener algún <transformer>
en el flujo de error para convertir adecuadamente MessagingException
al mensaje adecuado para enviar al AMQP.
Soy nuevo en la integración de Spring y estoy confundido acerca de cómo enviar mensajes de error a una cola de error designada. Quiero que el mensaje de error sea un encabezado en el mensaje original y termine en una cola separada. Leí que esto se puede hacer con un enriquecedor de encabezado, que traté de implementar, pero no aparece nada en la cola de errores.
Además, ¿necesito una clase de manejo de excepciones para que los mensajes de error lleguen a la cola de errores o puedo lanzar excepciones en mis métodos de transformación?
Aquí está mi configuración xml:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/amqp
http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory id="connectionFactory" host="bigdata-rdp" username="myuser" password="mypass" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="first" auto-delete="false" durable="true" />
<rabbit:queue name="second" auto-delete="false" durable="true" />
<rabbit:queue name="errorQueue" auto-delete="false" durable="true" />
<int:poller default="true" fixed-rate="100"/>
<rabbit:fanout-exchange name="second-exchange" auto-delete="true" durable="true">
<rabbit:bindings>
<rabbit:binding queue="second" />
</rabbit:bindings>
</rabbit:fanout-exchange>
<rabbit:fanout-exchange name="error-exchange" auto-delete="true" durable="true">
<rabbit:bindings>
<rabbit:binding queue="errorQueue" />
</rabbit:bindings>
</rabbit:fanout-exchange>
<int-amqp:outbound-channel-adapter channel="messageOutputChannel" exchange-name="second-exchange" amqp-template="amqpTemplate" />
<int-amqp:inbound-channel-adapter channel="messageInputChannel" error-channel="errorInputChannel" queue-names="first" connection-factory="connectionFactory" concurrent-consumers="20" />
<int-amqp:outbound-channel-adapter channel="errorOutputChannel" exchange-name="error-exchange" amqp-template="amqpTemplate" />
<int:channel id="messageInputChannel" />
<int:channel id="messageOutputChannel"/>
<int:channel id="errorInputChannel"/>
<int:service-activator input-channel="errorInputChannel" output-channel= "errorOutputChannel" method = "handleError" >
<bean class="firstAttempt.MessageErrorHandler"/>
<int:chain input-channel="messageInputChannel" output-channel="messageOutputChannel">
<int:header-enricher>
<int:error-channel ref="errorInputChannel" />
</int:header-enricher>
<int:transformer method = "convert" >
<bean class="firstAttempt.JsonObjectConverter" />
</int:transformer>
<int:service-activator method="transform">
<bean class="firstAttempt.Transformer" />
</int:service-activator>
<int:object-to-string-transformer />
</int:chain>
</beans>
Clase de error:
public class ErrorHandler {
public String errorHandle(MessageHandlingException exception) {
return exception.getMessage();
Clase QualityScorer (llamada por transformador):
public class QualityScorer {
private Hashtable<String, String> table;
private final static String csvFile = "C://Users//john//Test.csv";
public QualityScorer() throws Exception {
table = new Hashtable<String, String>();
initializeTable();
}
private void initializeTable() throws Exception {
BufferedReader br = null;
String line = "";
String cvsSplitBy = ",";
try {
br = new BufferedReader(new FileReader(csvFile));
while ((line = br.readLine()) != null) {
String[] data = line.split(cvsSplitBy);
if(data.length > 6 && data[1].equals("1") && data[4].equals("0") && data[5].equals("1"))
table.putIfAbsent(data[3], data[1]);
}
} catch (FileNotFoundException e) {
throw new Exception("No file found");
} catch (IOException e) {
e.printStackTrace();
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public float getScore(JSONObject object) throws Exception {
float score;
if (object == null) {
throw new IllegalArgumentException("object");
}
if (!object.has("source")) {
throw new Exception("Object does not have a source");
}
if (!object.has("employer")) {
throw new Exception("Object does not have an employer");
}
String source = object.getString("Source");
String employer = object.getString("employer");
if (table.containsKey(employer) && !source.equals("packageOne")) {
score = 1;
} else {
score = -1;
}
return score;
}
}
En este momento, el mensaje que se está cargando no tiene origen, por lo que el programa debería lanzar la excepción MessagingException al MessageErrorHandler.
Código de transformador:
public class Transformer {
private QualityScorer qualityScorer;
public Transformer() throws Exception {
qualityScorer = new QualityScorer();
}
public JSONObject transform(JSONObject object) throws Exception {
float score = qualityScorer.getScore(object);
object.put("score", score);
return object;
}
}
En conjunto, el programa debería recibir un mensaje precargado de una cola, transformarlo y enviarlo a una segunda cola, lo que hace con éxito si la fuente se proporciona en el mensaje precargado. Estoy tratando de manejar los errores y hacerlos para que se envíen a una cola de errores como un encabezado de mensaje. Este problema me ha estado frustrando por un tiempo, ¡así que la ayuda es muy apreciada!
El error que se muestra actualmente en stacktrace es:
java.lang.NoSuchMethodError: org.springframework.messaging.MessageHandlingException: method <init>(Lorg/springframework/messaging/Message;Ljava/lang/Throwable;)V not found
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:96)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:89)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:129)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:129)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:110)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:56)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.processMessage(AmqpInboundChannelAdapter.java:246)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:203)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:97)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:97)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1421)
at java.lang.Thread.run(Thread.java:748)
Pero nada va a la cola de errores.