Showing posts with label open-source. Show all posts
Showing posts with label open-source. Show all posts

Friday, July 8, 2011

Synchronous Mule Service - exception handling - Mule 3

In a previous post I outlined a solution to have an exception send back when doing a synchronous service call over JMS in Mule. In the intro of that post you can find the exact problem description. This post is about the exact same topic. The only difference in setup is that we'll use Mule 3.

Upgrading the exception handler from Mule 2 to 3
All the facets of migrating to a new version of Mule deserves a separate post, so I'll stick to the exception handling problem described before.

Version 3 of the Mule DefaultServiceExceptionStrategy deprecates the defaultHandler method. The new one to use is doHandleException. This new method gives you direct access to the MuleEvent so that's an improvement. In my newer implementation I can also get the endpoint more easy from the muleEvent and it doesn't require some Mule registry lookup.

Getting a hold of the replyTo object has become a bit more difficult and it forced me to narrow the usage of this class to JMS only. This is because there is no longer a convenient method like muleEvent.getMessage().getReplyTo(). Now you need to resolve it from a property, which is specific to the JMS connector.

For those interested I included all the code at the bottom of this post.

Using flow
Even though I was pretty pleased about the code I wrote, I still wondered if the new Flow concept in Mule 3 wouldn't allow me to get rid of the custom exception handler. After all it is not so strange that you want to be able to respond to a JMS request in case something goes wrong, not?

The thing I actually found was that using a Flow makes it worse, because by default flow doesn't reply at all to a JMS request: not in case of success or error. There is actually a bug open for this on JIRA: http://www.mulesoft.org/jira/browse/MULE-5307

After fiddling around a bit I did find that everything does work as expected with a VM endpoint! So in case you have an inbound-vm endpoint that is marked as exchange-pattern="request-response", you will always get a response (success and error cases).
And what is more: it also works if you have an inbound JMS endpoint and forward the request to a VM inpoint of a flow. It looks like the configuration below. Notice that you still need a response transformer that will transform the Mule exception payload to whatever you need. If you do not have a transformer you will get an empty message on your JMS response queue. This is because the JMS connector ignores the exception payload and takes the normal payload, which is null.

<mule>
    <jms:endpoint name="jms.queue.request" queue="${mq.queue.request.in}"
                  transformer-refs="JmsToObject"
                  responseTransformer-refs="exPayloadToResponse objectToJms"/>
    <vm:endpoint name="vm.request" address="vm://vm.request"/>

    <model>
        <service name="requestViaJMS">
            <inbound>
                <inbound-endpoint ref="jms.queue.request"/>
            </inbound>
            <outbound>
                <pass-through-router>
                    <outbound-endpoint ref="vm.request" exchange-pattern="request-response"/>
                </pass-through-router>
            </outbound>
        </service>
    </model>

    <flow name="requestViaVM">
        <inbound-endpoint ref="vm.request" exchange-pattern="request-response"/>
        <enricher ... />
        <transformer ... />
        <component ... />
    </flow>
</mule>

You would be right to argue that the above is exactly what is provided by the Mule Bridge pattern (also new in version 3). However, the bridge implementation suffers the same defect as the flow, so this does not bring a solution.

Conclusion
The most elegant solution in Mule 3 for exception handling on synchronous JMS flows is to hide them after a VM endpoint and simply pass through the JMS message.

Custom exceptionStrategy code
public final class ReqRepServiceExceptionStrategy extends DefaultServiceExceptionStrategy {
    public static final String REQ_REP_SERVICE_EXCEPTION_STRATEGY_REPLY_SENT = "REQ_REP_SERVICE_EXCEPTION_STRATEGY_REPLY_SENT";
    private final Logger logger = LoggerFactory.getLogger(ReqRepServiceExceptionStrategy.class);

    @Override
    protected void doHandleException(Exception e, MuleEvent muleEvent) {
        super.doHandleException(e, muleEvent);
        final ImmutableEndpoint inboundEp = muleEvent.getEndpoint();
        final boolean isReqRep = MessageExchangePattern.REQUEST_RESPONSE.equals(inboundEp.getExchangePattern());
        //only process replies for jms endpoints
        if (!isEventAlreadyProcessed(muleEvent) & inboundEp.isProtocolSupported(JmsConnector.JMS) & isReqRep) {
            final MuleMessage replyMessage = new DefaultMuleMessage(null, muleContext);
            replyMessage.setExceptionPayload(new DefaultExceptionPayload(e));
            try {
                final Object replyTo = getReplyTo(muleEvent.getMessage());
                final ReplyToHandler replyToHandler = getReplyToHandler(inboundEp);
                processReplyTo(muleEvent, replyMessage, replyToHandler, replyTo);
            } catch (MuleException me) {
                logger.error("Cannot reply from Exception Strategy.", me);
            }
        } else {
            logger.info("MuleEvent already processed once by this handler, not replying again.");
        }
    }

    private boolean isEventAlreadyProcessed(final MuleEvent muleEvent) {
        boolean eventAlreadyProcessed = false;
        final Object replyAlreadySent = muleEvent.getSession().getProperty(REQ_REP_SERVICE_EXCEPTION_STRATEGY_REPLY_SENT);
        if (replyAlreadySent != null && Boolean.class.isInstance(replyAlreadySent)) {
            eventAlreadyProcessed = Boolean.class.cast(replyAlreadySent);
        }
        return eventAlreadyProcessed;
    }

    private Object getReplyTo(final MuleMessage message) throws MuleException {
        final Object replyTo = message.getOutboundProperty(JmsConstants.JMS_REPLY_TO);
        if (replyTo == null) {
            throw new DefaultMuleException(MessageFactory.createStaticMessage(
                    "There is no jms-reply-to specified on this endpoint"));
        }
        return replyTo;
    }

    private ReplyToHandler getReplyToHandler(final ImmutableEndpoint endpoint) throws MuleException {
        final ReplyToHandler replyToHandler = ((AbstractConnector) endpoint.getConnector()).getReplyToHandler(endpoint);
        if (replyToHandler == null) {
            throw new DefaultMuleException(MessageFactory.createStaticMessage(
                    "There is no replyToHandler specified on this endpoint"));
        }
        final List responseTransformers = endpoint.getResponseTransformers();
        if (responseTransformers != null && responseTransformers.size() > 0) {
            replyToHandler.setTransformers(responseTransformers);
        }
        return replyToHandler;
    }

    private void processReplyTo(final MuleEvent event, final MuleMessage result, final ReplyToHandler replyToHandler,
                                final Object replyTo) throws MuleException {
        final String requestor = result.getOutboundProperty(MuleProperties.MULE_REPLY_TO_REQUESTOR_PROPERTY);
        if (((requestor != null && !requestor.equals(event.getFlowConstruct().getName())) || requestor == null)) {
            replyToHandler.processReplyTo(event, result, replyTo);
            event.getSession().setProperty(REQ_REP_SERVICE_EXCEPTION_STRATEGY_REPLY_SENT, Boolean.TRUE);
            logger.info("Reply send for this MuleEvent to " + replyTo.toString());
        }
    }
}

Wednesday, March 9, 2011

Synchronous Mule Service - exception handling

The problem

We have a mule service that is defined as synchronous. This means that it will handle incoming messages in the same thread as the endpoint that receives it. A high level overview of this is in the picture below.
What we found is that there is different behaviour depending on how the service is called, especially when it comes to exception handling. During our initial tests we called this service from the mule client in a synchronous way. In this case the default exception handler made sure we got an answer even if an exception was thrown.
In a production environment however we call the service asynchronously via JMS. In that use case the client timed out.

Below is a schematic representation of what happens in Mule in case everything goes well and with exception handling. Note that the client calls the service in an async way and that Mule executes the service in a sync way.



The behaviour we want is that Mule always returns a response even in the case of exception. This avoids the service clients to timeout while waiting for a response. To achieve this we need to write a custom exception strategy for Mule.

The solution ingredients

1) A custom exception strategy, some of the code is below. This code will make sure that a reply is send always. Since the exception strategy can be called multiple times, we'll store in the MuleSession if a reply is send already. There is also some specific handling needed for global endpoints and normal endpoints.
public final class ReqRepServiceExceptionStrategy extends DefaultServiceExceptionStrategy {
...
@Override
protected void defaultHandler(Throwable t) {
super.defaultHandler(t);
final MuleEvent muleEvent = RequestContext.getEvent();
if (!isEventAlreadyProcessed(muleEvent)) {
final MuleMessage replyMessage = new DefaultMuleMessage(null);
replyMessage.setExceptionPayload(new DefaultExceptionPayload(t));
ReplyToHandler replyToHandler;
try {
replyToHandler = getReplyToHandler(muleEvent.getMessage(), retrieveInboundEndpoint(muleEvent));
processReplyTo(muleEvent, replyMessage, replyToHandler, muleEvent.getMessage().getReplyTo());
} catch (MuleException e) {
logger.error("Cannot reply from Exception Strategy.", e);
}
} else {
logger.info("MuleEvent already processed once by this handler, not replying again.");
}
}

private ImmutableEndpoint retrieveInboundEndpoint(final MuleEvent muleEvent) throws MuleException {
InboundEndpoint inboundEndpoint;
String originatingAddress = (String) muleEvent.getMessage().getProperty(MuleProperties.MULE_ORIGINATING_ENDPOINT_PROPERTY);
//if not a global endpoint, will start with prefix
if (originatingAddress != null && originatingAddress.startsWith(ENDPOINT_PREFIX)) {
originatingAddress = originatingAddress.substring(ENDPOINT_PREFIX.length());
originatingAddress = originatingAddress.replaceFirst("\\.", "://");
}
inboundEndpoint = muleEvent.getMuleContext().getRegistry().lookupEndpointFactory().getInboundEndpoint(originatingAddress);
if (inboundEndpoint == null) {
throw new DefaultMuleException(MessageFactory.createStaticMessage("Cannot find original inbound endpoint for this message."));
}
return inboundEndpoint;
}

private boolean isEventAlreadyProcessed(final MuleEvent muleEvent) {
boolean eventAlreadyProcessed = false;
final Object replyAlreadySent = muleEvent.getSession().getProperty(REQ_REP_SERVICE_EXCEPTION_STRATEGY_REPLY_SENT);
if (replyAlreadySent != null && Boolean.class.isInstance(replyAlreadySent)) {
eventAlreadyProcessed = Boolean.class.cast(replyAlreadySent);
}
return eventAlreadyProcessed;
}
...
}

2) Add this exception handler to the Mule configuration. The configuration below is from an integration test: it will always throw a runtime exception in the component. This allows us to test the new exception strategy.
<service name="customHandler">
<inbound>
<vm:inbound-endpoint address="vm://custom.request" synchronous="true"/>
</inbound>
<component class="be.i8c.mule.service.RuntimeExComponent">
<custom-exception-strategy class="be.i8c.mule.service.ReqRepServiceExceptionStrategy"/>
</component>
</service>

3) Test the new exception handler and see it work.
@Test
public void testAsyncCustomExceptionStrategy() throws Exception {
final MuleMessage request = createRequest("UNIT TEST MESSAGE","vm://custom.reply");
final MuleMessage response = doAsyncReqRep(request, "vm://custom.request","vm://custom.reply");

assertNotNull(response);
assertNotNull(response.getExceptionPayload());
assertTrue(response.getExceptionPayload().getException().getCause().getMessage().indexOf(RuntimeExComponent.EX_MSG) >= 0);
}



Wednesday, February 23, 2011

JMS speed test: ActiveMQ vs HornetQ

A while ago I was asked by a client to evaluate different open source JMS providers. The ultimate goal was to set up a highly available messaging system that can manage high throughput.

Me and my colleagues dug deep in our memories and on the internet to find all of the open source JMS providers. And there are plenty. In the end we looked at the following:
  • ActiveMQ
  • OpenMQ
  • RabbitMQ
  • OpenJMS
  • HornetQ
Besides the requirements of high availability and throughput there were some other requirements:
  • must be JMS 1.1 compliant
  • must be easy to set up and administer
  • vibrant community for support
Because of these reasons we quickly abandoned OpenJMS, which seems to have stopped evolving somewhere in 2006.
RabbitMQ is not JMS compliant, which we really need.
OpenMQ was dropped a little later, since in the first performance tests we found that it was noticably slower than ActiveMQ and HornetQ.

In a next phase we did quite extensive load tests on HornetQ and ActiveMQ, which I'll summarize below.

For our tests we used the following setup:
3 similar machines with 2 quad cores, 8GB of RAM, RHEL 5, Java Hotspot VM 64 bit (1.6.0_21-b06). 2 machines were used for hosting the JMS providers. One machine contains a master, the other a backup instance. The third machine is used for generating load.

The load generation is done using the Sonic test harness. This framework allowed us to generate load on the the JMS providers with different producers and consumers of the JMS queues.

These are some of the results we got (based on size of message, whether messages are persisted, if a transaction is used, number of concurrent producers and receivers):


As you can see both JMS providers are about equal when you look at non peristent messaging.BUT when looking at persistent messaging, HornetQ is just amazing. The throughput you get there is just mindblowing. This has without a doubt to do with the Asynchronous IO feature of HornetQ. This is only available on *NIX based systems but it is well worth it when you are looking for a performant open source JMS provider.

There are other sources of comparisons like these, but they were in our opinion either biased or did not give us enough insight for our situation: