Sunday, October 10, 2010

Pushing Live Streams From Flash to iPhone

iPhone == anathema
So I've had to do some work with the iPhone regarding apps and that means looking at objective-C.  First let me suck yuck.  I started playing with OC code the other night when 1995 called and asked if it could please have its programming language back.  Anyway, as a Java programmer and a Flex programmer I have to say OC is nasty for gui dev and for basic app dev work.  Maybe it is really good for larger applications?  I don't know, but for little mobile apps it is cumbersome, ugly and overly fiddly.  That aside this post is actually about integrating an iPhone app into an existing Flex multi-media app, so I'll leave the OC coding to Mac Fanboys (as I'm a Java fanboy) and move on.


Flash Video Encoding
The existing app I have is a social networking site that allows you to stream live audio and video to an audience.  No, this is not a porn web cam, it is for musicians to broadcast their jam sessions or min-gigs to their fans.
So the performer logs on, clicks "yes allow flash to use my camera" and they start streaming to the server.  A fan logs on and clicks on the "watch my fave act" and they get the performer's stream playing for their pleasure.  The iPhone extension allows a person to watch the same performance on their iPhone rather than on their Flash enabled device (as in everything else in the entire universe, including TVs and refrigerators).
But, there is a problem:  iPhone only reads H264 encoded video with aac / mp3 audio.  Flash will also read that, but at present only creates Sorenson Spark / Speex (in my case) encoded video / audio.  That means the performer is producing a stream that is unintelligible to the iPhone.  There is, to my knowledge, no way to load additional codecs onto the iPhone and I don't think Apple will let you write an app that does.  So how the heck do you let an iPhone participate in these new fangled interwebs that Steve Jobs has heard so much about (and yet doesn't like)?  Oh and yes I am being contentious on purpose because I'm like that.

Transcoding
The magic word we are looking for is transcoder.  We take the flash video in one end and spit out h264.  This is easier said than done.  There is only one transcoder in the whole wide world, I know you don't believe me but it's true, and it is called FFMpeg.  Just try to find another application / library able to convert videos via a command line or with an API, go on I dare you.
Somewhere in my back end I need to use FFMpeg to create the iPhone stream otherwise I can just kiss my rich-media app goodbye.

Xuggler
Hooray for the Java open source community!  There is a lovely Java JNI wrapper for FFMpeg called Xuggler.  It even has some guts in it that let you read a stream from an rtmp source and publish back to an rtmp location.  As it happens my back end uses Wowza media server, also Java, so it is fairly easy to call Xuggler from a Wowza application.  What isn't so easy is finding any halfway decent documentation for Wowza. Also, Wowza isn't open source so it is a bit of a mystery as to what happens inside it.  I think there might be elves involved.  Anyway here is the basic glue I used (which I stole from a forum post on the Wowza forums, so thank whoever that was):

public class TheApp extends ModuleBase {
   ... stuff ...


   public void publish(IClient client, RequestFunction function, AMFDataList params){
      IMediaStream stream = getStream(client, function);
      stream.addClientListener(new StreamWatcher());
      ModuleCore.publish(client, function, params);
   }
   ... other stuff ...
}

Ok so the above code adds a listener to the stream so that when the stream is actually published I get notified and a method in the StreamWatcher class is called (that's my class BTW)

public class StreamWatcher implements IMediaStreamActionNotify2 {
   
   public void onPublish(IMediaStream stream, String streamName, boolean isRecord, boolean isAppend) {
      H264Transcoder tc = new H264Transcoder(stream.getName());
      tc.start();
   }
}

Above you can see that when the stream is publish I create a new transcoder (which is a thread) and start it.

public class H264Transcoder extends Thread {

   public void run() {
try {
Thread.sleep(5000L);
RTMPPublishH264();
} catch (ParseException e) {
System.out.println(e);
} catch (InterruptedException e) {
e.printStackTrace();
}
}


   public void testRTMPPublishH264() throws ParseException {
      Converter converter = new Converter();
      Options options = converter.defineOptions();
      String args = ""
         /* ffmpeg -i rtmp://www.site.com.au:1935/ppv/streamName
* -re 
* -acodec libfaac 
* -ar 22050 
* -vcodec libx264 
* -fpre default.ffpreset 
* -f flv rtmp://www.site.com.au:1935/live2/streamName-iphone
         */
      CommandLine cmdLine = converter.parseOptions(options, args);
      converter.run(cmdLine);
   }
}

Wow, so the above isn't complete, but I've included the really important bits.  The reason it isn't complete is because I haven't perfected it yet.  Maybe I never will.  Oh, who am I kidding, of course I will, but not just now.

Important Points
The important points for this:
Xuggler can't connect to specific application instances, only to the _definst_ instance.  That means the rtmp url above has an application called "ppv" in which a stream called "streamName" is published.  Normally I'd have a named app instance that I'd publish into so I could track shared objects and the like, but that didn't work with Xuggler.  Also the output stream I've called streamName-iphone, is in a different application, again the _definst_ instance.  The reason for that is my ppv application has a bunch of rich user interaction stuff like chat and it gets info from the server about the performer and the people involved need to have valid accounts and so on.  I've only included the actual ffmpeg command line code that I have personally used and it works.  The preset I used is the default one that comes with Xuggler for h264 so I'll probably have to tweak that to get the best performance.

I may add an update post once I get all the kinks worked out.  For example I'd love to package this up as a simple jar + config arrangement, but I do have to work for a living so I may not get the time.

Thursday, August 26, 2010

A Date with Flex

Date Objects
So in computing we always need to deal with dates.  All my tables for example have created date and modified date columns so each record is timestamped.  Then there are dates for reports, dates for events and schedules and calendar dates for people to deal with.  But why is it there is always so much trouble dealing with dates?
A Date is Unique
At its heart the Date object is an unique instant in time.  It is universal and absolute.
What does that mean?
It means that, say in Java when I say Date myDate = new Date();  I am creating an object that stores the moment of its own creation.  The date doesn't matter what timezone I'm in, it doesn't matter what language I speak or anything like that.  Deep inside the Date object, and this holds true regardless of programming language or database, there is a long integer that is the milliseconds since an epoch.  The usual epoch is 1/1/1970 which is often called Unix time, but it could also be UTC which was in 1972.  It isn't all that important when the epoch was.  The important point is there is no timezone offset.  Timezone and locale are human decorators added to a Date to make it relative to the human's location.  Regardless of where the computer is or the person programming it the number of milliseconds since epoch is the same.  If it is 10am for me in Australia and I call my friend in Greenwich (who won't be pleased) they would tell me it was 2am.  It is actually the same Date, the same time, the same instant in existence and it is that which the Date object captures.  We then apply formatting to that Date to produce something relative to where we are.
So What?
This is important because it means that if I need to create a date for my friend in Greenwich, for example I need to set an alarm for them to wake up at 8am things get twisted.  In Australia, if I just create a Date and set the hour to 8am and save it the alarm will go off at midnight!  So I need to see a Greenwich clock and select 8am on that.  In Java I can do something like this:
Calendar c = GregorianCalendar.getInstance(zone);
Where zone is Greenwich timezone.  Then I can set the hour on the calendar to 8am and get the Date from the calendar and bingo it's done.
Flex 
Flex has a Date object but no Calendar.  So when I create a Date object in flex it automagically get the client computer's timezone/locale settings.  Inside the Date it is still milliseconds since epoch and you can get that by using the various UTC related methods to get the universal time.  The date picker and date formatter all by default work of locale dates / times.  That means you need to write your own code to display dates in other timezones.  In the above example where I need to set an alarm for my friend in Greenwich I would need to make sure that I was viewing the Date in the correct timezone.
How?
There are a number of ways you can do it but I think it boils down to two ways which are both correct depending on your exact situation.

  1. Enter and send the date / time as a string.  It is pretty easy for me to simply type 8am and send that as a string to be stored and seen by my friend.  And if all I wanted to do was tell him when I'd set his alarm that would be fine.  However what if the server and/or client needed to operate on that time?  For example I actually needed to tell the computer what time to play the alarm and also how long the snooze lasts and so on?  I'd need to start parsing the string and I'd have to tell the computer what timezone the string was in.  If I was setting lots of alarms for people all over the world things would get very messy very quickly. So...
  2. On my client I have a timezone picker that selects what timezone to display dates to me in.  When I want to set the 8am alarm for my friend in Greenwich I tell my client I want to choose dates in Greenwich time, pick the date and send it.  Behind the scenes I will have selected the date that locally would look something like 4pm to me, but would equate to 8am to my friend.  Either way it is the same number of milliseconds since epoch.
OMG WTF?
Yeah it can get confusing.  Because all the date stuff in Flex hides the timezone you need to do some actual work if you want to operate in timezones other than what is set on your computer.  To do the modification might look like this:
var selectedDate : Date = new Date(2010,7,27,8); // 8am 27th Aug 2010 (month is 0 based index)
But that date is in my timezone which is no good.  But what if I do this:
selectedDate.minutes -= selectedDate.timezoneOffset;
selectedDate.minutes += myFriendsTimezoneOffset;
In the first line I subtract my timezone offset.  The timezone offset is the number of minutes difference between my timezone and UTC (or +0 hours).  So if I subtract the timezone offset to the selected date's minutes I will get a Date that is the time I selected (8am) as it would be in UTC, that is 8am UTC which, since I'm in Australia at +8hrs means selectedDate would be 4pm my time.
In the second line I add my friend's timezone offset.  This will make the Date 8am in his timezone (which just happens to be +0 since he is at Greenwich).  The important thing to note is that the selectedDate object now has a UTC time (milliseconds since epoch) that is equivalent to 8am Greenwich and 4pm Australia time.  It can be confusing but stick with it.
Over The Wire
When you serialize the date and send it to the server you are actually sending a UTC time.  There is no timezone sent with the Date.  If you remember a Date is an instance in time that is unique and absolute, a timezone / locale is something we decorate that Date with so it is relative to us humans.  Computers need a single representation of Date so they can compare them and sort them and perform calculations on them.  If you have chosen a Date and it is important that other people know what timezone you were in when you chose that Date you need to communicate this separately.
I hope this has helped someone.

Wednesday, June 9, 2010

Flex File Upload to End All Uploads

Back to the Future
I previously said I had implemented an FTP based solution for file upload.  Well I did and it works pretty well but it's a bit fiddly and tricky to get working so I kept looking for solutions.  I found one.  Hooray!  Flex 4 and FileReference in Flash 10 with Blaze provide the solution.
Big Irritation
A big irritation of mine with http uploads using multi part form stuff is that it is an all or nothing proposal.  Once you fire up the connection and start uploading you have to wait for the whole thing to go up.  For small files this isn't such a problem but if you upload a file that takes a minute or two you often don't know if it is working (from a client perspective) until you decide that 3 hours is too long for a file to take.  There are heaps of issues like socket disconnected by peer, socket read timeout and so forth.  To get things working you need to check special upload timeouts and browser timeouts and virus checkers / indexers on the server and other mysterious Mac OS X related muffery.  To use file upload over http in a true enterprise application you need some guarantees that the upload will happen and that data isn't lost.  You need a reliable system so you can tell your paying customers that their data will get to the server and if it doesn't you know what happened and you can fix it and keep going.  Anything less is amateur hour mickey mouse stuff.
Flash 10 FileReference
Flash player 10 allows you to call fileReference.load() to load a selected file.  This puts all the file's bytes into the data property.  You can then send that as a byte[] through AMF via Blaze to the server where you can save the file.  The problem with this simplistic approach is you don't get any progress events and if it is a big file you still end up with all the problems of http upload.  The solution is really simple; break the byte array into manageable chunks and send them up one at a time waiting for a response between each one.  This way you get a progress event and the system doesn't just freeze up until it's finished.  If you are clever you can also enable pause / resume.
I'm Clever
Yeah well so my mother says anyway.  But I wrote a nice little flex library and java library which go together to provide file upload services.  The flex part takes a byteArray and file name and queries the java service to see what progress the file is up to.  The java service replies with a progress event which either says 0 bytes uploaded, some bytes uploaded or complete.  You can use that to initialise your progress bar in flex.  Then the flex library moves to the correct position in the byteArray and sends a chunk up to java which is written to file and a response sent with progress info.  This continues until the file is complete at which point the flex library dispatches a complete event and you can do whatever you like with the file.  The java service has some methods that let you get an input stream to the uploaded file and also delete the file once your done with it.  If the user cancels midway through the upload (either on purpose or due to error) they can re-upload the file and it will resume from where you left off.  Errors are also communicated back to the flex client so you can auto retry or do whatever.  Hopefully I'll attach this stuff to my blog.  If you like it or find bugs (you probably will) then let me know.
LINKS:
applicationContext-upload.xml
FlexFileUpload.swc
reignite-upload.jar
fileupload.xml
HOW?
add the jar file to your lib folder
include the swc in your flex project
fileupload.xml goes in your web root or docroot whatever you call it
applicationContext-upload.xml is a spring context file so put it where you load them from
If you don't use spring you should, but if you don't, you can just create a facade that adapts the upload service and creates an instance and whatever.  Hey you're at least as clever as me so either use spring or take 5 minutes to figure out a way around it (I did for another project that used ejb)
in your remoting-config.xml

<destination id="fileUpload">
<properties>
<factory>spring</factory>
<source>uploadService</source>
</properties>
</destination>
You'll see I'm using the spring factory, if you don't have it do a google search for spring factory for blaze there's also an ejb factory if you are that way inclined.
The fileupload.xml has 2 things in it.  The first is the name of the destination you just configured in the remoting-config.xml and the other is the endpoint context path for your channel of choice.
Now to use it in your code:
  1. Create an instance of the au.com.reignite.upload.FileUploadManager and remember to declare it in a place that will have scope for the duration of your upload.  Same as FileReference below.  You need to pass in some start up arguments when creating FileUploadManager: url to fileupload.xml, securityToken which you wont use so hand in null and repositoryId which is a unique identifier for this user's uploads.  I use user id for this so the same user can resume their upload.  If you don't use a different id for each user then you'll get stuffed up files if more than one user uploads the same file at the same time.
  2. create a FileReference and get the  user to select a file.  Once selected call load().  Once complete you can do the rest.
  3. Once the file reference has done its load and you have the data do this:

uploadManager.addEventListener(FileUploadErrorEvent.UPLOAD_ERROR, handleUploadError);
uploadManager.addEventListener(ProgressEvent.PROGRESS, fileUploadProgressHandler);
uploadManager.addEventListener(FileUploadCompleteEvent.COMPLETE, completeUpload);
These are the events that get thrown so you had better handle them.  Pretty self explanatory really.
  4.  call uploadManager.upload(fileRef.data, fileRef.name); which starts it all off.  You'll get at least one progress event and a maximum of one complete event.
IF a user cancels or there is an error and you want to start over make sure you call uploadManager.cancel() otherwise you'll break it.  So that means if they close the popup your doing the upload in also make sure you call cancel otherwise the upload continues.
Maybe if I get bored I'll post an example app.

Tuesday, June 1, 2010

I'm back with egg on my face and all

So I figured I was posting off into oblivion happily talking to myself but it turns out some people read my blog and commented and I didn't think to check back.  Oops.
Anyway the good news is I have bonus solutions to tricky problems.
Clustered Blaze Messaging III
I previously talked about a couple of different ways to get around the problem of having Blaze behind a hardware firewall that did round robin load balancing.
Just a recap: the client subscription lives with the servlet context on the machine they first sent their subscription to.  This means when their next poll request hits the load balancer they tend to get fired off to the other server and have no subscription.  I tried to use JGroups and Tomcat session replication but both methods suck a fair bit and kept falling over.
The answer to my woes has been database guaranteed subscription and message queuing.
In Brief
Client A sends subscribe request to server A.  Server A creates an entry in the database (replicated using mysql replication) for that subscription.  Client A then polls and hits server B which looks in the database for a subscription and finds it.  Server B then checks the database message queue for any messages client A has not received and sends them or waits the prescribed time for a message to enter the queue.
So you see the messages get stored in a database table and the subscription keeps track of the last message id received.  Join the dots and you've got a messaging system that relies on MySQL replication (or whatever DB you are using) which is very reliable.  I say reliable because in the nearly 2 years this particular MySQL replication set up has been operating it only got out of sync once and that was due to an OS failure (at Christmas while I was on leave of course so I was called up and spent a lovely holiday reinitialising databases instead of drinking beer and stuffing my gob).
Now I will try to add the code:
In messaging-config.xml:

<?xml version="1.0" encoding="UTF-8"?>
<service id="message-service" class="flex.messaging.services.MessageService">
<adapters>
<adapter-definition id="actionscript" class="au.com.reignite.amf.DatabaseMessagingAdapter"
default="true" />
</adapters>
<default-channels>
<channel ref="my-polling-amf" />
</default-channels>
<destination id="projectLockService"/>
</service>
Everything is pretty normal except the DatabaseMessagingAdapter which is the thing that does the database magic.
There's a stack of stuff in the DatabaseMessagingAdapter that is a bit specific to my particular architecture so I'll just show the important bits:
DatabaseMessagingAdapter

public class DatabaseMessagingAdapter extends MessagingAdapter
Obviously I extend the MessagingAdapter.

@Override
public Object invoke(Message message) {

ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream out;
try {
out = new ObjectOutputStream(bos);
out.writeObject(message);
getMessageOrchestrator().storeMessage(bos.toByteArray(),
message.getBody().toString());
} catch (IOException e) {
LogWriter.error(getClass(), "Failed to serialize message: " + e, e);
}

return null;
}
Here I override the invoke() method because it is what Blaze calls when it wants to store the message in its internal store.  Instead you can see I serialize the message to a byte array and use my "MessageOrchestroator" to store it.  This is just my class that saves the bytes to the database in the following table:

DROP TABLE IF EXISTS `truenorth`.`blaze_message`;
CREATE TABLE  `truenorth`.`blaze_message` (
  `ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `MESSAGE_BYTES` blob NOT NULL,
  `MESSAGE_BODY` text,
  `CREATED_DATE` datetime NOT NULL,
  `MODIFIED_DATE` datetime NOT NULL,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=54 DEFAULT CHARSET=latin1;
My messages are in XML so I store the message bytes and the message body as a string so I can inspect them with my greedy eyes.
Back to DatabaseMessagingAdapter:

@Override
public boolean handlesSubscriptions() {
return true;
}
Of course it handles subscriptions too!  so I return true (the default is false).

@Override
public Object manage(CommandMessage commandMessage) {

 if (commandMessage.getOperation() == CommandMessage.SUBSCRIBE_OPERATION) {

MessageDestination destination = (MessageDestination) getDestination();



SubscriptionManager subscriptionManager = destination
.getSubscriptionManager();
String selectorExpr = (String) commandMessage
.getHeader(CommandMessage.SELECTOR_HEADER);
subscriptionManager.removeSubscriber(commandMessage.getClientId(),
selectorExpr, (String) commandMessage
.getHeader(AsyncMessage.SUBTOPIC_HEADER_NAME),
(String) commandMessage.getHeader(Message.ENDPOINT_HEADER));
getMessageOrchestrator().storeSubscription(
FlexContext.getFlexClient().getId(),
commandMessage.getClientId().toString(), selectorExpr);
} else if (commandMessage.getOperation() == CommandMessage.UNSUBSCRIBE_OPERATION) {
if (FlexContext.getFlexClient() != null) {

getMessageOrchestrator().deleteSubscription(



FlexContext.getFlexClient().getId());
}
}
return null;
}
This is the bit that manages subscriptions, as darkly foreshadowed by me above.  If it is a subscribe operation I clear out Blazes storage of the subscription otherwise it annoys the heck out of me by denying people who shouldn't be denied.  By I grab my trusty MessageOrchestrator and store my own subscription in a table like:

DROP TABLE IF EXISTS `truenorth`.`subscription`;
CREATE TABLE  `truenorth`.`subscription` (
  `ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `CLIENT_ID` varchar(100) NOT NULL,
  `MESSAGE_CLIENT_ID` varchar(100) NOT NULL,
  `SELECTOR` varchar(255) DEFAULT NULL,
  `EXPIRY_DATE` datetime NOT NULL,
  `LAST_MESSAGE_ID` int(10) unsigned NOT NULL,
  `CREATED_DATE` datetime NOT NULL,
  `MODIFIED_DATE` datetime NOT NULL,
  PRIMARY KEY (`ID`),
  UNIQUE KEY `Index_CLIENT_ID_UNIQUE` (`CLIENT_ID`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
The important bit to note is I store the client_id which is the id of the instance of flash running on the client machine and the message_client_id which is the instance of the consumer object running in the flash client on the client machine.  You can also see I keep the last message id sent and some datetime things.
Ok so that's how I store subscriptions and messages.  The next piece of the puzzle is how I handle poll requests.
Poll requests
In my services-config.xml I have a special endpoint for polling: DatabaseConfigurableAMFEndpoint
public class DatabaseConfigurableAMFEndpoint extends ConfigurableAMFEndpoint {
I extend my own ConfigurableAMFEndpoint so I can use my super special filter chain to do my filthy security processing (see previous blogs)

@Override
protected FlushResult handleFlexClientPoll(FlexClient flexClient,
CommandMessage pollCommand) {


I'll skip including all the code because it's long and boring so I'll summarise. 
I load my subscription, which is the data stored in the above table not the Blaze subscription based on the flex client id (that's as opposed to the message client id).  If I don't get a record I throw a subscription error.  Assuming they have a subscription I start a "do" loop where I load up the messages stored in the above table that have an id greater than the subscription's last received message.  Each of those messages that match the subscription selector (I use the JMSSelector object that comes with Blaze just like Blaze does) I rehydrate and add to my results to send.  If there are any results I break the loop and immediately send them.  If there were no messages I wait a little bit (5 seconds) and try again.  I keep doing that until the configured poll timeout (50 seconds) at which point I release the thread and let the client poll again.  I then update their subscription with the last message they received and increment their expiry date.




To finish it all off I have a scheduled task (I use quartz with spring) that every few minutes culls any expired subscriptions and deletes messages that all subscriptions have received.


Victory


This method of messaging has been working now for about 10 months without failure.  The concerns I have about it that it is fairly heavy on hitting the DB so I made sure the DB is on the same machine as the app server and the two machines in the cluster are directly connected via Gigabit Ethernet on the same switch (actually 2 switches cross linked for redundancy).  I've also got a fair query cache set up so I'm not always pulling the same messages out of the DB and I'm only really catering to a max of about 10 concurrent users.  Having said all that I'd imagine I could have a bog load more concurrent users and still have no trouble.  A big benefit is the messages are held by the DB not in memory by the tomcat instance which means tomcat can fall over and people still get their messages.


Next Time


I've also now come up with what I consider the ultimate file upload solution even more betterer than ftp and I've set my blog to email me when someone comments so I can actually respond.