Back to the Future
I previously said I had implemented an FTP based solution for file upload. Well I did and it works pretty well but it's a bit fiddly and tricky to get working so I kept looking for solutions. I found one. Hooray! Flex 4 and FileReference in Flash 10 with Blaze provide the solution.
Big Irritation
A big irritation of mine with http uploads using multi part form stuff is that it is an all or nothing proposal. Once you fire up the connection and start uploading you have to wait for the whole thing to go up. For small files this isn't such a problem but if you upload a file that takes a minute or two you often don't know if it is working (from a client perspective) until you decide that 3 hours is too long for a file to take. There are heaps of issues like socket disconnected by peer, socket read timeout and so forth. To get things working you need to check special upload timeouts and browser timeouts and virus checkers / indexers on the server and other mysterious Mac OS X related muffery. To use file upload over http in a true enterprise application you need some guarantees that the upload will happen and that data isn't lost. You need a reliable system so you can tell your paying customers that their data will get to the server and if it doesn't you know what happened and you can fix it and keep going. Anything less is amateur hour mickey mouse stuff.
Flash 10 FileReference
Flash player 10 allows you to call fileReference.load() to load a selected file. This puts all the file's bytes into the data property. You can then send that as a byte[] through AMF via Blaze to the server where you can save the file. The problem with this simplistic approach is you don't get any progress events and if it is a big file you still end up with all the problems of http upload. The solution is really simple; break the byte array into manageable chunks and send them up one at a time waiting for a response between each one. This way you get a progress event and the system doesn't just freeze up until it's finished. If you are clever you can also enable pause / resume.
I'm Clever
Yeah well so my mother says anyway. But I wrote a nice little flex library and java library which go together to provide file upload services. The flex part takes a byteArray and file name and queries the java service to see what progress the file is up to. The java service replies with a progress event which either says 0 bytes uploaded, some bytes uploaded or complete. You can use that to initialise your progress bar in flex. Then the flex library moves to the correct position in the byteArray and sends a chunk up to java which is written to file and a response sent with progress info. This continues until the file is complete at which point the flex library dispatches a complete event and you can do whatever you like with the file. The java service has some methods that let you get an input stream to the uploaded file and also delete the file once your done with it. If the user cancels midway through the upload (either on purpose or due to error) they can re-upload the file and it will resume from where you left off. Errors are also communicated back to the flex client so you can auto retry or do whatever. Hopefully I'll attach this stuff to my blog. If you like it or find bugs (you probably will) then let me know.
LINKS:
applicationContext-upload.xml
FlexFileUpload.swc
reignite-upload.jar
fileupload.xml
HOW?
add the jar file to your lib folder
include the swc in your flex project
fileupload.xml goes in your web root or docroot whatever you call it
applicationContext-upload.xml is a spring context file so put it where you load them from
If you don't use spring you should, but if you don't, you can just create a facade that adapts the upload service and creates an instance and whatever. Hey you're at least as clever as me so either use spring or take 5 minutes to figure out a way around it (I did for another project that used ejb)
in your remoting-config.xml
<destination id="fileUpload">
<properties>
<factory>spring</factory>
<source>uploadService</source>
</properties>
</destination>
You'll see I'm using the spring factory, if you don't have it do a google search for spring factory for blaze there's also an ejb factory if you are that way inclined.
The fileupload.xml has 2 things in it. The first is the name of the destination you just configured in the remoting-config.xml and the other is the endpoint context path for your channel of choice.
Now to use it in your code:
1. Create an instance of the au.com.reignite.upload.FileUploadManager and remember to declare it in a place that will have scope for the duration of your upload. Same as FileReference below. You need to pass in some start up arguments when creating FileUploadManager: url to fileupload.xml, securityToken which you wont use so hand in null and repositoryId which is a unique identifier for this user's uploads. I use user id for this so the same user can resume their upload. If you don't use a different id for each user then you'll get stuffed up files if more than one user uploads the same file at the same time.
2. create a FileReference and get the user to select a file. Once selected call load(). Once complete you can do the rest.
3. Once the file reference has done its load and you have the data do this:
uploadManager.addEventListener(FileUploadErrorEvent.UPLOAD_ERROR, handleUploadError);
uploadManager.addEventListener(ProgressEvent.PROGRESS, fileUploadProgressHandler);
uploadManager.addEventListener(FileUploadCompleteEvent.COMPLETE, completeUpload);
These are the events that get thrown so you had better handle them. Pretty self explanatory really.
4. call uploadManager.upload(fileRef.data, fileRef.name); which starts it all off. You'll get at least one progress event and a maximum of one complete event.
IF a user cancels or there is an error and you want to start over make sure you call uploadManager.cancel() otherwise you'll break it. So that means if they close the popup your doing the upload in also make sure you call cancel otherwise the upload continues.
Maybe if I get bored I'll post an example app.
Wednesday, June 9, 2010
Tuesday, June 1, 2010
I'm back with egg on my face and all
So I figured I was posting off into oblivion happily talking to myself but it turns out some people read my blog and commented and I didn't think to check back. Oops.
Anyway the good news is I have bonus solutions to tricky problems.
Clustered Blaze Messaging III
I previously talked about a couple of different ways to get around the problem of having Blaze behind a hardware firewall that did round robin load balancing.
In Brief
Client A sends subscribe request to server A. Server A creates an entry in the database (replicated using mysql replication) for that subscription. Client A then polls and hits server B which looks in the database for a subscription and finds it. Server B then checks the database message queue for any messages client A has not received and sends them or waits the prescribed time for a message to enter the queue.
So you see the messages get stored in a database table and the subscription keeps track of the last message id received. Join the dots and you've got a messaging system that relies on MySQL replication (or whatever DB you are using) which is very reliable. I say reliable because in the nearly 2 years this particular MySQL replication set up has been operating it only got out of sync once and that was due to an OS failure (at Christmas while I was on leave of course so I was called up and spent a lovely holiday reinitialising databases instead of drinking beer and stuffing my gob).
Now I will try to add the code:
In messaging-config.xml:
<?xml version="1.0" encoding="UTF-8"?>
<service id="message-service" class="flex.messaging.services.MessageService">
<adapters>
<adapter-definition id="actionscript" class="au.com.reignite.amf.DatabaseMessagingAdapter"
default="true" />
</adapters>
<default-channels>
<channel ref="my-polling-amf" />
</default-channels>
<destination id="projectLockService"/>
</service>
Everything is pretty normal except the DatabaseMessagingAdapter which is the thing that does the database magic.
There's a stack of stuff in the DatabaseMessagingAdapter that is a bit specific to my particular architecture so I'll just show the important bits:
DatabaseMessagingAdapter
public class DatabaseMessagingAdapter extends MessagingAdapter
Obviously I extend the MessagingAdapter.
@Override
public Object invoke(Message message) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream out;
try {
out = new ObjectOutputStream(bos);
out.writeObject(message);
getMessageOrchestrator().storeMessage(bos.toByteArray(),
message.getBody().toString());
} catch (IOException e) {
LogWriter.error(getClass(), "Failed to serialize message: " + e, e);
}
return null;
}
Here I override the invoke() method because it is what Blaze calls when it wants to store the message in its internal store. Instead you can see I serialize the message to a byte array and use my "MessageOrchestroator" to store it. This is just my class that saves the bytes to the database in the following table:
DROP TABLE IF EXISTS `truenorth`.`blaze_message`;
CREATE TABLE `truenorth`.`blaze_message` (
`ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
`MESSAGE_BYTES` blob NOT NULL,
`MESSAGE_BODY` text,
`CREATED_DATE` datetime NOT NULL,
`MODIFIED_DATE` datetime NOT NULL,
PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=54 DEFAULT CHARSET=latin1;
My messages are in XML so I store the message bytes and the message body as a string so I can inspect them with my greedy eyes.
Back to DatabaseMessagingAdapter:
@Override
public boolean handlesSubscriptions() {
return true;
}
Of course it handles subscriptions too! so I return true (the default is false).
@Override
public Object manage(CommandMessage commandMessage) {
if (commandMessage.getOperation() == CommandMessage.SUBSCRIBE_OPERATION) {
SubscriptionManager subscriptionManager = destination
.getSubscriptionManager();
String selectorExpr = (String) commandMessage
.getHeader(CommandMessage.SELECTOR_HEADER);
subscriptionManager.removeSubscriber(commandMessage.getClientId(),
selectorExpr, (String) commandMessage
.getHeader(AsyncMessage.SUBTOPIC_HEADER_NAME),
(String) commandMessage.getHeader(Message.ENDPOINT_HEADER));
getMessageOrchestrator().storeSubscription(
FlexContext.getFlexClient().getId(),
commandMessage.getClientId().toString(), selectorExpr);
} else if (commandMessage.getOperation() == CommandMessage.UNSUBSCRIBE_OPERATION) {
if (FlexContext.getFlexClient() != null) {
FlexContext.getFlexClient().getId());
}
}
return null;
}
This is the bit that manages subscriptions, as darkly foreshadowed by me above. If it is a subscribe operation I clear out Blazes storage of the subscription otherwise it annoys the heck out of me by denying people who shouldn't be denied. By I grab my trusty MessageOrchestrator and store my own subscription in a table like:
DROP TABLE IF EXISTS `truenorth`.`subscription`;
CREATE TABLE `truenorth`.`subscription` (
`ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
`CLIENT_ID` varchar(100) NOT NULL,
`MESSAGE_CLIENT_ID` varchar(100) NOT NULL,
`SELECTOR` varchar(255) DEFAULT NULL,
`EXPIRY_DATE` datetime NOT NULL,
`LAST_MESSAGE_ID` int(10) unsigned NOT NULL,
`CREATED_DATE` datetime NOT NULL,
`MODIFIED_DATE` datetime NOT NULL,
PRIMARY KEY (`ID`),
UNIQUE KEY `Index_CLIENT_ID_UNIQUE` (`CLIENT_ID`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
The important bit to note is I store the client_id which is the id of the instance of flash running on the client machine and the message_client_id which is the instance of the consumer object running in the flash client on the client machine. You can also see I keep the last message id sent and some datetime things.
Ok so that's how I store subscriptions and messages. The next piece of the puzzle is how I handle poll requests.
Poll requests
In my services-config.xml I have a special endpoint for polling: DatabaseConfigurableAMFEndpoint
public class DatabaseConfigurableAMFEndpoint extends ConfigurableAMFEndpoint {
I extend my own ConfigurableAMFEndpoint so I can use my super special filter chain to do my filthy security processing (see previous blogs)
@Override
protected FlushResult handleFlexClientPoll(FlexClient flexClient,
Anyway the good news is I have bonus solutions to tricky problems.
Clustered Blaze Messaging III
I previously talked about a couple of different ways to get around the problem of having Blaze behind a hardware firewall that did round robin load balancing.
Just a recap: the client subscription lives with the servlet context on the machine they first sent their subscription to. This means when their next poll request hits the load balancer they tend to get fired off to the other server and have no subscription. I tried to use JGroups and Tomcat session replication but both methods suck a fair bit and kept falling over.The answer to my woes has been database guaranteed subscription and message queuing.
In Brief
Client A sends subscribe request to server A. Server A creates an entry in the database (replicated using mysql replication) for that subscription. Client A then polls and hits server B which looks in the database for a subscription and finds it. Server B then checks the database message queue for any messages client A has not received and sends them or waits the prescribed time for a message to enter the queue.
So you see the messages get stored in a database table and the subscription keeps track of the last message id received. Join the dots and you've got a messaging system that relies on MySQL replication (or whatever DB you are using) which is very reliable. I say reliable because in the nearly 2 years this particular MySQL replication set up has been operating it only got out of sync once and that was due to an OS failure (at Christmas while I was on leave of course so I was called up and spent a lovely holiday reinitialising databases instead of drinking beer and stuffing my gob).
Now I will try to add the code:
In messaging-config.xml:
<?xml version="1.0" encoding="UTF-8"?>
<service id="message-service" class="flex.messaging.services.MessageService">
<adapters>
<adapter-definition id="actionscript" class="au.com.reignite.amf.DatabaseMessagingAdapter"
default="true" />
</adapters>
<default-channels>
<channel ref="my-polling-amf" />
</default-channels>
<destination id="projectLockService"/>
</service>
Everything is pretty normal except the DatabaseMessagingAdapter which is the thing that does the database magic.
There's a stack of stuff in the DatabaseMessagingAdapter that is a bit specific to my particular architecture so I'll just show the important bits:
DatabaseMessagingAdapter
public class DatabaseMessagingAdapter extends MessagingAdapter
Obviously I extend the MessagingAdapter.
@Override
public Object invoke(Message message) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream out;
try {
out = new ObjectOutputStream(bos);
out.writeObject(message);
getMessageOrchestrator().storeMessage(bos.toByteArray(),
message.getBody().toString());
} catch (IOException e) {
LogWriter.error(getClass(), "Failed to serialize message: " + e, e);
}
return null;
}
Here I override the invoke() method because it is what Blaze calls when it wants to store the message in its internal store. Instead you can see I serialize the message to a byte array and use my "MessageOrchestroator" to store it. This is just my class that saves the bytes to the database in the following table:
DROP TABLE IF EXISTS `truenorth`.`blaze_message`;
CREATE TABLE `truenorth`.`blaze_message` (
`ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
`MESSAGE_BYTES` blob NOT NULL,
`MESSAGE_BODY` text,
`CREATED_DATE` datetime NOT NULL,
`MODIFIED_DATE` datetime NOT NULL,
PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=54 DEFAULT CHARSET=latin1;
My messages are in XML so I store the message bytes and the message body as a string so I can inspect them with my greedy eyes.
Back to DatabaseMessagingAdapter:
@Override
public boolean handlesSubscriptions() {
return true;
}
Of course it handles subscriptions too! so I return true (the default is false).
@Override
public Object manage(CommandMessage commandMessage) {
if (commandMessage.getOperation() == CommandMessage.SUBSCRIBE_OPERATION) {
MessageDestination destination = (MessageDestination) getDestination();
SubscriptionManager subscriptionManager = destination
.getSubscriptionManager();
String selectorExpr = (String) commandMessage
.getHeader(CommandMessage.SELECTOR_HEADER);
subscriptionManager.removeSubscriber(commandMessage.getClientId(),
selectorExpr, (String) commandMessage
.getHeader(AsyncMessage.SUBTOPIC_HEADER_NAME),
(String) commandMessage.getHeader(Message.ENDPOINT_HEADER));
getMessageOrchestrator().storeSubscription(
FlexContext.getFlexClient().getId(),
commandMessage.getClientId().toString(), selectorExpr);
} else if (commandMessage.getOperation() == CommandMessage.UNSUBSCRIBE_OPERATION) {
if (FlexContext.getFlexClient() != null) {
getMessageOrchestrator().deleteSubscription(
FlexContext.getFlexClient().getId());
}
}
return null;
}
This is the bit that manages subscriptions, as darkly foreshadowed by me above. If it is a subscribe operation I clear out Blazes storage of the subscription otherwise it annoys the heck out of me by denying people who shouldn't be denied. By I grab my trusty MessageOrchestrator and store my own subscription in a table like:
DROP TABLE IF EXISTS `truenorth`.`subscription`;
CREATE TABLE `truenorth`.`subscription` (
`ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
`CLIENT_ID` varchar(100) NOT NULL,
`MESSAGE_CLIENT_ID` varchar(100) NOT NULL,
`SELECTOR` varchar(255) DEFAULT NULL,
`EXPIRY_DATE` datetime NOT NULL,
`LAST_MESSAGE_ID` int(10) unsigned NOT NULL,
`CREATED_DATE` datetime NOT NULL,
`MODIFIED_DATE` datetime NOT NULL,
PRIMARY KEY (`ID`),
UNIQUE KEY `Index_CLIENT_ID_UNIQUE` (`CLIENT_ID`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
The important bit to note is I store the client_id which is the id of the instance of flash running on the client machine and the message_client_id which is the instance of the consumer object running in the flash client on the client machine. You can also see I keep the last message id sent and some datetime things.
Ok so that's how I store subscriptions and messages. The next piece of the puzzle is how I handle poll requests.
Poll requests
In my services-config.xml I have a special endpoint for polling: DatabaseConfigurableAMFEndpoint
public class DatabaseConfigurableAMFEndpoint extends ConfigurableAMFEndpoint {
I extend my own ConfigurableAMFEndpoint so I can use my super special filter chain to do my filthy security processing (see previous blogs)
@Override
protected FlushResult handleFlexClientPoll(FlexClient flexClient,
CommandMessage pollCommand) {
I'll skip including all the code because it's long and boring so I'll summarise.
I load my subscription, which is the data stored in the above table not the Blaze subscription based on the flex client id (that's as opposed to the message client id). If I don't get a record I throw a subscription error. Assuming they have a subscription I start a "do" loop where I load up the messages stored in the above table that have an id greater than the subscription's last received message. Each of those messages that match the subscription selector (I use the JMSSelector object that comes with Blaze just like Blaze does) I rehydrate and add to my results to send. If there are any results I break the loop and immediately send them. If there were no messages I wait a little bit (5 seconds) and try again. I keep doing that until the configured poll timeout (50 seconds) at which point I release the thread and let the client poll again. I then update their subscription with the last message they received and increment their expiry date.
To finish it all off I have a scheduled task (I use quartz with spring) that every few minutes culls any expired subscriptions and deletes messages that all subscriptions have received.
Victory
This method of messaging has been working now for about 10 months without failure. The concerns I have about it that it is fairly heavy on hitting the DB so I made sure the DB is on the same machine as the app server and the two machines in the cluster are directly connected via Gigabit Ethernet on the same switch (actually 2 switches cross linked for redundancy). I've also got a fair query cache set up so I'm not always pulling the same messages out of the DB and I'm only really catering to a max of about 10 concurrent users. Having said all that I'd imagine I could have a bog load more concurrent users and still have no trouble. A big benefit is the messages are held by the DB not in memory by the tomcat instance which means tomcat can fall over and people still get their messages.
Next Time
I've also now come up with what I consider the ultimate file upload solution even more betterer than ftp and I've set my blog to email me when someone comments so I can actually respond.
Subscribe to:
Posts (Atom)