ChannelPipeline question

Michael McGrady mmcgrady at topiatechnology.com
Tue Sep 1 15:17:50 EDT 2009


Fredrick,

At the bottom of this email is a handler that is for "all" but is  
stateful.  The key is to use instrumented data and monitor controls  
related to the sending of data.  In this case, The "sender" has to be  
sure to use monitors in the ChannelFutures to make sure that (a) a  
channel is returned before writing and (b) to make sure that a  
"manager" is sent prior to sending file chunks.  Further, the file  
chunks and managers as well as FileChannels are stored in maps  
relating to the particular data IDs, e.g. Java UUIDs.

Any of your thoughts on this would be greatly appreciated.


Mike


On Sep 1, 2009, at 9:03 AM, Frederic Bregier wrote:

>
> Hi Mike,
>
> First, "all" is right now only an annotation, so no error will come  
> if you
> write "one" or "all" in your handler. It is only an help for a  
> reader who is
> not the writer ;-)
>
> Now, if your handler is a stateful but you do take into account  
> concurrency
> (for instance which channel is using currently the handler by the id  
> of the
> channel), it should be ok.
> Take care it is not only about concurrent access, it greatly depends  
> on what
> the handler has to do and what kind of "persistent" information for  
> each
> channel it has to keep in memory.
> For instance, let say we try to write the StringDecoder using an "all"
> method, this would implied to have as many current channelBuffer as  
> channels
> in one handler. This could be somewhat complicated, also doable. It is
> obviously simpler (in term of development) for this case to use one  
> "one"
> version with a new created handler for each channel.
>
> If the handler has only small info to store for each channel, then
> implementing such a "all" handler but using a stateful logic can  
> have a
> sens. For instance, if some information have to be shared among all  
> channels
> in the handler (stateful but also considering several channels for
> instance), then you can try an "all" version, also you can also try  
> an "one"
> version using some shared data object.
>
> My opinion (only mine ;-) is that a stateful handler most of the  
> time should
> be a "one" version, but as usual, many exceptions can exist! I don't  
> have in
> mind one real example, but I'm sure there are a lot.
> If you can share a description of your example, it could help  
> perhaps some
> users (like me) to go further?
>
> HTH,
> Cheers,
> Frederic
>
>
> Mike McGrady wrote:
>>
>> Just a more detailed thought, Fredric, on this matter.  I think you
>> can use "all" with stateful handlers so long as the state is handled
>> consistent with concurrent access.  I do this with my first Netty
>> application, if I am understanding you correctly.  What do you think?
>>
>> Mike
>>
>>
>> On Sep 1, 2009, at 3:32 AM, Frederic Bregier wrote:
>>
>>>
>>> Hi Mike,
>>>
>>> If I made some mistake, please correct me of course ;-)
>>>
>>> "all" means that several channels can shared this handler  
>>> (stateless).
>>> "one" means that this handler is for one and only one channel
>>> (stateful).
>>>
>>> bootstrap.getPipeline() => if you add handlers there, they should be
>>> "all"
>>> since the pipeline will be shared by all channels that will connect
>>> or be
>>> connected through this bootstrap.
>>>
>>> In a ChannelPipelineFactory, you can mixed "all" and "one" handlers.
>>> For
>>> instance, if you have a handler that can be shared among all
>>> channels, then
>>> even included from the factory, it could be "all". On the opposite,
>>> usualy
>>> you have at least 1 handler that is "one" in such a construction
>>> (generally
>>> the codecd that is a stateful one).
>>> The pipeline will be new for each new channel, but the handlers you
>>> put in
>>> it can be shared (statically created in the Factory and reused so
>>> "all") or
>>> unique by channel (created in the getPipeline() method so "one").
>>> If all handlers are "all", then I think it is better to use the
>>> bootstrap
>>> version since there will be only one Pipeline shared among all
>>> channels with
>>> the same handlers. But if you have at least one "one" handler
>>> (stateful),
>>> then it is preferable to use the ChannelPipelineFactory.
>>>
>>> Or you can added it at runtime (in channelConnected for instance)
>>> manually.
>>> Again, the added handler can be "all" (static and reused) or
>>> "one" (new one
>>> created).
>>> I feel like, even if it is correct, it somehow less easy to read
>>> such a code
>>> since the we don't know where such handlers are added or not. But in
>>> some
>>> occasion, you don't have the choice (for instance, a new handler is
>>> needed
>>> once the channel is connected for some specific behaviour).
>>>
>>> HTH,
>>>
>>> Cheers,
>>> Frederic
>>>
>>>
>>> MikeQ wrote:
>>>>
>>>> Hi all,
>>>>
>>>> Quick q about @ChannelPipelineCoverage and setup of handlers.
>>>>
>>>> As far as I can tell when setting up a new pipeline you have a
>>>> couple of
>>>> options.
>>>>
>>>> - Adding handlers using bootstrap.getPipeline()
>>>>   - I believe in this instance all the handlers are singletons (one
>>>> shared by all channels) and must therefore be
>>>> @ChannelPipelineCoverage("all")
>>>>
>>>> - Adding handlers using a ChannelPipelineFactory
>>>>   - I believe the ChannelPipelineFactory.getPipeline() is called  
>>>> for
>>>> each new channel.  The handlers can therefore be one/all  
>>>> depending on
>>>> whether the getPipeline() implementation creates a new instance
>>>> each time
>>>> (one) or uses a shared instance (all)
>>>>
>>>> - Adding handlers during execution of another handler
>>>>   - Similar to ChannelPipelineFactory in that a new instance or
>>>> shared
>>>> instance can be used
>>>>
>>>> If someone could confirm/correct the above that would be useful for
>>>> my own
>>>> understanding.
>>>>
>>>> Cheers.
>>>>
>>>
>>>
>>> -----
>>> Hardware/Software Architect
>>> -- 
>>> View this message in context:
>>> http://n2.nabble.com/ChannelPipeline-question-tp3559391p3559650.html
>>> Sent from the Netty User Group mailing list archive at Nabble.com.
>>> _______________________________________________
>>> netty-users mailing list
>>> netty-users at lists.jboss.org
>>> https://lists.jboss.org/mailman/listinfo/netty-users
>>

Mike McGrady
Principal Investigator AF081-028 AFRL SBIR
Senior Engineer
Topia Technology, Inc
1.253.720.3365
mmcgrady at topiatechnology.com

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.UUID;

import javax.net.ssl.SSLException;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.ChildChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.WriteCompletionEvent;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.logging.InternalLogLevel;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;

import com.topiatechnology.karma.api.KarmaManager;
import com.topiatechnology.karma.spi.KarmaReceiver;
import com.topiatechnology.karma.spi.util.KarmaUtility;

/**
  * <p>
  * A composition rather than inheritance use of JBoss Netty simple  
channel
  * upstream and downstream handlers with a KARMA decoder.
  * </p>
  *
  * @author Mike McGrady
  * @version Karma Alpha v001
  * @since August 2009.
  */
@ChannelPipelineCoverage("all")
public final class KarmaDecoder
		implements
			ChannelUpstreamHandler,
			ChannelDownstreamHandler {

	// static final Set<Channel> channels = new MapBackedSet<Channel>(
	// new ConcurrentHashMap<Channel, Boolean>());
	private final KarmaReceiver receiver;
	private final SimpleChannelUpstreamHandler scuh = new  
SimpleChannelUpstreamHandler();
	private final SimpleChannelDownstreamHandler scdh = new  
SimpleChannelDownstreamHandler();
	private final Map<UUID, FileChannel> fileWriters = new HashMap<UUID,  
FileChannel>();
	private final Map<UUID, KarmaManager> managers = new HashMap<UUID,  
KarmaManager>();
	private final Map<UUID, Integer> fileCount = new HashMap<UUID,  
Integer>();

	private static final InternalLogger logger = InternalLoggerFactory
			.getInstance(KarmaDecoder.class.getName());

	/**
	 * Creates a new instance with the current system character set.
	 */
	public KarmaDecoder(KarmaReceiver receiver) {
		this.receiver = receiver;
	}

	/**
	 * {@inheritDoc} Down-casts the received downstream event into more
	 * meaningful sub-type event and calls an appropriate handler method  
with
	 * the down-casted event.
	 */
	public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent  
e) {
		if (e instanceof MessageEvent) {
			try {
				this.scdh.writeRequested(ctx, (MessageEvent) e);
			} catch (Exception e1) {
				logger.log(InternalLogLevel.ERROR, e1.getMessage());
				e1.printStackTrace();
			}
		} else if (e instanceof ChannelStateEvent) {
			ChannelStateEvent evt = (ChannelStateEvent) e;
			switch (evt.getState()) {
				case OPEN :
					if (!Boolean.TRUE.equals(evt.getValue())) {
						try {
							this.scdh.closeRequested(ctx, evt);
						} catch (Exception e1) {
							logger.log(InternalLogLevel.ERROR, e1.getMessage());
							e1.printStackTrace();
						}
					}
					break;
				case BOUND :
					if (evt.getValue() != null) {
						try {
							this.scdh.bindRequested(ctx, evt);
						} catch (Exception e1) {
							logger.log(InternalLogLevel.ERROR, e1.getMessage());
							e1.printStackTrace();
						}
					} else {
						try {
							this.scdh.unbindRequested(ctx, evt);
						} catch (Exception e1) {
							logger.log(InternalLogLevel.ERROR, e1.getMessage());
							e1.printStackTrace();
						}
					}
					break;
				case CONNECTED :
					if (evt.getValue() != null) {
						try {
							this.scdh.connectRequested(ctx, evt);
						} catch (Exception e1) {
							logger.log(InternalLogLevel.ERROR, e1.getMessage());
							e1.printStackTrace();
						}
					} else {
						try {
							this.scdh.disconnectRequested(ctx, evt);
						} catch (Exception e1) {
							logger.log(InternalLogLevel.ERROR, e1.getMessage());
							e1.printStackTrace();
						}
					}
					break;
				case INTEREST_OPS :
					try {
						this.scdh.setInterestOpsRequested(ctx, evt);
					} catch (Exception e1) {
						logger.log(InternalLogLevel.ERROR, e1.getMessage());
						e1.printStackTrace();
					}
					break;
				default :
					ctx.sendDownstream(e);
			}
		} else {
			ctx.sendDownstream(e);
		}
	}

	/**
	 * {@inheritDoc} Down-casts the received upstream event into more  
meaningful
	 * sub-type event and calls an appropriate handler method with the
	 * down-casted event.
	 */
	public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) {
		if (e instanceof MessageEvent) {
			this.messageReceivedUpstream(ctx, (MessageEvent) e);
		} else if (e instanceof WriteCompletionEvent) {
			WriteCompletionEvent evt = (WriteCompletionEvent) e;
			try {
				this.scuh.writeComplete(ctx, evt);
			} catch (Exception e1) {
				logger.log(InternalLogLevel.ERROR, e1.getMessage());
				e1.printStackTrace();
			}
		} else if (e instanceof ChildChannelStateEvent) {
			ChildChannelStateEvent evt = (ChildChannelStateEvent) e;
			if (evt.getChildChannel().isOpen()) {
				try {
					this.scuh.childChannelOpen(ctx, evt);
				} catch (Exception e1) {
					logger.log(InternalLogLevel.ERROR, e1.getMessage());
					e1.printStackTrace();
				}
			} else {
				try {
					this.scuh.childChannelClosed(ctx, evt);
				} catch (Exception e1) {
					logger.log(InternalLogLevel.ERROR, e1.getMessage());
					e1.printStackTrace();
				}
			}
		} else if (e instanceof ChannelStateEvent) {
			ChannelStateEvent evt = (ChannelStateEvent) e;
			switch (evt.getState()) {
				case OPEN :
					if (Boolean.TRUE.equals(evt.getValue())) {
						try {
							this.scuh.channelOpen(ctx, evt);
						} catch (Exception e1) {
							logger.log(InternalLogLevel.ERROR, e1.getMessage());
							e1.printStackTrace();
						}
					} else {
						try {
							this.scuh.channelClosed(ctx, evt);
						} catch (Exception e1) {
							logger.log(InternalLogLevel.ERROR, e1.getMessage());
							e1.printStackTrace();
						}
					}
					break;
				case BOUND :
					if (evt.getValue() != null) {
						try {
							this.scuh.channelBound(ctx, evt);
						} catch (Exception e1) {
							logger.log(InternalLogLevel.ERROR, e1.getMessage());
							e1.printStackTrace();
						}
					} else {
						try {
							this.scuh.channelUnbound(ctx, evt);
						} catch (Exception e1) {
							logger.log(InternalLogLevel.ERROR, e1.getMessage());
							e1.printStackTrace();
						}
					}
					break;
				case CONNECTED :
					if (evt.getValue() != null) {
						try {
							this.channelConnectedUpstream(ctx, evt);
						} catch (Exception e1) {
							logger.log(InternalLogLevel.ERROR, e1.getMessage());
							e1.printStackTrace();
						}
					} else {
						try {
							this.scuh.channelDisconnected(ctx, evt);
						} catch (Exception e1) {
							logger.log(InternalLogLevel.ERROR, e1.getMessage());
							e1.printStackTrace();
						}
					}
					break;
				case INTEREST_OPS :
					try {
						this.scuh.channelInterestChanged(ctx, evt);
					} catch (Exception e1) {
						logger.log(InternalLogLevel.ERROR, e1.getMessage());
						e1.printStackTrace();
					}
					break;
				default :
					ctx.sendDownstream(e);
			}
		} else if (e instanceof ExceptionEvent) {
			try {
				this.scuh.exceptionCaught(ctx, (ExceptionEvent) e);
			} catch (Exception e1) {
				logger.log(InternalLogLevel.ERROR, e1.getMessage());
				e1.printStackTrace();
			}
		} else {
			ctx.sendUpstream(e);
		}
	}

	/**
	 * Invoked when a message object (e.g: {@link ChannelBuffer}) was  
received
	 * from a remote peer.
	 */
	public void messageReceivedUpstream(ChannelHandlerContext ctx,
			MessageEvent evt) {
		Object originalMessage = evt.getMessage();
		decode(ctx, evt.getChannel(), originalMessage);
	}

	/**
	 * Invoked when a {@link Channel} is open, bound to a local address,  
and
	 * connected to a remote address.
	 */
	public void channelConnectedUpstream(ChannelHandlerContext ctx,
			ChannelStateEvent e) {

		// Get the SslHandler in the current pipeline.
		// We added it in SecureChatPipelineFactory.
		final SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);

		// Get notified when SSL handshake is done.
		ChannelFuture handshakeFuture = null;
		try {
			handshakeFuture = sslHandler.handshake(e.getChannel());
		} catch (SSLException e1) {
			logger.log(InternalLogLevel.ERROR, e1.getMessage());
			e1.printStackTrace();
		}
		handshakeFuture.addListener(new ConnectionNotice(sslHandler));
	}

	private void decode(ChannelHandlerContext ctx, Channel channel,  
Object msg) {

		ChannelBuffer msgBuf = (ChannelBuffer) msg;
		Object object = convertChannelBuffer(msgBuf);

		if (object instanceof KarmaFileChunk) {
			KarmaFileChunk chunk = (KarmaFileChunk) object;
			this.processKarmaFileChunk(channel, chunk);
		} else if (object instanceof KarmaManager) {
			KarmaManager manager = (KarmaManager) object;
			processKarmaManager(manager, channel);
		} else {
			logger.log(InternalLogLevel.ERROR, "Not a legimate object type.");
		}
	}

	private Object convertChannelBuffer(ChannelBuffer msgBuf) {
		int readableBytes = msgBuf.readableBytes();
		byte[] readBytes = new byte[readableBytes];
		msgBuf.readBytes(readBytes);
		Object object = KarmaUtility.toObject(readBytes);
		return object;
	}

	private void processKarmaManager(KarmaManager manager, Channel  
channel) {
		String baseName = System.getProperty("user.home") + File.separatorChar
				+ ".KARMA" + File.separatorChar + ".AUM" + File.separatorChar
				+ manager.getId() + File.separatorChar;
		this.managers.put(manager.getId(), manager);
		LinkedHashSet<File> managerInputFiles = manager.getResources();
		Iterator<File> managerFileIter = managerInputFiles.iterator();
		LinkedHashSet<File> newManagerFiles = new LinkedHashSet<File>();
		while (managerFileIter.hasNext()) {
			File file = managerFileIter.next();
			String name = file.getName();
			File newFile = new File(baseName + name);
			newManagerFiles.add(newFile);
		}

		managerInputFiles.clear();
		managerInputFiles.addAll(newManagerFiles);

		UUID uuid = manager.getId();
		Integer numberOfFiles = this.fileCount.get(uuid);
		int files = numberOfFiles.intValue();
		int managerFiles = manager.getResources().size();

		if (managerFiles == files) {
			this.managers.remove(uuid);
			this.fileCount.remove(uuid);
			try {
				this.fileWriters.remove(uuid).close();
			} catch (IOException e) {
				logger.log(InternalLogLevel.ERROR, e.getMessage());
				e.printStackTrace();
			}
			this.receiver.update(manager);
			channel.close();
		}

	}

	private void processKarmaFileChunk(Channel channel, KarmaFileChunk  
chunk) {
		byte[] bytes = chunk.bytes;
		String baseName = System.getProperty("user.home") + File.separatorChar
				+ ".KARMA" + File.separatorChar + ".AUM" + File.separatorChar
				+ chunk.uuid;

		File file1 = new File(baseName);

		if (!file1.exists()) {
			file1.mkdirs();
			file1.isDirectory();
		}

		String toFileName = baseName + File.separatorChar + chunk.fileName;

		if (chunk.id == 1) {
			FileChannel fileChannel = null;
			try {
				fileChannel = new FileOutputStream(toFileName).getChannel();
			} catch (FileNotFoundException e) {
				logger.log(InternalLogLevel.ERROR, e.getMessage());
			}
			this.fileWriters.put(chunk.uuid, fileChannel);
		}

		ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
		buffer.put(bytes);
		buffer.flip();

		try {
			this.fileWriters.get(chunk.uuid).write(buffer);

			if (chunk.fileLength == chunk.offset) {
				FileChannel fileChannel = this.fileWriters.remove(chunk.uuid);
				fileChannel.close();
			}
		} catch (IOException e) {
			logger.log(InternalLogLevel.ERROR, e.getMessage());
		}

		if (chunk.isLast) {
			Integer fileNumber = this.fileCount.get(chunk.uuid);

			if (fileNumber == null) {
				this.fileCount.put(chunk.uuid, new Integer(1));
			}

			Integer numberOfFiles = this.fileCount.get(chunk.uuid);
			KarmaManager manager = this.managers.get(chunk.uuid);
			int files = numberOfFiles.intValue();
			int managerFiles = manager.getResources().size();

			if ((managerFiles == files)
					&& (this.managers.get(chunk.uuid) != null)) {

				this.managers.remove(chunk.uuid);
				this.fileCount.remove(chunk.uuid);
				this.fileWriters.remove(chunk.uuid);
				this.receiver.update(manager);
				channel.close();
			} else {
				files++;
				this.fileCount.put(chunk.uuid, new Integer(files));
			}
		}

		buffer.clear();
	}

	private static final class ConnectionNotice
			implements
				ChannelFutureListener {

		private final SslHandler sslHandler;

		ConnectionNotice(SslHandler sslHandler) {
			this.sslHandler = sslHandler;
		}

		public void operationComplete(ChannelFuture future) {
			if (future.isSuccess()) {
				// Once session is secured, send a greeting.
				byte[] welcome = null;
				try {
					welcome = KarmaUtility.toBytes("Welcome to "
							+ InetAddress.getLocalHost().getHostName()
							+ " secure Karma service!\n");
				} catch (UnknownHostException e) {
					logger.log(InternalLogLevel.ERROR, e.getMessage());
					e.printStackTrace();
				}
				ChannelBuffer welcomeBuf = ChannelBuffers
						.wrappedBuffer(welcome);
				future.getChannel().write(welcomeBuf);
				byte[] protectedSession = KarmaUtility
						.toBytes("Your session is protected by "
								+ sslHandler.getEngine().getSession()
										.getCipherSuite() + " cipher suite.\n");
				ChannelBuffer protectedSessionBuf = ChannelBuffers
						.wrappedBuffer(protectedSession);
				future.getChannel().write(protectedSessionBuf);
				logger.log(InternalLogLevel.INFO,
						"Notifying Karma sender that the seesion is secured");

				// Register the channel to the global channel list
				// so the channel received the messages from others.
				// channels.add(future.getChannel());
			} else {
				future.getChannel().close();
			}
		}
	}
}





-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/netty-users/attachments/20090901/d84ac7f1/attachment-0001.html 


More information about the netty-users mailing list