Netty Handler for Strings and Things

Michael McGrady mmcgrady at topiatechnology.com
Mon Aug 17 18:48:05 EDT 2009


Trustin,

Here is the central class to the functionality mentioned earlier.

Mike

Mike McGrady
Principal Investigator AF081-028 AFRL SBIR
Senior Engineer
Topia Technology, Inc
1.253.720.3365
mmcgrady at topiatechnology.com


import static org.jboss.netty.channel.Channels.fireMessageReceived;

import java.net.InetAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.ChildChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.WriteCompletionEvent;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.logging.InternalLogLevel;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.MapBackedSet;

import com.topiatechnology.karma.api.mo.KarmaManager;
import com.topiatechnology.karma.spi.KarmaReceiver;
import com.topiatechnology.karma.spi.util.ByteObjectConverter;
import com.topiatechnology.karma.spi.util.KarmaChannelClose;
import com.topiatechnology.karma.spi.util.KarmaChunk;
import com.topiatechnology.karma.spi.util.KarmaMessage;

/**
  *
  */
@ChannelPipelineCoverage ( "one" )
public class KarmaDecoder
	implements ChannelUpstreamHandler, ChannelDownstreamHandler {

     static final Set<Channel> channels =
         new MapBackedSet<Channel>(new ConcurrentHashMap<Channel,  
Boolean>());
	private final KarmaReceiver receiver ;
	private String charsetName = null;
	
	private static final InternalLogger logger =
		InternalLoggerFactory.getInstance ( KarmaDecoder.class
			.getName ( ) );
	
	/**
	 * Creates a new instance with the current system character set.
	 */
	public KarmaDecoder ( KarmaReceiver receiver ) {
		this.receiver = receiver;
		this.charsetName = Charset.defaultCharset ( ).name ( );
	}
	
	/**
	 * {@inheritDoc} Down-casts the received downstream event into more  
meaningful sub-type event
	 * and calls an appropriate handler method with the down-casted event.
	 */
	public void handleDownstream ( ChannelHandlerContext ctx,  
ChannelEvent e )
		throws Exception {
		if ( e instanceof MessageEvent ) {
			writeRequestedDownstream ( ctx, ( MessageEvent ) e );
		} else if ( e instanceof ChannelStateEvent ) {
			ChannelStateEvent evt = ( ChannelStateEvent ) e;
			switch ( evt.getState ( ) ) {
				case OPEN :
					if ( !Boolean.TRUE.equals ( evt.getValue ( ) ) ) {
						closeRequestedDownstream ( ctx, evt );
					}
					break;
				case BOUND :
					if ( evt.getValue ( ) != null ) {
						bindRequestedDownstream ( ctx, evt );
					} else {
						unbindRequestedDownstream ( ctx, evt );
					}
					break;
				case CONNECTED :
					if ( evt.getValue ( ) != null ) {
						connectRequestedDownstream ( ctx, evt );
					} else {
						disconnectRequestedDownstream ( ctx, evt );
					}
					break;
				case INTEREST_OPS :
					setInterestOpsRequestedDownstream ( ctx, evt );
					break;
				default :
					ctx.sendDownstream ( e );
			}
		} else {
			ctx.sendDownstream ( e );
		}
	}
	
	/**
	 * Invoked when {@link Channel#write(Object)} is called.
	 */
	public void writeRequestedDownstream (
		ChannelHandlerContext ctx, MessageEvent e ) throws Exception {
		ctx.sendDownstream ( e );
	}
	
	/**
	 * Invoked when {@link Channel#bind(SocketAddress)} was called.
	 */
	public void bindRequestedDownstream (
		ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
		ctx.sendDownstream ( e );
		
	}
	
	/**
	 * Invoked when {@link Channel#connect(SocketAddress)} was called.
	 */
	public void connectRequestedDownstream (
		ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
		ctx.sendDownstream ( e );
		
	}
	
	/**
	 * Invoked when {@link Channel#setInterestOps(int)} was called.
	 */
	public void setInterestOpsRequestedDownstream (
		ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
		ctx.sendDownstream ( e );
	}
	
	/**
	 * Invoked when {@link Channel#disconnect()} was called.
	 */
	public void disconnectRequestedDownstream (
		ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
		ctx.sendDownstream ( e );
		
	}
	
	/**
	 * Invoked when {@link Channel#unbind()} was called.
	 */
	public void unbindRequestedDownstream (
		ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
		ctx.sendDownstream ( e );
		
	}
	
	/**
	 * Invoked when {@link Channel#close()} was called.
	 */
	public void closeRequestedDownstream (
		ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
		ctx.sendDownstream ( e );
	}
	
	/**
	 * {@inheritDoc} Down-casts the received upstream event into more  
meaningful sub-type event and
	 * calls an appropriate handler method with the down-casted event.
	 */
	public void handleUpstream ( ChannelHandlerContext ctx, ChannelEvent  
e )
		throws Exception {
		if ( e instanceof MessageEvent ) {
			messageReceivedUpstream ( ctx, ( MessageEvent ) e );
		} else if ( e instanceof WriteCompletionEvent ) {
			WriteCompletionEvent evt = ( WriteCompletionEvent ) e;
			writeCompleteUpstream ( ctx, evt );
		} else if ( e instanceof ChildChannelStateEvent ) {
			ChildChannelStateEvent evt = ( ChildChannelStateEvent ) e;
			if ( evt.getChildChannel ( ).isOpen ( ) ) {
				childChannelOpenUpstream ( ctx, evt );
			} else {
				childChannelClosedUpstream ( ctx, evt );
			}
		} else if ( e instanceof ChannelStateEvent ) {
			ChannelStateEvent evt = ( ChannelStateEvent ) e;
			switch ( evt.getState ( ) ) {
				case OPEN :
					if ( Boolean.TRUE.equals ( evt.getValue ( ) ) ) {
						channelOpenUpstream ( ctx, evt );
					} else {
						channelClosedUpstream ( ctx, evt );
					}
					break;
				case BOUND :
					if ( evt.getValue ( ) != null ) {
						channelBoundUpstream ( ctx, evt );
					} else {
						channelUnboundUpstream ( ctx, evt );
					}
					break;
				case CONNECTED :
					if ( evt.getValue ( ) != null ) {
						channelConnectedUpstream ( ctx, evt );
					} else {
						channelDisconnectedUpstream ( ctx, evt );
					}
					break;
				case INTEREST_OPS :
					channelInterestChangedUpstream ( ctx, evt );
					break;
				default :
					ctx.sendDownstream ( e );
			}
		} else if ( e instanceof ExceptionEvent ) {
			exceptionCaughtUpstream ( ctx, ( ExceptionEvent ) e );
		} else {
			ctx.sendUpstream ( e );
		}
	}
	
	/**
	 * Invoked when a message object (e.g: {@link ChannelBuffer}) was  
received from a remote peer.
	 */
	public void messageReceivedUpstream (
		ChannelHandlerContext ctx, MessageEvent evt ) throws Exception {
		Object originalMessage = evt.getMessage ( );
		Object decodedMessage =
			decode ( ctx, evt.getChannel ( ), originalMessage );
		
		if (decodedMessage instanceof KarmaChannelClose) {
             evt.getChannel().close();
		} else {
			if ( originalMessage == decodedMessage ) {
				ctx.sendUpstream ( evt );
			} else {
				if ( ! ( ( evt ).getMessage ( ) instanceof ChannelBuffer ) ) {
					fireMessageReceived ( ctx, decodedMessage, evt.getRemoteAddress  
( ) );
				}
			}
		}
	}
	
	/**
	 * Invoked when an exception was raised by an I/O thread or a {@link  
ChannelHandler}.
	 */
	public void exceptionCaughtUpstream (
		ChannelHandlerContext ctx, ExceptionEvent e ) throws Exception {
		if ( this == ctx.getPipeline ( ).getLast ( ) ) {
			logger.warn ( "EXCEPTION, please implement "
				+ getClass ( ).getName ( )
				+ ".exceptionCaught() for proper handling.", e.getCause ( ) );
		}
		ctx.sendUpstream ( e );
	}
	
	/**
	 * Invoked when a {@link Channel} is open, but not bound nor connected.
	 */
	public void channelOpenUpstream (
		ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
		ctx.sendUpstream ( e );
	}
	
	/**
	 * Invoked when a {@link Channel} is open and bound to a local  
address, but not connected.
	 */
	public void channelBoundUpstream (
		ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
		ctx.sendUpstream ( e );
	}
	
	/**
	 * Invoked when a {@link Channel} is open, bound to a local address,  
and connected to a remote
	 * address.
	 */
	public void channelConnectedUpstream (
             ChannelHandlerContext ctx, ChannelStateEvent e) throws  
Exception {

         // Get the SslHandler in the current pipeline.
         // We added it in SecureChatPipelineFactory.
         final SslHandler sslHandler =  
ctx.getPipeline().get(SslHandler.class);

         // Get notified when SSL handshake is done.
         ChannelFuture handshakeFuture =  
sslHandler.handshake(e.getChannel());
         handshakeFuture.addListener(new ConnectionNotice(sslHandler));
		//ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
		//ctx.sendUpstream ( e );
	}
	
	/**
	 * Invoked when a {@link Channel}'s {@link Channel#getInterestOps()  
interestOps} was changed.
	 */
	public void channelInterestChangedUpstream (
		ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
		ctx.sendUpstream ( e );
	}
	
	/**
	 * Invoked when a {@link Channel} was disconnected from its remote  
peer.
	 */
	public void channelDisconnectedUpstream (
             ChannelHandlerContext ctx, ChannelStateEvent e) throws  
Exception {
         // Unregister the channel from the global channel list
         // so the channel does not receive messages anymore.
         channels.remove(e.getChannel());

		//ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
		//ctx.sendUpstream ( e );
	}
	
	/**
	 * Invoked when a {@link Channel} was unbound from the current local  
address.
	 */
	public void channelUnboundUpstream (
		ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
		ctx.sendUpstream ( e );
	}
	
	/**
	 * Invoked when a {@link Channel} was closed and all its related  
resources were released.
	 */
	public void channelClosedUpstream (
		ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
		ctx.sendUpstream ( e );
	}
	
	/**
	 * Invoked when something was written into a {@link Channel}.
	 */
	public void writeCompleteUpstream (
		ChannelHandlerContext ctx, WriteCompletionEvent e ) throws Exception {
		ctx.sendUpstream ( e );
	}
	
	/**
	 * Invoked when a child {@link Channel} was open. (e.g. a server  
channel accepted a connection)
	 */
	public void childChannelOpenUpstream (
		ChannelHandlerContext ctx, ChildChannelStateEvent e ) throws  
Exception {
		ctx.sendUpstream ( e );
	}
	
	/**
	 * Invoked when a child {@link Channel} was closed. (e.g. the  
accepted connection was closed)
	 */
	public void childChannelClosedUpstream (
		ChannelHandlerContext ctx, ChildChannelStateEvent e ) throws  
Exception {
		ctx.sendUpstream ( e );
	}

	private Object decode(ChannelHandlerContext ctx, Channel channel,  
Object msg)
			throws Exception {
		Object result = ((ChannelBuffer) msg).toString(charsetName);
		if (!(msg instanceof ChannelBuffer)) {
			return msg;
		}

		ChannelBuffer buffer = ((ChannelBuffer) msg).copy();
		int readableBytes = buffer.readableBytes();
		byte[] bytes = new byte[readableBytes];
		buffer.readBytes(bytes);
		Object object = ByteObjectConverter.toObject(bytes);
		logger.log(InternalLogLevel.INFO, "-------\nRECEIVING: " + object);

		if (object instanceof String) {
			this.receiver.update(((String) object));
		} else if (object instanceof KarmaMessage) {
			this.receiver.update(((KarmaMessage) object).message);
		} else if (object instanceof KarmaChunk) {
			this.receiver.update(((KarmaChunk) object));
		} else if (object instanceof KarmaManager) {
			this.receiver.update(((KarmaManager) object));
		} else if (object instanceof KarmaChannelClose) {
			result = object;
		} else {
			throw new RuntimeException("Unlisted message payload type.");
		}

		return result;
	}
	
     private static final class ConnectionNotice implements  
ChannelFutureListener {

         private final SslHandler sslHandler;

         ConnectionNotice(SslHandler sslHandler) {
             this.sslHandler = sslHandler;
         }

         public void operationComplete(ChannelFuture future) throws  
Exception {
             if (future.isSuccess()) {
             	logger.log(InternalLogLevel.INFO, "--------------- 
Seesion is secured") ;
                 // Once session is secured, send a greeting.
         		byte [] bytes1 = ByteObjectConverter.toBytes("Welcome to "  
+ InetAddress.getLocalHost().getHostName() +
                         " secure chat service!\n") ;
         		ChannelBuffer buffer1 =  
ChannelBuffers.wrappedBuffer(bytes1) ;
                 future.getChannel().write(buffer1);
         		byte [] bytes2 = ByteObjectConverter.toBytes("Your session  
is protected by " +
                          
sslHandler.getEngine().getSession().getCipherSuite() +
                         " cipher suite.\n") ;
         		ChannelBuffer buffer2 =  
ChannelBuffers.wrappedBuffer(bytes2) ;
                 future.getChannel().write(buffer2);

                 // Register the channel to the global channel list
                 // so the channel received the messages from others.
                 channels.add(future.getChannel());
             } else {
                 future.getChannel().close();
             }
         }
     }
}




-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/netty-users/attachments/20090817/5e40386a/attachment-0001.html 


More information about the netty-users mailing list