Friday, February 17, 2012

Setting up Time machine on a NAS

I wanted to use my little NAS as a TimeCapsule, because Time Machine on Mac does look very impressive. My NAS has a Debian 6 install base and my Mac has Lion on it.

So I started out browsing to find some ideas on how to approach this.

The best howto document I found is this one: http://www.kremalicious.com/2008/06/ubuntu-as-mac-file-server-and-time-machine-volume/

I first tried to connect to the NAS over CIFS (Samba), but that didn't work. You get a report of Time machine that there is no support for AFP (Apple Filing Protocol). For this to work you need to install the Netatalk package. In Debian 6 there is however only a Netatalk 2.1.x package. Seems like you need a 2.2 verion because this one supports more SSL features which are needed by OS X Lion.

After doing some more digging I found some usable configuration on the following site: http://routerjockey.com/2009/08/28/setting-up-apple-filing-protocol-and-bonjour-under-debian/
This site proposes to build Netatalk 2.2.x from source. I'm however not a fan of that since I need to install plenty of stuff for that (gcc + loads of dev packages). I consider my NAS box to be a server, not a development machine. I did however find a backport of Netatalk 2.2 here: http://www.mikepalmer.net/build-a-netatalk-time-machine-for-osx-lion-using-debian-6-0-squeeze/

Once you install and get Netatalk running you will get an error with OS X Lion stating that the version is not supported. This site tells the following: http://www.kremalicious.com/2008/06/ubuntu-as-mac-file-server-and-time-machine-volume/

For all of you that have OSX lion installed and get a error message about wrong server version.
Add uams_dhx2.so to your config line in the file:
/etc/netatalk/afpd.conf
And the correct line should be:
- -transall -uamlist uams_randnum.so,uams_dhx.so,uams_dhx2.so -nosavepassword -advertise_ssh

Before you connect to the AFP share make sure to run the following command on your mac:
$ defaults write com.apple.systempreferences TMShowUnsupportedNetworkVolumes 1

If you do not do this, you can only connect to an official Time Capsule.

A good advice is to also install Avahi. This will make sure that your NAS shares are autodiscovered. Howto do this is also in the very first link (kremalicious.com).

HP Proliant Microserver case fan

For starters: I found a great review on silent PC review for the Microserver. It gets some pretty good reviews: http://www.silentpcreview.com/article1193-page1.html

There is however something small that annoys me: the fan in this case keeps on running at minimum 1000 RPM. This is strange since the CPU is passive cooled. Also the temperature stays around 30°C, which is very low. Wondering if I could get it down. I mean if the disc is not spinning and the CPU is passive cooled, why is it running?

But seems like I won't be able to do it from software since the sensor is not under software control. It is mentioned on this forum: http://www.silentpcreview.com/forums/viewtopic.php?f=20&t=60236
But I'm still waiting for final confirmation of that from HP. This guy asked the same question I wanted to, see if it gets a response: http://h30434.www3.hp.com/t5/Other-HP-Consumer-Products-and/HP-ProLiant-Microserver-N40L-fancontrol-with-Debian-6/td-p/1228957

Another option is to go hardware :-)
The following article talks about installing another fan. But I guess I'm not that desperate yet.
http://www.silentpcreview.com/article1193-page7.html

Friday, December 23, 2011

Writing a Mule JMX Agent

A client of ours uses Mule ESB mainly as a mediator component that will throttle, translate and call external parties. Some of these parties are however not so reliable when it comes to response times. There were several utilities in use to check what the response time was of a specific third party. All of these mechanisms could only provide a rough estimate however. Since Mule was the system executing the call to the third party, we found that these kind of statistics should come out of Mule instead. Next question was: how to maintain these and get them out of Mule? To expose statistics, JMX seems a logical choice.

Mule ESB comes with a number of JMX Agents: http://www.mulesoft.org/documentation/display/MULE3USER/JMX+Management. These give some great insight in the system. At our client this info is already being used to display statistics in Zabbix, not only from Mule but also from HornetQ JMS. Zabbix is used to monitor the Average service execution times, JMS Queue depth, memory usage, CPU usage and so on.
These agents however do not expose the information that we want so we we need to write our own.

But first we need to collect the statistics For this we created a statistics object that will keep track of the number of calls, minimum, maximum and average duration and also the average of the last 500 calls. This last metric allows us to have an average which does show peaks over time. To capture the call and add the call info to our statistics object we used AOP:
@Around("execThirdPartyCall")
public Object execute(ProceedingJoinPoint pjp) throws Throwable{
   long timeBefore = System.currentTimeMillis();
   Object result = pjp.proceed();
   try{
      getStatistics(pjp).addMuleCallInfo(
         System.currentTimeMillis() - timeBefore);
   }catch(Exception e){
      LOGGER.error("Error adding MuleCallInfo to Statistics Object.",e);
   }
   return result;
}
This statistics object than keeps all this info but does not calculate all the metrics: min, max, avg, avg500. Because adding the call info is on the call stack, we want to keep these methods as short as possible. The actual calculation is done when the info is requested via an MBean.

So now we come to the main part of this post: how to create an MBean and register it to the Mule MBeanServer. I've worked with Spring and MBeans before and I do like the annotation driven mechanisms that Spring offers for auto registrating your MBeans.
Mule however does not use annotations but a method that is more reflection based. The main class in Mule that provides this logic is org.mule.module.management.agent.ClassloaderSwitchingMBeanWrapper. This class needs an interface for introspection and a concrete class in order to create an MBean instance.

The first thing you do is create an interface that will define the MBean attributes and methods. In the example there are 4 read-only attributes. The JMX_PREFIX will be used later on to define the JMX objectName:
public interface StatisticsMBean {
   String DEFAULT_JMX_PREFIX = "type=Thirdparty.Statistics,name=";
   long getHttpCallLast500Average();
   long getHttpCallLast500Minimum();
   long getHttpCallLast500Maximum();
   int getTotalNumberOfCalls();
}
Second, you need to define an implementation of this interface:
public final class StatisticsService implements StatisticsMBean {
    private Statistics Statistics;

    public StatisticsService(Statistics statistics) {
        this.statistics = statistics;
    }
    @Override
    public long getHttpCallLast500Average() {
        return statistics.getAvgLast500HttpCall();
    }

    @Override
    public long getHttpCallLast500Minimum() {
        return statistics.getMinLast500HttpCall();
    }

    @Override
    public long getHttpCallLast500Maximum() {
        return statistics.getMaxLast500HttpCall();
    }

    @Override
    public int getTotalNumberOfCalls() {
        return statistics.getTotalCallsMuleCall();
    }
}
Third you need to create the actual agent. A few things to note about the code below:
  • the fields are missing and also some methods that are left empty anyway
  • only the registerMbean part is in here, if you have some fancy hot deploy setup, you'll need to provide an unregisterMBean part
  • a MuleContextListener is used to make sure that Spring has finished initializing before doing any work
  • the Statistics object below is an Enum, because we keep statistics for multiple third parties

public final class StatisticsAgent extends AbstractAgent {

   @Override
   public void initialise() throws InitialisationException {
      if (initialized.get()) {
         return;
      }
      //get mbeanserver
      if (mBeanServer == null) {
         mBeanServer = ManagementFactory.getPlatformMBeanServer();
      }
      if (mBeanServer == null) {
         throw new InitialisationException(
             ManagementMessages.cannotLocateOrCreateServer(), this);
      }
      try {
         // We need to register all the services once the server has initialised
         muleContext.registerListener(new MuleContextStartedListener());
      } catch (NotificationException e) {
         throw new InitialisationException(e, this);
      }
      initialized.compareAndSet(false, true);
   }

   protected class MuleContextStartedListener implements
       MuleContextNotificationListener {
      public void onNotification(MuleContextNotification notification) {
         if (notification.getAction() == MuleContextNotification.CONTEXT_STARTED) {
            try {
               registerMBeans();
            } catch (Exception e) {
               throw new MuleRuntimeException(
                   CoreMessages.objectFailedToInitialise("MBeans"), e);
            }
         }
      }
   }

   private void registerMBeans() throws MalformedObjectNameException, 
      NotCompliantMBeanException, InstanceAlreadyExistsException, MBeanRegistrationException {
      Statistics[] statisticsAr = Statistics.values();
      for (Statistics statistics : statisticsAr) {
         ObjectName on = jmxSupport.getObjectName(
                           String.format("%s:%s",
                           jmxSupport.getDomainName(muleContext, false),
                           StatisticsMBean.DEFAULT_JMX_PREFIX + statistics.name())
                          );
         StatisticsService statisticsService = new StatisticsService(statistics);
         ClassloaderSwitchingMBeanWrapper mBean = new ClassloaderSwitchingMBeanWrapper(
                           statisticsService,
                           StatisticsMBean.class, 
                           muleContext.getExecutionClassLoader()
                           );
         logger.debug("Registering StatisticsAgent with name: " + on);
         mBeanServer.registerMBean(mBean, on);
      }
   }
}
As a last step you need to add your agent to the Mule configuration as follows:
<mule>
   <custom-agent class="management.StatisticsAgent" name="statistics-agent"/>
</mule>

And that's it, once you start up Mule, you will see your custom agent appearing and your MBeans are available via JMX.

That is how our operations guys can now provide a nice screen with the call durations over time:

Friday, July 8, 2011

Synchronous Mule Service - exception handling - Mule 3

In a previous post I outlined a solution to have an exception send back when doing a synchronous service call over JMS in Mule. In the intro of that post you can find the exact problem description. This post is about the exact same topic. The only difference in setup is that we'll use Mule 3.

Upgrading the exception handler from Mule 2 to 3
All the facets of migrating to a new version of Mule deserves a separate post, so I'll stick to the exception handling problem described before.

Version 3 of the Mule DefaultServiceExceptionStrategy deprecates the defaultHandler method. The new one to use is doHandleException. This new method gives you direct access to the MuleEvent so that's an improvement. In my newer implementation I can also get the endpoint more easy from the muleEvent and it doesn't require some Mule registry lookup.

Getting a hold of the replyTo object has become a bit more difficult and it forced me to narrow the usage of this class to JMS only. This is because there is no longer a convenient method like muleEvent.getMessage().getReplyTo(). Now you need to resolve it from a property, which is specific to the JMS connector.

For those interested I included all the code at the bottom of this post.

Using flow
Even though I was pretty pleased about the code I wrote, I still wondered if the new Flow concept in Mule 3 wouldn't allow me to get rid of the custom exception handler. After all it is not so strange that you want to be able to respond to a JMS request in case something goes wrong, not?

The thing I actually found was that using a Flow makes it worse, because by default flow doesn't reply at all to a JMS request: not in case of success or error. There is actually a bug open for this on JIRA: http://www.mulesoft.org/jira/browse/MULE-5307

After fiddling around a bit I did find that everything does work as expected with a VM endpoint! So in case you have an inbound-vm endpoint that is marked as exchange-pattern="request-response", you will always get a response (success and error cases).
And what is more: it also works if you have an inbound JMS endpoint and forward the request to a VM inpoint of a flow. It looks like the configuration below. Notice that you still need a response transformer that will transform the Mule exception payload to whatever you need. If you do not have a transformer you will get an empty message on your JMS response queue. This is because the JMS connector ignores the exception payload and takes the normal payload, which is null.

<mule>
    <jms:endpoint name="jms.queue.request" queue="${mq.queue.request.in}"
                  transformer-refs="JmsToObject"
                  responseTransformer-refs="exPayloadToResponse objectToJms"/>
    <vm:endpoint name="vm.request" address="vm://vm.request"/>

    <model>
        <service name="requestViaJMS">
            <inbound>
                <inbound-endpoint ref="jms.queue.request"/>
            </inbound>
            <outbound>
                <pass-through-router>
                    <outbound-endpoint ref="vm.request" exchange-pattern="request-response"/>
                </pass-through-router>
            </outbound>
        </service>
    </model>

    <flow name="requestViaVM">
        <inbound-endpoint ref="vm.request" exchange-pattern="request-response"/>
        <enricher ... />
        <transformer ... />
        <component ... />
    </flow>
</mule>

You would be right to argue that the above is exactly what is provided by the Mule Bridge pattern (also new in version 3). However, the bridge implementation suffers the same defect as the flow, so this does not bring a solution.

Conclusion
The most elegant solution in Mule 3 for exception handling on synchronous JMS flows is to hide them after a VM endpoint and simply pass through the JMS message.

Custom exceptionStrategy code
public final class ReqRepServiceExceptionStrategy extends DefaultServiceExceptionStrategy {
    public static final String REQ_REP_SERVICE_EXCEPTION_STRATEGY_REPLY_SENT = "REQ_REP_SERVICE_EXCEPTION_STRATEGY_REPLY_SENT";
    private final Logger logger = LoggerFactory.getLogger(ReqRepServiceExceptionStrategy.class);

    @Override
    protected void doHandleException(Exception e, MuleEvent muleEvent) {
        super.doHandleException(e, muleEvent);
        final ImmutableEndpoint inboundEp = muleEvent.getEndpoint();
        final boolean isReqRep = MessageExchangePattern.REQUEST_RESPONSE.equals(inboundEp.getExchangePattern());
        //only process replies for jms endpoints
        if (!isEventAlreadyProcessed(muleEvent) & inboundEp.isProtocolSupported(JmsConnector.JMS) & isReqRep) {
            final MuleMessage replyMessage = new DefaultMuleMessage(null, muleContext);
            replyMessage.setExceptionPayload(new DefaultExceptionPayload(e));
            try {
                final Object replyTo = getReplyTo(muleEvent.getMessage());
                final ReplyToHandler replyToHandler = getReplyToHandler(inboundEp);
                processReplyTo(muleEvent, replyMessage, replyToHandler, replyTo);
            } catch (MuleException me) {
                logger.error("Cannot reply from Exception Strategy.", me);
            }
        } else {
            logger.info("MuleEvent already processed once by this handler, not replying again.");
        }
    }

    private boolean isEventAlreadyProcessed(final MuleEvent muleEvent) {
        boolean eventAlreadyProcessed = false;
        final Object replyAlreadySent = muleEvent.getSession().getProperty(REQ_REP_SERVICE_EXCEPTION_STRATEGY_REPLY_SENT);
        if (replyAlreadySent != null && Boolean.class.isInstance(replyAlreadySent)) {
            eventAlreadyProcessed = Boolean.class.cast(replyAlreadySent);
        }
        return eventAlreadyProcessed;
    }

    private Object getReplyTo(final MuleMessage message) throws MuleException {
        final Object replyTo = message.getOutboundProperty(JmsConstants.JMS_REPLY_TO);
        if (replyTo == null) {
            throw new DefaultMuleException(MessageFactory.createStaticMessage(
                    "There is no jms-reply-to specified on this endpoint"));
        }
        return replyTo;
    }

    private ReplyToHandler getReplyToHandler(final ImmutableEndpoint endpoint) throws MuleException {
        final ReplyToHandler replyToHandler = ((AbstractConnector) endpoint.getConnector()).getReplyToHandler(endpoint);
        if (replyToHandler == null) {
            throw new DefaultMuleException(MessageFactory.createStaticMessage(
                    "There is no replyToHandler specified on this endpoint"));
        }
        final List responseTransformers = endpoint.getResponseTransformers();
        if (responseTransformers != null && responseTransformers.size() > 0) {
            replyToHandler.setTransformers(responseTransformers);
        }
        return replyToHandler;
    }

    private void processReplyTo(final MuleEvent event, final MuleMessage result, final ReplyToHandler replyToHandler,
                                final Object replyTo) throws MuleException {
        final String requestor = result.getOutboundProperty(MuleProperties.MULE_REPLY_TO_REQUESTOR_PROPERTY);
        if (((requestor != null && !requestor.equals(event.getFlowConstruct().getName())) || requestor == null)) {
            replyToHandler.processReplyTo(event, result, replyTo);
            event.getSession().setProperty(REQ_REP_SERVICE_EXCEPTION_STRATEGY_REPLY_SENT, Boolean.TRUE);
            logger.info("Reply send for this MuleEvent to " + replyTo.toString());
        }
    }
}

Wednesday, March 9, 2011

Synchronous Mule Service - exception handling

The problem

We have a mule service that is defined as synchronous. This means that it will handle incoming messages in the same thread as the endpoint that receives it. A high level overview of this is in the picture below.
What we found is that there is different behaviour depending on how the service is called, especially when it comes to exception handling. During our initial tests we called this service from the mule client in a synchronous way. In this case the default exception handler made sure we got an answer even if an exception was thrown.
In a production environment however we call the service asynchronously via JMS. In that use case the client timed out.

Below is a schematic representation of what happens in Mule in case everything goes well and with exception handling. Note that the client calls the service in an async way and that Mule executes the service in a sync way.



The behaviour we want is that Mule always returns a response even in the case of exception. This avoids the service clients to timeout while waiting for a response. To achieve this we need to write a custom exception strategy for Mule.

The solution ingredients

1) A custom exception strategy, some of the code is below. This code will make sure that a reply is send always. Since the exception strategy can be called multiple times, we'll store in the MuleSession if a reply is send already. There is also some specific handling needed for global endpoints and normal endpoints.
public final class ReqRepServiceExceptionStrategy extends DefaultServiceExceptionStrategy {
...
@Override
protected void defaultHandler(Throwable t) {
super.defaultHandler(t);
final MuleEvent muleEvent = RequestContext.getEvent();
if (!isEventAlreadyProcessed(muleEvent)) {
final MuleMessage replyMessage = new DefaultMuleMessage(null);
replyMessage.setExceptionPayload(new DefaultExceptionPayload(t));
ReplyToHandler replyToHandler;
try {
replyToHandler = getReplyToHandler(muleEvent.getMessage(), retrieveInboundEndpoint(muleEvent));
processReplyTo(muleEvent, replyMessage, replyToHandler, muleEvent.getMessage().getReplyTo());
} catch (MuleException e) {
logger.error("Cannot reply from Exception Strategy.", e);
}
} else {
logger.info("MuleEvent already processed once by this handler, not replying again.");
}
}

private ImmutableEndpoint retrieveInboundEndpoint(final MuleEvent muleEvent) throws MuleException {
InboundEndpoint inboundEndpoint;
String originatingAddress = (String) muleEvent.getMessage().getProperty(MuleProperties.MULE_ORIGINATING_ENDPOINT_PROPERTY);
//if not a global endpoint, will start with prefix
if (originatingAddress != null && originatingAddress.startsWith(ENDPOINT_PREFIX)) {
originatingAddress = originatingAddress.substring(ENDPOINT_PREFIX.length());
originatingAddress = originatingAddress.replaceFirst("\\.", "://");
}
inboundEndpoint = muleEvent.getMuleContext().getRegistry().lookupEndpointFactory().getInboundEndpoint(originatingAddress);
if (inboundEndpoint == null) {
throw new DefaultMuleException(MessageFactory.createStaticMessage("Cannot find original inbound endpoint for this message."));
}
return inboundEndpoint;
}

private boolean isEventAlreadyProcessed(final MuleEvent muleEvent) {
boolean eventAlreadyProcessed = false;
final Object replyAlreadySent = muleEvent.getSession().getProperty(REQ_REP_SERVICE_EXCEPTION_STRATEGY_REPLY_SENT);
if (replyAlreadySent != null && Boolean.class.isInstance(replyAlreadySent)) {
eventAlreadyProcessed = Boolean.class.cast(replyAlreadySent);
}
return eventAlreadyProcessed;
}
...
}

2) Add this exception handler to the Mule configuration. The configuration below is from an integration test: it will always throw a runtime exception in the component. This allows us to test the new exception strategy.
<service name="customHandler">
<inbound>
<vm:inbound-endpoint address="vm://custom.request" synchronous="true"/>
</inbound>
<component class="be.i8c.mule.service.RuntimeExComponent">
<custom-exception-strategy class="be.i8c.mule.service.ReqRepServiceExceptionStrategy"/>
</component>
</service>

3) Test the new exception handler and see it work.
@Test
public void testAsyncCustomExceptionStrategy() throws Exception {
final MuleMessage request = createRequest("UNIT TEST MESSAGE","vm://custom.reply");
final MuleMessage response = doAsyncReqRep(request, "vm://custom.request","vm://custom.reply");

assertNotNull(response);
assertNotNull(response.getExceptionPayload());
assertTrue(response.getExceptionPayload().getException().getCause().getMessage().indexOf(RuntimeExComponent.EX_MSG) >= 0);
}