Sunday, July 19, 2009

Hardware Load Balanced BlazeDS Cluster on Tomcat

GREAT BIG NOTICE: My professional blog has a good working solution to this: http://www.reignite.com.au/blaze-push-messaging-in-a-cluster/

Blaze DS:
Blaze is an open source remoting and messaging system from Adobe. I use the Java version because that's the crazy devil-may-care fool I am. Blaze is really cool (assuming first you find web technologies cool) and has made my life considerably easier. In the past I used web services (Axis anyone?) but when developing rich Internet applications I found web services to be too cumbersome in terms of transferring stacks of complex data as objects. Blaze also does "push" which means I can have multi-user real time apps where the users can interact with shared data. Of course Blaze, being from Adobe, is used in particular to communicate between Flex and my services. Flex is fantastic too! It does of course have draw backs, but in all pretty good.

Clustering:
There comes a time in every young server's life when thoughts begin to turn toward other servers and they get together to share certain aspects of their life. I had to take two Mac Minis with OS X Server, each with Tomcat, MySQL, of course Blaze and I had to turn them into a mighty cluster. The Blaze manual says that HttpFlexSession does not cluster so you need to make sure that once a client reaches a server it should "stick" to that server so it has access to the appropriate HttpFlexSession. That would be wonderful except this is the real world where clusters sit behind a hardware load balancer and we need to not only balance load but also provide failover. This is tricky because it means that a client doesn't know which machine they will hit and they also don't have direct access to the server IP addresses so no you can't use the built in Flex failover.

JGroups:
BlazeDS uses JGroups to ensure that messages generated or sent to one server reaches all the others in the cluster. It is really easy to set up and the Blaze manual show you how... Except it is wrong. oops. Caused me trouble, but I got over it. The problem is in the jgroups-tcp.xml explanation. The num_initial_members="2" is explained as having a value equal to the number of members in the element. An important point to creating a JGroups cluster that can survive restarts and such is setting up the ports well.
<TCP ... start_port="7800" end_port="7804" ...>

Not that I've created a range of ports for the TCP element. Because when old Bessy fires up port 7800 may be in use! It may be in use by JGroups because it hasn't been released yet or because some other mysterious creature wants it. Anyway it is more rugged to give a port range. Then in the TCPPING element
<TCPPING ... initial_hosts="10.1.1.1[7800]" port_range="5" num_initial_members="2" />

So it will look for host 10.1.1.1 at port 7800 but if that fails it will check ports 7801,7802,7803 and 7804. This makes it all a little more able to deal with the ups and downs of modern server life.

Tomcat Clusters:
My first hurdle was to ensure that the Tomcat sessions were being replicated across servers. The HttpFlexSession is stored in the Tomcat session so by replicating the Tomcat session I get the HttpFlexSession! Hooray! Of course that didn't really work in my case. So anyway to set up a Tomcat cluster so it replicates sessions is really easy (unless you are reading this out of frustration hoping to find a secret answer).
In your web.xml but just below your tags. Then in the server.xml in the Tomcat conf directly uncomment the tag. I also put the sendChannelOptions="4" attribute in because I wanted to make sure that the session replication occurred before the request returned (4 = synchronous while the default is 8 which is async). That's all there is to it. Though if you are dong things a bit trickier you better check the Tomcat doco.

The Cluster Problem:
I use Blaze messaging to push updates to the users of my app. That means the client creates a Consumer and subscribes to a channel / destination at an endpoint. The endpoint is the load balancer IP address and the load balancer decides which server the Consumer subscribes to. But then when the Consumer polls (I use "long polling") it hits the load balancer again and might end up polling ther server it is not subscribed to and you get an error back and you probably start crying (that's what I did). This is simply because the subscription is held in the SubscriptionManager on the Destination which is created in the MessageBrokerServlet and so is specific to the server and not held in the HttpFlexSession let alone the Tomcat session. Ack! To make things a little interesting the subscription is created by the FlexClient which is held in the HttpFlexSession but because the FlexClient and MessagingClient are not added using the setAttribute() method of the Tomcat session the Tomcat server replication doesn't copy it across.

Solution:
Warning, the following doesn't really work :(
The following doesn't work as advertised I'm sorry (not that anyone reads this blog anyway). The configurable endpoint is good and I still use the cluster filter bizzo but the subscribing on another server thing fails somewhat. It works a bit, but also doesn't a bit. A shame really. Anyway check out my other post on how to make is good and worky.
Here is what I did: I rolled my sleeves up spat on my hands and brewed a strong cup of harden up. Also I hacked up Blaze and made it excellent.
1. Create ConfigurableAMFEndpoint.java

package flex.messaging.endpoints;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import flex.messaging.config.ConfigMap;
import flex.messaging.endpoints.amf.AMFFilter;
import flex.messaging.endpoints.amf.EndPointAwareFilter;

/**
* @author surrey
*
*/
public class ConfigurableAMFEndpoint extends AMFEndpoint {

private List filters = new ArrayList();

protected AMFFilter createFilterChain() {
AMFFilter firstFilter = super.createFilterChain();
AMFFilter lastFilter = getLastFilter(firstFilter);
for (AMFFilter filter : filters) {
// add this endpoint on those filters that take an endpoint.
if (filter instanceof EndPointAwareFilter) {
((EndPointAwareFilter) filter).setEndpoint(this);
}
lastFilter.setNext(filter);
}
return firstFilter;
}

private AMFFilter getLastFilter(AMFFilter filterChain) {
AMFFilter last = filterChain;
while (last.getNext() != null) {
last = last.getNext();
}
return last;
}

public void initialize(String id, ConfigMap properties) {
super.initialize(id, properties);
ConfigMap filterMap = properties.getPropertyAsMap("filter-chain", null);
if (filterMap != null && !filterMap.isEmpty()) {
List filterNames = filterMap.getPropertyAsList("filter-class",
null);
if (filterNames != null) {
for (Iterator iter = filterNames.iterator(); iter.hasNext();) {
String className = (String) iter.next();
try {
filters.add((AMFFilter) Class.forName(className)
.newInstance());
} catch (InstantiationException e) {
log.error("Could not instantiate filter: " + className,
e);
} catch (IllegalAccessException e) {
log.error("No public constructor for filter: "
+ className, e);
} catch (ClassNotFoundException e) {
log.error("Could not find filter class: " + className,
e);
}
}
}
}
}

}
This endpoint extends the AMFEndpoint by allowing me to add extra filters to the Blaze filter chain.
2. I modified MessageBrokerFilter to have:
if (next != null) {
next.invoke(context);
}

// Service the message.
outMessage = endpoint.serviceMessage(inMessage);
So that the chain continues on if I've added any.
3. I created an EndPointAwareFilter which I could extend by my own filters so that they automagically get their EndPoint added to them. I've created a couple other filters too such as a security filter which allows me to use custom security with AMF headers. That's another story though because I had to hack up some Flex code and add more functionality to Blaze.

package flex.messaging.endpoints.amf;

import flex.messaging.endpoints.AMFEndpoint;

/**
* An abstract AMFFilter that other filters should extend to gain access to
* their endpoint. This is only populated when using a ConfigurableAMFEndpoint.
*
* @author Surrey
*
*/
public abstract class EndPointAwareFilter extends AMFFilter {

private AMFEndpoint endpoint;

public AMFEndpoint getEndpoint() {
return endpoint;
}

public void setEndpoint(AMFEndpoint endpoint) {
this.endpoint = endpoint;
}
}


4.. I created ClusterMessagingFilter.java
package au.com.truenorth.amf.filter;

import java.io.IOException;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import javax.servlet.http.HttpSession;

import flex.messaging.Destination;
import flex.messaging.FlexContext;
import flex.messaging.MessageDestination;
import flex.messaging.endpoints.amf.EndPointAwareFilter;
import flex.messaging.io.amf.ActionContext;
import flex.messaging.io.amf.MessageBody;
import flex.messaging.messages.AsyncMessage;
import flex.messaging.messages.CommandMessage;
import flex.messaging.services.Service;

public class ClusterMessagingFilter extends EndPointAwareFilter {

public ClusterMessagingFilter() {
super();
}

@SuppressWarnings("unchecked")
@Override
public void invoke(ActionContext context) throws IOException {

Object data = getData(context);

if (data instanceof CommandMessage) {
CommandMessage command = (CommandMessage) data;
int operation = command.getOperation();
Object clientId = FlexContext.getFlexClient().getId();
String endpointId = getEndpoint().getId();

if (operation == CommandMessage.POLL_OPERATION) {
// add a subscription if appropriate
HttpSession sess = FlexContext.getHttpRequest().getSession();
Object att = sess
.getAttribute(clientId.toString() + endpointId);
if (att != null && att instanceof List) {
List> destList = (List>) att;
for (Map subscribeMap : destList) {
MessageDestination msgDest = getMessageDestination(subscribeMap);
Object cId = subscribeMap.get("clientId");
if (msgDest != null
&& msgDest.getSubscriptionManager()
.getSubscriber(cId) == null) {
Object selectorExpr = subscribeMap
.get("selectorExpr");
String selectorString = getStringOrNull(selectorExpr);
Object subtopic = subscribeMap
.get("subtopicString");
String subtopicString = getStringOrNull(subtopic);

msgDest.getSubscriptionManager().addSubscriber(cId,
selectorString, subtopicString,
subscribeMap.get("endpointId").toString());
}
}
sess.removeAttribute(clientId.toString() + endpointId);
}
} else if (operation == CommandMessage.SUBSCRIBE_OPERATION) {
// get httpSession and stick a flag in with everything needed to
// make a subscription
Object att = FlexContext.getHttpRequest().getSession()
.getAttribute(clientId.toString() + endpointId);
List
> destList = (List>) att;
if (destList == null) {
destList = new ArrayList
>();
}

Map subscribeMap = createSubscribeMap(command);
destList.add(subscribeMap);
FlexContext.getHttpRequest().getSession().setAttribute(
clientId.toString() + endpointId, destList);
}
}
if (next != null) {
next.invoke(context);
}
}

private Map createSubscribeMap(CommandMessage command) {
Map subscribeMap = new HashMap();

subscribeMap.put("clientId", command.getClientId());
subscribeMap.put("endpointId", getEndpoint().getId());
subscribeMap.put("subtopicString", command
.getHeader(AsyncMessage.SUBTOPIC_HEADER_NAME));
subscribeMap.put("selectorExpr", command
.getHeader(CommandMessage.SELECTOR_HEADER));
subscribeMap.put("destination", command.getDestination());
return subscribeMap;
}

private String getStringOrNull(Object selectorExpr) {
String selectorString = null;
if (selectorExpr != null) {
selectorString = selectorExpr.toString();
}
return selectorString;
}

private MessageDestination getMessageDestination(
Map subscribeMap) {
String destId = subscribeMap.get("destination").toString();
MessageDestination msgDest = null;
for (Iterator iter = getEndpoint().getMessageBroker().getServices()
.keySet().iterator(); iter.hasNext();) {
Service service = (Service) getEndpoint().getMessageBroker()
.getServices().get((String) iter.next());
Destination dest = service.getDestination(destId);
if (dest != null && dest instanceof MessageDestination) {
msgDest = (MessageDestination) dest;
break;
}
}
return msgDest;
}

@SuppressWarnings("unchecked")
protected Object getData(ActionContext context) {
// get the in message to inspect the headers
MessageBody request = context.getRequestMessageBody();

Object data = request.getData();
if (data instanceof List) {
data = ((List) data).get(0);
} else if (data.getClass().isArray()) {
data = Array.get(data, 0);
}
return data;
}

}
This filter is a little messier than I like and the honest to goodness production version is a bit nicer but you get the picture. Essentially when a client subscribes to a server that server adds all the required info to subscribe into the Tomcat session so that when a poll hits another server I can get that info out and create a subscription on that server and this all happens before any servicing of the poll occurs so I don't get any dumb "you are not subscribed" errors.

The most amazing thing is this works! I was actually able to sit there with my colleague logged in to the system with both of us using it while I turned off first one server, brought it back up then turned off the other server. The whole time the system just kept polling and as users we didn't even know the server had died and come back up. Brilliant.

Oh the services-config.xml
<clusters><br />        <cluster id="default-cluster" properties="jgroups-tcp.xml" default="false" balancing="false"><br />    </clusters><br /><!-- the above is added so I can use clusters.  Note that I set default="false" because I'm a control freak -->
<channel-definition id="my-polling-amf" class="mx.messaging.channels.AMFChannel">
         <endpoint url="@AMF_POLLING_END_POINT_BASE@" class="flex.messaging.endpoints.ConfigurableAMFEndpoint">
         <properties>
             <filter-chain>
                 <filter-class>au.com.truenorth.amf.filter.ClusterMessagingFilter</filter-class>
             </filter-chain>
             <polling-enabled>true</polling-enabled>
             <wait-interval-millis>50000</wait-interval-millis>
             <polling-interval-seconds>0</polling-interval-seconds>
             <max-waiting-poll-requests>50</max-waiting-poll-requests>
             <serialization>
                 <type-marshaller>au.com.truenorth.service.bean.NumberHandlingTypeMarshaller</type-marshaller>
             </serialization>
         </properties>
     </endpoint>
<!-- This is my channel definition where I set up the long polling and configure the ClusterMessagingFilter -->
<!-- You'll also note, if you are clever, the NumberHandlingTypeMarshaller which allows me to send Null numbers to Flex.  I also modified Blaze to turn NaN numbers into null Numbers (like Integer) coming into Java so my Hibernate wouldn't go spaz when id fields were NaN when they should be null. --></channel-definition>


Well this has been a long one but I hope at least one person finds it helpful.