Tuesday, June 1, 2010

I'm back with egg on my face and all

So I figured I was posting off into oblivion happily talking to myself but it turns out some people read my blog and commented and I didn't think to check back.  Oops.
Anyway the good news is I have bonus solutions to tricky problems.
Clustered Blaze Messaging III
I previously talked about a couple of different ways to get around the problem of having Blaze behind a hardware firewall that did round robin load balancing.
Just a recap: the client subscription lives with the servlet context on the machine they first sent their subscription to.  This means when their next poll request hits the load balancer they tend to get fired off to the other server and have no subscription.  I tried to use JGroups and Tomcat session replication but both methods suck a fair bit and kept falling over.
The answer to my woes has been database guaranteed subscription and message queuing.
In Brief
Client A sends subscribe request to server A.  Server A creates an entry in the database (replicated using mysql replication) for that subscription.  Client A then polls and hits server B which looks in the database for a subscription and finds it.  Server B then checks the database message queue for any messages client A has not received and sends them or waits the prescribed time for a message to enter the queue.
So you see the messages get stored in a database table and the subscription keeps track of the last message id received.  Join the dots and you've got a messaging system that relies on MySQL replication (or whatever DB you are using) which is very reliable.  I say reliable because in the nearly 2 years this particular MySQL replication set up has been operating it only got out of sync once and that was due to an OS failure (at Christmas while I was on leave of course so I was called up and spent a lovely holiday reinitialising databases instead of drinking beer and stuffing my gob).
Now I will try to add the code:
In messaging-config.xml:

<?xml version="1.0" encoding="UTF-8"?>
<service id="message-service" class="flex.messaging.services.MessageService">
<adapters>
<adapter-definition id="actionscript" class="au.com.reignite.amf.DatabaseMessagingAdapter"
default="true" />
</adapters>
<default-channels>
<channel ref="my-polling-amf" />
</default-channels>
<destination id="projectLockService"/>
</service>
Everything is pretty normal except the DatabaseMessagingAdapter which is the thing that does the database magic.
There's a stack of stuff in the DatabaseMessagingAdapter that is a bit specific to my particular architecture so I'll just show the important bits:
DatabaseMessagingAdapter

public class DatabaseMessagingAdapter extends MessagingAdapter
Obviously I extend the MessagingAdapter.

@Override
public Object invoke(Message message) {

ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream out;
try {
out = new ObjectOutputStream(bos);
out.writeObject(message);
getMessageOrchestrator().storeMessage(bos.toByteArray(),
message.getBody().toString());
} catch (IOException e) {
LogWriter.error(getClass(), "Failed to serialize message: " + e, e);
}

return null;
}
Here I override the invoke() method because it is what Blaze calls when it wants to store the message in its internal store.  Instead you can see I serialize the message to a byte array and use my "MessageOrchestroator" to store it.  This is just my class that saves the bytes to the database in the following table:

DROP TABLE IF EXISTS `truenorth`.`blaze_message`;
CREATE TABLE  `truenorth`.`blaze_message` (
  `ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `MESSAGE_BYTES` blob NOT NULL,
  `MESSAGE_BODY` text,
  `CREATED_DATE` datetime NOT NULL,
  `MODIFIED_DATE` datetime NOT NULL,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=54 DEFAULT CHARSET=latin1;
My messages are in XML so I store the message bytes and the message body as a string so I can inspect them with my greedy eyes.
Back to DatabaseMessagingAdapter:

@Override
public boolean handlesSubscriptions() {
return true;
}
Of course it handles subscriptions too!  so I return true (the default is false).

@Override
public Object manage(CommandMessage commandMessage) {

 if (commandMessage.getOperation() == CommandMessage.SUBSCRIBE_OPERATION) {

MessageDestination destination = (MessageDestination) getDestination();



SubscriptionManager subscriptionManager = destination
.getSubscriptionManager();
String selectorExpr = (String) commandMessage
.getHeader(CommandMessage.SELECTOR_HEADER);
subscriptionManager.removeSubscriber(commandMessage.getClientId(),
selectorExpr, (String) commandMessage
.getHeader(AsyncMessage.SUBTOPIC_HEADER_NAME),
(String) commandMessage.getHeader(Message.ENDPOINT_HEADER));
getMessageOrchestrator().storeSubscription(
FlexContext.getFlexClient().getId(),
commandMessage.getClientId().toString(), selectorExpr);
} else if (commandMessage.getOperation() == CommandMessage.UNSUBSCRIBE_OPERATION) {
if (FlexContext.getFlexClient() != null) {

getMessageOrchestrator().deleteSubscription(



FlexContext.getFlexClient().getId());
}
}
return null;
}
This is the bit that manages subscriptions, as darkly foreshadowed by me above.  If it is a subscribe operation I clear out Blazes storage of the subscription otherwise it annoys the heck out of me by denying people who shouldn't be denied.  By I grab my trusty MessageOrchestrator and store my own subscription in a table like:

DROP TABLE IF EXISTS `truenorth`.`subscription`;
CREATE TABLE  `truenorth`.`subscription` (
  `ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `CLIENT_ID` varchar(100) NOT NULL,
  `MESSAGE_CLIENT_ID` varchar(100) NOT NULL,
  `SELECTOR` varchar(255) DEFAULT NULL,
  `EXPIRY_DATE` datetime NOT NULL,
  `LAST_MESSAGE_ID` int(10) unsigned NOT NULL,
  `CREATED_DATE` datetime NOT NULL,
  `MODIFIED_DATE` datetime NOT NULL,
  PRIMARY KEY (`ID`),
  UNIQUE KEY `Index_CLIENT_ID_UNIQUE` (`CLIENT_ID`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
The important bit to note is I store the client_id which is the id of the instance of flash running on the client machine and the message_client_id which is the instance of the consumer object running in the flash client on the client machine.  You can also see I keep the last message id sent and some datetime things.
Ok so that's how I store subscriptions and messages.  The next piece of the puzzle is how I handle poll requests.
Poll requests
In my services-config.xml I have a special endpoint for polling: DatabaseConfigurableAMFEndpoint
public class DatabaseConfigurableAMFEndpoint extends ConfigurableAMFEndpoint {
I extend my own ConfigurableAMFEndpoint so I can use my super special filter chain to do my filthy security processing (see previous blogs)

@Override
protected FlushResult handleFlexClientPoll(FlexClient flexClient,
CommandMessage pollCommand) {


I'll skip including all the code because it's long and boring so I'll summarise. 
I load my subscription, which is the data stored in the above table not the Blaze subscription based on the flex client id (that's as opposed to the message client id).  If I don't get a record I throw a subscription error.  Assuming they have a subscription I start a "do" loop where I load up the messages stored in the above table that have an id greater than the subscription's last received message.  Each of those messages that match the subscription selector (I use the JMSSelector object that comes with Blaze just like Blaze does) I rehydrate and add to my results to send.  If there are any results I break the loop and immediately send them.  If there were no messages I wait a little bit (5 seconds) and try again.  I keep doing that until the configured poll timeout (50 seconds) at which point I release the thread and let the client poll again.  I then update their subscription with the last message they received and increment their expiry date.




To finish it all off I have a scheduled task (I use quartz with spring) that every few minutes culls any expired subscriptions and deletes messages that all subscriptions have received.


Victory


This method of messaging has been working now for about 10 months without failure.  The concerns I have about it that it is fairly heavy on hitting the DB so I made sure the DB is on the same machine as the app server and the two machines in the cluster are directly connected via Gigabit Ethernet on the same switch (actually 2 switches cross linked for redundancy).  I've also got a fair query cache set up so I'm not always pulling the same messages out of the DB and I'm only really catering to a max of about 10 concurrent users.  Having said all that I'd imagine I could have a bog load more concurrent users and still have no trouble.  A big benefit is the messages are held by the DB not in memory by the tomcat instance which means tomcat can fall over and people still get their messages.


Next Time


I've also now come up with what I consider the ultimate file upload solution even more betterer than ftp and I've set my blog to email me when someone comments so I can actually respond.






No comments:

Post a Comment