Netty Handler for Strings and Things

Michael McGrady mmcgrady at topiatechnology.com
Tue Aug 18 14:11:57 EDT 2009


Also, Iain, here is the other half of the combo bombo.


public class KarmaSenderPipelineFactory implements  
ChannelPipelineFactory {
	private final NettyKarmaSenderAdapter adapter ;
	
	public KarmaSenderPipelineFactory (NettyKarmaSenderAdapter adapter) {
		this.adapter = adapter;
	}
	
	public ChannelPipeline getPipeline ( ) throws Exception {
		ChannelPipeline pipeline = pipeline ( );
		SSLEngine engine =
			KarmaSslContextFactory.getClientContext ( ).createSSLEngine ( );
		engine.setUseClientMode ( true );
		pipeline.addLast ( "ssl", new SslHandler ( engine ) );
		pipeline.addLast ( "universal", new KarmaEncoder (
			adapter) );
		
		return pipeline;
	}
}


public class KarmaReceiverPipelineFactory implements  
ChannelPipelineFactory {
	private final NettyKarmaReceiverAdapter adapter ;
	
	public KarmaReceiverPipelineFactory ( NettyKarmaReceiverAdapter  
adapter ) {
		this.adapter = adapter;
	}
	
	public ChannelPipeline getPipeline ( ) throws Exception {
		ChannelPipeline pipeline = pipeline ( );
		SSLEngine engine =
			KarmaSslContextFactory.getServerContext ( ).createSSLEngine ( );
		engine.setUseClientMode ( false );
		pipeline.addLast ( "ssl", new SslHandler ( engine ) );
		pipeline.addLast ( "universal", new KarmaDecoder (
			adapter ) );
		
		return pipeline;
	}
}



On Aug 18, 2009, at 2:22 AM, Iain McGinniss wrote:
Mike McGrady
Principal Investigator AF081-028 AFRL SBIR
Senior Engineer
Topia Technology, Inc
1.253.720.3365
mmcgrady at topiatechnology.com


package com.topiatechnology.karma.spi.adapter.netty.handler;

import static org.jboss.netty.channel.Channels.fireMessageReceived;

import java.nio.charset.Charset;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.ChildChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.WriteCompletionEvent;
import org.jboss.netty.logging.InternalLogLevel;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;

import com.topiatechnology.karma.spi.KarmaSender;
import com.topiatechnology.karma.spi.util.ByteObjectConverter;

/**
  *
  */
@ChannelPipelineCoverage ( "one" )
public class KarmaEncoder
	implements ChannelUpstreamHandler, ChannelDownstreamHandler {
	
	private final String charsetName;
	
	private static final InternalLogger logger =
		InternalLoggerFactory.getInstance ( KarmaEncoder.class.getName ( ) );
	private SimpleChannelUpstreamHandler scuh = new  
SimpleChannelUpstreamHandler() ;
	private SimpleChannelDownstreamHandler scdh = new  
SimpleChannelDownstreamHandler() ;
	
	/**
	 * Creates a new instance with the current system character set.
	 */
	public KarmaEncoder ( KarmaSender sender ) {
		charsetName = Charset.defaultCharset ( ).name ( );
	}
	
	/**
	 * {@inheritDoc} Down-casts the received downstream event into more  
meaningful sub-type event
	 * and calls an appropriate handler method with the down-casted event.
	 */
	public void handleDownstream ( ChannelHandlerContext ctx,  
ChannelEvent e )
		throws Exception {
		if ( e instanceof MessageEvent ) {
			this.scdh.writeRequested ( ctx, ( MessageEvent ) e );
		} else if ( e instanceof ChannelStateEvent ) {
			ChannelStateEvent evt = ( ChannelStateEvent ) e;
			switch ( evt.getState ( ) ) {
				case OPEN :
					if ( !Boolean.TRUE.equals ( evt.getValue ( ) ) ) {
						this.scdh.closeRequested ( ctx, evt );
					}
					break;
				case BOUND :
					if ( evt.getValue ( ) != null ) {
						this.scdh.bindRequested ( ctx, evt );
					} else {
						this.scdh.unbindRequested ( ctx, evt );
					}
					break;
				case CONNECTED :
					if ( evt.getValue ( ) != null ) {
						this.scdh.connectRequested ( ctx, evt );
					} else {
						this.scdh.disconnectRequested ( ctx, evt );
					}
					break;
				case INTEREST_OPS :
					this.scdh.setInterestOpsRequested ( ctx, evt );
					break;
				default :
					ctx.sendDownstream ( e );
			}
		} else {
			ctx.sendDownstream ( e );
		}
	}
	
	/**
	 * {@inheritDoc} Down-casts the received upstream event into more  
meaningful sub-type event and
	 * calls an appropriate handler method with the down-casted event.
	 */
	public void handleUpstream ( ChannelHandlerContext ctx, ChannelEvent  
e )
		throws Exception {
		if ( e instanceof MessageEvent ) {
			this.messageReceivedUpstream ( ctx, ( MessageEvent ) e );
		} else if ( e instanceof WriteCompletionEvent ) {
			WriteCompletionEvent evt = ( WriteCompletionEvent ) e;
			this.scuh.writeComplete ( ctx, evt );
		} else if ( e instanceof ChildChannelStateEvent ) {
			ChildChannelStateEvent evt = ( ChildChannelStateEvent ) e;
			if ( evt.getChildChannel ( ).isOpen ( ) ) {
				this.scuh.childChannelOpen ( ctx, evt );
			} else {
				this.scuh.childChannelClosed ( ctx, evt );
			}
		} else if ( e instanceof ChannelStateEvent ) {
			ChannelStateEvent evt = ( ChannelStateEvent ) e;
			switch ( evt.getState ( ) ) {
				case OPEN :
					if ( Boolean.TRUE.equals ( evt.getValue ( ) ) ) {
						this.scuh.channelOpen ( ctx, evt );
					} else {
						this.scuh.channelClosed ( ctx, evt );
					}
					break;
				case BOUND :
					if ( evt.getValue ( ) != null ) {
						this.scuh.channelBound ( ctx, evt );
					} else {
						this.scuh.channelUnbound ( ctx, evt );
					}
					break;
				case CONNECTED :
					if ( evt.getValue ( ) != null ) {
						this.scuh.channelConnected ( ctx, evt );
					} else {
						this.scuh.channelDisconnected ( ctx, evt );
					}
					break;
				case INTEREST_OPS :
					this.scuh.channelInterestChanged ( ctx, evt );
					break;
				default :
					ctx.sendDownstream ( e );
			}
		} else if ( e instanceof ExceptionEvent ) {
			logger.log(InternalLogLevel.INFO, "CAUSE: " + ((ExceptionEvent)  
e).getCause() + " MESSAGE: " + ((ExceptionEvent)  
e).getCause().getMessage()) ;
			this.scuh.exceptionCaught ( ctx, ( ExceptionEvent ) e );
		} else {
			ctx.sendUpstream ( e );
		}
	}
	
	/**
	 * Invoked when a message object (e.g: {@link ChannelBuffer}) was  
received from a remote peer.
	 */
	public void messageReceivedUpstream (
		ChannelHandlerContext ctx, MessageEvent evt ) throws Exception {
		Object originalMessage = evt.getMessage ( );
		Object decodedMessage =
			decode ( ctx, evt.getChannel ( ), originalMessage );

		if ( originalMessage == decodedMessage ) {
			ctx.sendUpstream ( evt );
		} else {
			fireMessageReceived ( ctx, decodedMessage, evt.getRemoteAddress  
( ) );
		}
	}
	
	private Object decode (
		ChannelHandlerContext ctx, Channel channel, Object msg )
		throws Exception {
		if ( ! ( msg instanceof ChannelBuffer ) ) {
			return msg;
		}
		// Returned message on send completion and completion of SSL  
handshake.
		ChannelBuffer buffer = ( ( ChannelBuffer ) msg ).copy ( );
		int readableBytes = buffer.readableBytes ( );
		byte [ ] bytes = new byte [ readableBytes ];
		buffer.readBytes ( bytes );
		Object object = ByteObjectConverter.toObject ( bytes );
		logger.log ( InternalLogLevel.INFO, ( ( String ) object ) );
		return ( ( ChannelBuffer ) msg ).toString ( charsetName );
	}
}





-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/netty-users/attachments/20090818/9af111f8/attachment-0001.html 


More information about the netty-users mailing list