KarmaDecoder updates

Michael McGrady mmcgrady at topiatechnology.com
Thu Sep 10 00:54:45 EDT 2009


Trustin,

Here is a later version of KarmaDecoder without the fixes in  
documentation that will be there.

package com.topiatechnology.karma.spi.transportadapter.netty;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.UUID;

import javax.net.ssl.SSLException;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.ChildChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.WriteCompletionEvent;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.logging.InternalLogLevel;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;

import com.topiatechnology.karma.api.KarmaManager;
import com.topiatechnology.karma.spi.KarmaReceiver;
import com.topiatechnology.karma.spi.util.KarmaUtility;

/**
  * <p>
  * A composition rather than inheritance use of JBoss Netty simple  
channel
  * upstream and downstream handlers with a KARMA decoder.
  * </p>
  *
  * @author Mike McGrady
  * @version Karma Alpha v001
  * @since August 2009.
  */
@ChannelPipelineCoverage("all")
public final class KarmaDecoder
		implements
			ChannelUpstreamHandler,
			ChannelDownstreamHandler {
	private static int channelCount;
	private static int count;

	// static final Set<Channel> channels = new MapBackedSet<Channel>(
	// new ConcurrentHashMap<Channel, Boolean>());
	private final KarmaReceiver receiver;
	private final SimpleChannelUpstreamHandler scuh = new  
SimpleChannelUpstreamHandler();
	private final SimpleChannelDownstreamHandler scdh = new  
SimpleChannelDownstreamHandler();
	private final Map<UUID, FileChannel> fileWriters = new HashMap<UUID,  
FileChannel>();
	private final Map<UUID, KarmaManager> managers = new HashMap<UUID,  
KarmaManager>();
	private final Map<UUID, Integer> fileCount = new HashMap<UUID,  
Integer>();
	private final ChannelGroup allChannels;
	private static final InternalLogger logger = InternalLoggerFactory
			.getInstance(KarmaDecoder.class.getName());

	/**
	 * Creates a new instance with the current system character set.
	 */
	public KarmaDecoder(KarmaReceiver receiver, ChannelGroup allChannels) {
		this.receiver = receiver;
		this.allChannels = allChannels;
	}

	/**
	 * {@inheritDoc} Down-casts the received downstream event into more
	 * meaningful sub-type event and calls an appropriate handler method  
with
	 * the down-casted event.
	 */
	public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent  
e) {
		if (e instanceof MessageEvent) {
			try {
				this.scdh.writeRequested(ctx, (MessageEvent) e);
			} catch (Exception e1) {
				logger.log(InternalLogLevel.ERROR, e1.getMessage());
				e1.printStackTrace();
			}
		} else if (e instanceof ChannelStateEvent) {
			ChannelStateEvent evt = (ChannelStateEvent) e;
			switch (evt.getState()) {
				case OPEN :
					if (!Boolean.TRUE.equals(evt.getValue())) {
						try {
							this.scdh.closeRequested(ctx, evt);
						} catch (Exception e1) {
							logger.log(InternalLogLevel.ERROR, e1.getMessage());
							e1.printStackTrace();
						}
					}
					break;
				case BOUND :
					if (evt.getValue() != null) {
						try {
							this.scdh.bindRequested(ctx, evt);
						} catch (Exception e1) {
							logger.log(InternalLogLevel.ERROR, e1.getMessage());
							e1.printStackTrace();
						}
					} else {
						try {
							this.scdh.unbindRequested(ctx, evt);
						} catch (Exception e1) {
							logger.log(InternalLogLevel.ERROR, e1.getMessage());
							e1.printStackTrace();
						}
					}
					break;
				case CONNECTED :
					if (evt.getValue() != null) {
						try {
							this.scdh.connectRequested(ctx, evt);
						} catch (Exception e1) {
							logger.log(InternalLogLevel.ERROR, e1.getMessage());
							e1.printStackTrace();
						}
					} else {
						try {
							this.scdh.disconnectRequested(ctx, evt);
						} catch (Exception e1) {
							logger.log(InternalLogLevel.ERROR, e1.getMessage());
							e1.printStackTrace();
						}
					}
					break;
				case INTEREST_OPS :
					try {
						this.scdh.setInterestOpsRequested(ctx, evt);
					} catch (Exception e1) {
						logger.log(InternalLogLevel.ERROR, e1.getMessage());
						e1.printStackTrace();
					}
					break;
				default :
					ctx.sendDownstream(e);
			}
		} else {
			ctx.sendDownstream(e);
		}
	}

	/**
	 * {@inheritDoc} Down-casts the received upstream event into more  
meaningful
	 * sub-type event and calls an appropriate handler method with the
	 * down-casted event.
	 */
	public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) {
		if (e instanceof MessageEvent) {
			this.messageReceivedUpstream(ctx, (MessageEvent) e);
		} else if (e instanceof WriteCompletionEvent) {
			WriteCompletionEvent evt = (WriteCompletionEvent) e;
			try {
				this.scuh.writeComplete(ctx, evt);
			} catch (Exception e1) {
				logger.log(InternalLogLevel.ERROR, e1.getMessage());
				e1.printStackTrace();
			}
		} else if (e instanceof ChildChannelStateEvent) {
			ChildChannelStateEvent evt = (ChildChannelStateEvent) e;
			if (evt.getChildChannel().isOpen()) {
				try {
					this.scuh.childChannelOpen(ctx, evt);
				} catch (Exception e1) {
					logger.log(InternalLogLevel.ERROR, e1.getMessage());
					e1.printStackTrace();
				}
			} else {
				try {
					this.scuh.childChannelClosed(ctx, evt);
				} catch (Exception e1) {
					logger.log(InternalLogLevel.ERROR, e1.getMessage());
					e1.printStackTrace();
				}
			}
		} else if (e instanceof ChannelStateEvent) {
			ChannelStateEvent evt = (ChannelStateEvent) e;
			switch (evt.getState()) {
				case OPEN :
					if (Boolean.TRUE.equals(evt.getValue())) {
						try {
							// this.scuh.channelOpenUpstream(ctx, evt);
							this.channelOpenUpstream(ctx, evt);
						} catch (Exception e1) {
							logger.log(InternalLogLevel.ERROR, e1.getMessage());
							e1.printStackTrace();
						}
					} else {
						try {
							this.scuh.channelClosed(ctx, evt);
						} catch (Exception e1) {
							logger.log(InternalLogLevel.ERROR, e1.getMessage());
							e1.printStackTrace();
						}
					}
					break;
				case BOUND :
					if (evt.getValue() != null) {
						try {
							this.scuh.channelBound(ctx, evt);
						} catch (Exception e1) {
							logger.log(InternalLogLevel.ERROR, e1.getMessage());
							e1.printStackTrace();
						}
					} else {
						try {
							this.scuh.channelUnbound(ctx, evt);
						} catch (Exception e1) {
							logger.log(InternalLogLevel.ERROR, e1.getMessage());
							e1.printStackTrace();
						}
					}
					break;
				case CONNECTED :
					if (evt.getValue() != null) {
						try {
							this.handshake(ctx, evt);
						} catch (Exception e1) {
							logger.log(InternalLogLevel.ERROR, e1.getMessage());
							e1.printStackTrace();
						}
					} else {
						try {
							this.scuh.channelDisconnected(ctx, evt);
						} catch (Exception e1) {
							logger.log(InternalLogLevel.ERROR, e1.getMessage());
							e1.printStackTrace();
						}
					}
					break;
				case INTEREST_OPS :
					try {
						this.scuh.channelInterestChanged(ctx, evt);
					} catch (Exception e1) {
						logger.log(InternalLogLevel.ERROR, e1.getMessage());
						e1.printStackTrace();
					}
					break;
				default :
					ctx.sendUpstream(e);
			}
		} else if (e instanceof ExceptionEvent) {
			try {
				this.scuh.exceptionCaught(ctx, (ExceptionEvent) e);
			} catch (Exception e1) {
				logger.log(InternalLogLevel.ERROR, e1.getMessage());
				e1.printStackTrace();
			}
		} else {
			ctx.sendUpstream(e);
		}
	}

	public void channelOpenUpstream(ChannelHandlerContext ctx,
			ChannelStateEvent e) {
		channelCount++;
		// Add all open channels to the global group so that they are
		// closed on shutdown.
		Channel channel = e.getChannel() ;
		System.out.println(channelCount + ": SERVER CHANNEL: " + channel) ;
		allChannels.add(channel);
		ctx.sendUpstream(e);
	}

	/**
	 * Invoked when a message object (e.g: {@link ChannelBuffer}) was  
received
	 * from a remote peer.
	 */
	private void messageReceivedUpstream(ChannelHandlerContext ctx,
			MessageEvent evt) {
		Object originalMessage = evt.getMessage();
		decode(ctx, evt.getChannel(), originalMessage);
	}

	/**
	 * Invoked when a {@link Channel} is open, bound to a local address,  
and
	 * connected to a remote address.
	 */
	private void handshake(ChannelHandlerContext ctx, ChannelStateEvent  
e) {

		// Get the SslHandler in the current pipeline.
		// We added it in SecureChatPipelineFactory.
		final SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);

		// Get notified when SSL handshake is done.
		ChannelFuture handshakeFuture = null;
		try {
			handshakeFuture = sslHandler.handshake(e.getChannel());
		} catch (SSLException e1) {
			logger.log(InternalLogLevel.ERROR, e1.getMessage());
			e1.printStackTrace();
		}

		handshakeFuture.addListener(new ConnectionNotice(sslHandler));
	}

	private void decode(ChannelHandlerContext ctx, Channel channel,  
Object msg) {

		ChannelBuffer msgBuf = (ChannelBuffer) msg;
		Object object = convertChannelBuffer(msgBuf);

		if (object instanceof KarmaFileChunk) {
			KarmaFileChunk chunk = (KarmaFileChunk) object;
			this.processKarmaFileChunk(channel, chunk);
		} else if (object instanceof KarmaManager) {
			KarmaManager manager = (KarmaManager) object;
			processKarmaManager(manager, channel);
		} else {
			logger.log(InternalLogLevel.ERROR, "Not a legimate object type.");
		}
	}

	private Object convertChannelBuffer(ChannelBuffer msgBuf) {
		int readableBytes = msgBuf.readableBytes();
		byte[] readBytes = new byte[readableBytes];
		msgBuf.readBytes(readBytes);
		Object object = KarmaUtility.toObject(readBytes);
		return object;
	}

	private void processKarmaManager(KarmaManager manager, Channel  
channel) {
		String baseName = System.getProperty("user.home") + File.separatorChar
				+ ".KARMA" + File.separatorChar + ".AUM" + File.separatorChar
				+ manager.getId() + File.separatorChar;
		count++;
		// System.out.println("\n" + count + " --------BANG: " + this + ": " +
		// count + " manager: " + manager) ;
		this.managers.put(manager.getId(), manager);
		LinkedHashSet<String> managerInputFileNames = manager.getResources();
		LinkedHashSet<File> managerInputFiles = new LinkedHashSet<File>();
		Iterator<String> managerFileIter = managerInputFileNames.iterator();

		while (managerFileIter.hasNext()) {
			String name = managerFileIter.next();
			File newFile = new File(baseName + name);
			managerInputFiles.add(newFile);
		}

		UUID uuid = manager.getId();
		Integer numberOfFiles = this.fileCount.get(uuid);
		int files = numberOfFiles.intValue();
		int managerFiles = manager.getResources().size();

		if (managerFiles == files) {
			this.managers.remove(uuid);
			this.fileCount.remove(uuid);
			try {
				this.fileWriters.remove(uuid).close();
			} catch (IOException e) {
				logger.log(InternalLogLevel.ERROR, e.getMessage());
				e.printStackTrace();
			}
			this.receiver.update(manager);
			channel.close();
		}

	}

	private void processKarmaFileChunk(final Channel channel,
			KarmaFileChunk chunk) {
		byte[] bytes = chunk.bytes;
		String baseName = System.getProperty("user.home") + File.separatorChar
				+ ".KARMA" + File.separatorChar + ".AUM" + File.separatorChar
				+ chunk.uuid;

		File file1 = new File(baseName);

		if (!file1.exists()) {
			file1.mkdirs();
			file1.isDirectory();
		}

		String toFileName = baseName + File.separatorChar + chunk.fileName;

		if (chunk.id == 1) {
			FileChannel fileChannel = null;
			try {
				fileChannel = new FileOutputStream(toFileName).getChannel();
			} catch (FileNotFoundException e) {
				logger.log(InternalLogLevel.ERROR, e.getMessage());
			}

			this.fileWriters.put(chunk.uuid, fileChannel);
		}

		ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
		buffer.put(bytes);
		buffer.flip();

		try {
			this.fileWriters.get(chunk.uuid).write(buffer);

			if (chunk.fileLength == chunk.offset) {
				FileChannel fileChannel = this.fileWriters.remove(chunk.uuid);
				fileChannel.close();
			}
		} catch (IOException e) {
			logger.log(InternalLogLevel.ERROR, e.getMessage());
		}

		if (chunk.isLast) {
			Integer fileNumber = this.fileCount.get(chunk.uuid);

			if (fileNumber == null) {
				this.fileCount.put(chunk.uuid, new Integer(1));
			}

			Integer numberOfFiles = this.fileCount.get(chunk.uuid);
			KarmaManager manager = this.managers.get(chunk.uuid);
			int files = numberOfFiles.intValue();
			int managerFiles = manager.getResources().size();

			if ((managerFiles == files)
					&& (this.managers.get(chunk.uuid) != null)) {

				this.managers.remove(chunk.uuid);
				this.fileCount.remove(chunk.uuid);
				this.fileWriters.remove(chunk.uuid);
				this.receiver.update(manager);
			} else {
				files++;
				this.fileCount.put(chunk.uuid, new Integer(files));
			}
		}

		buffer.clear();
	}

	private static final class ConnectionNotice
			implements
				ChannelFutureListener {

		private final SslHandler sslHandler;

		ConnectionNotice(SslHandler sslHandler) {
			this.sslHandler = sslHandler;
		}

		public void operationComplete(ChannelFuture future) {
			if (future.isSuccess()) {
				byte[] protectedSession = KarmaUtility
						.toBytes("Your session is protected by "
								+ sslHandler.getEngine().getSession()
										.getCipherSuite() + " cipher suite.");
				ChannelBuffer protectedSessionBuf = ChannelBuffers
						.wrappedBuffer(protectedSession);
				future.getChannel().write(protectedSessionBuf);
			} else {
				future.getChannel().close();
			}
		}
	}
}


Mike McGrady
Principal Investigator AF081-028 AFRL SBIR
Senior Engineer
Topia Technology, Inc
1.253.720.3365
mmcgrady at topiatechnology.com







-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/netty-users/attachments/20090909/233c0784/attachment-0001.html 


More information about the netty-users mailing list