One of the things that we've come to realize while developing Errai is that when you
work with a framework such as Errai that push messaging is basically free, easy and
awesome -- you end up wanting to do things like stream live data across the wire.
A lot of our demos have involved creating threads and pushing data across completely
asynchronously. And it's always messy code. You need to worry about managing those
threads, making sure they die when the session dies, or when the subject is unsubscribed,
etc. Having to worry about all this creates security problems, resource management
issues, and it makes your code messy.
Well, worry no longer! The latest commit into trunk introduces a new comprehensive (and
simple) way of creating asynchronously running tasks -- as part of the the standard
MessageBuilder API.
Take our new TimeDisplay demo, where we stream a bunch of updates from the server to the
client containing System.currentTimeMillis() results. Up until now, the demo consisted of
a thread that looped around and around and built new messages to send. Through the
addition of a new API extensions, this demo is greatly simplified.
The first addition is some helper classes that help you create managed contexts to store
stuff in the session. One is called SessionContext, and the other is called LocalContext.
SessionContext allows you to create session scoped attributes, and LocalContext lets you
create locally-scoped, as in, page-scoped. So if a user has multiple browser windows
open, or multiple tabs, each window or tab is it's own LocalContext. This is a pretty
powerful little tool.
The second addition is the implementation of a provided message parts. Unlike regular
message parts, these parts are resolved via providers at the time of transmission. This
is a key aspect of what we're about to show below.
The third addition is the implementation of a repeating and delayed message transmission
calls as part of the standard messaging API.
Let's take a look at the example:
AsyncTask task = MessageBuilder.createConversation(message)
.toSubject("TimeChannel").signalling()
.withProvided(TimeServerParts.TimeString, new ResourceProvider<String>() {
public String get() {
return String.valueOf(System.currentTimeMillis());
}
}).noErrorHandling().replyRepeating(TimeUnit.MILLISECONDS, 100);
In this example, we create a conversational message which replies not just once, but
replies continuously. Once every 100 milliseconds as it would turn out. The
replyRepeating and replyDelayed, sendRepeating and sendDelayed methods all return an
instance of AsyncTask, which is a handle on the task being performed. You can use this
object to cancel the task.
task.cancel(true);
But let's look at the total example now:
@Service("TimeServer")
@RequireAuthentication
public class TimeDisplay implements MessageCallback {
private MessageBus bus;
@Inject
public TimeDisplay(MessageBus bus) {
this.bus = bus;
}
public void callback(final Message message) {
if (message.getCommandType() == null) return;
/**
* Create a local context to store state that is unique to this client instance.
(not session wide).
*/
final LocalContext context = LocalContext.get(message);
/**
* Switch on the TimeServerCommand type provided
*/
switch (TimeServerCommands.valueOf(message.getCommandType())) {
case Start:
/**
* We want to start streaming.
*/
AsyncTask task = MessageBuilder.createConversation(message)
.toSubject("TimeChannel").signalling()
.withProvided(TimeServerParts.TimeString, new
ResourceProvider<String>() {
public String get() {
return String.valueOf(System.currentTimeMillis());
}
}).noErrorHandling().replyRepeating(TimeUnit.MILLISECONDS, 100);
/**
* Store the task as an attribute uniquely identified by it's class
type.
*/
context.setAttribute(AsyncTask.class, task);
/**
* Create a listener that will kill the task gracefully if the subject is
unsubscribed. This
* isn't 100% necessary, as the task will be auto-killed ungracefully.
But this provides
* and opportunity to clean up after ourselves.
*/
bus.addUnsubscribeListener(new UnsubscribeListener() {
public void onUnsubscribe(SubscriptionEvent event) {
if ("TimeChannel".equals(event.getSubject())) {
/**
* Delete this listener after this execution.
*/
event.setDisposeListener(true);
/**
* Stop the task from running.
*/
context.getAttribute(AsyncTask.class).cancel(true);
/**
* Destroy the local context. Sort of unnecessary, but helps
reduce memory usage.
*/
context.destroy();
}
}
});
break;
case Stop:
/**
* Access our stored AsyncTask from this instance and cancel it.
*/
context.getAttribute(AsyncTask.class).cancel(true);
/**
* Destroy the local context. Sort of unnecessary, but helps reduce
memory usage.
*/
context.destroy();
break;
}
}
}
That's all there is to it. It's pretty sweet. This API works on both the client
and the server side. All the thread scheduling is all transparently managed by an
executor service on the server, and by a simple Timer based implementation in the client.
I'm hoping people will find this a welcome addition to Errai-land.
All the best,
Mike.
Show replies by date