ChannelPipeline question

Michael McGrady mmcgrady at topiatechnology.com
Tue Sep 1 20:36:07 EDT 2009


Thanks, Fredrick.

Please note that all the shared variables refer to final objects.  So,  
while they are shared, they are not themselves modified in a way that  
would change the reference "content" as it were.  Everything that  
changes, is discarded, etc., has a reference only in the method, so  
this makes it a relatively easy case.  Also, note that there are  
assumptions in the implementation that must be warranted by behavior  
on the client side, e.g., that a KarmaManager is received prior to any  
KarmaFileChunk.

Cheers,

Mike


On Sep 1, 2009, at 2:39 PM, Frederic Bregier wrote:

>
> Well Mike,
>
> Great work indeed! It is exactly the kind of example I tried to find  
> to
> explain it.
> You're right on your coding "all" there since you share (if I  
> understand
> well) at least the receiver and the scuh and scdh. I didn't see any  
> problem
> (of course I have just read the code).
>
> If I understand well, you share some data (at least the receiver). I  
> suppose
> the upstream and downstream handlers are used for common tasks  
> outside after
> coding/decoding data.
>
> Of course one can think to another implementation using "one". For  
> instance
> it will simplify the access to data without using UUIDs (from channel
> getId() for instance). But it will be at the price of the complexity  
> to
> share data among all handlers (which can be done by setting them into
> private variables of each handlers from the PipelineFactory). And  
> also it
> will probably prevents you to use your internal UUID you use. Of  
> course it
> is still possible to have some internals that do care about channel Id
> to/from UUID translation, but it depends greatly on how the manager  
> assign
> those UUID.
>
> So for me, it looks like a good example of a stateful handler with  
> an "all"
> meaning attribute.
> I read it not very carefully but I found almost quickly what is the  
> logic
> behind.
> However, I think you will agree that you probably spend a lot of  
> time in
> order to keep it simple and readable, so the reason I probably will  
> not
> recommand "beginner" to try to do such an handler. However,  
> "advanced" users
> can surely do, like you've done!
>
> Thank you to share this piece of code, Mike!
>
> Cheers,
> Frederic
>
>
> Mike McGrady wrote:
>>
>> Fredrick,
>>
>> At the bottom of this email is a handler that is for "all" but is
>> stateful.  The key is to use instrumented data and monitor controls
>> related to the sending of data.  In this case, The "sender" has to be
>> sure to use monitors in the ChannelFutures to make sure that (a) a
>> channel is returned before writing and (b) to make sure that a
>> "manager" is sent prior to sending file chunks.  Further, the file
>> chunks and managers as well as FileChannels are stored in maps
>> relating to the particular data IDs, e.g. Java UUIDs.
>>
>> Any of your thoughts on this would be greatly appreciated.
>>
>>
>> Mike
>>
>>
>> On Sep 1, 2009, at 9:03 AM, Frederic Bregier wrote:
>>
>>>
>>> Hi Mike,
>>>
>>> First, "all" is right now only an annotation, so no error will come
>>> if you
>>> write "one" or "all" in your handler. It is only an help for a
>>> reader who is
>>> not the writer ;-)
>>>
>>> Now, if your handler is a stateful but you do take into account
>>> concurrency
>>> (for instance which channel is using currently the handler by the id
>>> of the
>>> channel), it should be ok.
>>> Take care it is not only about concurrent access, it greatly depends
>>> on what
>>> the handler has to do and what kind of "persistent" information for
>>> each
>>> channel it has to keep in memory.
>>> For instance, let say we try to write the StringDecoder using an  
>>> "all"
>>> method, this would implied to have as many current channelBuffer as
>>> channels
>>> in one handler. This could be somewhat complicated, also doable.  
>>> It is
>>> obviously simpler (in term of development) for this case to use one
>>> "one"
>>> version with a new created handler for each channel.
>>>
>>> If the handler has only small info to store for each channel, then
>>> implementing such a "all" handler but using a stateful logic can
>>> have a
>>> sens. For instance, if some information have to be shared among all
>>> channels
>>> in the handler (stateful but also considering several channels for
>>> instance), then you can try an "all" version, also you can also try
>>> an "one"
>>> version using some shared data object.
>>>
>>> My opinion (only mine ;-) is that a stateful handler most of the
>>> time should
>>> be a "one" version, but as usual, many exceptions can exist! I don't
>>> have in
>>> mind one real example, but I'm sure there are a lot.
>>> If you can share a description of your example, it could help
>>> perhaps some
>>> users (like me) to go further?
>>>
>>> HTH,
>>> Cheers,
>>> Frederic
>>>
>>>
>>> Mike McGrady wrote:
>>>>
>>>> Just a more detailed thought, Fredric, on this matter.  I think you
>>>> can use "all" with stateful handlers so long as the state is  
>>>> handled
>>>> consistent with concurrent access.  I do this with my first Netty
>>>> application, if I am understanding you correctly.  What do you  
>>>> think?
>>>>
>>>> Mike
>>>>
>>>>
>>>> On Sep 1, 2009, at 3:32 AM, Frederic Bregier wrote:
>>>>
>>>>>
>>>>> Hi Mike,
>>>>>
>>>>> If I made some mistake, please correct me of course ;-)
>>>>>
>>>>> "all" means that several channels can shared this handler
>>>>> (stateless).
>>>>> "one" means that this handler is for one and only one channel
>>>>> (stateful).
>>>>>
>>>>> bootstrap.getPipeline() => if you add handlers there, they  
>>>>> should be
>>>>> "all"
>>>>> since the pipeline will be shared by all channels that will  
>>>>> connect
>>>>> or be
>>>>> connected through this bootstrap.
>>>>>
>>>>> In a ChannelPipelineFactory, you can mixed "all" and "one"  
>>>>> handlers.
>>>>> For
>>>>> instance, if you have a handler that can be shared among all
>>>>> channels, then
>>>>> even included from the factory, it could be "all". On the  
>>>>> opposite,
>>>>> usualy
>>>>> you have at least 1 handler that is "one" in such a construction
>>>>> (generally
>>>>> the codecd that is a stateful one).
>>>>> The pipeline will be new for each new channel, but the handlers  
>>>>> you
>>>>> put in
>>>>> it can be shared (statically created in the Factory and reused so
>>>>> "all") or
>>>>> unique by channel (created in the getPipeline() method so "one").
>>>>> If all handlers are "all", then I think it is better to use the
>>>>> bootstrap
>>>>> version since there will be only one Pipeline shared among all
>>>>> channels with
>>>>> the same handlers. But if you have at least one "one" handler
>>>>> (stateful),
>>>>> then it is preferable to use the ChannelPipelineFactory.
>>>>>
>>>>> Or you can added it at runtime (in channelConnected for instance)
>>>>> manually.
>>>>> Again, the added handler can be "all" (static and reused) or
>>>>> "one" (new one
>>>>> created).
>>>>> I feel like, even if it is correct, it somehow less easy to read
>>>>> such a code
>>>>> since the we don't know where such handlers are added or not.  
>>>>> But in
>>>>> some
>>>>> occasion, you don't have the choice (for instance, a new handler  
>>>>> is
>>>>> needed
>>>>> once the channel is connected for some specific behaviour).
>>>>>
>>>>> HTH,
>>>>>
>>>>> Cheers,
>>>>> Frederic
>>>>>
>>>>>
>>>>> MikeQ wrote:
>>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> Quick q about @ChannelPipelineCoverage and setup of handlers.
>>>>>>
>>>>>> As far as I can tell when setting up a new pipeline you have a
>>>>>> couple of
>>>>>> options.
>>>>>>
>>>>>> - Adding handlers using bootstrap.getPipeline()
>>>>>>  - I believe in this instance all the handlers are singletons  
>>>>>> (one
>>>>>> shared by all channels) and must therefore be
>>>>>> @ChannelPipelineCoverage("all")
>>>>>>
>>>>>> - Adding handlers using a ChannelPipelineFactory
>>>>>>  - I believe the ChannelPipelineFactory.getPipeline() is called
>>>>>> for
>>>>>> each new channel.  The handlers can therefore be one/all
>>>>>> depending on
>>>>>> whether the getPipeline() implementation creates a new instance
>>>>>> each time
>>>>>> (one) or uses a shared instance (all)
>>>>>>
>>>>>> - Adding handlers during execution of another handler
>>>>>>  - Similar to ChannelPipelineFactory in that a new instance or
>>>>>> shared
>>>>>> instance can be used
>>>>>>
>>>>>> If someone could confirm/correct the above that would be useful  
>>>>>> for
>>>>>> my own
>>>>>> understanding.
>>>>>>
>>>>>> Cheers.
>>>>>>
>>>>>
>>>>>
>>>>> -----
>>>>> Hardware/Software Architect
>>>>> -- 
>>>>> View this message in context:
>>>>> http://n2.nabble.com/ChannelPipeline-question-tp3559391p3559650.html
>>>>> Sent from the Netty User Group mailing list archive at Nabble.com.
>>>>> _______________________________________________
>>>>> netty-users mailing list
>>>>> netty-users at lists.jboss.org
>>>>> https://lists.jboss.org/mailman/listinfo/netty-users
>>>>
>>
>> Mike McGrady
>> Principal Investigator AF081-028 AFRL SBIR
>> Senior Engineer
>> Topia Technology, Inc
>> 1.253.720.3365
>> mmcgrady at topiatechnology.com
>>
>> import java.io.File;
>> import java.io.FileNotFoundException;
>> import java.io.FileOutputStream;
>> import java.io.IOException;
>> import java.net.InetAddress;
>> import java.net.UnknownHostException;
>> import java.nio.ByteBuffer;
>> import java.nio.channels.FileChannel;
>> import java.util.HashMap;
>> import java.util.Iterator;
>> import java.util.LinkedHashSet;
>> import java.util.Map;
>> import java.util.UUID;
>>
>> import javax.net.ssl.SSLException;
>>
>> import org.jboss.netty.buffer.ChannelBuffer;
>> import org.jboss.netty.buffer.ChannelBuffers;
>> import org.jboss.netty.channel.Channel;
>> import org.jboss.netty.channel.ChannelDownstreamHandler;
>> import org.jboss.netty.channel.ChannelEvent;
>> import org.jboss.netty.channel.ChannelFuture;
>> import org.jboss.netty.channel.ChannelFutureListener;
>> import org.jboss.netty.channel.ChannelHandlerContext;
>> import org.jboss.netty.channel.ChannelPipelineCoverage;
>> import org.jboss.netty.channel.ChannelStateEvent;
>> import org.jboss.netty.channel.ChannelUpstreamHandler;
>> import org.jboss.netty.channel.ChildChannelStateEvent;
>> import org.jboss.netty.channel.ExceptionEvent;
>> import org.jboss.netty.channel.MessageEvent;
>> import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
>> import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
>> import org.jboss.netty.channel.WriteCompletionEvent;
>> import org.jboss.netty.handler.ssl.SslHandler;
>> import org.jboss.netty.logging.InternalLogLevel;
>> import org.jboss.netty.logging.InternalLogger;
>> import org.jboss.netty.logging.InternalLoggerFactory;
>>
>> import com.topiatechnology.karma.api.KarmaManager;
>> import com.topiatechnology.karma.spi.KarmaReceiver;
>> import com.topiatechnology.karma.spi.util.KarmaUtility;
>>
>> /**
>>  * <p>
>>  * A composition rather than inheritance use of JBoss Netty simple
>> channel
>>  * upstream and downstream handlers with a KARMA decoder.
>>  * </p>
>>  *
>>  * @author Mike McGrady
>>  * @version Karma Alpha v001
>>  * @since August 2009.
>>  */
>> @ChannelPipelineCoverage("all")
>> public final class KarmaDecoder
>> 		implements
>> 			ChannelUpstreamHandler,
>> 			ChannelDownstreamHandler {
>>
>> 	// static final Set<Channel> channels = new MapBackedSet<Channel>(
>> 	// new ConcurrentHashMap<Channel, Boolean>());
>> 	private final KarmaReceiver receiver;
>> 	private final SimpleChannelUpstreamHandler scuh = new
>> SimpleChannelUpstreamHandler();
>> 	private final SimpleChannelDownstreamHandler scdh = new
>> SimpleChannelDownstreamHandler();
>> 	private final Map<UUID, FileChannel> fileWriters = new HashMap<UUID,
>> FileChannel>();
>> 	private final Map<UUID, KarmaManager> managers = new HashMap<UUID,
>> KarmaManager>();
>> 	private final Map<UUID, Integer> fileCount = new HashMap<UUID,
>> Integer>();
>>
>> 	private static final InternalLogger logger = InternalLoggerFactory
>> 			.getInstance(KarmaDecoder.class.getName());
>>
>> 	/**
>> 	 * Creates a new instance with the current system character set.
>> 	 */
>> 	public KarmaDecoder(KarmaReceiver receiver) {
>> 		this.receiver = receiver;
>> 	}
>>
>> 	/**
>> 	 * {@inheritDoc} Down-casts the received downstream event into more
>> 	 * meaningful sub-type event and calls an appropriate handler method
>> with
>> 	 * the down-casted event.
>> 	 */
>> 	public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent
>> e) {
>> 		if (e instanceof MessageEvent) {
>> 			try {
>> 				this.scdh.writeRequested(ctx, (MessageEvent) e);
>> 			} catch (Exception e1) {
>> 				logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> 				e1.printStackTrace();
>> 			}
>> 		} else if (e instanceof ChannelStateEvent) {
>> 			ChannelStateEvent evt = (ChannelStateEvent) e;
>> 			switch (evt.getState()) {
>> 				case OPEN :
>> 					if (!Boolean.TRUE.equals(evt.getValue())) {
>> 						try {
>> 							this.scdh.closeRequested(ctx, evt);
>> 						} catch (Exception e1) {
>> 							logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> 							e1.printStackTrace();
>> 						}
>> 					}
>> 					break;
>> 				case BOUND :
>> 					if (evt.getValue() != null) {
>> 						try {
>> 							this.scdh.bindRequested(ctx, evt);
>> 						} catch (Exception e1) {
>> 							logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> 							e1.printStackTrace();
>> 						}
>> 					} else {
>> 						try {
>> 							this.scdh.unbindRequested(ctx, evt);
>> 						} catch (Exception e1) {
>> 							logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> 							e1.printStackTrace();
>> 						}
>> 					}
>> 					break;
>> 				case CONNECTED :
>> 					if (evt.getValue() != null) {
>> 						try {
>> 							this.scdh.connectRequested(ctx, evt);
>> 						} catch (Exception e1) {
>> 							logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> 							e1.printStackTrace();
>> 						}
>> 					} else {
>> 						try {
>> 							this.scdh.disconnectRequested(ctx, evt);
>> 						} catch (Exception e1) {
>> 							logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> 							e1.printStackTrace();
>> 						}
>> 					}
>> 					break;
>> 				case INTEREST_OPS :
>> 					try {
>> 						this.scdh.setInterestOpsRequested(ctx, evt);
>> 					} catch (Exception e1) {
>> 						logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> 						e1.printStackTrace();
>> 					}
>> 					break;
>> 				default :
>> 					ctx.sendDownstream(e);
>> 			}
>> 		} else {
>> 			ctx.sendDownstream(e);
>> 		}
>> 	}
>>
>> 	/**
>> 	 * {@inheritDoc} Down-casts the received upstream event into more
>> meaningful
>> 	 * sub-type event and calls an appropriate handler method with the
>> 	 * down-casted event.
>> 	 */
>> 	public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent  
>> e) {
>> 		if (e instanceof MessageEvent) {
>> 			this.messageReceivedUpstream(ctx, (MessageEvent) e);
>> 		} else if (e instanceof WriteCompletionEvent) {
>> 			WriteCompletionEvent evt = (WriteCompletionEvent) e;
>> 			try {
>> 				this.scuh.writeComplete(ctx, evt);
>> 			} catch (Exception e1) {
>> 				logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> 				e1.printStackTrace();
>> 			}
>> 		} else if (e instanceof ChildChannelStateEvent) {
>> 			ChildChannelStateEvent evt = (ChildChannelStateEvent) e;
>> 			if (evt.getChildChannel().isOpen()) {
>> 				try {
>> 					this.scuh.childChannelOpen(ctx, evt);
>> 				} catch (Exception e1) {
>> 					logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> 					e1.printStackTrace();
>> 				}
>> 			} else {
>> 				try {
>> 					this.scuh.childChannelClosed(ctx, evt);
>> 				} catch (Exception e1) {
>> 					logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> 					e1.printStackTrace();
>> 				}
>> 			}
>> 		} else if (e instanceof ChannelStateEvent) {
>> 			ChannelStateEvent evt = (ChannelStateEvent) e;
>> 			switch (evt.getState()) {
>> 				case OPEN :
>> 					if (Boolean.TRUE.equals(evt.getValue())) {
>> 						try {
>> 							this.scuh.channelOpen(ctx, evt);
>> 						} catch (Exception e1) {
>> 							logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> 							e1.printStackTrace();
>> 						}
>> 					} else {
>> 						try {
>> 							this.scuh.channelClosed(ctx, evt);
>> 						} catch (Exception e1) {
>> 							logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> 							e1.printStackTrace();
>> 						}
>> 					}
>> 					break;
>> 				case BOUND :
>> 					if (evt.getValue() != null) {
>> 						try {
>> 							this.scuh.channelBound(ctx, evt);
>> 						} catch (Exception e1) {
>> 							logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> 							e1.printStackTrace();
>> 						}
>> 					} else {
>> 						try {
>> 							this.scuh.channelUnbound(ctx, evt);
>> 						} catch (Exception e1) {
>> 							logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> 							e1.printStackTrace();
>> 						}
>> 					}
>> 					break;
>> 				case CONNECTED :
>> 					if (evt.getValue() != null) {
>> 						try {
>> 							this.channelConnectedUpstream(ctx, evt);
>> 						} catch (Exception e1) {
>> 							logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> 							e1.printStackTrace();
>> 						}
>> 					} else {
>> 						try {
>> 							this.scuh.channelDisconnected(ctx, evt);
>> 						} catch (Exception e1) {
>> 							logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> 							e1.printStackTrace();
>> 						}
>> 					}
>> 					break;
>> 				case INTEREST_OPS :
>> 					try {
>> 						this.scuh.channelInterestChanged(ctx, evt);
>> 					} catch (Exception e1) {
>> 						logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> 						e1.printStackTrace();
>> 					}
>> 					break;
>> 				default :
>> 					ctx.sendDownstream(e);
>> 			}
>> 		} else if (e instanceof ExceptionEvent) {
>> 			try {
>> 				this.scuh.exceptionCaught(ctx, (ExceptionEvent) e);
>> 			} catch (Exception e1) {
>> 				logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> 				e1.printStackTrace();
>> 			}
>> 		} else {
>> 			ctx.sendUpstream(e);
>> 		}
>> 	}
>>
>> 	/**
>> 	 * Invoked when a message object (e.g: {@link ChannelBuffer}) was
>> received
>> 	 * from a remote peer.
>> 	 */
>> 	public void messageReceivedUpstream(ChannelHandlerContext ctx,
>> 			MessageEvent evt) {
>> 		Object originalMessage = evt.getMessage();
>> 		decode(ctx, evt.getChannel(), originalMessage);
>> 	}
>>
>> 	/**
>> 	 * Invoked when a {@link Channel} is open, bound to a local address,
>> and
>> 	 * connected to a remote address.
>> 	 */
>> 	public void channelConnectedUpstream(ChannelHandlerContext ctx,
>> 			ChannelStateEvent e) {
>>
>> 		// Get the SslHandler in the current pipeline.
>> 		// We added it in SecureChatPipelineFactory.
>> 		final SslHandler sslHandler =  
>> ctx.getPipeline().get(SslHandler.class);
>>
>> 		// Get notified when SSL handshake is done.
>> 		ChannelFuture handshakeFuture = null;
>> 		try {
>> 			handshakeFuture = sslHandler.handshake(e.getChannel());
>> 		} catch (SSLException e1) {
>> 			logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> 			e1.printStackTrace();
>> 		}
>> 		handshakeFuture.addListener(new ConnectionNotice(sslHandler));
>> 	}
>>
>> 	private void decode(ChannelHandlerContext ctx, Channel channel,
>> Object msg) {
>>
>> 		ChannelBuffer msgBuf = (ChannelBuffer) msg;
>> 		Object object = convertChannelBuffer(msgBuf);
>>
>> 		if (object instanceof KarmaFileChunk) {
>> 			KarmaFileChunk chunk = (KarmaFileChunk) object;
>> 			this.processKarmaFileChunk(channel, chunk);
>> 		} else if (object instanceof KarmaManager) {
>> 			KarmaManager manager = (KarmaManager) object;
>> 			processKarmaManager(manager, channel);
>> 		} else {
>> 			logger.log(InternalLogLevel.ERROR, "Not a legimate object type.");
>> 		}
>> 	}
>>
>> 	private Object convertChannelBuffer(ChannelBuffer msgBuf) {
>> 		int readableBytes = msgBuf.readableBytes();
>> 		byte[] readBytes = new byte[readableBytes];
>> 		msgBuf.readBytes(readBytes);
>> 		Object object = KarmaUtility.toObject(readBytes);
>> 		return object;
>> 	}
>>
>> 	private void processKarmaManager(KarmaManager manager, Channel
>> channel) {
>> 		String baseName = System.getProperty("user.home") +  
>> File.separatorChar
>> 				+ ".KARMA" + File.separatorChar + ".AUM" + File.separatorChar
>> 				+ manager.getId() + File.separatorChar;
>> 		this.managers.put(manager.getId(), manager);
>> 		LinkedHashSet<File> managerInputFiles = manager.getResources();
>> 		Iterator<File> managerFileIter = managerInputFiles.iterator();
>> 		LinkedHashSet<File> newManagerFiles = new LinkedHashSet<File>();
>> 		while (managerFileIter.hasNext()) {
>> 			File file = managerFileIter.next();
>> 			String name = file.getName();
>> 			File newFile = new File(baseName + name);
>> 			newManagerFiles.add(newFile);
>> 		}
>>
>> 		managerInputFiles.clear();
>> 		managerInputFiles.addAll(newManagerFiles);
>>
>> 		UUID uuid = manager.getId();
>> 		Integer numberOfFiles = this.fileCount.get(uuid);
>> 		int files = numberOfFiles.intValue();
>> 		int managerFiles = manager.getResources().size();
>>
>> 		if (managerFiles == files) {
>> 			this.managers.remove(uuid);
>> 			this.fileCount.remove(uuid);
>> 			try {
>> 				this.fileWriters.remove(uuid).close();
>> 			} catch (IOException e) {
>> 				logger.log(InternalLogLevel.ERROR, e.getMessage());
>> 				e.printStackTrace();
>> 			}
>> 			this.receiver.update(manager);
>> 			channel.close();
>> 		}
>>
>> 	}
>>
>> 	private void processKarmaFileChunk(Channel channel, KarmaFileChunk
>> chunk) {
>> 		byte[] bytes = chunk.bytes;
>> 		String baseName = System.getProperty("user.home") +  
>> File.separatorChar
>> 				+ ".KARMA" + File.separatorChar + ".AUM" + File.separatorChar
>> 				+ chunk.uuid;
>>
>> 		File file1 = new File(baseName);
>>
>> 		if (!file1.exists()) {
>> 			file1.mkdirs();
>> 			file1.isDirectory();
>> 		}
>>
>> 		String toFileName = baseName + File.separatorChar + chunk.fileName;
>>
>> 		if (chunk.id == 1) {
>> 			FileChannel fileChannel = null;
>> 			try {
>> 				fileChannel = new FileOutputStream(toFileName).getChannel();
>> 			} catch (FileNotFoundException e) {
>> 				logger.log(InternalLogLevel.ERROR, e.getMessage());
>> 			}
>> 			this.fileWriters.put(chunk.uuid, fileChannel);
>> 		}
>>
>> 		ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
>> 		buffer.put(bytes);
>> 		buffer.flip();
>>
>> 		try {
>> 			this.fileWriters.get(chunk.uuid).write(buffer);
>>
>> 			if (chunk.fileLength == chunk.offset) {
>> 				FileChannel fileChannel = this.fileWriters.remove(chunk.uuid);
>> 				fileChannel.close();
>> 			}
>> 		} catch (IOException e) {
>> 			logger.log(InternalLogLevel.ERROR, e.getMessage());
>> 		}
>>
>> 		if (chunk.isLast) {
>> 			Integer fileNumber = this.fileCount.get(chunk.uuid);
>>
>> 			if (fileNumber == null) {
>> 				this.fileCount.put(chunk.uuid, new Integer(1));
>> 			}
>>
>> 			Integer numberOfFiles = this.fileCount.get(chunk.uuid);
>> 			KarmaManager manager = this.managers.get(chunk.uuid);
>> 			int files = numberOfFiles.intValue();
>> 			int managerFiles = manager.getResources().size();
>>
>> 			if ((managerFiles == files)
>> 					&& (this.managers.get(chunk.uuid) != null)) {
>>
>> 				this.managers.remove(chunk.uuid);
>> 				this.fileCount.remove(chunk.uuid);
>> 				this.fileWriters.remove(chunk.uuid);
>> 				this.receiver.update(manager);
>> 				channel.close();
>> 			} else {
>> 				files++;
>> 				this.fileCount.put(chunk.uuid, new Integer(files));
>> 			}
>> 		}
>>
>> 		buffer.clear();
>> 	}
>>
>> 	private static final class ConnectionNotice
>> 			implements
>> 				ChannelFutureListener {
>>
>> 		private final SslHandler sslHandler;
>>
>> 		ConnectionNotice(SslHandler sslHandler) {
>> 			this.sslHandler = sslHandler;
>> 		}
>>
>> 		public void operationComplete(ChannelFuture future) {
>> 			if (future.isSuccess()) {
>> 				// Once session is secured, send a greeting.
>> 				byte[] welcome = null;
>> 				try {
>> 					welcome = KarmaUtility.toBytes("Welcome to "
>> 							+ InetAddress.getLocalHost().getHostName()
>> 							+ " secure Karma service!\n");
>> 				} catch (UnknownHostException e) {
>> 					logger.log(InternalLogLevel.ERROR, e.getMessage());
>> 					e.printStackTrace();
>> 				}
>> 				ChannelBuffer welcomeBuf = ChannelBuffers
>> 						.wrappedBuffer(welcome);
>> 				future.getChannel().write(welcomeBuf);
>> 				byte[] protectedSession = KarmaUtility
>> 						.toBytes("Your session is protected by "
>> 								+ sslHandler.getEngine().getSession()
>> 										.getCipherSuite() + " cipher suite.\n");
>> 				ChannelBuffer protectedSessionBuf = ChannelBuffers
>> 						.wrappedBuffer(protectedSession);
>> 				future.getChannel().write(protectedSessionBuf);
>> 				logger.log(InternalLogLevel.INFO,
>> 						"Notifying Karma sender that the seesion is secured");
>>
>> 				// Register the channel to the global channel list
>> 				// so the channel received the messages from others.
>> 				// channels.add(future.getChannel());
>> 			} else {
>> 				future.getChannel().close();
>> 			}
>> 		}
>> 	}
>> }
>>
>>
>>
>>
>>
>>
>> _______________________________________________
>> netty-users mailing list
>> netty-users at lists.jboss.org
>> https://lists.jboss.org/mailman/listinfo/netty-users
>>
>>
>
>
> -----
> Hardware/Software Architect
> -- 
> View this message in context: http://n2.nabble.com/ChannelPipeline-question-tp3559391p3563209.html
> Sent from the Netty User Group mailing list archive at Nabble.com.
> _______________________________________________
> netty-users mailing list
> netty-users at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/netty-users

Mike McGrady
Principal Investigator AF081-028 AFRL SBIR
Senior Engineer
Topia Technology, Inc
1.253.720.3365
mmcgrady at topiatechnology.com









More information about the netty-users mailing list