ChannelPipeline question
Michael McGrady
mmcgrady at topiatechnology.com
Tue Sep 1 20:36:07 EDT 2009
Thanks, Fredrick.
Please note that all the shared variables refer to final objects. So,
while they are shared, they are not themselves modified in a way that
would change the reference "content" as it were. Everything that
changes, is discarded, etc., has a reference only in the method, so
this makes it a relatively easy case. Also, note that there are
assumptions in the implementation that must be warranted by behavior
on the client side, e.g., that a KarmaManager is received prior to any
KarmaFileChunk.
Cheers,
Mike
On Sep 1, 2009, at 2:39 PM, Frederic Bregier wrote:
>
> Well Mike,
>
> Great work indeed! It is exactly the kind of example I tried to find
> to
> explain it.
> You're right on your coding "all" there since you share (if I
> understand
> well) at least the receiver and the scuh and scdh. I didn't see any
> problem
> (of course I have just read the code).
>
> If I understand well, you share some data (at least the receiver). I
> suppose
> the upstream and downstream handlers are used for common tasks
> outside after
> coding/decoding data.
>
> Of course one can think to another implementation using "one". For
> instance
> it will simplify the access to data without using UUIDs (from channel
> getId() for instance). But it will be at the price of the complexity
> to
> share data among all handlers (which can be done by setting them into
> private variables of each handlers from the PipelineFactory). And
> also it
> will probably prevents you to use your internal UUID you use. Of
> course it
> is still possible to have some internals that do care about channel Id
> to/from UUID translation, but it depends greatly on how the manager
> assign
> those UUID.
>
> So for me, it looks like a good example of a stateful handler with
> an "all"
> meaning attribute.
> I read it not very carefully but I found almost quickly what is the
> logic
> behind.
> However, I think you will agree that you probably spend a lot of
> time in
> order to keep it simple and readable, so the reason I probably will
> not
> recommand "beginner" to try to do such an handler. However,
> "advanced" users
> can surely do, like you've done!
>
> Thank you to share this piece of code, Mike!
>
> Cheers,
> Frederic
>
>
> Mike McGrady wrote:
>>
>> Fredrick,
>>
>> At the bottom of this email is a handler that is for "all" but is
>> stateful. The key is to use instrumented data and monitor controls
>> related to the sending of data. In this case, The "sender" has to be
>> sure to use monitors in the ChannelFutures to make sure that (a) a
>> channel is returned before writing and (b) to make sure that a
>> "manager" is sent prior to sending file chunks. Further, the file
>> chunks and managers as well as FileChannels are stored in maps
>> relating to the particular data IDs, e.g. Java UUIDs.
>>
>> Any of your thoughts on this would be greatly appreciated.
>>
>>
>> Mike
>>
>>
>> On Sep 1, 2009, at 9:03 AM, Frederic Bregier wrote:
>>
>>>
>>> Hi Mike,
>>>
>>> First, "all" is right now only an annotation, so no error will come
>>> if you
>>> write "one" or "all" in your handler. It is only an help for a
>>> reader who is
>>> not the writer ;-)
>>>
>>> Now, if your handler is a stateful but you do take into account
>>> concurrency
>>> (for instance which channel is using currently the handler by the id
>>> of the
>>> channel), it should be ok.
>>> Take care it is not only about concurrent access, it greatly depends
>>> on what
>>> the handler has to do and what kind of "persistent" information for
>>> each
>>> channel it has to keep in memory.
>>> For instance, let say we try to write the StringDecoder using an
>>> "all"
>>> method, this would implied to have as many current channelBuffer as
>>> channels
>>> in one handler. This could be somewhat complicated, also doable.
>>> It is
>>> obviously simpler (in term of development) for this case to use one
>>> "one"
>>> version with a new created handler for each channel.
>>>
>>> If the handler has only small info to store for each channel, then
>>> implementing such a "all" handler but using a stateful logic can
>>> have a
>>> sens. For instance, if some information have to be shared among all
>>> channels
>>> in the handler (stateful but also considering several channels for
>>> instance), then you can try an "all" version, also you can also try
>>> an "one"
>>> version using some shared data object.
>>>
>>> My opinion (only mine ;-) is that a stateful handler most of the
>>> time should
>>> be a "one" version, but as usual, many exceptions can exist! I don't
>>> have in
>>> mind one real example, but I'm sure there are a lot.
>>> If you can share a description of your example, it could help
>>> perhaps some
>>> users (like me) to go further?
>>>
>>> HTH,
>>> Cheers,
>>> Frederic
>>>
>>>
>>> Mike McGrady wrote:
>>>>
>>>> Just a more detailed thought, Fredric, on this matter. I think you
>>>> can use "all" with stateful handlers so long as the state is
>>>> handled
>>>> consistent with concurrent access. I do this with my first Netty
>>>> application, if I am understanding you correctly. What do you
>>>> think?
>>>>
>>>> Mike
>>>>
>>>>
>>>> On Sep 1, 2009, at 3:32 AM, Frederic Bregier wrote:
>>>>
>>>>>
>>>>> Hi Mike,
>>>>>
>>>>> If I made some mistake, please correct me of course ;-)
>>>>>
>>>>> "all" means that several channels can shared this handler
>>>>> (stateless).
>>>>> "one" means that this handler is for one and only one channel
>>>>> (stateful).
>>>>>
>>>>> bootstrap.getPipeline() => if you add handlers there, they
>>>>> should be
>>>>> "all"
>>>>> since the pipeline will be shared by all channels that will
>>>>> connect
>>>>> or be
>>>>> connected through this bootstrap.
>>>>>
>>>>> In a ChannelPipelineFactory, you can mixed "all" and "one"
>>>>> handlers.
>>>>> For
>>>>> instance, if you have a handler that can be shared among all
>>>>> channels, then
>>>>> even included from the factory, it could be "all". On the
>>>>> opposite,
>>>>> usualy
>>>>> you have at least 1 handler that is "one" in such a construction
>>>>> (generally
>>>>> the codecd that is a stateful one).
>>>>> The pipeline will be new for each new channel, but the handlers
>>>>> you
>>>>> put in
>>>>> it can be shared (statically created in the Factory and reused so
>>>>> "all") or
>>>>> unique by channel (created in the getPipeline() method so "one").
>>>>> If all handlers are "all", then I think it is better to use the
>>>>> bootstrap
>>>>> version since there will be only one Pipeline shared among all
>>>>> channels with
>>>>> the same handlers. But if you have at least one "one" handler
>>>>> (stateful),
>>>>> then it is preferable to use the ChannelPipelineFactory.
>>>>>
>>>>> Or you can added it at runtime (in channelConnected for instance)
>>>>> manually.
>>>>> Again, the added handler can be "all" (static and reused) or
>>>>> "one" (new one
>>>>> created).
>>>>> I feel like, even if it is correct, it somehow less easy to read
>>>>> such a code
>>>>> since the we don't know where such handlers are added or not.
>>>>> But in
>>>>> some
>>>>> occasion, you don't have the choice (for instance, a new handler
>>>>> is
>>>>> needed
>>>>> once the channel is connected for some specific behaviour).
>>>>>
>>>>> HTH,
>>>>>
>>>>> Cheers,
>>>>> Frederic
>>>>>
>>>>>
>>>>> MikeQ wrote:
>>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> Quick q about @ChannelPipelineCoverage and setup of handlers.
>>>>>>
>>>>>> As far as I can tell when setting up a new pipeline you have a
>>>>>> couple of
>>>>>> options.
>>>>>>
>>>>>> - Adding handlers using bootstrap.getPipeline()
>>>>>> - I believe in this instance all the handlers are singletons
>>>>>> (one
>>>>>> shared by all channels) and must therefore be
>>>>>> @ChannelPipelineCoverage("all")
>>>>>>
>>>>>> - Adding handlers using a ChannelPipelineFactory
>>>>>> - I believe the ChannelPipelineFactory.getPipeline() is called
>>>>>> for
>>>>>> each new channel. The handlers can therefore be one/all
>>>>>> depending on
>>>>>> whether the getPipeline() implementation creates a new instance
>>>>>> each time
>>>>>> (one) or uses a shared instance (all)
>>>>>>
>>>>>> - Adding handlers during execution of another handler
>>>>>> - Similar to ChannelPipelineFactory in that a new instance or
>>>>>> shared
>>>>>> instance can be used
>>>>>>
>>>>>> If someone could confirm/correct the above that would be useful
>>>>>> for
>>>>>> my own
>>>>>> understanding.
>>>>>>
>>>>>> Cheers.
>>>>>>
>>>>>
>>>>>
>>>>> -----
>>>>> Hardware/Software Architect
>>>>> --
>>>>> View this message in context:
>>>>> http://n2.nabble.com/ChannelPipeline-question-tp3559391p3559650.html
>>>>> Sent from the Netty User Group mailing list archive at Nabble.com.
>>>>> _______________________________________________
>>>>> netty-users mailing list
>>>>> netty-users at lists.jboss.org
>>>>> https://lists.jboss.org/mailman/listinfo/netty-users
>>>>
>>
>> Mike McGrady
>> Principal Investigator AF081-028 AFRL SBIR
>> Senior Engineer
>> Topia Technology, Inc
>> 1.253.720.3365
>> mmcgrady at topiatechnology.com
>>
>> import java.io.File;
>> import java.io.FileNotFoundException;
>> import java.io.FileOutputStream;
>> import java.io.IOException;
>> import java.net.InetAddress;
>> import java.net.UnknownHostException;
>> import java.nio.ByteBuffer;
>> import java.nio.channels.FileChannel;
>> import java.util.HashMap;
>> import java.util.Iterator;
>> import java.util.LinkedHashSet;
>> import java.util.Map;
>> import java.util.UUID;
>>
>> import javax.net.ssl.SSLException;
>>
>> import org.jboss.netty.buffer.ChannelBuffer;
>> import org.jboss.netty.buffer.ChannelBuffers;
>> import org.jboss.netty.channel.Channel;
>> import org.jboss.netty.channel.ChannelDownstreamHandler;
>> import org.jboss.netty.channel.ChannelEvent;
>> import org.jboss.netty.channel.ChannelFuture;
>> import org.jboss.netty.channel.ChannelFutureListener;
>> import org.jboss.netty.channel.ChannelHandlerContext;
>> import org.jboss.netty.channel.ChannelPipelineCoverage;
>> import org.jboss.netty.channel.ChannelStateEvent;
>> import org.jboss.netty.channel.ChannelUpstreamHandler;
>> import org.jboss.netty.channel.ChildChannelStateEvent;
>> import org.jboss.netty.channel.ExceptionEvent;
>> import org.jboss.netty.channel.MessageEvent;
>> import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
>> import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
>> import org.jboss.netty.channel.WriteCompletionEvent;
>> import org.jboss.netty.handler.ssl.SslHandler;
>> import org.jboss.netty.logging.InternalLogLevel;
>> import org.jboss.netty.logging.InternalLogger;
>> import org.jboss.netty.logging.InternalLoggerFactory;
>>
>> import com.topiatechnology.karma.api.KarmaManager;
>> import com.topiatechnology.karma.spi.KarmaReceiver;
>> import com.topiatechnology.karma.spi.util.KarmaUtility;
>>
>> /**
>> * <p>
>> * A composition rather than inheritance use of JBoss Netty simple
>> channel
>> * upstream and downstream handlers with a KARMA decoder.
>> * </p>
>> *
>> * @author Mike McGrady
>> * @version Karma Alpha v001
>> * @since August 2009.
>> */
>> @ChannelPipelineCoverage("all")
>> public final class KarmaDecoder
>> implements
>> ChannelUpstreamHandler,
>> ChannelDownstreamHandler {
>>
>> // static final Set<Channel> channels = new MapBackedSet<Channel>(
>> // new ConcurrentHashMap<Channel, Boolean>());
>> private final KarmaReceiver receiver;
>> private final SimpleChannelUpstreamHandler scuh = new
>> SimpleChannelUpstreamHandler();
>> private final SimpleChannelDownstreamHandler scdh = new
>> SimpleChannelDownstreamHandler();
>> private final Map<UUID, FileChannel> fileWriters = new HashMap<UUID,
>> FileChannel>();
>> private final Map<UUID, KarmaManager> managers = new HashMap<UUID,
>> KarmaManager>();
>> private final Map<UUID, Integer> fileCount = new HashMap<UUID,
>> Integer>();
>>
>> private static final InternalLogger logger = InternalLoggerFactory
>> .getInstance(KarmaDecoder.class.getName());
>>
>> /**
>> * Creates a new instance with the current system character set.
>> */
>> public KarmaDecoder(KarmaReceiver receiver) {
>> this.receiver = receiver;
>> }
>>
>> /**
>> * {@inheritDoc} Down-casts the received downstream event into more
>> * meaningful sub-type event and calls an appropriate handler method
>> with
>> * the down-casted event.
>> */
>> public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent
>> e) {
>> if (e instanceof MessageEvent) {
>> try {
>> this.scdh.writeRequested(ctx, (MessageEvent) e);
>> } catch (Exception e1) {
>> logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> e1.printStackTrace();
>> }
>> } else if (e instanceof ChannelStateEvent) {
>> ChannelStateEvent evt = (ChannelStateEvent) e;
>> switch (evt.getState()) {
>> case OPEN :
>> if (!Boolean.TRUE.equals(evt.getValue())) {
>> try {
>> this.scdh.closeRequested(ctx, evt);
>> } catch (Exception e1) {
>> logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> e1.printStackTrace();
>> }
>> }
>> break;
>> case BOUND :
>> if (evt.getValue() != null) {
>> try {
>> this.scdh.bindRequested(ctx, evt);
>> } catch (Exception e1) {
>> logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> e1.printStackTrace();
>> }
>> } else {
>> try {
>> this.scdh.unbindRequested(ctx, evt);
>> } catch (Exception e1) {
>> logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> e1.printStackTrace();
>> }
>> }
>> break;
>> case CONNECTED :
>> if (evt.getValue() != null) {
>> try {
>> this.scdh.connectRequested(ctx, evt);
>> } catch (Exception e1) {
>> logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> e1.printStackTrace();
>> }
>> } else {
>> try {
>> this.scdh.disconnectRequested(ctx, evt);
>> } catch (Exception e1) {
>> logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> e1.printStackTrace();
>> }
>> }
>> break;
>> case INTEREST_OPS :
>> try {
>> this.scdh.setInterestOpsRequested(ctx, evt);
>> } catch (Exception e1) {
>> logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> e1.printStackTrace();
>> }
>> break;
>> default :
>> ctx.sendDownstream(e);
>> }
>> } else {
>> ctx.sendDownstream(e);
>> }
>> }
>>
>> /**
>> * {@inheritDoc} Down-casts the received upstream event into more
>> meaningful
>> * sub-type event and calls an appropriate handler method with the
>> * down-casted event.
>> */
>> public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent
>> e) {
>> if (e instanceof MessageEvent) {
>> this.messageReceivedUpstream(ctx, (MessageEvent) e);
>> } else if (e instanceof WriteCompletionEvent) {
>> WriteCompletionEvent evt = (WriteCompletionEvent) e;
>> try {
>> this.scuh.writeComplete(ctx, evt);
>> } catch (Exception e1) {
>> logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> e1.printStackTrace();
>> }
>> } else if (e instanceof ChildChannelStateEvent) {
>> ChildChannelStateEvent evt = (ChildChannelStateEvent) e;
>> if (evt.getChildChannel().isOpen()) {
>> try {
>> this.scuh.childChannelOpen(ctx, evt);
>> } catch (Exception e1) {
>> logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> e1.printStackTrace();
>> }
>> } else {
>> try {
>> this.scuh.childChannelClosed(ctx, evt);
>> } catch (Exception e1) {
>> logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> e1.printStackTrace();
>> }
>> }
>> } else if (e instanceof ChannelStateEvent) {
>> ChannelStateEvent evt = (ChannelStateEvent) e;
>> switch (evt.getState()) {
>> case OPEN :
>> if (Boolean.TRUE.equals(evt.getValue())) {
>> try {
>> this.scuh.channelOpen(ctx, evt);
>> } catch (Exception e1) {
>> logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> e1.printStackTrace();
>> }
>> } else {
>> try {
>> this.scuh.channelClosed(ctx, evt);
>> } catch (Exception e1) {
>> logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> e1.printStackTrace();
>> }
>> }
>> break;
>> case BOUND :
>> if (evt.getValue() != null) {
>> try {
>> this.scuh.channelBound(ctx, evt);
>> } catch (Exception e1) {
>> logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> e1.printStackTrace();
>> }
>> } else {
>> try {
>> this.scuh.channelUnbound(ctx, evt);
>> } catch (Exception e1) {
>> logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> e1.printStackTrace();
>> }
>> }
>> break;
>> case CONNECTED :
>> if (evt.getValue() != null) {
>> try {
>> this.channelConnectedUpstream(ctx, evt);
>> } catch (Exception e1) {
>> logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> e1.printStackTrace();
>> }
>> } else {
>> try {
>> this.scuh.channelDisconnected(ctx, evt);
>> } catch (Exception e1) {
>> logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> e1.printStackTrace();
>> }
>> }
>> break;
>> case INTEREST_OPS :
>> try {
>> this.scuh.channelInterestChanged(ctx, evt);
>> } catch (Exception e1) {
>> logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> e1.printStackTrace();
>> }
>> break;
>> default :
>> ctx.sendDownstream(e);
>> }
>> } else if (e instanceof ExceptionEvent) {
>> try {
>> this.scuh.exceptionCaught(ctx, (ExceptionEvent) e);
>> } catch (Exception e1) {
>> logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> e1.printStackTrace();
>> }
>> } else {
>> ctx.sendUpstream(e);
>> }
>> }
>>
>> /**
>> * Invoked when a message object (e.g: {@link ChannelBuffer}) was
>> received
>> * from a remote peer.
>> */
>> public void messageReceivedUpstream(ChannelHandlerContext ctx,
>> MessageEvent evt) {
>> Object originalMessage = evt.getMessage();
>> decode(ctx, evt.getChannel(), originalMessage);
>> }
>>
>> /**
>> * Invoked when a {@link Channel} is open, bound to a local address,
>> and
>> * connected to a remote address.
>> */
>> public void channelConnectedUpstream(ChannelHandlerContext ctx,
>> ChannelStateEvent e) {
>>
>> // Get the SslHandler in the current pipeline.
>> // We added it in SecureChatPipelineFactory.
>> final SslHandler sslHandler =
>> ctx.getPipeline().get(SslHandler.class);
>>
>> // Get notified when SSL handshake is done.
>> ChannelFuture handshakeFuture = null;
>> try {
>> handshakeFuture = sslHandler.handshake(e.getChannel());
>> } catch (SSLException e1) {
>> logger.log(InternalLogLevel.ERROR, e1.getMessage());
>> e1.printStackTrace();
>> }
>> handshakeFuture.addListener(new ConnectionNotice(sslHandler));
>> }
>>
>> private void decode(ChannelHandlerContext ctx, Channel channel,
>> Object msg) {
>>
>> ChannelBuffer msgBuf = (ChannelBuffer) msg;
>> Object object = convertChannelBuffer(msgBuf);
>>
>> if (object instanceof KarmaFileChunk) {
>> KarmaFileChunk chunk = (KarmaFileChunk) object;
>> this.processKarmaFileChunk(channel, chunk);
>> } else if (object instanceof KarmaManager) {
>> KarmaManager manager = (KarmaManager) object;
>> processKarmaManager(manager, channel);
>> } else {
>> logger.log(InternalLogLevel.ERROR, "Not a legimate object type.");
>> }
>> }
>>
>> private Object convertChannelBuffer(ChannelBuffer msgBuf) {
>> int readableBytes = msgBuf.readableBytes();
>> byte[] readBytes = new byte[readableBytes];
>> msgBuf.readBytes(readBytes);
>> Object object = KarmaUtility.toObject(readBytes);
>> return object;
>> }
>>
>> private void processKarmaManager(KarmaManager manager, Channel
>> channel) {
>> String baseName = System.getProperty("user.home") +
>> File.separatorChar
>> + ".KARMA" + File.separatorChar + ".AUM" + File.separatorChar
>> + manager.getId() + File.separatorChar;
>> this.managers.put(manager.getId(), manager);
>> LinkedHashSet<File> managerInputFiles = manager.getResources();
>> Iterator<File> managerFileIter = managerInputFiles.iterator();
>> LinkedHashSet<File> newManagerFiles = new LinkedHashSet<File>();
>> while (managerFileIter.hasNext()) {
>> File file = managerFileIter.next();
>> String name = file.getName();
>> File newFile = new File(baseName + name);
>> newManagerFiles.add(newFile);
>> }
>>
>> managerInputFiles.clear();
>> managerInputFiles.addAll(newManagerFiles);
>>
>> UUID uuid = manager.getId();
>> Integer numberOfFiles = this.fileCount.get(uuid);
>> int files = numberOfFiles.intValue();
>> int managerFiles = manager.getResources().size();
>>
>> if (managerFiles == files) {
>> this.managers.remove(uuid);
>> this.fileCount.remove(uuid);
>> try {
>> this.fileWriters.remove(uuid).close();
>> } catch (IOException e) {
>> logger.log(InternalLogLevel.ERROR, e.getMessage());
>> e.printStackTrace();
>> }
>> this.receiver.update(manager);
>> channel.close();
>> }
>>
>> }
>>
>> private void processKarmaFileChunk(Channel channel, KarmaFileChunk
>> chunk) {
>> byte[] bytes = chunk.bytes;
>> String baseName = System.getProperty("user.home") +
>> File.separatorChar
>> + ".KARMA" + File.separatorChar + ".AUM" + File.separatorChar
>> + chunk.uuid;
>>
>> File file1 = new File(baseName);
>>
>> if (!file1.exists()) {
>> file1.mkdirs();
>> file1.isDirectory();
>> }
>>
>> String toFileName = baseName + File.separatorChar + chunk.fileName;
>>
>> if (chunk.id == 1) {
>> FileChannel fileChannel = null;
>> try {
>> fileChannel = new FileOutputStream(toFileName).getChannel();
>> } catch (FileNotFoundException e) {
>> logger.log(InternalLogLevel.ERROR, e.getMessage());
>> }
>> this.fileWriters.put(chunk.uuid, fileChannel);
>> }
>>
>> ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
>> buffer.put(bytes);
>> buffer.flip();
>>
>> try {
>> this.fileWriters.get(chunk.uuid).write(buffer);
>>
>> if (chunk.fileLength == chunk.offset) {
>> FileChannel fileChannel = this.fileWriters.remove(chunk.uuid);
>> fileChannel.close();
>> }
>> } catch (IOException e) {
>> logger.log(InternalLogLevel.ERROR, e.getMessage());
>> }
>>
>> if (chunk.isLast) {
>> Integer fileNumber = this.fileCount.get(chunk.uuid);
>>
>> if (fileNumber == null) {
>> this.fileCount.put(chunk.uuid, new Integer(1));
>> }
>>
>> Integer numberOfFiles = this.fileCount.get(chunk.uuid);
>> KarmaManager manager = this.managers.get(chunk.uuid);
>> int files = numberOfFiles.intValue();
>> int managerFiles = manager.getResources().size();
>>
>> if ((managerFiles == files)
>> && (this.managers.get(chunk.uuid) != null)) {
>>
>> this.managers.remove(chunk.uuid);
>> this.fileCount.remove(chunk.uuid);
>> this.fileWriters.remove(chunk.uuid);
>> this.receiver.update(manager);
>> channel.close();
>> } else {
>> files++;
>> this.fileCount.put(chunk.uuid, new Integer(files));
>> }
>> }
>>
>> buffer.clear();
>> }
>>
>> private static final class ConnectionNotice
>> implements
>> ChannelFutureListener {
>>
>> private final SslHandler sslHandler;
>>
>> ConnectionNotice(SslHandler sslHandler) {
>> this.sslHandler = sslHandler;
>> }
>>
>> public void operationComplete(ChannelFuture future) {
>> if (future.isSuccess()) {
>> // Once session is secured, send a greeting.
>> byte[] welcome = null;
>> try {
>> welcome = KarmaUtility.toBytes("Welcome to "
>> + InetAddress.getLocalHost().getHostName()
>> + " secure Karma service!\n");
>> } catch (UnknownHostException e) {
>> logger.log(InternalLogLevel.ERROR, e.getMessage());
>> e.printStackTrace();
>> }
>> ChannelBuffer welcomeBuf = ChannelBuffers
>> .wrappedBuffer(welcome);
>> future.getChannel().write(welcomeBuf);
>> byte[] protectedSession = KarmaUtility
>> .toBytes("Your session is protected by "
>> + sslHandler.getEngine().getSession()
>> .getCipherSuite() + " cipher suite.\n");
>> ChannelBuffer protectedSessionBuf = ChannelBuffers
>> .wrappedBuffer(protectedSession);
>> future.getChannel().write(protectedSessionBuf);
>> logger.log(InternalLogLevel.INFO,
>> "Notifying Karma sender that the seesion is secured");
>>
>> // Register the channel to the global channel list
>> // so the channel received the messages from others.
>> // channels.add(future.getChannel());
>> } else {
>> future.getChannel().close();
>> }
>> }
>> }
>> }
>>
>>
>>
>>
>>
>>
>> _______________________________________________
>> netty-users mailing list
>> netty-users at lists.jboss.org
>> https://lists.jboss.org/mailman/listinfo/netty-users
>>
>>
>
>
> -----
> Hardware/Software Architect
> --
> View this message in context: http://n2.nabble.com/ChannelPipeline-question-tp3559391p3563209.html
> Sent from the Netty User Group mailing list archive at Nabble.com.
> _______________________________________________
> netty-users mailing list
> netty-users at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/netty-users
Mike McGrady
Principal Investigator AF081-028 AFRL SBIR
Senior Engineer
Topia Technology, Inc
1.253.720.3365
mmcgrady at topiatechnology.com
More information about the netty-users
mailing list