We have a mule service that is defined as synchronous. This means that it will handle incoming messages in the same thread as the endpoint that receives it. A high level overview of this is in the picture below.
What we found is that there is different behaviour depending on how the service is called, especially when it comes to exception handling. During our initial tests we called this service from the mule client in a synchronous way. In this case the default exception handler made sure we got an answer even if an exception was thrown.
In a production environment however we call the service asynchronously via JMS. In that use case the client timed out.
Below is a schematic representation of what happens in Mule in case everything goes well and with exception handling. Note that the client calls the service in an async way and that Mule executes the service in a sync way.
The behaviour we want is that Mule always returns a response even in the case of exception. This avoids the service clients to timeout while waiting for a response. To achieve this we need to write a custom exception strategy for Mule.
The solution ingredients
1) A custom exception strategy, some of the code is below. This code will make sure that a reply is send always. Since the exception strategy can be called multiple times, we'll store in the MuleSession if a reply is send already. There is also some specific handling needed for global endpoints and normal endpoints.
public final class ReqRepServiceExceptionStrategy extends DefaultServiceExceptionStrategy {
...
@Override
protected void defaultHandler(Throwable t) {
super.defaultHandler(t);
final MuleEvent muleEvent = RequestContext.getEvent();
if (!isEventAlreadyProcessed(muleEvent)) {
final MuleMessage replyMessage = new DefaultMuleMessage(null);
replyMessage.setExceptionPayload(new DefaultExceptionPayload(t));
ReplyToHandler replyToHandler;
try {
replyToHandler = getReplyToHandler(muleEvent.getMessage(), retrieveInboundEndpoint(muleEvent));
processReplyTo(muleEvent, replyMessage, replyToHandler, muleEvent.getMessage().getReplyTo());
} catch (MuleException e) {
logger.error("Cannot reply from Exception Strategy.", e);
}
} else {
logger.info("MuleEvent already processed once by this handler, not replying again.");
}
}
private ImmutableEndpoint retrieveInboundEndpoint(final MuleEvent muleEvent) throws MuleException {
InboundEndpoint inboundEndpoint;
String originatingAddress = (String) muleEvent.getMessage().getProperty(MuleProperties.MULE_ORIGINATING_ENDPOINT_PROPERTY);
//if not a global endpoint, will start with prefix
if (originatingAddress != null && originatingAddress.startsWith(ENDPOINT_PREFIX)) {
originatingAddress = originatingAddress.substring(ENDPOINT_PREFIX.length());
originatingAddress = originatingAddress.replaceFirst("\\.", "://");
}
inboundEndpoint = muleEvent.getMuleContext().getRegistry().lookupEndpointFactory().getInboundEndpoint(originatingAddress);
if (inboundEndpoint == null) {
throw new DefaultMuleException(MessageFactory.createStaticMessage("Cannot find original inbound endpoint for this message."));
}
return inboundEndpoint;
}
private boolean isEventAlreadyProcessed(final MuleEvent muleEvent) {
boolean eventAlreadyProcessed = false;
final Object replyAlreadySent = muleEvent.getSession().getProperty(REQ_REP_SERVICE_EXCEPTION_STRATEGY_REPLY_SENT);
if (replyAlreadySent != null && Boolean.class.isInstance(replyAlreadySent)) {
eventAlreadyProcessed = Boolean.class.cast(replyAlreadySent);
}
return eventAlreadyProcessed;
}
...
}
2) Add this exception handler to the Mule configuration. The configuration below is from an integration test: it will always throw a runtime exception in the component. This allows us to test the new exception strategy.
<service name="customHandler">
<inbound>
<vm:inbound-endpoint address="vm://custom.request" synchronous="true"/>
</inbound>
<component class="be.i8c.mule.service.RuntimeExComponent">
<custom-exception-strategy class="be.i8c.mule.service.ReqRepServiceExceptionStrategy"/>
</component>
</service>
3) Test the new exception handler and see it work.
@Test
public void testAsyncCustomExceptionStrategy() throws Exception {
final MuleMessage request = createRequest("UNIT TEST MESSAGE","vm://custom.reply");
final MuleMessage response = doAsyncReqRep(request, "vm://custom.request","vm://custom.reply");
assertNotNull(response);
assertNotNull(response.getExceptionPayload());
assertTrue(response.getExceptionPayload().getException().getCause().getMessage().indexOf(RuntimeExComponent.EX_MSG) >= 0);
}