Netty Handler for Strings and Things

Michael McGrady mmcgrady at topiatechnology.com
Tue Aug 18 13:40:35 EDT 2009


Jain,

Yes.  Most of this code was just copy and pasted.  With Joshua Bloch,  
I prefer composition over inheritance, because I don't like the little  
surprises exposing implementation sometimes bring, so I have followed  
your suggestion, as I had planned to do, with a twist as shown below.   
This is all just temporary and please excuse the lack of attribution,  
etc. as due solely to the unfinished state of the code.

Mike

On Aug 18, 2009, at 2:22 AM, Iain McGinniss wrote:

> Hi Mike,
>
> No doubt you've stripped a lot of code out of what was attached  
> below to make it easier to see what was going on, but just in case  
> you hadn't: your code can be much more concisely expressed by  
> extending SimpleChannelUpstreamHandler, as you weren't manipulating  
> any of the downstream events and you only handle connected,  
> disconnected and message reception on upstream.
>
> <KarmaDecoder.java>
>
>
> I've stripped it down to what I see as the minimum without changing  
> any of the real code. Apologies if this was obvious to you and you  
> were splitting out all of the events manually for reasons that were  
> stripped out of your original code.
>
> Iain


package com.topiatechnology.karma.spi.adapter.netty.handler;

import static org.jboss.netty.channel.Channels.fireMessageReceived;

import java.net.InetAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.ChildChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.WriteCompletionEvent;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.logging.InternalLogLevel;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.MapBackedSet;

import com.topiatechnology.karma.api.mo.KarmaManager;
import com.topiatechnology.karma.spi.KarmaReceiver;
import com.topiatechnology.karma.spi.util.ByteObjectConverter;
import com.topiatechnology.karma.spi.util.KarmaChannelClose;
import com.topiatechnology.karma.spi.util.KarmaChunk;
import com.topiatechnology.karma.spi.util.KarmaMessage;

/**
  *
  */
@ChannelPipelineCoverage("one")
public class KarmaDecoder implements ChannelUpstreamHandler,
		ChannelDownstreamHandler {
	
	static final Set<Channel> channels = new MapBackedSet<Channel>(
			new ConcurrentHashMap<Channel, Boolean>());
	private final KarmaReceiver receiver;
	private String charsetName = null;
	private SimpleChannelUpstreamHandler scuh = new  
SimpleChannelUpstreamHandler() ;
	private SimpleChannelDownstreamHandler scdh = new  
SimpleChannelDownstreamHandler() ;

	private static final InternalLogger logger = InternalLoggerFactory
			.getInstance(KarmaDecoder.class.getName());

	/**
	 * Creates a new instance with the current system character set.
	 */
	public KarmaDecoder(KarmaReceiver receiver) {
		this.receiver = receiver;
		this.charsetName = Charset.defaultCharset().name();
	}

	/**
	 * {@inheritDoc} Down-casts the received downstream event into more
	 * meaningful sub-type event and calls an appropriate handler method  
with
	 * the down-casted event.
	 */
	public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
			throws Exception {
		if (e instanceof MessageEvent) {
			this.scdh.writeRequested(ctx, (MessageEvent) e);
		} else if (e instanceof ChannelStateEvent) {
			ChannelStateEvent evt = (ChannelStateEvent) e;
			switch (evt.getState()) {
			case OPEN:
				if (!Boolean.TRUE.equals(evt.getValue())) {
					this.scdh.closeRequested(ctx, evt);
				}
				break;
			case BOUND:
				if (evt.getValue() != null) {
					this.scdh.bindRequested(ctx, evt);
				} else {
					this.scdh.unbindRequested(ctx, evt);
				}
				break;
			case CONNECTED:
				if (evt.getValue() != null) {
					this.scdh.connectRequested(ctx, evt);
				} else {
					this.scdh.disconnectRequested(ctx, evt);
				}
				break;
			case INTEREST_OPS:
				this.scdh.setInterestOpsRequested(ctx, evt);
				break;
			default:
				ctx.sendDownstream(e);
			}
		} else {
			ctx.sendDownstream(e);
		}
	}

	/**
	 * {@inheritDoc} Down-casts the received upstream event into more  
meaningful
	 * sub-type event and calls an appropriate handler method with the
	 * down-casted event.
	 */
	public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
			throws Exception {
		if (e instanceof MessageEvent) {
			this.messageReceivedUpstream(ctx, (MessageEvent) e);
		} else if (e instanceof WriteCompletionEvent) {
			WriteCompletionEvent evt = (WriteCompletionEvent) e;
			this.scuh.writeComplete(ctx, evt);
		} else if (e instanceof ChildChannelStateEvent) {
			ChildChannelStateEvent evt = (ChildChannelStateEvent) e;
			if (evt.getChildChannel().isOpen()) {
				this.scuh.childChannelOpen(ctx, evt);
			} else {
				this.scuh.childChannelClosed(ctx, evt);
			}
		} else if (e instanceof ChannelStateEvent) {
			ChannelStateEvent evt = (ChannelStateEvent) e;
			switch (evt.getState()) {
			case OPEN:
				if (Boolean.TRUE.equals(evt.getValue())) {
					this.scuh.channelOpen(ctx, evt);
				} else {
					this.scuh.channelClosed(ctx, evt);
				}
				break;
			case BOUND:
				if (evt.getValue() != null) {
					this.scuh.channelBound(ctx, evt);
				} else {
					this.scuh.channelUnbound(ctx, evt);
				}
				break;
			case CONNECTED:
				if (evt.getValue() != null) {
					this.channelConnectedUpstream(ctx, evt);
				} else {
					this.channelDisconnectedUpstream(ctx, evt);
				}
				break;
			case INTEREST_OPS:
				this.scuh.channelInterestChanged(ctx, evt);
				break;
			default:
				ctx.sendDownstream(e);
			}
		} else if (e instanceof ExceptionEvent) {
			this.scuh.exceptionCaught(ctx, (ExceptionEvent) e);
		} else {
			ctx.sendUpstream(e);
		}
	}

	/**
	 * Invoked when a message object (e.g: {@link ChannelBuffer}) was  
received
	 * from a remote peer.
	 */
	public void messageReceivedUpstream(ChannelHandlerContext ctx,
			MessageEvent evt) throws Exception {
		Object originalMessage = evt.getMessage();
		Object decodedMessage = decode(ctx, evt.getChannel(),  
originalMessage);

		if (decodedMessage instanceof KarmaChannelClose) {
			evt.getChannel().close();
		} else {
			if (originalMessage == decodedMessage) {
				ctx.sendUpstream(evt);
			} else {
				if (!((evt).getMessage() instanceof ChannelBuffer)) {
					fireMessageReceived(ctx, decodedMessage, evt
							.getRemoteAddress());
				}
			}
		}
	}

	/**
	 * Invoked when a {@link Channel} is open, bound to a local address,  
and
	 * connected to a remote address.
	 */
	public void channelConnectedUpstream(ChannelHandlerContext ctx,
			ChannelStateEvent e) throws Exception {

		// Get the SslHandler in the current pipeline.
		// We added it in SecureChatPipelineFactory.
		final SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);

		// Get notified when SSL handshake is done.
		ChannelFuture handshakeFuture = sslHandler.handshake(e.getChannel());
		handshakeFuture.addListener(new ConnectionNotice(sslHandler));
	}

	/**
	 * Invoked when a {@link Channel} was disconnected from its remote  
peer.
	 */
	public void channelDisconnectedUpstream(ChannelHandlerContext ctx,
			ChannelStateEvent e) throws Exception {
		// Unregister the channel from the global channel list
		// so the channel does not receive messages anymore.
		channels.remove(e.getChannel());

		// ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
		// ctx.sendUpstream ( e );
	}

	private Object decode(ChannelHandlerContext ctx, Channel channel,  
Object msg)
			throws Exception {
		Object result = ((ChannelBuffer) msg).toString(charsetName);
		if (!(msg instanceof ChannelBuffer)) {
			return msg;
		}

		ChannelBuffer buffer = ((ChannelBuffer) msg).copy();
		int readableBytes = buffer.readableBytes();
		byte[] bytes = new byte[readableBytes];
		buffer.readBytes(bytes);
		Object object = ByteObjectConverter.toObject(bytes);
		System.out.println("-------\nRECEIVING: " + object);

		if (object instanceof String) {
			this.receiver.update(((String) object));
		} else if (object instanceof KarmaMessage) {
			this.receiver.update(((KarmaMessage) object).message);
		} else if (object instanceof KarmaChunk) {
			this.receiver.update(((KarmaChunk) object));
		} else if (object instanceof KarmaManager) {
			this.receiver.update(((KarmaManager) object));
		} else if (object instanceof KarmaChannelClose) {
			result = object;
		} else {
			throw new RuntimeException("Unlisted message payload type.");
		}

		return result;
	}

	private static final class ConnectionNotice implements
			ChannelFutureListener {

		private final SslHandler sslHandler;

		ConnectionNotice(SslHandler sslHandler) {
			this.sslHandler = sslHandler;
		}

		public void operationComplete(ChannelFuture future) throws Exception {
			if (future.isSuccess()) {
				logger.log(InternalLogLevel.INFO,
						"---------------Seesion is secured");
				// Once session is secured, send a greeting.
				byte[] bytes1 = ByteObjectConverter.toBytes("Welcome to "
						+ InetAddress.getLocalHost().getHostName()
						+ " secure Karma service!\n");
				ChannelBuffer buffer1 = ChannelBuffers.wrappedBuffer(bytes1);
				future.getChannel().write(buffer1);
				byte[] bytes2 = ByteObjectConverter
						.toBytes("Your session is protected by "
								+ sslHandler.getEngine().getSession()
										.getCipherSuite() + " cipher suite.\n");
				ChannelBuffer buffer2 = ChannelBuffers.wrappedBuffer(bytes2);
				future.getChannel().write(buffer2);

				// Register the channel to the global channel list
				// so the channel received the messages from others.
				channels.add(future.getChannel());
			} else {
				future.getChannel().close();
			}
		}
	}
}

-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/netty-users/attachments/20090818/9f5d27e7/attachment-0001.html 


More information about the netty-users mailing list