Wednesday, March 9, 2011

Synchronous Mule Service - exception handling

The problem

We have a mule service that is defined as synchronous. This means that it will handle incoming messages in the same thread as the endpoint that receives it. A high level overview of this is in the picture below.
What we found is that there is different behaviour depending on how the service is called, especially when it comes to exception handling. During our initial tests we called this service from the mule client in a synchronous way. In this case the default exception handler made sure we got an answer even if an exception was thrown.
In a production environment however we call the service asynchronously via JMS. In that use case the client timed out.

Below is a schematic representation of what happens in Mule in case everything goes well and with exception handling. Note that the client calls the service in an async way and that Mule executes the service in a sync way.



The behaviour we want is that Mule always returns a response even in the case of exception. This avoids the service clients to timeout while waiting for a response. To achieve this we need to write a custom exception strategy for Mule.

The solution ingredients

1) A custom exception strategy, some of the code is below. This code will make sure that a reply is send always. Since the exception strategy can be called multiple times, we'll store in the MuleSession if a reply is send already. There is also some specific handling needed for global endpoints and normal endpoints.
public final class ReqRepServiceExceptionStrategy extends DefaultServiceExceptionStrategy {
...
@Override
protected void defaultHandler(Throwable t) {
super.defaultHandler(t);
final MuleEvent muleEvent = RequestContext.getEvent();
if (!isEventAlreadyProcessed(muleEvent)) {
final MuleMessage replyMessage = new DefaultMuleMessage(null);
replyMessage.setExceptionPayload(new DefaultExceptionPayload(t));
ReplyToHandler replyToHandler;
try {
replyToHandler = getReplyToHandler(muleEvent.getMessage(), retrieveInboundEndpoint(muleEvent));
processReplyTo(muleEvent, replyMessage, replyToHandler, muleEvent.getMessage().getReplyTo());
} catch (MuleException e) {
logger.error("Cannot reply from Exception Strategy.", e);
}
} else {
logger.info("MuleEvent already processed once by this handler, not replying again.");
}
}

private ImmutableEndpoint retrieveInboundEndpoint(final MuleEvent muleEvent) throws MuleException {
InboundEndpoint inboundEndpoint;
String originatingAddress = (String) muleEvent.getMessage().getProperty(MuleProperties.MULE_ORIGINATING_ENDPOINT_PROPERTY);
//if not a global endpoint, will start with prefix
if (originatingAddress != null && originatingAddress.startsWith(ENDPOINT_PREFIX)) {
originatingAddress = originatingAddress.substring(ENDPOINT_PREFIX.length());
originatingAddress = originatingAddress.replaceFirst("\\.", "://");
}
inboundEndpoint = muleEvent.getMuleContext().getRegistry().lookupEndpointFactory().getInboundEndpoint(originatingAddress);
if (inboundEndpoint == null) {
throw new DefaultMuleException(MessageFactory.createStaticMessage("Cannot find original inbound endpoint for this message."));
}
return inboundEndpoint;
}

private boolean isEventAlreadyProcessed(final MuleEvent muleEvent) {
boolean eventAlreadyProcessed = false;
final Object replyAlreadySent = muleEvent.getSession().getProperty(REQ_REP_SERVICE_EXCEPTION_STRATEGY_REPLY_SENT);
if (replyAlreadySent != null && Boolean.class.isInstance(replyAlreadySent)) {
eventAlreadyProcessed = Boolean.class.cast(replyAlreadySent);
}
return eventAlreadyProcessed;
}
...
}

2) Add this exception handler to the Mule configuration. The configuration below is from an integration test: it will always throw a runtime exception in the component. This allows us to test the new exception strategy.
<service name="customHandler">
<inbound>
<vm:inbound-endpoint address="vm://custom.request" synchronous="true"/>
</inbound>
<component class="be.i8c.mule.service.RuntimeExComponent">
<custom-exception-strategy class="be.i8c.mule.service.ReqRepServiceExceptionStrategy"/>
</component>
</service>

3) Test the new exception handler and see it work.
@Test
public void testAsyncCustomExceptionStrategy() throws Exception {
final MuleMessage request = createRequest("UNIT TEST MESSAGE","vm://custom.reply");
final MuleMessage response = doAsyncReqRep(request, "vm://custom.request","vm://custom.reply");

assertNotNull(response);
assertNotNull(response.getExceptionPayload());
assertTrue(response.getExceptionPayload().getException().getCause().getMessage().indexOf(RuntimeExComponent.EX_MSG) >= 0);
}



7 comments:

  1. can you provide full source of ReqRepServiceExceptionStrategy
    i'm not able find out implementation of getReplyToHandler and processReplyTo

    ReplyDelete
  2. Can you give you mail address? I'm not allowed to copy that much text in a comment ...

    Note that this only works for Mule 2.x. In Mule 3 some of this has changed.

    ReplyDelete
  3. Hi,

    Thanks for this wonderful post.

    Could you please send me the code for the class ReqRepServiceExceptionStrategy.
    My email id is sanjayfromgomia@gmail.com

    Thanks,
    Sanjay

    ReplyDelete
  4. can I get a copy of your code also

    gerald.pierce@yahoo.com

    ReplyDelete
  5. Hi,
    Could I get a copy of the mule 3 version of the code. I seem to be having the same problem in my project.

    tbriers @ gmail.com

    thanks

    Tom.

    ReplyDelete
  6. Code for Mule 3 is in a new post: http://jeroen-v.blogspot.com/2011/07/synchronous-mule-service-exception.html

    ReplyDelete
  7. can i get a copy of your source code pls?

    for both 2.X and 3

    dramezh@gmail.com

    ReplyDelete