Netty Handler for Strings and Things
Michael McGrady
mmcgrady at topiatechnology.com
Tue Aug 18 14:11:57 EDT 2009
Also, Iain, here is the other half of the combo bombo.
public class KarmaSenderPipelineFactory implements
ChannelPipelineFactory {
private final NettyKarmaSenderAdapter adapter ;
public KarmaSenderPipelineFactory (NettyKarmaSenderAdapter adapter) {
this.adapter = adapter;
}
public ChannelPipeline getPipeline ( ) throws Exception {
ChannelPipeline pipeline = pipeline ( );
SSLEngine engine =
KarmaSslContextFactory.getClientContext ( ).createSSLEngine ( );
engine.setUseClientMode ( true );
pipeline.addLast ( "ssl", new SslHandler ( engine ) );
pipeline.addLast ( "universal", new KarmaEncoder (
adapter) );
return pipeline;
}
}
public class KarmaReceiverPipelineFactory implements
ChannelPipelineFactory {
private final NettyKarmaReceiverAdapter adapter ;
public KarmaReceiverPipelineFactory ( NettyKarmaReceiverAdapter
adapter ) {
this.adapter = adapter;
}
public ChannelPipeline getPipeline ( ) throws Exception {
ChannelPipeline pipeline = pipeline ( );
SSLEngine engine =
KarmaSslContextFactory.getServerContext ( ).createSSLEngine ( );
engine.setUseClientMode ( false );
pipeline.addLast ( "ssl", new SslHandler ( engine ) );
pipeline.addLast ( "universal", new KarmaDecoder (
adapter ) );
return pipeline;
}
}
On Aug 18, 2009, at 2:22 AM, Iain McGinniss wrote:
Mike McGrady
Principal Investigator AF081-028 AFRL SBIR
Senior Engineer
Topia Technology, Inc
1.253.720.3365
mmcgrady at topiatechnology.com
package com.topiatechnology.karma.spi.adapter.netty.handler;
import static org.jboss.netty.channel.Channels.fireMessageReceived;
import java.nio.charset.Charset;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.ChildChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.WriteCompletionEvent;
import org.jboss.netty.logging.InternalLogLevel;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import com.topiatechnology.karma.spi.KarmaSender;
import com.topiatechnology.karma.spi.util.ByteObjectConverter;
/**
*
*/
@ChannelPipelineCoverage ( "one" )
public class KarmaEncoder
implements ChannelUpstreamHandler, ChannelDownstreamHandler {
private final String charsetName;
private static final InternalLogger logger =
InternalLoggerFactory.getInstance ( KarmaEncoder.class.getName ( ) );
private SimpleChannelUpstreamHandler scuh = new
SimpleChannelUpstreamHandler() ;
private SimpleChannelDownstreamHandler scdh = new
SimpleChannelDownstreamHandler() ;
/**
* Creates a new instance with the current system character set.
*/
public KarmaEncoder ( KarmaSender sender ) {
charsetName = Charset.defaultCharset ( ).name ( );
}
/**
* {@inheritDoc} Down-casts the received downstream event into more
meaningful sub-type event
* and calls an appropriate handler method with the down-casted event.
*/
public void handleDownstream ( ChannelHandlerContext ctx,
ChannelEvent e )
throws Exception {
if ( e instanceof MessageEvent ) {
this.scdh.writeRequested ( ctx, ( MessageEvent ) e );
} else if ( e instanceof ChannelStateEvent ) {
ChannelStateEvent evt = ( ChannelStateEvent ) e;
switch ( evt.getState ( ) ) {
case OPEN :
if ( !Boolean.TRUE.equals ( evt.getValue ( ) ) ) {
this.scdh.closeRequested ( ctx, evt );
}
break;
case BOUND :
if ( evt.getValue ( ) != null ) {
this.scdh.bindRequested ( ctx, evt );
} else {
this.scdh.unbindRequested ( ctx, evt );
}
break;
case CONNECTED :
if ( evt.getValue ( ) != null ) {
this.scdh.connectRequested ( ctx, evt );
} else {
this.scdh.disconnectRequested ( ctx, evt );
}
break;
case INTEREST_OPS :
this.scdh.setInterestOpsRequested ( ctx, evt );
break;
default :
ctx.sendDownstream ( e );
}
} else {
ctx.sendDownstream ( e );
}
}
/**
* {@inheritDoc} Down-casts the received upstream event into more
meaningful sub-type event and
* calls an appropriate handler method with the down-casted event.
*/
public void handleUpstream ( ChannelHandlerContext ctx, ChannelEvent
e )
throws Exception {
if ( e instanceof MessageEvent ) {
this.messageReceivedUpstream ( ctx, ( MessageEvent ) e );
} else if ( e instanceof WriteCompletionEvent ) {
WriteCompletionEvent evt = ( WriteCompletionEvent ) e;
this.scuh.writeComplete ( ctx, evt );
} else if ( e instanceof ChildChannelStateEvent ) {
ChildChannelStateEvent evt = ( ChildChannelStateEvent ) e;
if ( evt.getChildChannel ( ).isOpen ( ) ) {
this.scuh.childChannelOpen ( ctx, evt );
} else {
this.scuh.childChannelClosed ( ctx, evt );
}
} else if ( e instanceof ChannelStateEvent ) {
ChannelStateEvent evt = ( ChannelStateEvent ) e;
switch ( evt.getState ( ) ) {
case OPEN :
if ( Boolean.TRUE.equals ( evt.getValue ( ) ) ) {
this.scuh.channelOpen ( ctx, evt );
} else {
this.scuh.channelClosed ( ctx, evt );
}
break;
case BOUND :
if ( evt.getValue ( ) != null ) {
this.scuh.channelBound ( ctx, evt );
} else {
this.scuh.channelUnbound ( ctx, evt );
}
break;
case CONNECTED :
if ( evt.getValue ( ) != null ) {
this.scuh.channelConnected ( ctx, evt );
} else {
this.scuh.channelDisconnected ( ctx, evt );
}
break;
case INTEREST_OPS :
this.scuh.channelInterestChanged ( ctx, evt );
break;
default :
ctx.sendDownstream ( e );
}
} else if ( e instanceof ExceptionEvent ) {
logger.log(InternalLogLevel.INFO, "CAUSE: " + ((ExceptionEvent)
e).getCause() + " MESSAGE: " + ((ExceptionEvent)
e).getCause().getMessage()) ;
this.scuh.exceptionCaught ( ctx, ( ExceptionEvent ) e );
} else {
ctx.sendUpstream ( e );
}
}
/**
* Invoked when a message object (e.g: {@link ChannelBuffer}) was
received from a remote peer.
*/
public void messageReceivedUpstream (
ChannelHandlerContext ctx, MessageEvent evt ) throws Exception {
Object originalMessage = evt.getMessage ( );
Object decodedMessage =
decode ( ctx, evt.getChannel ( ), originalMessage );
if ( originalMessage == decodedMessage ) {
ctx.sendUpstream ( evt );
} else {
fireMessageReceived ( ctx, decodedMessage, evt.getRemoteAddress
( ) );
}
}
private Object decode (
ChannelHandlerContext ctx, Channel channel, Object msg )
throws Exception {
if ( ! ( msg instanceof ChannelBuffer ) ) {
return msg;
}
// Returned message on send completion and completion of SSL
handshake.
ChannelBuffer buffer = ( ( ChannelBuffer ) msg ).copy ( );
int readableBytes = buffer.readableBytes ( );
byte [ ] bytes = new byte [ readableBytes ];
buffer.readBytes ( bytes );
Object object = ByteObjectConverter.toObject ( bytes );
logger.log ( InternalLogLevel.INFO, ( ( String ) object ) );
return ( ( ChannelBuffer ) msg ).toString ( charsetName );
}
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/netty-users/attachments/20090818/9af111f8/attachment-0001.html
More information about the netty-users
mailing list