Netty Handler for Strings and Things
Michael McGrady
mmcgrady at topiatechnology.com
Mon Aug 17 18:48:05 EDT 2009
Trustin,
Here is the central class to the functionality mentioned earlier.
Mike
Mike McGrady
Principal Investigator AF081-028 AFRL SBIR
Senior Engineer
Topia Technology, Inc
1.253.720.3365
mmcgrady at topiatechnology.com
import static org.jboss.netty.channel.Channels.fireMessageReceived;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.ChildChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.WriteCompletionEvent;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.logging.InternalLogLevel;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.MapBackedSet;
import com.topiatechnology.karma.api.mo.KarmaManager;
import com.topiatechnology.karma.spi.KarmaReceiver;
import com.topiatechnology.karma.spi.util.ByteObjectConverter;
import com.topiatechnology.karma.spi.util.KarmaChannelClose;
import com.topiatechnology.karma.spi.util.KarmaChunk;
import com.topiatechnology.karma.spi.util.KarmaMessage;
/**
*
*/
@ChannelPipelineCoverage ( "one" )
public class KarmaDecoder
implements ChannelUpstreamHandler, ChannelDownstreamHandler {
static final Set<Channel> channels =
new MapBackedSet<Channel>(new ConcurrentHashMap<Channel,
Boolean>());
private final KarmaReceiver receiver ;
private String charsetName = null;
private static final InternalLogger logger =
InternalLoggerFactory.getInstance ( KarmaDecoder.class
.getName ( ) );
/**
* Creates a new instance with the current system character set.
*/
public KarmaDecoder ( KarmaReceiver receiver ) {
this.receiver = receiver;
this.charsetName = Charset.defaultCharset ( ).name ( );
}
/**
* {@inheritDoc} Down-casts the received downstream event into more
meaningful sub-type event
* and calls an appropriate handler method with the down-casted event.
*/
public void handleDownstream ( ChannelHandlerContext ctx,
ChannelEvent e )
throws Exception {
if ( e instanceof MessageEvent ) {
writeRequestedDownstream ( ctx, ( MessageEvent ) e );
} else if ( e instanceof ChannelStateEvent ) {
ChannelStateEvent evt = ( ChannelStateEvent ) e;
switch ( evt.getState ( ) ) {
case OPEN :
if ( !Boolean.TRUE.equals ( evt.getValue ( ) ) ) {
closeRequestedDownstream ( ctx, evt );
}
break;
case BOUND :
if ( evt.getValue ( ) != null ) {
bindRequestedDownstream ( ctx, evt );
} else {
unbindRequestedDownstream ( ctx, evt );
}
break;
case CONNECTED :
if ( evt.getValue ( ) != null ) {
connectRequestedDownstream ( ctx, evt );
} else {
disconnectRequestedDownstream ( ctx, evt );
}
break;
case INTEREST_OPS :
setInterestOpsRequestedDownstream ( ctx, evt );
break;
default :
ctx.sendDownstream ( e );
}
} else {
ctx.sendDownstream ( e );
}
}
/**
* Invoked when {@link Channel#write(Object)} is called.
*/
public void writeRequestedDownstream (
ChannelHandlerContext ctx, MessageEvent e ) throws Exception {
ctx.sendDownstream ( e );
}
/**
* Invoked when {@link Channel#bind(SocketAddress)} was called.
*/
public void bindRequestedDownstream (
ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
ctx.sendDownstream ( e );
}
/**
* Invoked when {@link Channel#connect(SocketAddress)} was called.
*/
public void connectRequestedDownstream (
ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
ctx.sendDownstream ( e );
}
/**
* Invoked when {@link Channel#setInterestOps(int)} was called.
*/
public void setInterestOpsRequestedDownstream (
ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
ctx.sendDownstream ( e );
}
/**
* Invoked when {@link Channel#disconnect()} was called.
*/
public void disconnectRequestedDownstream (
ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
ctx.sendDownstream ( e );
}
/**
* Invoked when {@link Channel#unbind()} was called.
*/
public void unbindRequestedDownstream (
ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
ctx.sendDownstream ( e );
}
/**
* Invoked when {@link Channel#close()} was called.
*/
public void closeRequestedDownstream (
ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
ctx.sendDownstream ( e );
}
/**
* {@inheritDoc} Down-casts the received upstream event into more
meaningful sub-type event and
* calls an appropriate handler method with the down-casted event.
*/
public void handleUpstream ( ChannelHandlerContext ctx, ChannelEvent
e )
throws Exception {
if ( e instanceof MessageEvent ) {
messageReceivedUpstream ( ctx, ( MessageEvent ) e );
} else if ( e instanceof WriteCompletionEvent ) {
WriteCompletionEvent evt = ( WriteCompletionEvent ) e;
writeCompleteUpstream ( ctx, evt );
} else if ( e instanceof ChildChannelStateEvent ) {
ChildChannelStateEvent evt = ( ChildChannelStateEvent ) e;
if ( evt.getChildChannel ( ).isOpen ( ) ) {
childChannelOpenUpstream ( ctx, evt );
} else {
childChannelClosedUpstream ( ctx, evt );
}
} else if ( e instanceof ChannelStateEvent ) {
ChannelStateEvent evt = ( ChannelStateEvent ) e;
switch ( evt.getState ( ) ) {
case OPEN :
if ( Boolean.TRUE.equals ( evt.getValue ( ) ) ) {
channelOpenUpstream ( ctx, evt );
} else {
channelClosedUpstream ( ctx, evt );
}
break;
case BOUND :
if ( evt.getValue ( ) != null ) {
channelBoundUpstream ( ctx, evt );
} else {
channelUnboundUpstream ( ctx, evt );
}
break;
case CONNECTED :
if ( evt.getValue ( ) != null ) {
channelConnectedUpstream ( ctx, evt );
} else {
channelDisconnectedUpstream ( ctx, evt );
}
break;
case INTEREST_OPS :
channelInterestChangedUpstream ( ctx, evt );
break;
default :
ctx.sendDownstream ( e );
}
} else if ( e instanceof ExceptionEvent ) {
exceptionCaughtUpstream ( ctx, ( ExceptionEvent ) e );
} else {
ctx.sendUpstream ( e );
}
}
/**
* Invoked when a message object (e.g: {@link ChannelBuffer}) was
received from a remote peer.
*/
public void messageReceivedUpstream (
ChannelHandlerContext ctx, MessageEvent evt ) throws Exception {
Object originalMessage = evt.getMessage ( );
Object decodedMessage =
decode ( ctx, evt.getChannel ( ), originalMessage );
if (decodedMessage instanceof KarmaChannelClose) {
evt.getChannel().close();
} else {
if ( originalMessage == decodedMessage ) {
ctx.sendUpstream ( evt );
} else {
if ( ! ( ( evt ).getMessage ( ) instanceof ChannelBuffer ) ) {
fireMessageReceived ( ctx, decodedMessage, evt.getRemoteAddress
( ) );
}
}
}
}
/**
* Invoked when an exception was raised by an I/O thread or a {@link
ChannelHandler}.
*/
public void exceptionCaughtUpstream (
ChannelHandlerContext ctx, ExceptionEvent e ) throws Exception {
if ( this == ctx.getPipeline ( ).getLast ( ) ) {
logger.warn ( "EXCEPTION, please implement "
+ getClass ( ).getName ( )
+ ".exceptionCaught() for proper handling.", e.getCause ( ) );
}
ctx.sendUpstream ( e );
}
/**
* Invoked when a {@link Channel} is open, but not bound nor connected.
*/
public void channelOpenUpstream (
ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
ctx.sendUpstream ( e );
}
/**
* Invoked when a {@link Channel} is open and bound to a local
address, but not connected.
*/
public void channelBoundUpstream (
ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
ctx.sendUpstream ( e );
}
/**
* Invoked when a {@link Channel} is open, bound to a local address,
and connected to a remote
* address.
*/
public void channelConnectedUpstream (
ChannelHandlerContext ctx, ChannelStateEvent e) throws
Exception {
// Get the SslHandler in the current pipeline.
// We added it in SecureChatPipelineFactory.
final SslHandler sslHandler =
ctx.getPipeline().get(SslHandler.class);
// Get notified when SSL handshake is done.
ChannelFuture handshakeFuture =
sslHandler.handshake(e.getChannel());
handshakeFuture.addListener(new ConnectionNotice(sslHandler));
//ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
//ctx.sendUpstream ( e );
}
/**
* Invoked when a {@link Channel}'s {@link Channel#getInterestOps()
interestOps} was changed.
*/
public void channelInterestChangedUpstream (
ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
ctx.sendUpstream ( e );
}
/**
* Invoked when a {@link Channel} was disconnected from its remote
peer.
*/
public void channelDisconnectedUpstream (
ChannelHandlerContext ctx, ChannelStateEvent e) throws
Exception {
// Unregister the channel from the global channel list
// so the channel does not receive messages anymore.
channels.remove(e.getChannel());
//ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
//ctx.sendUpstream ( e );
}
/**
* Invoked when a {@link Channel} was unbound from the current local
address.
*/
public void channelUnboundUpstream (
ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
ctx.sendUpstream ( e );
}
/**
* Invoked when a {@link Channel} was closed and all its related
resources were released.
*/
public void channelClosedUpstream (
ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
ctx.sendUpstream ( e );
}
/**
* Invoked when something was written into a {@link Channel}.
*/
public void writeCompleteUpstream (
ChannelHandlerContext ctx, WriteCompletionEvent e ) throws Exception {
ctx.sendUpstream ( e );
}
/**
* Invoked when a child {@link Channel} was open. (e.g. a server
channel accepted a connection)
*/
public void childChannelOpenUpstream (
ChannelHandlerContext ctx, ChildChannelStateEvent e ) throws
Exception {
ctx.sendUpstream ( e );
}
/**
* Invoked when a child {@link Channel} was closed. (e.g. the
accepted connection was closed)
*/
public void childChannelClosedUpstream (
ChannelHandlerContext ctx, ChildChannelStateEvent e ) throws
Exception {
ctx.sendUpstream ( e );
}
private Object decode(ChannelHandlerContext ctx, Channel channel,
Object msg)
throws Exception {
Object result = ((ChannelBuffer) msg).toString(charsetName);
if (!(msg instanceof ChannelBuffer)) {
return msg;
}
ChannelBuffer buffer = ((ChannelBuffer) msg).copy();
int readableBytes = buffer.readableBytes();
byte[] bytes = new byte[readableBytes];
buffer.readBytes(bytes);
Object object = ByteObjectConverter.toObject(bytes);
logger.log(InternalLogLevel.INFO, "-------\nRECEIVING: " + object);
if (object instanceof String) {
this.receiver.update(((String) object));
} else if (object instanceof KarmaMessage) {
this.receiver.update(((KarmaMessage) object).message);
} else if (object instanceof KarmaChunk) {
this.receiver.update(((KarmaChunk) object));
} else if (object instanceof KarmaManager) {
this.receiver.update(((KarmaManager) object));
} else if (object instanceof KarmaChannelClose) {
result = object;
} else {
throw new RuntimeException("Unlisted message payload type.");
}
return result;
}
private static final class ConnectionNotice implements
ChannelFutureListener {
private final SslHandler sslHandler;
ConnectionNotice(SslHandler sslHandler) {
this.sslHandler = sslHandler;
}
public void operationComplete(ChannelFuture future) throws
Exception {
if (future.isSuccess()) {
logger.log(InternalLogLevel.INFO, "---------------
Seesion is secured") ;
// Once session is secured, send a greeting.
byte [] bytes1 = ByteObjectConverter.toBytes("Welcome to "
+ InetAddress.getLocalHost().getHostName() +
" secure chat service!\n") ;
ChannelBuffer buffer1 =
ChannelBuffers.wrappedBuffer(bytes1) ;
future.getChannel().write(buffer1);
byte [] bytes2 = ByteObjectConverter.toBytes("Your session
is protected by " +
sslHandler.getEngine().getSession().getCipherSuite() +
" cipher suite.\n") ;
ChannelBuffer buffer2 =
ChannelBuffers.wrappedBuffer(bytes2) ;
future.getChannel().write(buffer2);
// Register the channel to the global channel list
// so the channel received the messages from others.
channels.add(future.getChannel());
} else {
future.getChannel().close();
}
}
}
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/netty-users/attachments/20090817/5e40386a/attachment-0001.html
More information about the netty-users
mailing list