KarmaDecoder updates
Michael McGrady
mmcgrady at topiatechnology.com
Thu Sep 10 00:54:45 EDT 2009
Trustin,
Here is a later version of KarmaDecoder without the fixes in
documentation that will be there.
package com.topiatechnology.karma.spi.transportadapter.netty;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.UUID;
import javax.net.ssl.SSLException;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.ChildChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.WriteCompletionEvent;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.logging.InternalLogLevel;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import com.topiatechnology.karma.api.KarmaManager;
import com.topiatechnology.karma.spi.KarmaReceiver;
import com.topiatechnology.karma.spi.util.KarmaUtility;
/**
* <p>
* A composition rather than inheritance use of JBoss Netty simple
channel
* upstream and downstream handlers with a KARMA decoder.
* </p>
*
* @author Mike McGrady
* @version Karma Alpha v001
* @since August 2009.
*/
@ChannelPipelineCoverage("all")
public final class KarmaDecoder
implements
ChannelUpstreamHandler,
ChannelDownstreamHandler {
private static int channelCount;
private static int count;
// static final Set<Channel> channels = new MapBackedSet<Channel>(
// new ConcurrentHashMap<Channel, Boolean>());
private final KarmaReceiver receiver;
private final SimpleChannelUpstreamHandler scuh = new
SimpleChannelUpstreamHandler();
private final SimpleChannelDownstreamHandler scdh = new
SimpleChannelDownstreamHandler();
private final Map<UUID, FileChannel> fileWriters = new HashMap<UUID,
FileChannel>();
private final Map<UUID, KarmaManager> managers = new HashMap<UUID,
KarmaManager>();
private final Map<UUID, Integer> fileCount = new HashMap<UUID,
Integer>();
private final ChannelGroup allChannels;
private static final InternalLogger logger = InternalLoggerFactory
.getInstance(KarmaDecoder.class.getName());
/**
* Creates a new instance with the current system character set.
*/
public KarmaDecoder(KarmaReceiver receiver, ChannelGroup allChannels) {
this.receiver = receiver;
this.allChannels = allChannels;
}
/**
* {@inheritDoc} Down-casts the received downstream event into more
* meaningful sub-type event and calls an appropriate handler method
with
* the down-casted event.
*/
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent
e) {
if (e instanceof MessageEvent) {
try {
this.scdh.writeRequested(ctx, (MessageEvent) e);
} catch (Exception e1) {
logger.log(InternalLogLevel.ERROR, e1.getMessage());
e1.printStackTrace();
}
} else if (e instanceof ChannelStateEvent) {
ChannelStateEvent evt = (ChannelStateEvent) e;
switch (evt.getState()) {
case OPEN :
if (!Boolean.TRUE.equals(evt.getValue())) {
try {
this.scdh.closeRequested(ctx, evt);
} catch (Exception e1) {
logger.log(InternalLogLevel.ERROR, e1.getMessage());
e1.printStackTrace();
}
}
break;
case BOUND :
if (evt.getValue() != null) {
try {
this.scdh.bindRequested(ctx, evt);
} catch (Exception e1) {
logger.log(InternalLogLevel.ERROR, e1.getMessage());
e1.printStackTrace();
}
} else {
try {
this.scdh.unbindRequested(ctx, evt);
} catch (Exception e1) {
logger.log(InternalLogLevel.ERROR, e1.getMessage());
e1.printStackTrace();
}
}
break;
case CONNECTED :
if (evt.getValue() != null) {
try {
this.scdh.connectRequested(ctx, evt);
} catch (Exception e1) {
logger.log(InternalLogLevel.ERROR, e1.getMessage());
e1.printStackTrace();
}
} else {
try {
this.scdh.disconnectRequested(ctx, evt);
} catch (Exception e1) {
logger.log(InternalLogLevel.ERROR, e1.getMessage());
e1.printStackTrace();
}
}
break;
case INTEREST_OPS :
try {
this.scdh.setInterestOpsRequested(ctx, evt);
} catch (Exception e1) {
logger.log(InternalLogLevel.ERROR, e1.getMessage());
e1.printStackTrace();
}
break;
default :
ctx.sendDownstream(e);
}
} else {
ctx.sendDownstream(e);
}
}
/**
* {@inheritDoc} Down-casts the received upstream event into more
meaningful
* sub-type event and calls an appropriate handler method with the
* down-casted event.
*/
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) {
if (e instanceof MessageEvent) {
this.messageReceivedUpstream(ctx, (MessageEvent) e);
} else if (e instanceof WriteCompletionEvent) {
WriteCompletionEvent evt = (WriteCompletionEvent) e;
try {
this.scuh.writeComplete(ctx, evt);
} catch (Exception e1) {
logger.log(InternalLogLevel.ERROR, e1.getMessage());
e1.printStackTrace();
}
} else if (e instanceof ChildChannelStateEvent) {
ChildChannelStateEvent evt = (ChildChannelStateEvent) e;
if (evt.getChildChannel().isOpen()) {
try {
this.scuh.childChannelOpen(ctx, evt);
} catch (Exception e1) {
logger.log(InternalLogLevel.ERROR, e1.getMessage());
e1.printStackTrace();
}
} else {
try {
this.scuh.childChannelClosed(ctx, evt);
} catch (Exception e1) {
logger.log(InternalLogLevel.ERROR, e1.getMessage());
e1.printStackTrace();
}
}
} else if (e instanceof ChannelStateEvent) {
ChannelStateEvent evt = (ChannelStateEvent) e;
switch (evt.getState()) {
case OPEN :
if (Boolean.TRUE.equals(evt.getValue())) {
try {
// this.scuh.channelOpenUpstream(ctx, evt);
this.channelOpenUpstream(ctx, evt);
} catch (Exception e1) {
logger.log(InternalLogLevel.ERROR, e1.getMessage());
e1.printStackTrace();
}
} else {
try {
this.scuh.channelClosed(ctx, evt);
} catch (Exception e1) {
logger.log(InternalLogLevel.ERROR, e1.getMessage());
e1.printStackTrace();
}
}
break;
case BOUND :
if (evt.getValue() != null) {
try {
this.scuh.channelBound(ctx, evt);
} catch (Exception e1) {
logger.log(InternalLogLevel.ERROR, e1.getMessage());
e1.printStackTrace();
}
} else {
try {
this.scuh.channelUnbound(ctx, evt);
} catch (Exception e1) {
logger.log(InternalLogLevel.ERROR, e1.getMessage());
e1.printStackTrace();
}
}
break;
case CONNECTED :
if (evt.getValue() != null) {
try {
this.handshake(ctx, evt);
} catch (Exception e1) {
logger.log(InternalLogLevel.ERROR, e1.getMessage());
e1.printStackTrace();
}
} else {
try {
this.scuh.channelDisconnected(ctx, evt);
} catch (Exception e1) {
logger.log(InternalLogLevel.ERROR, e1.getMessage());
e1.printStackTrace();
}
}
break;
case INTEREST_OPS :
try {
this.scuh.channelInterestChanged(ctx, evt);
} catch (Exception e1) {
logger.log(InternalLogLevel.ERROR, e1.getMessage());
e1.printStackTrace();
}
break;
default :
ctx.sendUpstream(e);
}
} else if (e instanceof ExceptionEvent) {
try {
this.scuh.exceptionCaught(ctx, (ExceptionEvent) e);
} catch (Exception e1) {
logger.log(InternalLogLevel.ERROR, e1.getMessage());
e1.printStackTrace();
}
} else {
ctx.sendUpstream(e);
}
}
public void channelOpenUpstream(ChannelHandlerContext ctx,
ChannelStateEvent e) {
channelCount++;
// Add all open channels to the global group so that they are
// closed on shutdown.
Channel channel = e.getChannel() ;
System.out.println(channelCount + ": SERVER CHANNEL: " + channel) ;
allChannels.add(channel);
ctx.sendUpstream(e);
}
/**
* Invoked when a message object (e.g: {@link ChannelBuffer}) was
received
* from a remote peer.
*/
private void messageReceivedUpstream(ChannelHandlerContext ctx,
MessageEvent evt) {
Object originalMessage = evt.getMessage();
decode(ctx, evt.getChannel(), originalMessage);
}
/**
* Invoked when a {@link Channel} is open, bound to a local address,
and
* connected to a remote address.
*/
private void handshake(ChannelHandlerContext ctx, ChannelStateEvent
e) {
// Get the SslHandler in the current pipeline.
// We added it in SecureChatPipelineFactory.
final SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
// Get notified when SSL handshake is done.
ChannelFuture handshakeFuture = null;
try {
handshakeFuture = sslHandler.handshake(e.getChannel());
} catch (SSLException e1) {
logger.log(InternalLogLevel.ERROR, e1.getMessage());
e1.printStackTrace();
}
handshakeFuture.addListener(new ConnectionNotice(sslHandler));
}
private void decode(ChannelHandlerContext ctx, Channel channel,
Object msg) {
ChannelBuffer msgBuf = (ChannelBuffer) msg;
Object object = convertChannelBuffer(msgBuf);
if (object instanceof KarmaFileChunk) {
KarmaFileChunk chunk = (KarmaFileChunk) object;
this.processKarmaFileChunk(channel, chunk);
} else if (object instanceof KarmaManager) {
KarmaManager manager = (KarmaManager) object;
processKarmaManager(manager, channel);
} else {
logger.log(InternalLogLevel.ERROR, "Not a legimate object type.");
}
}
private Object convertChannelBuffer(ChannelBuffer msgBuf) {
int readableBytes = msgBuf.readableBytes();
byte[] readBytes = new byte[readableBytes];
msgBuf.readBytes(readBytes);
Object object = KarmaUtility.toObject(readBytes);
return object;
}
private void processKarmaManager(KarmaManager manager, Channel
channel) {
String baseName = System.getProperty("user.home") + File.separatorChar
+ ".KARMA" + File.separatorChar + ".AUM" + File.separatorChar
+ manager.getId() + File.separatorChar;
count++;
// System.out.println("\n" + count + " --------BANG: " + this + ": " +
// count + " manager: " + manager) ;
this.managers.put(manager.getId(), manager);
LinkedHashSet<String> managerInputFileNames = manager.getResources();
LinkedHashSet<File> managerInputFiles = new LinkedHashSet<File>();
Iterator<String> managerFileIter = managerInputFileNames.iterator();
while (managerFileIter.hasNext()) {
String name = managerFileIter.next();
File newFile = new File(baseName + name);
managerInputFiles.add(newFile);
}
UUID uuid = manager.getId();
Integer numberOfFiles = this.fileCount.get(uuid);
int files = numberOfFiles.intValue();
int managerFiles = manager.getResources().size();
if (managerFiles == files) {
this.managers.remove(uuid);
this.fileCount.remove(uuid);
try {
this.fileWriters.remove(uuid).close();
} catch (IOException e) {
logger.log(InternalLogLevel.ERROR, e.getMessage());
e.printStackTrace();
}
this.receiver.update(manager);
channel.close();
}
}
private void processKarmaFileChunk(final Channel channel,
KarmaFileChunk chunk) {
byte[] bytes = chunk.bytes;
String baseName = System.getProperty("user.home") + File.separatorChar
+ ".KARMA" + File.separatorChar + ".AUM" + File.separatorChar
+ chunk.uuid;
File file1 = new File(baseName);
if (!file1.exists()) {
file1.mkdirs();
file1.isDirectory();
}
String toFileName = baseName + File.separatorChar + chunk.fileName;
if (chunk.id == 1) {
FileChannel fileChannel = null;
try {
fileChannel = new FileOutputStream(toFileName).getChannel();
} catch (FileNotFoundException e) {
logger.log(InternalLogLevel.ERROR, e.getMessage());
}
this.fileWriters.put(chunk.uuid, fileChannel);
}
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();
try {
this.fileWriters.get(chunk.uuid).write(buffer);
if (chunk.fileLength == chunk.offset) {
FileChannel fileChannel = this.fileWriters.remove(chunk.uuid);
fileChannel.close();
}
} catch (IOException e) {
logger.log(InternalLogLevel.ERROR, e.getMessage());
}
if (chunk.isLast) {
Integer fileNumber = this.fileCount.get(chunk.uuid);
if (fileNumber == null) {
this.fileCount.put(chunk.uuid, new Integer(1));
}
Integer numberOfFiles = this.fileCount.get(chunk.uuid);
KarmaManager manager = this.managers.get(chunk.uuid);
int files = numberOfFiles.intValue();
int managerFiles = manager.getResources().size();
if ((managerFiles == files)
&& (this.managers.get(chunk.uuid) != null)) {
this.managers.remove(chunk.uuid);
this.fileCount.remove(chunk.uuid);
this.fileWriters.remove(chunk.uuid);
this.receiver.update(manager);
} else {
files++;
this.fileCount.put(chunk.uuid, new Integer(files));
}
}
buffer.clear();
}
private static final class ConnectionNotice
implements
ChannelFutureListener {
private final SslHandler sslHandler;
ConnectionNotice(SslHandler sslHandler) {
this.sslHandler = sslHandler;
}
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
byte[] protectedSession = KarmaUtility
.toBytes("Your session is protected by "
+ sslHandler.getEngine().getSession()
.getCipherSuite() + " cipher suite.");
ChannelBuffer protectedSessionBuf = ChannelBuffers
.wrappedBuffer(protectedSession);
future.getChannel().write(protectedSessionBuf);
} else {
future.getChannel().close();
}
}
}
}
Mike McGrady
Principal Investigator AF081-028 AFRL SBIR
Senior Engineer
Topia Technology, Inc
1.253.720.3365
mmcgrady at topiatechnology.com
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/netty-users/attachments/20090909/233c0784/attachment-0001.html
More information about the netty-users
mailing list