[teiid-commits] teiid SVN: r565 - in trunk: client and 22 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue Mar 17 10:25:36 EDT 2009


Author: shawkins
Date: 2009-03-17 10:25:36 -0400 (Tue, 17 Mar 2009)
New Revision: 565

Added:
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/ObjectInputStreamWithClassloader.java
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java
   trunk/client/src/main/java/org/
   trunk/client/src/main/java/org/jboss/
   trunk/client/src/main/java/org/jboss/netty/
   trunk/client/src/main/java/org/jboss/netty/handler/
   trunk/client/src/main/java/org/jboss/netty/handler/codec/
   trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/
   trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/CompactObjectInputStream.java
   trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/CompactObjectOutputStream.java
   trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectDecoderInputStream.java
   trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectEncoderOutputStream.java
   trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/ChannelListener.java
   trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SSLAwareChannelHandler.java
Removed:
   trunk/client/src/main/java/com/metamatrix/common/comm/api/MessageListener.java
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/SSLAwareChannelHandler.java
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/NioObjectChannelFactory.java
Modified:
   trunk/client/pom.xml
   trunk/client/src/main/java/com/metamatrix/common/api/HostInfo.java
   trunk/client/src/main/java/com/metamatrix/common/api/MMURL.java
   trunk/client/src/main/java/com/metamatrix/common/comm/api/Message.java
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/Handshake.java
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/ObjectChannel.java
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/SocketUtil.java
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/AdminApiServerDiscovery.java
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/ObjectChannelFactory.java
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceFactory.java
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java
   trunk/client/src/main/java/com/metamatrix/dqp/client/ResultsFuture.java
   trunk/client/src/main/resources/teiid-client-settings.properties
   trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerConnection.java
   trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerInstanceImpl.java
   trunk/client/src/test/java/com/metamatrix/common/util/TestMMURL.java
   trunk/common-internal/src/main/java/com/metamatrix/common/util/VMNaming.java
   trunk/engine/src/main/java/com/metamatrix/dqp/internal/process/DQPCore.java
   trunk/engine/src/main/java/com/metamatrix/dqp/internal/process/DQPWorkContext.java
   trunk/pom.xml
   trunk/server/pom.xml
   trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/ServerWorkItem.java
   trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketClientInstance.java
   trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java
   trunk/server/src/main/java/com/metamatrix/common/net/ServerSocketConfiguration.java
   trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestCommSockets.java
   trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestSocketRemoting.java
Log:
TEIID-292 changed back to synch io on the client.

Modified: trunk/client/pom.xml
===================================================================
--- trunk/client/pom.xml	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/pom.xml	2009-03-17 14:25:36 UTC (rev 565)
@@ -18,10 +18,6 @@
       <artifactId>teiid-common-core</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.jboss.netty</groupId>
-      <artifactId>netty</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.jboss.teiid</groupId>
       <artifactId>teiid-common-core</artifactId>
       <type>test-jar</type>

Modified: trunk/client/src/main/java/com/metamatrix/common/api/HostInfo.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/api/HostInfo.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/api/HostInfo.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -26,93 +26,52 @@
 import java.net.UnknownHostException;
 
 import com.metamatrix.common.util.NetUtils;
+import com.metamatrix.core.util.ArgCheck;
 import com.metamatrix.core.util.HashCodeUtil;
 
 /**
- * HostInfo is used internally by Metamatirx to store the host information used in creating the Service Object reference
- * 
+ * Defines the hostname/port or {@link InetAddress} to connect to a host.
  * @since 4.2
  */
 public class HostInfo {
     // Host Name and Port Number
     private String hostName;
+    private int portNumber = 0;
     private InetAddress inetAddress;
-    private int portNumber  = 0;
 
-    public HostInfo(String host, String port) {
-        this(host, parsePort(port));
+    public InetAddress getInetAddress() throws UnknownHostException {
+    	if (inetAddress != null) {
+    		return inetAddress;
+    	}
+    	return NetUtils.resolveHostByName(hostName);
     }
-
-	private static int parsePort(String port) {
-		try {
-            return Integer.parseInt(port);
-        } catch (NumberFormatException nfe) {
-            throw new IllegalArgumentException("port must be numeric:" + port); //$NON-NLS-1$
-        }
-	}
     
     public HostInfo (String host, int port) {
-    	this(host, port, null);
-    }
-    
-    /**
-     * @since 4.2
-     */
-    public HostInfo(String host, int port, InetAddress inetAddress) {
-    	if (host == null || host.equals("")) { //$NON-NLS-1$
-            throw new IllegalArgumentException("hostname can't be null"); //$NON-NLS-1$
-        }
-        if( host.equalsIgnoreCase("localhost")) { //$NON-NLS-1$
-            try {
-            	InetAddress addr = NetUtils.getInstance().getInetAddress();
-				this.hostName = addr.getCanonicalHostName();
-			} catch (UnknownHostException e) {
-				this.hostName = host.toLowerCase();
+    	ArgCheck.isNotNull(host);
+		this.hostName = host.toLowerCase();
+    	this.portNumber = port;
+    	
+    	//only cache inetaddresses if they represent the ip. 
+    	try {
+			InetAddress addr = NetUtils.resolveHostByName(hostName);
+			if (addr.getHostAddress().equalsIgnoreCase(hostName)) {
+				this.inetAddress = addr;
 			}
-        } else {
-        	this.hostName = host.toLowerCase();
-        }
-        if (inetAddress == null) {
-            try {
-				this.inetAddress = InetAddress.getByName(hostName);
-			} catch (UnknownHostException e) {
-				//ignore
-			}
-        } else {
-        	this.inetAddress = inetAddress;
-        }
-        portNumber = port;
-        
-        if (portNumber < 0 || portNumber > 0xFFFF) {
-            throw new IllegalArgumentException("port out of range:" + portNumber); //$NON-NLS-1$
-        }
-        if (hostName == null) {
-            throw new IllegalArgumentException("hostname can't be null");  //$NON-NLS-1$
-        }
+		} catch (UnknownHostException e) {
+		}
     }
     
     public String getHostName() {
         return hostName;
     }
 
-    public void setPortNumber(int thePortNumber) {
-        portNumber = thePortNumber;
-    }
-
     public int getPortNumber() {
         return portNumber;
     }
-
-    public InetAddress getInetAddress() {
-        return inetAddress;
-    }
-
-    public String toString() {
+    
+	public String toString() {
         StringBuffer sb = new StringBuffer();
         sb.append(hostName).append(":").append(portNumber); //$NON-NLS-1$
-        if (inetAddress != null) {
-            sb.append(inetAddress);
-        }
         return sb.toString();
     }
     
@@ -128,10 +87,7 @@
     		return false;
     	}
         HostInfo hostInfo = (HostInfo) obj;
-        if (inetAddress == null || hostInfo.getInetAddress() == null) {
-            return hostName.equals(hostInfo.getHostName()) && portNumber == hostInfo.getPortNumber();
-        }
-        return inetAddress.equals(hostInfo.getInetAddress()) && portNumber == hostInfo.getPortNumber();
+        return hostName.equals(hostInfo.getHostName()) && portNumber == hostInfo.getPortNumber();
     }
 
     /** 
@@ -139,16 +95,8 @@
      * @since 4.2
      */
     public int hashCode() {
-        int hc = 0;
-
-        if (inetAddress != null) {
-        	hc = HashCodeUtil.hashCode(hc, inetAddress.getHostAddress());
-        } else {
-        	hc = HashCodeUtil.hashCode(hc, hostName);
-        }
-        hc = HashCodeUtil.hashCode(hc, portNumber);
-        
-        return hc;
+        int hc = HashCodeUtil.hashCode(0, hostName);
+        return HashCodeUtil.hashCode(hc, portNumber);
     }
 
 }

Modified: trunk/client/src/main/java/com/metamatrix/common/api/MMURL.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/api/MMURL.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/api/MMURL.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -29,7 +29,7 @@
 import java.util.NoSuchElementException;
 import java.util.StringTokenizer;
 
-import com.metamatrix.core.CorePlugin;
+import com.metamatrix.common.comm.platform.CommPlatformPlugin;
 
 /**
  * Class to encapsulate URL to a Clustered Metamatrix Server with multiple host
@@ -92,7 +92,7 @@
 
     public static final String FORMAT_SERVER = "mm[s]://server1:port1[,server2:port2]"; //$NON-NLS-1$
 
-    public static final String INVALID_FORMAT_SERVER = CorePlugin.Util.getString("MMURL.INVALID_FORMAT", new Object[] {FORMAT_SERVER}); //$NON-NLS-1$
+    public static final String INVALID_FORMAT_SERVER = CommPlatformPlugin.Util.getString("MMURL.INVALID_FORMAT", new Object[] {FORMAT_SERVER}); //$NON-NLS-1$
 
 
     
@@ -234,7 +234,19 @@
             try {
                 String host = st2.nextToken().trim();
                 String port = st2.nextToken().trim();
-                HostInfo hostInfo = new HostInfo(host, port);
+                if (host.equals("")) { //$NON-NLS-1$
+                    throw new IllegalArgumentException("hostname can't be empty"); //$NON-NLS-1$
+                }
+                int portNumber;
+                try {
+                    portNumber = Integer.parseInt(port);
+                } catch (NumberFormatException nfe) {
+                    throw new IllegalArgumentException("port must be numeric:" + port); //$NON-NLS-1$
+                }
+                if (portNumber < 0 || portNumber > 0xFFFF) {
+                    throw new IllegalArgumentException("port out of range:" + portNumber); //$NON-NLS-1$
+                }
+                HostInfo hostInfo = new HostInfo(host, portNumber);
                 hosts.add(hostInfo);
             } catch (NoSuchElementException nsee) {
                 throw new IllegalArgumentException(exceptionMessage);

Modified: trunk/client/src/main/java/com/metamatrix/common/comm/api/Message.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/api/Message.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/api/Message.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -48,13 +48,7 @@
 
 	public void readExternal(ObjectInput in) throws IOException,
 			ClassNotFoundException {
-		try {
-			this.contents = (Serializable) in.readObject();
-		} catch (IOException t) {
-			throw t;
-		} catch (Throwable t) {
-			this.contents = t;
-		}
+		this.contents = (Serializable) in.readObject();
 		this.messageKey = (Serializable) in.readObject();
 	}
 

Deleted: trunk/client/src/main/java/com/metamatrix/common/comm/api/MessageListener.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/api/MessageListener.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/api/MessageListener.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -1,43 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.comm.api;
-
-import java.io.Serializable;
-
-/**
- * <p>The MessageListener is used for asynchronous message callbacks and 
- * will receive a message with a messageKey that was sent with the 
- * message originally.  Typically the messageKey is a unique key generated
- * by the client so that it can distinguish between return messages.  The 
- * MessageListener is typically implemented by the application.</p>
- */
-public interface MessageListener {
-
-    /**
-     * Deliver a message to the listener.
-     * @param message The message being delivered
-     * @param messageKey The key identifying the message, may be null depending on the application
-     */
-    void deliverMessage(Message message, Serializable messageKey);
-    
-}

Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/Handshake.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/Handshake.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/Handshake.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -31,7 +31,6 @@
  */
 public class Handshake implements Serializable {
     
-	public static final int HANDSHAKE_TIMEOUT = 3000; //3 seconds
 	private static final long serialVersionUID = 7839271224736355515L;
     
     private String version = MetaMatrixProductVersion.VERSION_NUMBER;

Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/ObjectChannel.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/ObjectChannel.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/ObjectChannel.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -22,27 +22,16 @@
 
 package com.metamatrix.common.comm.platform.socket;
 
+import java.io.IOException;
+import java.net.SocketAddress;
 import java.util.concurrent.Future;
 
-import com.metamatrix.common.comm.exception.CommunicationException;
-
-
-
 public interface ObjectChannel {
 	
-	public interface ChannelListenerFactory {
-		ChannelListener createChannelListener(ObjectChannel channel);
-	}
+	Object read() throws IOException, ClassNotFoundException;
 	
-	public interface ChannelListener {
+	SocketAddress getRemoteAddress();
 		
-		void receivedMessage(Object msg) throws CommunicationException;
-		
-		void exceptionOccurred(Throwable t);
-		
-		void onConnection() throws CommunicationException;
-	}
-		
 	Future<?> write(Object msg);
 	
 	boolean isOpen();

Added: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/ObjectInputStreamWithClassloader.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/ObjectInputStreamWithClassloader.java	                        (rev 0)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/ObjectInputStreamWithClassloader.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -0,0 +1,50 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.common.comm.platform.socket;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+
+public final class ObjectInputStreamWithClassloader extends
+		ObjectInputStream {
+	private final ClassLoader cl;
+
+	public ObjectInputStreamWithClassloader(InputStream in,
+			ClassLoader cl) throws IOException {
+		super(in);
+		this.cl = cl;
+	}
+
+	@Override
+	protected Class<?> resolveClass(ObjectStreamClass desc)
+			throws IOException, ClassNotFoundException {
+		//see java bug id 6434149
+		try {
+			return Class.forName(desc.getName(), false, cl);
+		} catch (ClassNotFoundException e) {
+			return super.resolveClass(desc);
+		}
+	}
+}
\ No newline at end of file


Property changes on: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/ObjectInputStreamWithClassloader.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Deleted: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/SSLAwareChannelHandler.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/SSLAwareChannelHandler.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/SSLAwareChannelHandler.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -1,240 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-/**
- * 
- */
-package com.metamatrix.common.comm.platform.socket;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.net.ssl.SSLEngine;
-
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.DefaultChannelPipeline;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
-import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
-import org.jboss.netty.handler.ssl.SslHandler;
-
-import com.metamatrix.common.comm.exception.SingleInstanceCommunicationException;
-import com.metamatrix.common.comm.platform.CommPlatformPlugin;
-import com.metamatrix.common.comm.platform.socket.ObjectChannel.ChannelListener;
-import com.metamatrix.common.comm.platform.socket.ObjectChannel.ChannelListenerFactory;
-
-/**
- * Main class for creating Netty Nio Channels 
- */
-
- at ChannelPipelineCoverage(ChannelPipelineCoverage.ALL)
-public class SSLAwareChannelHandler extends SimpleChannelHandler implements ChannelPipelineFactory {
-	
-	public class ObjectChannelImpl implements ObjectChannel {
-		private final Channel channel;
-
-		public ObjectChannelImpl(Channel channel) {
-			this.channel = channel;
-		}
-
-		public void close() {
-			channel.close();
-		}
-
-		public boolean isOpen() {
-			return channel.isOpen();
-		}
-
-		public Future<?> write(Object msg) {
-			final ChannelFuture future = channel.write(msg);
-			future.addListener(completionListener);
-			return new Future() {
-
-				@Override
-				public boolean cancel(boolean arg0) {
-					return future.cancel();
-				}
-
-				@Override
-				public Object get() throws InterruptedException,
-						ExecutionException {
-					future.await();
-					if (!future.isSuccess()) {
-						throw new ExecutionException(future.getCause());
-					}
-					return null;
-				}
-
-				@Override
-				public Object get(long arg0, TimeUnit arg1)
-						throws InterruptedException, ExecutionException,
-						TimeoutException {
-					if (future.await(arg0, arg1)) {
-						if (!future.isSuccess()) {
-							throw new ExecutionException(future.getCause());
-						}
-						return null;
-					}
-					throw new TimeoutException();
-				}
-
-				@Override
-				public boolean isCancelled() {
-					return future.isCancelled();
-				}
-
-				@Override
-				public boolean isDone() {
-					return future.isDone();
-				}
-			};
-		}
-	}
-	
-	private final ChannelListenerFactory listenerFactory;
-	private final SSLEngine engine;
-	private final ClassLoader classLoader;
-	private Map<Channel, ChannelListener> listeners = Collections.synchronizedMap(new HashMap<Channel, ChannelListener>());
-	private AtomicLong objectsRead = new AtomicLong(0);
-	private AtomicLong objectsWritten = new AtomicLong(0);
-	private volatile int maxChannels;
-	
-	private ChannelFutureListener completionListener = new ChannelFutureListener() {
-
-		@Override
-		public void operationComplete(ChannelFuture arg0)
-				throws Exception {
-			if (arg0.isSuccess()) {
-				objectsWritten.getAndIncrement();
-			}
-		}
-		
-	};
-	 
-	public SSLAwareChannelHandler(ChannelListenerFactory listenerFactory,
-			SSLEngine engine, ClassLoader classloader) {
-		this.listenerFactory = listenerFactory;
-		this.engine = engine;
-		this.classLoader = classloader;
-	}
-
-	@Override
-	public void channelConnected(ChannelHandlerContext ctx,
-			final ChannelStateEvent e) throws Exception {
-		ChannelListener listener = this.listenerFactory.createChannelListener(new ObjectChannelImpl(e.getChannel()));
-		synchronized (this.listeners) {
-			this.listeners.put(e.getChannel(), listener);
-			maxChannels = Math.max(maxChannels, this.listeners.size());
-		}
-		if (engine != null) {
-			SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
-	        sslHandler.handshake(e.getChannel()).addListener(new ChannelFutureListener() {
-	        	public void operationComplete(ChannelFuture arg0)
-	        			throws Exception {
-	        		onConnection(e.getChannel());
-	        	}
-	        });
-		} else {
-			onConnection(e.getChannel());
-		}
-	}
-	
-	private void onConnection(Channel channel) throws Exception {
-		ChannelListener listener = this.listeners.get(channel);
-		if (listener != null) {
-			listener.onConnection();
-		}
-	}
-	
-	@Override
-	public void exceptionCaught(ChannelHandlerContext ctx,
-			ExceptionEvent e) throws Exception {
-		ChannelListener listener = this.listeners.get(e.getChannel());
-		if (listener != null) {
-			listener.exceptionOccurred(e.getCause());
-		}
-		e.getChannel().close();
-	}
-
-	@Override
-	public void messageReceived(ChannelHandlerContext ctx,
-			MessageEvent e) throws Exception {
-		objectsRead.getAndIncrement();
-		ChannelListener listener = this.listeners.get(e.getChannel());
-		if (listener != null) {
-			listener.receivedMessage(e.getMessage());
-		}
-	}
-	
-	@Override
-	public void channelDisconnected(ChannelHandlerContext ctx,
-			ChannelStateEvent e) throws Exception {
-		ChannelListener listener = this.listeners.remove(e.getChannel());
-		if (listener != null) {
-			listener.exceptionOccurred(new SingleInstanceCommunicationException(CommPlatformPlugin.Util.getString("SSLAwareChannelHandler.channel_closed"))); //$NON-NLS-1$
-		}
-	}
-
-	public ChannelPipeline getPipeline() throws Exception {
-		ChannelPipeline pipeline = new DefaultChannelPipeline();
-
-	    if (engine != null) {
-	        pipeline.addLast("ssl", new SslHandler(engine)); //$NON-NLS-1$
-	    }
-	    pipeline.addLast("decoder", new ObjectDecoder(2 << 24, this.classLoader)); //$NON-NLS-1$
-	    pipeline.addLast("encoder", new ObjectEncoder()); //$NON-NLS-1$
-	    pipeline.addLast("handler", this); //$NON-NLS-1$
-	    return pipeline;
-	}
-	
-	public long getObjectsRead() {
-		return this.objectsRead.get();
-	}
-	
-	public long getObjectsWritten() {
-		return this.objectsWritten.get();
-	}
-	
-	public int getConnectedChannels() {
-		return this.listeners.size();
-	}
-	
-	public int getMaxConnectedChannels() {
-		return this.maxChannels;
-	}
-
-}
\ No newline at end of file

Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/SocketUtil.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/SocketUtil.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/SocketUtil.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -26,6 +26,7 @@
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.Socket;
 import java.security.GeneralSecurityException;
 import java.security.KeyStore;
 import java.security.KeyStoreException;
@@ -37,7 +38,7 @@
 import javax.net.ssl.KeyManager;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLSocket;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
 
@@ -68,17 +69,17 @@
     public static final String ANON_CIPHER_SUITE = "TLS_DH_anon_WITH_AES_128_CBC_SHA"; //$NON-NLS-1$
     public static final String ANON_PROTOCOL = "TLS"; //$NON-NLS-1$
     
-    public static class SSLEngineFactory {
+    public static class SSLSocketFactory {
     	private boolean isAnon;
-    	private SSLContext context;
+    	private javax.net.ssl.SSLSocketFactory factory;
     	
-    	public SSLEngineFactory(SSLContext context, boolean isAnon) {
-			this.context = context;
+    	public SSLSocketFactory(SSLContext context, boolean isAnon) {
+			this.factory = context.getSocketFactory();
 			this.isAnon = isAnon;
 		}
 
-		public SSLEngine getSSLEngine() {
-    		SSLEngine result = context.createSSLEngine();
+		public synchronized Socket getSocket() throws IOException {
+    		SSLSocket result = (SSLSocket)factory.createSocket();
     		result.setUseClientMode(true);
     		if (isAnon) {
     			addCipherSuite(result, ANON_CIPHER_SUITE);
@@ -87,7 +88,7 @@
     	}
     }
     
-    public static SSLEngineFactory getSSLEngineFactory(Properties props) throws IOException, NoSuchAlgorithmException{
+    public static SSLSocketFactory getSSLSocketFactory(Properties props) throws IOException, GeneralSecurityException{
     	// -Dcom.metamatrix.ssl.keyStore
         String keystore = props.getProperty(KEYSTORE_FILENAME); 
         // -Dcom.metamatrix.ssl.keyStorePassword
@@ -123,11 +124,12 @@
         } else {
         	result = SSLContext.getDefault();
         }
-        return new SSLEngineFactory(result, anon);
+        return new SSLSocketFactory(result, anon);
     }
     
     /**
      * create socket factory for the client socket.  
+     * @throws GeneralSecurityException 
      */
     static SSLContext getClientSSLContext(String keystore,
                                             String password,
@@ -135,11 +137,11 @@
                                             String truststorePassword,
                                             String algorithm,
                                             String keystoreType,
-                                            String protocol) throws IOException {
+                                            String protocol) throws IOException, GeneralSecurityException {
         return getSSLContext(keystore, password, truststore, truststorePassword, algorithm, keystoreType, protocol);
     }
     
-    public static void addCipherSuite(SSLEngine engine, String cipherSuite) {
+    public static void addCipherSuite(SSLSocket engine, String cipherSuite) {
         Assertion.assertTrue(Arrays.asList(engine.getSupportedCipherSuites()).contains(cipherSuite));
 
         String[] suites = engine.getEnabledCipherSuites();
@@ -152,7 +154,7 @@
         engine.setEnabledCipherSuites(newSuites);
     }
 
-    public static SSLContext getAnonSSLContext() throws IOException {
+    public static SSLContext getAnonSSLContext() throws IOException, GeneralSecurityException {
         return getSSLContext(null, null, null, null, null, null, ANON_PROTOCOL);
     }
     
@@ -162,43 +164,37 @@
                                             String truststorePassword,
                                             String algorithm,
                                             String keystoreType,
-                                            String protocol) throws IOException {
+                                            String protocol) throws IOException, GeneralSecurityException {
         
-        try {
-        	if (algorithm == null) {
-        		algorithm = KeyManagerFactory.getDefaultAlgorithm();
-        	}
-            // Configure the Keystore Manager
-            KeyManager[] keyManagers = null;
-            if (keystore != null) {
-                KeyStore ks = loadKeyStore(keystore, password, keystoreType);
-                if (ks != null) {
-                    KeyManagerFactory kmf = KeyManagerFactory.getInstance(algorithm);
-                    kmf.init(ks, password.toCharArray());
-                    keyManagers = kmf.getKeyManagers();
-                }
+    	if (algorithm == null) {
+    		algorithm = KeyManagerFactory.getDefaultAlgorithm();
+    	}
+        // Configure the Keystore Manager
+        KeyManager[] keyManagers = null;
+        if (keystore != null) {
+            KeyStore ks = loadKeyStore(keystore, password, keystoreType);
+            if (ks != null) {
+                KeyManagerFactory kmf = KeyManagerFactory.getInstance(algorithm);
+                kmf.init(ks, password.toCharArray());
+                keyManagers = kmf.getKeyManagers();
             }
-            
-            // Configure the Trust Store Manager
-            TrustManager[] trustManagers = null;
-            if (truststore != null) {
-                KeyStore ks = loadKeyStore(truststore, truststorePassword, keystoreType);
-                if (ks != null) {
-                    TrustManagerFactory tmf = TrustManagerFactory.getInstance(algorithm);
-                    tmf.init(ks);
-                    trustManagers = tmf.getTrustManagers();
-                }
-            } 
-
-            // Configure the SSL
-            SSLContext sslc = SSLContext.getInstance(protocol);
-            sslc.init(keyManagers, trustManagers, null);
-            return sslc;
-        } catch (GeneralSecurityException err) {
-            IOException exception = new IOException(err.getMessage());
-            exception.initCause(err);
-            throw exception;
+        }
+        
+        // Configure the Trust Store Manager
+        TrustManager[] trustManagers = null;
+        if (truststore != null) {
+            KeyStore ks = loadKeyStore(truststore, truststorePassword, keystoreType);
+            if (ks != null) {
+                TrustManagerFactory tmf = TrustManagerFactory.getInstance(algorithm);
+                tmf.init(ks);
+                trustManagers = tmf.getTrustManagers();
+            }
         } 
+
+        // Configure the SSL
+        SSLContext sslc = SSLContext.getInstance(protocol);
+        sslc.init(keyManagers, trustManagers, null);
+        return sslc;
     }
     
     /**

Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/AdminApiServerDiscovery.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/AdminApiServerDiscovery.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/AdminApiServerDiscovery.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -90,11 +90,7 @@
 				if (!processObject.isEnabled()) {
 					continue;
 				}
-				if (useUrlHost) {
-					this.knownHosts.add(new HostInfo(lastHostInfo.getHostName(), processObject.getPort()));
-				} else {
-					this.knownHosts.add(new HostInfo(processObject.getInetAddress().getHostName(), processObject.getPort(), processObject.getInetAddress()));
-				}
+				this.knownHosts.add(new HostInfo(useUrlHost?lastHostInfo.getHostName():processObject.getInetAddress().getHostName(), processObject.getPort()));
 			}
 			discoveredHosts = true;
 		} catch (AdminException e) {

Deleted: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/NioObjectChannelFactory.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/NioObjectChannelFactory.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/NioObjectChannelFactory.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -1,92 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.comm.platform.socket.client;
-
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.SSLEngine;
-
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-
-import com.metamatrix.common.comm.exception.CommunicationException;
-import com.metamatrix.common.comm.exception.SingleInstanceCommunicationException;
-import com.metamatrix.common.comm.platform.socket.SSLAwareChannelHandler;
-import com.metamatrix.common.comm.platform.socket.ObjectChannel.ChannelListenerFactory;
-import com.metamatrix.core.util.NamedThreadFactory;
-
-public class NioObjectChannelFactory implements ObjectChannelFactory {
-	
-	private int inputBufferSize;
-	private int outputBufferSize;
-	private boolean conserveBandwidth;
-	private ClassLoader classLoader;
-	private ChannelFactory channlFactory;
-	
-	public NioObjectChannelFactory(boolean conserveBandwidth, int inputBufferSize, int outputBufferSize, ClassLoader classLoader, int workerCount) {
-		this.conserveBandwidth = conserveBandwidth;
-		this.inputBufferSize = inputBufferSize;
-		this.outputBufferSize = outputBufferSize;
-		this.classLoader = classLoader;
-		ThreadPoolExecutor executor = new ThreadPoolExecutor(0,
-				Integer.MAX_VALUE, 2, TimeUnit.MINUTES,
-				new SynchronousQueue<Runnable>(),
-				new NamedThreadFactory("Nio")); //$NON-NLS-1$
-		this.channlFactory = new NioClientSocketChannelFactory(executor, executor, workerCount);
-	}
-
-	public void createObjectChannel(SocketAddress address, SSLEngine engine,
-			ChannelListenerFactory listener) throws IOException,
-			CommunicationException {
-		ClientBootstrap bootstrap = new ClientBootstrap(channlFactory);
-
-        final SSLAwareChannelHandler handler = new SSLAwareChannelHandler(listener, engine, this.classLoader);
-        
-        bootstrap.setPipelineFactory(handler);
-        
-        if (!conserveBandwidth) {
-        	bootstrap.setOption("tcpNoDelay", Boolean.TRUE); //$NON-NLS-1$
-        }
-        bootstrap.setOption("receiveBufferSize", new Integer(inputBufferSize)); //$NON-NLS-1$
-        bootstrap.setOption("sendBufferSize", new Integer(outputBufferSize)); //$NON-NLS-1$
-        bootstrap.setOption("keepAlive", Boolean.TRUE); //$NON-NLS-1$
-               
-        ChannelFuture future = bootstrap.connect(address);
-        
-        //connections have no timeout
-        future.awaitUninterruptibly();
-        
-        if (!future.isSuccess()) {
-        	if (future.getCause() instanceof IOException) {
-        		throw (IOException)future.getCause();
-        	}
-        	throw new SingleInstanceCommunicationException(future.getCause());
-        }
-	}
-}

Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/ObjectChannelFactory.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/ObjectChannelFactory.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/ObjectChannelFactory.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -25,15 +25,12 @@
 import java.io.IOException;
 import java.net.SocketAddress;
 
-import javax.net.ssl.SSLEngine;
-
 import com.metamatrix.common.comm.exception.CommunicationException;
 import com.metamatrix.common.comm.platform.socket.ObjectChannel;
 
 public interface ObjectChannelFactory {
 
-	void createObjectChannel(SocketAddress address, SSLEngine engine,
-			ObjectChannel.ChannelListenerFactory listenerFactory) throws IOException,
+	ObjectChannel createObjectChannel(SocketAddress address, boolean ssl) throws IOException,
 			CommunicationException;
 	
 }
\ No newline at end of file

Added: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java	                        (rev 0)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -0,0 +1,183 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.common.comm.platform.socket.client;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.security.GeneralSecurityException;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.logging.Logger;
+
+import org.jboss.netty.handler.codec.serialization.ObjectDecoderInputStream;
+import org.jboss.netty.handler.codec.serialization.ObjectEncoderOutputStream;
+
+import com.metamatrix.common.comm.exception.CommunicationException;
+import com.metamatrix.common.comm.platform.socket.ObjectChannel;
+import com.metamatrix.common.comm.platform.socket.SocketUtil;
+import com.metamatrix.common.comm.platform.socket.SocketUtil.SSLSocketFactory;
+import com.metamatrix.dqp.client.ResultsFuture;
+
+final class OioOjbectChannelFactory implements ObjectChannelFactory {
+	
+	private final static int STREAM_BUFFER_SIZE = 1<<15;
+	private final static int SO_TIMEOUT = 3000;
+	
+	private static Logger log = Logger.getLogger("org.teiid.client.sockets"); //$NON-NLS-1$
+	
+	final static class OioObjectChannel implements ObjectChannel {
+		private final Socket socket;
+		private ObjectOutputStream outputStream;
+		private ObjectInputStream inputStream;
+		private Object readLock = new Object();
+
+		private OioObjectChannel(Socket socket) throws IOException {
+			log.fine("creating new OioObjectChannel"); //$NON-NLS-1$
+			this.socket = socket;
+            socket.setSoTimeout(SO_TIMEOUT);
+            BufferedOutputStream bos = new BufferedOutputStream( socket.getOutputStream(), STREAM_BUFFER_SIZE);
+            outputStream = new ObjectEncoderOutputStream( new DataOutputStream(bos), 512);
+            //The output stream must be flushed on creation in order to write some initialization data
+            //through the buffered stream to the input stream on the other side
+            outputStream.flush();
+            final ClassLoader cl = Thread.currentThread().getContextClassLoader();
+            BufferedInputStream bis = new BufferedInputStream(socket.getInputStream(), STREAM_BUFFER_SIZE);
+            inputStream = new ObjectDecoderInputStream(new DataInputStream(bis), cl, 1 << 25);
+		}
+
+		@Override
+		public void close() {
+			log.finer("closing socket"); //$NON-NLS-1$
+			try {
+		        outputStream.flush();
+		    } catch (IOException e) {
+		        // ignore
+		    }
+		    try {
+		        outputStream.close();
+		    } catch (IOException e) {
+		        // ignore
+		    }
+		    try {
+		        inputStream.close();
+		    } catch (IOException e) {
+		        // ignore
+		    }
+		    try {
+		        socket.close();
+		    } catch (IOException e) {
+		        // ignore
+		    }
+		}
+
+		@Override
+		public SocketAddress getRemoteAddress() {
+			return socket.getRemoteSocketAddress();
+		}
+
+		@Override
+		public boolean isOpen() {
+			return !socket.isClosed();
+		}
+
+		@Override
+		public Object read() throws IOException, ClassNotFoundException {
+			log.finer("reading message from socket"); //$NON-NLS-1$
+			synchronized (readLock) {
+				try {
+					return inputStream.readObject();
+				} catch (SocketTimeoutException e) {
+					throw e;
+		        } catch (IOException e) {
+		            close();
+		            throw e;
+		        }
+			}
+		}
+
+		@Override
+		public synchronized Future<?> write(Object msg) {
+			log.finer("writing message to socket"); //$NON-NLS-1$
+		    ResultsFuture<Void> result = new ResultsFuture<Void>();
+		    try {
+		        outputStream.writeObject(msg);
+		        outputStream.flush();     
+		        outputStream.reset();
+		    	result.getResultsReceiver().receiveResults(null);
+		    } catch (IOException e) {
+		        close();
+		    	result.getResultsReceiver().exceptionOccurred(e);
+		    }
+		    return result;
+		}
+	}
+
+	private Properties props;
+	private int inputBufferSize;
+	private int outputBufferSize;
+	private boolean conserveBandwidth;
+	private volatile SSLSocketFactory sslSocketFactory;
+
+	public OioOjbectChannelFactory(boolean conserveBandwidth,
+			int inputBufferSize, int outputBufferSize, Properties props) {
+		this.conserveBandwidth = conserveBandwidth;
+		this.inputBufferSize = inputBufferSize;
+		this.outputBufferSize = outputBufferSize;
+		this.props = props;
+	}
+
+	@Override
+	public ObjectChannel createObjectChannel(SocketAddress address, boolean ssl) throws IOException,
+			CommunicationException {
+		final Socket socket;
+		if (ssl) {
+			if (this.sslSocketFactory == null) {
+				try {
+					sslSocketFactory = SocketUtil.getSSLSocketFactory(props);
+				} catch (GeneralSecurityException e) {
+					throw new CommunicationException(e);
+				}
+			}
+			socket = sslSocketFactory.getSocket();
+		} else {
+			socket = new Socket();
+		}
+		if (inputBufferSize > 0) {
+			socket.setReceiveBufferSize(inputBufferSize);
+		}
+		if (outputBufferSize > 0) {
+			socket.setSendBufferSize(outputBufferSize);
+		}
+	    socket.setTcpNoDelay(!conserveBandwidth); // enable Nagle's algorithm to conserve bandwidth
+	    socket.connect(address);
+	    return new OioObjectChannel(socket);
+	}
+}
\ No newline at end of file


Property changes on: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -40,6 +40,8 @@
 import java.util.TimerTask;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -69,7 +71,7 @@
 	private Map<HostInfo, SocketServerInstance> existingConnections = new HashMap<HostInfo, SocketServerInstance>();
 	private SocketServerInstanceFactory connectionFactory;
     private ServerDiscovery serverDiscovery;
-    private static Logger log = Logger.getLogger("org.teiid.client.sockets");
+    private static Logger log = Logger.getLogger("org.teiid.client.sockets"); //$NON-NLS-1$
 
 	private boolean secure;
     private Properties connProps;
@@ -78,7 +80,7 @@
     private volatile LogonResult logonResult;
     private ILogon logon;
     private Timer pingTimer;
-    private volatile boolean closed;
+    private boolean closed;
     
 	public SocketServerConnection(
 			SocketServerInstanceFactory connectionFactory, boolean secure,
@@ -131,7 +133,7 @@
 			}
 			Exception ex = null;
 			try {
-				instance = connectionFactory.createServerInstance(hostInfo, secure);
+				instance = connectionFactory.getServerInstance(hostInfo, secure);
 				if (this.logonResult != null) {
 					ILogon newLogon = instance.getService(ILogon.class);
 					newLogon.assertIdentity(logonResult.getSessionID());
@@ -236,12 +238,9 @@
 	            }
 	            if (exception instanceof SingleInstanceCommunicationException
 						|| exception.getCause() instanceof SingleInstanceCommunicationException) {
-	            	if (!isOpen()) {
+	            	if (!failOver || !isOpen()) {
 	            		break;
 	            	}
-	            	if (!failOver) {
-			        	break;
-		        	}
 	            	invalidateTarget();
 	            } else {
 	            	break;
@@ -275,7 +274,7 @@
 		try {
 			//make a best effort to send the logoff
 			Future<?> writeFuture = this.logon.logoff();
-			writeFuture.get();
+			writeFuture.get(5000, TimeUnit.MILLISECONDS);
 		} catch (InvalidSessionException e) {
 			//ignore
 		} catch (MetaMatrixComponentException e) {
@@ -284,6 +283,8 @@
 			//ignore
 		} catch (ExecutionException e) {
 			//ignore
+		} catch (TimeoutException e) {
+			//ignore
 		}
 
 		for (SocketServerInstance instance : existingConnections.values()) {
@@ -294,7 +295,7 @@
 		this.serverDiscovery.shutdown();
 	}
 
-	public boolean isOpen() {
+	public synchronized boolean isOpen() {
 		if (this.closed) {
 			return false;
 		}

Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -26,20 +26,17 @@
 import java.io.InputStream;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Timer;
 
-import javax.net.ssl.SSLEngine;
-
 import com.metamatrix.common.api.HostInfo;
 import com.metamatrix.common.api.MMURL;
 import com.metamatrix.common.comm.api.ServerConnectionFactory;
 import com.metamatrix.common.comm.exception.CommunicationException;
 import com.metamatrix.common.comm.exception.ConnectionException;
-import com.metamatrix.common.comm.platform.socket.Handshake;
-import com.metamatrix.common.comm.platform.socket.SocketUtil;
-import com.metamatrix.common.comm.platform.socket.SocketUtil.SSLEngineFactory;
 import com.metamatrix.common.util.NetUtils;
 import com.metamatrix.common.util.PropertiesUtils;
 import com.metamatrix.core.MetaMatrixCoreException;
@@ -65,8 +62,8 @@
 	public static final int DEFAULT_MAX_THREADS = 15;
 	public static final long DEFAULT_TTL = 120000L;
 	public static final long DEFAULT_SYNCH_TTL = 120000L;
-	public static final int DEFAULT_SOCKET_INPUT_BUFFER_SIZE = 102400;
-	public static final int DEFAULT_SOCKET_OUTPUT_BUFFER_SIZE = 102400;
+	public static final int DEFAULT_SOCKET_INPUT_BUFFER_SIZE = 0;
+	public static final int DEFAULT_SOCKET_OUTPUT_BUFFER_SIZE = 0;
 	
 	private static final String URL = "URL"; //$NON-NLS-1$
 	
@@ -75,7 +72,6 @@
     private ObjectChannelFactory channelFactory;
 	private Timer pingTimer;
 	private Properties props;
-	private SSLEngineFactory sslEngineFactory;
 	
 	public static synchronized SocketServerConnectionFactory getInstance() {
 		if (INSTANCE == null) {
@@ -95,7 +91,7 @@
 					}
 				}
 			}
-			INSTANCE.init(props, true);
+			INSTANCE.init(props);
 		}
 		return INSTANCE;
 	}
@@ -104,33 +100,15 @@
 		
 	}
 	
-	public void init(Properties props, boolean usePing) {
+	public void init(final Properties props) {
 		this.props = props;
 		this.pingTimer = new Timer("SocketPing", true); //$NON-NLS-1$
-		this.channelFactory = new NioObjectChannelFactory(
-				getConserveBandwidth(), getInputBufferSize(),
-				getOutputBufferSize(), Thread.currentThread()
-						.getContextClassLoader(), getMaxThreads());
+		this.channelFactory = new OioOjbectChannelFactory(getConserveBandwidth(), getInputBufferSize(), getOutputBufferSize(), props);
 	}
 			
-	public SocketServerInstance createServerInstance(HostInfo info, boolean ssl) throws CommunicationException, IOException {
-		SSLEngine sslEngine = null;
-		if (ssl) {
-			synchronized (this) {
-				if (this.sslEngineFactory == null) {
-					try {
-						this.sslEngineFactory = SocketUtil.getSSLEngineFactory(this.props);
-					} catch (NoSuchAlgorithmException e) {
-						throw new CommunicationException(e);
-					} catch (IOException e) {
-						throw new CommunicationException(e);
-					}
-				}
-				sslEngine = this.sslEngineFactory.getSSLEngine();
-			}
-		}
-		SocketServerInstanceImpl ssii = new SocketServerInstanceImpl(info, sslEngine, getSynchronousTTL());
-		ssii.connect(this.channelFactory, Handshake.HANDSHAKE_TIMEOUT);
+	public SocketServerInstance getServerInstance(HostInfo info, boolean ssl) throws CommunicationException, IOException {
+		SocketServerInstanceImpl ssii = new SocketServerInstanceImpl(info, ssl, getSynchronousTTL());
+		ssii.connect(this.channelFactory);
 		return ssii;
 	}
 	

Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceFactory.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceFactory.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceFactory.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -27,12 +27,8 @@
 import com.metamatrix.common.api.HostInfo;
 import com.metamatrix.common.comm.exception.CommunicationException;
 
-
-/**
- * Sockets implementation of the communication framework ServerConnectionFactory interface.
- */
 public interface SocketServerInstanceFactory {
 	
-	SocketServerInstance createServerInstance(HostInfo info, boolean ssl) throws CommunicationException, IOException;
+	SocketServerInstance getServerInstance(HostInfo info, boolean ssl) throws CommunicationException, IOException;
 
 }
\ No newline at end of file

Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -30,6 +30,7 @@
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
@@ -37,16 +38,14 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import javax.net.ssl.SSLEngine;
-
 import com.metamatrix.client.ExceptionUtil;
 import com.metamatrix.common.api.HostInfo;
 import com.metamatrix.common.comm.api.Message;
-import com.metamatrix.common.comm.api.MessageListener;
 import com.metamatrix.common.comm.api.ResultsReceiver;
 import com.metamatrix.common.comm.exception.CommunicationException;
 import com.metamatrix.common.comm.exception.ExceptionHolder;
@@ -54,8 +53,6 @@
 import com.metamatrix.common.comm.platform.CommPlatformPlugin;
 import com.metamatrix.common.comm.platform.socket.Handshake;
 import com.metamatrix.common.comm.platform.socket.ObjectChannel;
-import com.metamatrix.common.comm.platform.socket.ObjectChannel.ChannelListener;
-import com.metamatrix.common.comm.platform.socket.ObjectChannel.ChannelListenerFactory;
 import com.metamatrix.common.util.crypto.CryptoException;
 import com.metamatrix.common.util.crypto.Cryptor;
 import com.metamatrix.common.util.crypto.DhKeyGenerator;
@@ -69,77 +66,42 @@
  * On construction this class will create a channel and exchange a handshake.
  * That handshake will establish a {@link Cryptor} to be used for secure traffic.
  */
-public class SocketServerInstanceImpl implements ChannelListener, SocketServerInstance {
+public class SocketServerInstanceImpl implements SocketServerInstance {
 	
 	private AtomicInteger MESSAGE_ID = new AtomicInteger();
 
 	private HostInfo hostInfo;
-	private SSLEngine engine;
+	private boolean ssl;
     private ObjectChannel socketChannel;
-    private Logger log = Logger.getLogger("org.teiid.client.sockets");
+    private static Logger log = Logger.getLogger("org.teiid.client.sockets"); //$NON-NLS-1$
     private long synchTimeout;
 
     private Cryptor cryptor;
     
-    private Map<Serializable, MessageListener> asynchronousListeners = new ConcurrentHashMap<Serializable, MessageListener>();
+    private Map<Serializable, ResultsReceiver<Object>> asynchronousListeners = new ConcurrentHashMap<Serializable, ResultsReceiver<Object>>();
     
-    private boolean handshakeCompleted;
-    private CommunicationException handshakeError;
-    
     public SocketServerInstanceImpl() {
     	
     }
 
-    public SocketServerInstanceImpl(final HostInfo host, SSLEngine engine, long synchTimeout) {
+    public SocketServerInstanceImpl(final HostInfo host, boolean ssl, long synchTimeout) {
         this.hostInfo = host;
-        this.engine = engine;
+        this.ssl = ssl;
         this.synchTimeout = synchTimeout;
     }
     
-    public void connect(ObjectChannelFactory channelFactory, long handShakeTimeout) throws CommunicationException, IOException {
-        InetSocketAddress address = null;
-        if (hostInfo.getInetAddress() != null) {
-            address = new InetSocketAddress(hostInfo.getInetAddress(), hostInfo.getPortNumber());
-        } else {
-            address = new InetSocketAddress(hostInfo.getHostName(), hostInfo.getPortNumber());
+    public void connect(ObjectChannelFactory channelFactory) throws CommunicationException, IOException {
+        InetSocketAddress address = new InetSocketAddress(hostInfo.getInetAddress(), hostInfo.getPortNumber());
+        this.socketChannel = channelFactory.createObjectChannel(address, ssl);
+        try {
+        	doHandshake();
+        } catch (CommunicationException e) {
+        	this.socketChannel.close();
+        	throw e;
+        } catch (IOException e) {
+        	this.socketChannel.close();
+        	throw e;
         }
-		channelFactory.createObjectChannel(address, engine, new ChannelListenerFactory() {
-
-			public ChannelListener createChannelListener(
-					ObjectChannel channel) {
-				synchronized (SocketServerInstanceImpl.this) {
-					if (SocketServerInstanceImpl.this.handshakeError != null) {
-						channel.close();
-					}
-					SocketServerInstanceImpl.this.socketChannel = channel;
-				}
-				return SocketServerInstanceImpl.this;
-			}
-			
-		});
-		synchronized (this) {
-			long endTime = System.currentTimeMillis() + handShakeTimeout;
-			while (!this.handshakeCompleted && this.handshakeError == null) {
-				long remainingTimeout = endTime - System.currentTimeMillis();
-				if (remainingTimeout <= 0) {
-					break;
-				}
-				try {
-					this.wait(remainingTimeout);
-				} catch (InterruptedException e) {
-					break;
-				}
-			}
-			if (!this.handshakeCompleted || this.handshakeError != null) {
-				if (this.socketChannel != null) {
-					this.socketChannel.close();
-				}
-				if (this.handshakeError == null) {
-					this.handshakeError = new SingleInstanceCommunicationException(CommPlatformPlugin.Util.getString("SocketServerInstanceImpl.handshake_timeout")); //$NON-NLS-1$
-				}	
-				throw this.handshakeError;
-			}
-		}
     }
     
     /**
@@ -153,11 +115,22 @@
         return MetaMatrixProductVersion.VERSION_NUMBER;
     }
     
-    private synchronized void receivedHahdshake(Handshake handshake) {
+    private void doHandshake() throws IOException, CommunicationException {
+    	final Handshake handshake;
         try {
+			Object obj = this.socketChannel.read();
+			
+			if (!(obj instanceof Handshake)) {
+				throw new CommunicationException(CommPlatformPlugin.Util.getString("SocketServerInstanceImpl.handshake_error"));  //$NON-NLS-1$
+			}
+			handshake = (Handshake)obj;
+		} catch (ClassNotFoundException e1) {
+			throw new CommunicationException(e1);
+		}
+
+        try {
             if (!getVersionInfo().equals(handshake.getVersion())) {
-                this.handshakeError = new CommunicationException(CommPlatformPlugin.Util.getString("SocketServerInstanceImpl.version_mismatch", getVersionInfo(), handshake.getVersion())); //$NON-NLS-1$
-                return;
+                throw new CommunicationException(CommPlatformPlugin.Util.getString("SocketServerInstanceImpl.version_mismatch", getVersionInfo(), handshake.getVersion())); //$NON-NLS-1$
             }
             
             handshake.setVersion(getVersionInfo());
@@ -174,11 +147,8 @@
             }
             
             this.socketChannel.write(handshake);
-            this.handshakeCompleted = true;
         } catch (CryptoException err) {
-        	this.handshakeError = new CommunicationException(err);
-        } finally {
-			this.notify();
+        	throw new CommunicationException(err);
         }
     }
 
@@ -186,7 +156,7 @@
         return socketChannel.isOpen();
     }
 
-    public void send(Message message, MessageListener listener, Serializable messageKey)
+    public void send(Message message, ResultsReceiver<Object> listener, Serializable messageKey)
         throws CommunicationException, InterruptedException {
 	    if (listener != null) {
 	        asynchronousListeners.put(messageKey, listener);
@@ -213,54 +183,42 @@
 	public void exceptionOccurred(Throwable e) {
     	if (e instanceof CommunicationException) {
 	        if (e.getCause() instanceof InvalidClassException) {
-	            log.log(Level.SEVERE, "Unknown class or incorrect class version:", e); //$NON-NLS-1$ //$NON-NLS-2$
+	            log.log(Level.SEVERE, "Unknown class or incorrect class version:", e); //$NON-NLS-1$ 
 	        } else {
-	            log.log(Level.FINE, "Unable to read: socket was already closed.", e); //$NON-NLS-1$ //$NON-NLS-2$
+	            log.log(Level.FINE, "Unable to read: socket was already closed.", e); //$NON-NLS-1$ 
 	        }
     	} else if (e instanceof EOFException) {
-            log.log(Level.FINE, "Unable to read: socket was already closed.", e); //$NON-NLS-1$ //$NON-NLS-2$
+            log.log(Level.FINE, "Unable to read: socket was already closed.", e); //$NON-NLS-1$ 
     	} else {
-            log.log(Level.WARNING, "Unable to read: unexpected exception", e); //$NON-NLS-1$ //$NON-NLS-2$
+            log.log(Level.WARNING, "Unable to read: unexpected exception", e); //$NON-NLS-1$ 
     	}
-    	
-		synchronized (this) {
-			if (!handshakeCompleted) {
-				this.handshakeError = new SingleInstanceCommunicationException(e, CommPlatformPlugin.Util.getString(engine!=null?"SocketServerInstanceImpl.secure_error_during_handshake":"SocketServerInstanceImpl.error_during_handshake")); //$NON-NLS-1$ //$NON-NLS-2$  
-				this.notify();
-			}
-		}
-		
-        Message messageHolder = new Message();
-        messageHolder.setContents(e instanceof SingleInstanceCommunicationException?e:new SingleInstanceCommunicationException(e));
+    			
+        if (!(e instanceof SingleInstanceCommunicationException)) {
+        	e = new SingleInstanceCommunicationException(e);
+        }
 
-        Set<Map.Entry<Serializable, MessageListener>> entries = this.asynchronousListeners.entrySet();
-        for (Iterator<Map.Entry<Serializable, MessageListener>> iterator = entries.iterator(); iterator.hasNext();) {
-			Map.Entry<Serializable, MessageListener> entry = iterator.next();
+        Set<Map.Entry<Serializable, ResultsReceiver<Object>>> entries = this.asynchronousListeners.entrySet();
+        for (Iterator<Map.Entry<Serializable, ResultsReceiver<Object>>> iterator = entries.iterator(); iterator.hasNext();) {
+			Map.Entry<Serializable, ResultsReceiver<Object>> entry = iterator.next();
 			iterator.remove();
-			entry.getValue().deliverMessage(messageHolder, entry.getKey());
+			entry.getValue().exceptionOccurred(e);
 		}
     }
 
 	public void receivedMessage(Object packet) {
-		log.log(Level.FINE, "reading packet"); //$NON-NLS-1$ //$NON-NLS-2$
+		log.log(Level.FINE, "reading packet"); //$NON-NLS-1$ 
         if (packet instanceof Message) {
         	Message messagePacket = (Message)packet;
-            processAsynchronousPacket(messagePacket);
-        } else if (packet instanceof Handshake) {
-        	receivedHahdshake((Handshake)packet);
+        	Serializable messageKey = messagePacket.getMessageKey();
+            log.log(Level.FINE, "read asynch message:" + messageKey); //$NON-NLS-1$ 
+            ResultsReceiver<Object> listener = asynchronousListeners.remove(messageKey);
+            if (listener != null) {
+                listener.receiveResults(messagePacket.getContents());
+            }
         } else {
-        	log.log(Level.FINE, "packet ignored:" + packet); //$NON-NLS-1$ //$NON-NLS-2$
+        	log.log(Level.FINE, "packet ignored:" + packet); //$NON-NLS-1$ 
         }
     }
-
-    private void processAsynchronousPacket(Message message) {
-        Serializable messageKey = message.getMessageKey();
-        log.log(Level.FINE, "read asynch message:" + messageKey); //$NON-NLS-1$ //$NON-NLS-2$
-    	MessageListener listener = asynchronousListeners.remove(messageKey);
-        if (listener != null) {
-            listener.deliverMessage(message, messageKey);
-        }
-    }
     
     public void shutdown() {
     	socketChannel.close();
@@ -273,10 +231,7 @@
         return this.cryptor;
     }
 
-	public void onConnection() {
-		
-	}
-
+	@SuppressWarnings("unchecked")
 	@Override
 	public <T> T getService(Class<T> iface) {
 		return (T)Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] {iface}, new RemoteInvocationHandler(iface));
@@ -303,7 +258,7 @@
 				if (secure) {
 					message.setContents(getCryptor().sealObject(message.getContents()));
 				}
-				ResultsFuture results = new ResultsFuture() {
+				ResultsFuture<Object> results = new ResultsFuture<Object>() {
 					@Override
 					protected Object convertResult() throws ExecutionException {
 						try {
@@ -319,24 +274,53 @@
 							throw new ExecutionException(e);
 						}
 					}
+					
+					@Override
+					public synchronized Object get()
+							throws InterruptedException, ExecutionException {
+						throw new UnsupportedOperationException();
+					}
+					
+					/**
+					 * get calls are overridden to provide a thread in which to perform
+					 * the actual reads. 
+					 */
+					@Override
+					public synchronized Object get(long timeout, TimeUnit unit)
+							throws InterruptedException, ExecutionException,
+							TimeoutException {
+						int timeoutMillis = (int)Math.min(unit.toMillis(timeout), Integer.MAX_VALUE);
+						while (!isDone()) {
+							if (timeoutMillis <= 0) {
+								throw new TimeoutException();
+							}
+							long start = System.currentTimeMillis();
+							try {
+								receivedMessage(socketChannel.read());
+							} catch (IOException e) {
+								if (e instanceof SocketTimeoutException) {
+									timeoutMillis -= (System.currentTimeMillis() - start);
+									continue;
+								}
+								exceptionOccurred(e);
+							} catch (ClassNotFoundException e) {
+								exceptionOccurred(e);
+							}
+						}
+						return super.get(timeout, unit);
+					}
 				};
-				final ResultsReceiver receiver = results.getResultsReceiver();
+				final ResultsReceiver<Object> receiver = results.getResultsReceiver();
 	
-				send(message, new MessageListener() {
-	
-					public void deliverMessage(Message responseMessage,
-							Serializable messageKey) {
-						Serializable result = responseMessage.getContents();
-						receiver.receiveResults(result);
-					}
-	
-				}, Integer.valueOf(MESSAGE_ID.getAndIncrement()));
+				send(message, receiver, Integer.valueOf(MESSAGE_ID.getAndIncrement()));
 				if (ResultsFuture.class.isAssignableFrom(method.getReturnType())) {
 					return results;
 				}
 				return results.get(synchTimeout, TimeUnit.MILLISECONDS);
 			} catch (ExecutionException e) {
 				t = e.getCause();
+			} catch (TimeoutException e) {
+				t = new SingleInstanceCommunicationException(e);
 			} catch (Throwable e) {
 				t = e;
 			}

Modified: trunk/client/src/main/java/com/metamatrix/dqp/client/ResultsFuture.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/dqp/client/ResultsFuture.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/dqp/client/ResultsFuture.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -30,6 +30,10 @@
 
 import com.metamatrix.common.comm.api.ResultsReceiver;
 
+/**
+ * Implements a call back based future that can also have
+ * completion listeners.
+ */
 public class ResultsFuture<T> implements Future<T> {
 	
 	public interface CompletionListener<T> {
@@ -41,36 +45,37 @@
 	private T result;
 	private Throwable exception;
 	private boolean done;
+	private ResultsReceiver<T> resultsReceiver = new ResultsReceiver<T> () {
+		public void exceptionOccurred(Throwable e) {
+			synchronized (ResultsFuture.this) {
+				if (done) {
+					throw new IllegalStateException("Already sent results"); //$NON-NLS-1$
+				}
+				exception = e;
+				done = true;
+				ResultsFuture.this.notifyAll();
+			}
+			done();
+		}
+		public void receiveResults(T results) {
+			synchronized (ResultsFuture.this) {
+				if (done) {
+					throw new IllegalStateException("Already sent results"); //$NON-NLS-1$
+				}
+				result = results;
+				done = true;
+				ResultsFuture.this.notifyAll();
+			}
+			done();
+		}
+	}; 
 	
 	public ResultsFuture() {
 		
 	}
 	
 	public ResultsReceiver<T> getResultsReceiver() {
-		return new ResultsReceiver<T> () {
-			public void exceptionOccurred(Throwable e) {
-				synchronized (ResultsFuture.this) {
-					if (done) {
-						throw new IllegalStateException("Already sent results"); //$NON-NLS-1$
-					}
-					exception = e;
-					done = true;
-					ResultsFuture.this.notifyAll();
-				}
-				done();
-			}
-			public void receiveResults(T results) {
-				synchronized (ResultsFuture.this) {
-					if (done) {
-						throw new IllegalStateException("Already sent results"); //$NON-NLS-1$
-					}
-					result = results;
-					done = true;
-					ResultsFuture.this.notifyAll();
-				}
-				done();
-			}
-		};
+		return resultsReceiver; 
 	}
 	
 	public boolean cancel(boolean mayInterruptIfRunning) {

Added: trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/CompactObjectInputStream.java
===================================================================
--- trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/CompactObjectInputStream.java	                        (rev 0)
+++ trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/CompactObjectInputStream.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -0,0 +1,98 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @author tags. See the COPYRIGHT.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.netty.handler.codec.serialization;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+import java.io.StreamCorruptedException;
+
+/**
+ * @author The Netty Project (netty-dev at lists.jboss.org)
+ * @author Trustin Lee (tlee at redhat.com)
+ *
+ * @version $Rev: 381 $, $Date: 2008-10-01 06:06:18 -0500 (Wed, 01 Oct 2008) $
+ *
+ */
+class CompactObjectInputStream extends ObjectInputStream {
+
+    private final ClassLoader classLoader;
+
+    CompactObjectInputStream(InputStream in) throws IOException {
+        this(in, null);
+    }
+
+    CompactObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException {
+        super(in);
+        this.classLoader = classLoader;
+    }
+
+    @Override
+    protected void readStreamHeader() throws IOException,
+            StreamCorruptedException {
+        int version = readByte() & 0xFF;
+        if (version != STREAM_VERSION) {
+            throw new StreamCorruptedException(
+                    "Unsupported version: " + version); //$NON-NLS-1$
+        }
+    }
+
+    @Override
+    protected ObjectStreamClass readClassDescriptor()
+            throws IOException, ClassNotFoundException {
+        int type = read();
+        if (type < 0) {
+            throw new EOFException();
+        }
+        switch (type) {
+        case CompactObjectOutputStream.TYPE_PRIMITIVE:
+            return super.readClassDescriptor();
+        case CompactObjectOutputStream.TYPE_NON_PRIMITIVE:
+            String className = readUTF();
+            Class<?> clazz;
+            if (classLoader == null) {
+                clazz = Class.forName(
+                        className, true,
+                        Thread.currentThread().getContextClassLoader());
+            } else {
+                clazz = Class.forName(className, true, classLoader);
+            }
+            return ObjectStreamClass.lookup(clazz);
+        default:
+            throw new StreamCorruptedException(
+                    "Unexpected class descriptor type: " + type); //$NON-NLS-1$
+        }
+    }
+
+    @Override
+    protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
+        String name = desc.getName();
+        try {
+            return Class.forName(name, false, classLoader);
+        } catch (ClassNotFoundException ex) {
+            return super.resolveClass(desc);
+        }
+    }
+}


Property changes on: trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/CompactObjectInputStream.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Added: trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/CompactObjectOutputStream.java
===================================================================
--- trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/CompactObjectOutputStream.java	                        (rev 0)
+++ trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/CompactObjectOutputStream.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @author tags. See the COPYRIGHT.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.netty.handler.codec.serialization;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+import java.io.OutputStream;
+
+/**
+ * @author The Netty Project (netty-dev at lists.jboss.org)
+ * @author Trustin Lee (tlee at redhat.com)
+ *
+ * @version $Rev: 6 $, $Date: 2008-08-07 20:40:10 -0500 (Thu, 07 Aug 2008) $
+ *
+ */
+class CompactObjectOutputStream extends ObjectOutputStream {
+
+    static final int TYPE_PRIMITIVE = 0;
+    static final int TYPE_NON_PRIMITIVE = 1;
+
+    CompactObjectOutputStream(OutputStream out) throws IOException {
+        super(out);
+    }
+
+    @Override
+    protected void writeStreamHeader() throws IOException {
+        writeByte(STREAM_VERSION);
+    }
+
+    @Override
+    protected void writeClassDescriptor(ObjectStreamClass desc) throws IOException {
+        if (desc.forClass().isPrimitive()) {
+            write(TYPE_PRIMITIVE);
+            super.writeClassDescriptor(desc);
+        } else {
+            write(TYPE_NON_PRIMITIVE);
+            writeUTF(desc.getName());
+        }
+    }
+}


Property changes on: trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/CompactObjectOutputStream.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Added: trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectDecoderInputStream.java
===================================================================
--- trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectDecoderInputStream.java	                        (rev 0)
+++ trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectDecoderInputStream.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -0,0 +1,74 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @author tags. See the COPYRIGHT.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.netty.handler.codec.serialization;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.StreamCorruptedException;
+
+/**
+ * An {@link ObjectInput} which is interoperable with {@link ObjectEncoder}
+ * and {@link ObjectEncoderOutputStream}.
+ *
+ * @author The Netty Project (netty-dev at lists.jboss.org)
+ * @author Trustin Lee (tlee at redhat.com)
+ *
+ * @version $Rev: 628 $, $Date: 2009-01-05 20:06:00 -0600 (Mon, 05 Jan 2009) $
+ *
+ */
+public class ObjectDecoderInputStream extends ObjectInputStream {
+
+    private final DataInputStream in;
+    private final ClassLoader classLoader;
+    private final int maxObjectSize;
+
+    public ObjectDecoderInputStream(DataInputStream in, ClassLoader classLoader, int maxObjectSize) throws SecurityException, IOException {
+    	super();
+    	this.in = in;
+        this.classLoader = classLoader;
+        this.maxObjectSize = maxObjectSize;
+    }
+    
+    @Override
+    protected final Object readObjectOverride() throws IOException,
+    		ClassNotFoundException {
+        int dataLen = in.readInt();
+        if (dataLen <= 0) {
+            throw new StreamCorruptedException("invalid data length: " + dataLen); //$NON-NLS-1$
+        }
+        if (dataLen > maxObjectSize) {
+            throw new StreamCorruptedException(
+                    "data length too big: " + dataLen + " (max: " + maxObjectSize + ')'); //$NON-NLS-1$ //$NON-NLS-2$
+        }
+
+        return new CompactObjectInputStream(in, classLoader).readObject();
+    }
+    
+    @Override
+    public void close() throws IOException {
+    	in.close();
+    }
+    
+}


Property changes on: trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectDecoderInputStream.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Added: trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectEncoderOutputStream.java
===================================================================
--- trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectEncoderOutputStream.java	                        (rev 0)
+++ trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectEncoderOutputStream.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -0,0 +1,78 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @author tags. See the COPYRIGHT.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.netty.handler.codec.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+
+/**
+ * An {@link ObjectOutput} which is interoperable with {@link ObjectDecoder}
+ * and {@link ObjectDecoderInputStream}.
+ *
+ * @author The Netty Project (netty-dev at lists.jboss.org)
+ * @author Trustin Lee (tlee at redhat.com)
+ *
+ * @version $Rev: 595 $, $Date: 2008-12-08 03:02:33 -0600 (Mon, 08 Dec 2008) $
+ *
+ */
+public class ObjectEncoderOutputStream extends ObjectOutputStream {
+
+    private final DataOutputStream out;
+    private final int estimatedLength;
+
+    public ObjectEncoderOutputStream(DataOutputStream out, int estimatedLength) throws SecurityException, IOException {
+    	super();
+    	this.out = out;
+        this.estimatedLength = estimatedLength;
+    }
+    
+    @Override
+    final protected void writeObjectOverride(Object obj) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(estimatedLength);
+        ObjectOutputStream oout = new CompactObjectOutputStream(baos);
+        oout.writeObject(obj);
+        oout.flush();
+        oout.close();
+
+        out.writeInt(baos.size());
+        out.write(baos.toByteArray());
+    }
+    
+    @Override
+    public void close() throws IOException {
+    	out.close();
+    }
+    
+    @Override
+    public void flush() throws IOException {
+    	out.flush();
+    }
+    
+    @Override
+    public void reset() throws IOException {
+    }
+    
+}


Property changes on: trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectEncoderOutputStream.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Modified: trunk/client/src/main/resources/teiid-client-settings.properties
===================================================================
--- trunk/client/src/main/resources/teiid-client-settings.properties	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/resources/teiid-client-settings.properties	2009-03-17 14:25:36 UTC (rev 565)
@@ -86,16 +86,16 @@
 metamatrix.synchronous.sockets.ttl=120000
 
 #
-# Set the input buffer size
+# Set the socket receive buffer size
 #
 
-metamatrix.sockets.inputBufferSize=102400
+metamatrix.sockets.inputBufferSize=0
 
 #
-# Set the output buffer size
+# Set the socket send buffer size
 #
 
-metamatrix.sockets.outputBufferSize=102400
+metamatrix.sockets.outputBufferSize=0
 
 #
 # Set to true to enable Nagle's algorithm to conserve bandwidth 

Modified: trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerConnection.java
===================================================================
--- trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerConnection.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerConnection.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -78,7 +78,7 @@
 				Properties connectionProperties)
 				throws LogonException,
 				MetaMatrixComponentException {
-			return new LogonResult(new MetaMatrixSessionID(1), "fooUser", new Properties(), 1, "fake"); //$NON-NLS-1$
+			return new LogonResult(new MetaMatrixSessionID(1), "fooUser", new Properties(), 1, "fake"); //$NON-NLS-1$ //$NON-NLS-2$
 		}
 
 		@Override
@@ -122,23 +122,23 @@
 		Properties p = new Properties();
 		SocketServerInstanceFactory instanceFactory = new SocketServerInstanceFactory() {
 			@Override
-			public SocketServerInstance createServerInstance(HostInfo info,
+			public SocketServerInstance getServerInstance(HostInfo info,
 					boolean ssl) throws CommunicationException, IOException {
 				throw new SingleInstanceCommunicationException();
 			}
 		};
-		ServerDiscovery discovery = new UrlServerDiscovery(new MMURL("mm://host1:1,host2:2"));
+		ServerDiscovery discovery = new UrlServerDiscovery(new MMURL("mm://host1:1,host2:2")); //$NON-NLS-1$
 		try {
 			new SocketServerConnection(instanceFactory, false, discovery, p, null);
-			fail("exception expected");
+			fail("exception expected"); //$NON-NLS-1$
 		} catch (CommunicationException e) {
-			assertEquals("No valid host available. Attempted connections to: [host1:1, host2:2]", e.getMessage());
+			assertEquals("No valid host available. Attempted connections to: [host1:1, host2:2]", e.getMessage()); //$NON-NLS-1$
 		}
 	}
 	
 	public void testLogon() throws Exception {
 		SocketServerConnection connection = createConnection(null);
-		assertEquals("00000000-0000-0001-0000-000000000001", connection.getLogonResult().getSessionID().toString());
+		assertEquals("00000000-0000-0001-0000-000000000001", connection.getLogonResult().getSessionID().toString()); //$NON-NLS-1$
 	}
 	
 	/**
@@ -155,14 +155,14 @@
 		ILogon logon = connection.getService(ILogon.class);
 		try {
 			logon.ping();
-			fail("expected exception");
+			fail("expected exception"); //$NON-NLS-1$
 		} catch (MetaMatrixComponentException e) {
 			
 		}
 	}
 
 	private SocketServerConnection createConnection(final Throwable throwException) throws CommunicationException, ConnectionException {
-		return createConnection(throwException, new HostInfo("foo", 1));
+		return createConnection(throwException, new HostInfo("foo", 1)); //$NON-NLS-1$
 	}
 	
 	private SocketServerConnection createConnection(final Throwable t, HostInfo hostInfo)
@@ -171,7 +171,7 @@
 		ServerDiscovery discovery = new UrlServerDiscovery(new MMURL(hostInfo.getHostName(), hostInfo.getPortNumber(), false));
 		SocketServerInstanceFactory instanceFactory = new SocketServerInstanceFactory() {
 			@Override
-			public SocketServerInstance createServerInstance(final HostInfo info,
+			public SocketServerInstance getServerInstance(final HostInfo info,
 					boolean ssl) throws CommunicationException, IOException {
 				SocketServerInstance instance = Mockito.mock(SocketServerInstance.class);
 				Mockito.stub(instance.getCryptor()).toReturn(new NullCryptor());
@@ -188,8 +188,8 @@
 	}
 	
 	public void testIsSameInstance() throws Exception {
-		SocketServerConnection conn = createConnection(null, new HostInfo("foo", 1));
-		SocketServerConnection conn1 = createConnection(null, new HostInfo("bar", 1));
+		SocketServerConnection conn = createConnection(null, new HostInfo("foo", 1)); //$NON-NLS-1$
+		SocketServerConnection conn1 = createConnection(null, new HostInfo("bar", 1)); //$NON-NLS-1$
 		
 		ClientSideDQP dqp = conn.getService(ClientSideDQP.class);
 		ClientSideDQP dqp1 = conn1.getService(ClientSideDQP.class);

Modified: trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerInstanceImpl.java
===================================================================
--- trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerInstanceImpl.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerInstanceImpl.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -24,30 +24,34 @@
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 
-import javax.net.ssl.SSLEngine;
-
 import junit.framework.TestCase;
 
-import org.mockito.Mockito;
-
 import com.metamatrix.api.exception.MetaMatrixComponentException;
 import com.metamatrix.common.api.HostInfo;
 import com.metamatrix.common.comm.exception.CommunicationException;
 import com.metamatrix.common.comm.platform.socket.Handshake;
 import com.metamatrix.common.comm.platform.socket.ObjectChannel;
-import com.metamatrix.common.comm.platform.socket.ObjectChannel.ChannelListener;
-import com.metamatrix.common.comm.platform.socket.ObjectChannel.ChannelListenerFactory;
 import com.metamatrix.dqp.client.ResultsFuture;
 import com.metamatrix.platform.security.api.ILogon;
 
 public class TestSocketServerInstanceImpl extends TestCase {
 	
-	private static final class FakeObjectChannel implements ObjectChannel {
-		Object msg;
+	private static class FakeObjectChannel implements ObjectChannel, ObjectChannelFactory {
+		List<Object> msgs = new ArrayList<Object>();
+		List<? extends Object> readMsgs;
+		int readCount;
+		
+		public FakeObjectChannel(List<? extends Object> readMsgs) {
+			this.readMsgs = readMsgs;
+		}
 
 		@Override
 		public void close() {
@@ -61,71 +65,80 @@
 
 		@Override
 		public Future<?> write(Object msg) {
-			ResultsFuture<?> result = new ResultsFuture();
-			this.msg = msg;
+			msgs.add(msg);
+			ResultsFuture<?> result = new ResultsFuture<Void>();
 			result.getResultsReceiver().receiveResults(null);
 			return result;
 		}
+		
+		@Override
+		public Object read() throws IOException,
+				ClassNotFoundException {
+			Object msg = readMsgs.get(readCount++);
+			if (msg instanceof IOException) {
+				if (msg instanceof SocketTimeoutException) {
+					try {
+						Thread.sleep(5);
+					} catch (InterruptedException e) {
+					}
+				}
+				throw (IOException)msg;
+			}
+			return msg;
+		}
+		
+		@Override
+		public SocketAddress getRemoteAddress() {
+			return null;
+		}
+		
+		@Override
+		public ObjectChannel createObjectChannel(SocketAddress address,
+				boolean ssl) throws IOException, CommunicationException {
+			return this;
+		}
+		
 	}
 
 	public void testHandshakeTimeout() throws Exception {
-		ObjectChannelFactory channelFactory = Mockito.mock(ObjectChannelFactory.class);
+		final FakeObjectChannel channel = new FakeObjectChannel(Arrays.asList(new SocketTimeoutException()));
+		
 		try {
-			createInstance(channelFactory);
+			createInstance(channel);
 			fail("Exception expected"); //$NON-NLS-1$
-		} catch (CommunicationException e) {
-			assertEquals("Handshake timeout", e.getMessage()); //$NON-NLS-1$
+		} catch (IOException e) {
+			
 		}
 	}
 
 	private SocketServerInstanceImpl createInstance(ObjectChannelFactory channelFactory)
 			throws CommunicationException, IOException {
-		SocketServerInstanceImpl ssii = new SocketServerInstanceImpl(new HostInfo("foo", 1), null, 1); //$NON-NLS-1$
-		ssii.connect(channelFactory, 1);
+		SocketServerInstanceImpl ssii = new SocketServerInstanceImpl(new HostInfo("0.0.0.0", 1), false, 1); //$NON-NLS-1$
+		ssii.connect(channelFactory);
 		return ssii;
 	}
 	
 	public void testSuccessfulHandshake() throws Exception {
-		final FakeObjectChannel channel = new FakeObjectChannel();
-		ObjectChannelFactory channelFactory = new ObjectChannelFactory() {
-			@Override
-			public void createObjectChannel(SocketAddress address,
-					SSLEngine engine, ChannelListenerFactory listenerFactory)
-					throws IOException, CommunicationException {
-				assertNull(engine);
-				ChannelListener listener = listenerFactory.createChannelListener(channel);
-				listener.receivedMessage(new Handshake());
-				assertTrue(channel.msg instanceof Handshake);
-			}
-		};
-		SocketServerInstanceImpl instance = createInstance(channelFactory);
+		final FakeObjectChannel channel = new FakeObjectChannel(Arrays.asList(new Handshake(), new SocketTimeoutException()));
 		
+		SocketServerInstanceImpl instance = createInstance(channel);
+		
 		//no remote server is hooked up, so this will timeout
 		ILogon logon = instance.getService(ILogon.class);
 		try {
 			logon.logon(new Properties());
 			fail("Exception expected"); //$NON-NLS-1$
 		} catch (MetaMatrixComponentException e) {
-			assertTrue(e.getCause() instanceof TimeoutException);
+			assertTrue(e.getCause().getCause() instanceof TimeoutException);
 		}
 	}
 	
 	public void testVersionMismatch() throws Exception {
-		final FakeObjectChannel channel = new FakeObjectChannel();
-		ObjectChannelFactory channelFactory = new ObjectChannelFactory() {
-			@Override
-			public void createObjectChannel(SocketAddress address,
-					SSLEngine engine, ChannelListenerFactory listenerFactory)
-					throws IOException, CommunicationException {
-				assertNull(engine);
-				ChannelListener listener = listenerFactory.createChannelListener(channel);
-				Handshake h = new Handshake();
-				h.setVersion("foo"); //$NON-NLS-1$
-				listener.receivedMessage(h);
-			}
-		};
+		Handshake h = new Handshake();
+		h.setVersion("foo"); //$NON-NLS-1$
+		final FakeObjectChannel channel = new FakeObjectChannel(Arrays.asList(h));
 		try {
-			createInstance(channelFactory);
+			createInstance(channel);
 			fail("exception expected"); //$NON-NLS-1$
 		} catch (CommunicationException e) {
 			e.printStackTrace();
@@ -133,5 +146,4 @@
 		}
 	}
 
-
 }

Modified: trunk/client/src/test/java/com/metamatrix/common/util/TestMMURL.java
===================================================================
--- trunk/client/src/test/java/com/metamatrix/common/util/TestMMURL.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/test/java/com/metamatrix/common/util/TestMMURL.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -217,14 +217,14 @@
     }
 
     public final void testHostInfoEquals() {
-        HostInfo expectedResults = new HostInfo("localhost","31000");  //$NON-NLS-1$//$NON-NLS-2$
+        HostInfo expectedResults = new HostInfo("localhost",31000);  //$NON-NLS-1$
         MMURL url = new MMURL("mm://localhost:31000"); //$NON-NLS-1$
         HostInfo actualResults = url.getHostInfo().get(0);
         assertEquals(expectedResults,actualResults);
     }
         
     public final void testWithEmbeddedSpaces() {
-        HostInfo expectedResults = new HostInfo("localhost","12345");  //$NON-NLS-1$ //$NON-NLS-2$
+        HostInfo expectedResults = new HostInfo("localhost",12345);  //$NON-NLS-1$
         
         MMURL url = new MMURL("mm://localhost : 12345"); //$NON-NLS-1$
         List hosts = url.getHostInfo();
@@ -235,7 +235,7 @@
     }
     
     public final void testHostPortConstructor() {
-        HostInfo expectedResults = new HostInfo("myhost","12345");  //$NON-NLS-1$ //$NON-NLS-2$
+        HostInfo expectedResults = new HostInfo("myhost", 12345);  //$NON-NLS-1$
         
         MMURL url = new MMURL("myhost", 12345, false); //$NON-NLS-1$
         List hosts = url.getHostInfo();
@@ -247,7 +247,7 @@
     }
     
     public final void testHostPortConstructorSSL() {
-        HostInfo expectedResults = new HostInfo("myhost","12345");  //$NON-NLS-1$ //$NON-NLS-2$
+        HostInfo expectedResults = new HostInfo("myhost",12345);  //$NON-NLS-1$ 
         
         MMURL url = new MMURL("myhost", 12345, true); //$NON-NLS-1$
         List hosts = url.getHostInfo();
@@ -258,12 +258,4 @@
         assertEquals("mms://myhost:12345", url.getAppServerURL()); //$NON-NLS-1$
     }
     
-    public final void testHostPortConstructor_NoHost() {
-        try {
-            new MMURL("", 12345, false); //$NON-NLS-1$
-            fail("Should have failed."); //$NON-NLS-1$
-        } catch (Exception e) {
-            // Success
-        }
-    }  
 }

Modified: trunk/common-internal/src/main/java/com/metamatrix/common/util/VMNaming.java
===================================================================
--- trunk/common-internal/src/main/java/com/metamatrix/common/util/VMNaming.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/common-internal/src/main/java/com/metamatrix/common/util/VMNaming.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -36,8 +36,7 @@
 
     /*
      * HOST_ADDRESS refers to to the host-name/ip, that is given to clients to connect where
-     * the server is. So, in case of Firewall, this may be firewall name, but the bind_address 
-     * would be the physical address of the server
+     * the server is.
      */
     private static InetAddress HOST_ADDRESS = null;
     
@@ -79,23 +78,22 @@
     	boolean bindAddressDefined = (bindAddress != null && bindAddress.length() > 0);
     	boolean hostNameDefined = (hostName != null && hostName.length() > 0);
 
+    	if (hostNameDefined) {
+			HOST_ADDRESS = NetUtils.resolveHostByName(hostName);
+		}
     	    	
-    	if (bindAddressDefined && hostNameDefined) {
+    	if (bindAddressDefined) {
     		BIND_ADDRESS = bindAddress;
-    		HOST_ADDRESS = NetUtils.resolveHostByName(hostName);
+    		
+    		if (!hostNameDefined) { 
+    			HOST_ADDRESS = InetAddress.getByName(bindAddress);
+    		}
     	}
-    	else if (bindAddressDefined && !hostNameDefined) {
-    		BIND_ADDRESS = bindAddress;
-    		HOST_ADDRESS = InetAddress.getByAddress(BIND_ADDRESS.getBytes());
-    	}
-    	else if (!bindAddressDefined && hostNameDefined) {
-    		HOST_ADDRESS = NetUtils.resolveHostByName(hostName);
-    		BIND_ADDRESS = HOST_ADDRESS.getCanonicalHostName();
-    	}
     	else {
-    		InetAddress addr = NetUtils.getInstance().getInetAddress();
-    		BIND_ADDRESS = addr.getHostAddress();
-    		HOST_ADDRESS = addr;
+    		if (!hostNameDefined) {
+	    		HOST_ADDRESS = NetUtils.getInstance().getInetAddress();
+	    	}
+    		BIND_ADDRESS = HOST_ADDRESS.getHostAddress();
     	}
     }
     

Modified: trunk/engine/src/main/java/com/metamatrix/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/dqp/internal/process/DQPCore.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/engine/src/main/java/com/metamatrix/dqp/internal/process/DQPCore.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -452,9 +452,7 @@
 	public ResultsFuture<?> closeRequest(long requestId) throws MetaMatrixProcessingException {
         DQPWorkContext workContext = DQPWorkContext.getWorkContext();
         closeRequest(workContext.getRequestID(requestId));
-        ResultsFuture<Void> resultsFuture = new ResultsFuture<Void>();
-        resultsFuture.getResultsReceiver().receiveResults(null);
-        return resultsFuture;
+        return null;
 	}
     
     /**

Modified: trunk/engine/src/main/java/com/metamatrix/dqp/internal/process/DQPWorkContext.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/dqp/internal/process/DQPWorkContext.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/engine/src/main/java/com/metamatrix/dqp/internal/process/DQPWorkContext.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -22,17 +22,13 @@
 
 package com.metamatrix.dqp.internal.process;
 
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
 import java.io.Serializable;
 
 import com.metamatrix.dqp.message.RequestID;
 import com.metamatrix.platform.security.api.MetaMatrixSessionID;
 import com.metamatrix.platform.security.api.SessionToken;
 
-public class DQPWorkContext implements Externalizable {
+public class DQPWorkContext implements Serializable {
 	
 	private static final long serialVersionUID = -6389893410233192977L;
 	
@@ -50,7 +46,6 @@
 		CONTEXTS.set(context);
 	}
 	
-    private String connectionID;
     private MetaMatrixSessionID sessionId;
     private String userName;
     private Serializable trustedPayload;
@@ -58,6 +53,8 @@
     private String vdbVersion;
     private String appName;
     private SessionToken sessionToken;
+    private String clientAddress;
+    private String clientHostname;
     
     public DQPWorkContext() {
 	}
@@ -119,7 +116,7 @@
     }
 
 	public String getConnectionID() {
-		return connectionID;
+		return this.sessionId!=null?this.sessionId.toString():null;
 	}
 	
 	public MetaMatrixSessionID getSessionId() {
@@ -128,7 +125,6 @@
 
 	public void setSessionId(MetaMatrixSessionID sessionId) {
 		this.sessionId = sessionId;
-		this.connectionID = sessionId.toString();
 	}
 
 	public void setAppName(String appName) {
@@ -151,25 +147,20 @@
 		return sessionToken;
 	}
 
-	public void readExternal(ObjectInput in) throws IOException,
-			ClassNotFoundException {
-		this.setSessionId((MetaMatrixSessionID)in.readObject());
-		this.setUserName((String)in.readObject());
-		this.setTrustedPayload((Serializable)in.readObject());
-		this.setVdbName((String)in.readObject());
-		this.setVdbVersion((String)in.readObject());
-		this.setAppName((String)in.readObject());
-		this.setSessionToken((SessionToken)in.readObject());
+	public void setClientAddress(String clientAddress) {
+		this.clientAddress = clientAddress;
 	}
 
-	public void writeExternal(ObjectOutput out) throws IOException {
-	    out.writeObject(sessionId);
-	    out.writeObject(userName);
-	    out.writeObject(trustedPayload);
-	    out.writeObject(vdbName);
-	    out.writeObject(vdbVersion);
-	    out.writeObject(appName);
-	    out.writeObject(sessionToken);
+	public String getClientAddress() {
+		return clientAddress;
 	}
 
+	public void setClientHostname(String clientHostname) {
+		this.clientHostname = clientHostname;
+	}
+
+	public String getClientHostname() {
+		return clientHostname;
+	}
+
 }

Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/pom.xml	2009-03-17 14:25:36 UTC (rev 565)
@@ -70,8 +70,6 @@
 			<plugin>
 				<artifactId>maven-surefire-plugin</artifactId>
 				<configuration>
-					<forkMode>once</forkMode>
-					<argLine>-XX:MaxPermSize=128m</argLine>
 					<includes>
 						<include>**/*TestCase.java</include>
 						<include>**/*Test.java</include>

Modified: trunk/server/pom.xml
===================================================================
--- trunk/server/pom.xml	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/server/pom.xml	2009-03-17 14:25:36 UTC (rev 565)
@@ -1,100 +1,93 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-  <parent>
-    <artifactId>teiid</artifactId>
-    <groupId>org.jboss.teiid</groupId>
-    <version>6.0.0-SNAPSHOT</version>
-  </parent>
-  <modelVersion>4.0.0</modelVersion>
-  <artifactId>teiid-server</artifactId>
-  <name>Server</name>
-  <description>Standalone server/cluster infrastructure.</description>
-  
-  <build>
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<parent>
+		<artifactId>teiid</artifactId>
+		<groupId>org.jboss.teiid</groupId>
+		<version>6.0.0-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+	<artifactId>teiid-server</artifactId>
+	<name>Server</name>
+	<description>Standalone server/cluster infrastructure.</description>
+	<build>
     <!-- Zips all the .sql files into single file and adds as artifact to the maven repository -->
-  	<plugins>
-      <plugin>
-        <artifactId>maven-assembly-plugin</artifactId>
-        <version>2.2-beta-2</version>      
-        <configuration>
-              <descriptors>
-                  <descriptor>src/assembly/repository-sql.xml</descriptor>
-              </descriptors>
-              <outputDirectory>target/distribution</outputDirectory>
-              <workDirectory>target/assembly/work</workDirectory>
-        </configuration>
-        <executions>
-          <execution>
-            <id>make-assembly</id>
-            <phase>package</phase> 
-            <goals>
-              <goal>attached</goal> 
-            </goals>
-          </execution>
-        </executions>        
-      </plugin>   
-    </plugins>
-  </build>
-  
-  <dependencies>
-    
-    <dependency>
-      <groupId>org.jboss.teiid</groupId>
-      <artifactId>teiid-engine</artifactId>
-    </dependency>
-    
-    <dependency>
-      <groupId>org.jboss.teiid</groupId>
-      <artifactId>teiid-engine</artifactId>
-      <type>test-jar</type>
-    </dependency>
-    
-    <dependency>
-      <groupId>org.jboss.teiid</groupId>
-      <artifactId>teiid-metadata</artifactId>
-    </dependency>
-    
-    <!-- this dependency can easily be removed -->
-    <dependency>
-      <groupId>org.jboss.teiid</groupId>
-      <artifactId>teiid-txn-jbossts</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.jboss.teiid</groupId>
-      <artifactId>teiid-common-core</artifactId>
-      <type>test-jar</type>
-    </dependency>
-
-    <dependency>
-      <groupId>org.jboss.teiid</groupId>
-      <artifactId>teiid-common-internal</artifactId>
-      <type>test-jar</type>
-    </dependency>
-            
-    <dependency>
-      <groupId>org.jboss.teiid</groupId>
-      <artifactId>teiid-client</artifactId>
-      <type>test-jar</type>
-    </dependency>
-    
-    <dependency>
-      <groupId>org.jboss.teiid</groupId>
-      <artifactId>teiid-metadata</artifactId>
-      <type>test-jar</type>
-    </dependency>
-    
-    <dependency>
-    	<groupId>com.google.code.guice</groupId>
-		<artifactId>guice</artifactId>
-    </dependency>
-    
-    <dependency>
-      <groupId>org.jboss.teiid</groupId>
-      <artifactId>teiid-cache-jbosscache</artifactId>
-    </dependency>
-    
-  </dependencies>
- 
-  
+		<plugins>
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<version>2.2-beta-2</version>
+				<configuration>
+					<descriptors>
+						<descriptor>src/assembly/repository-sql.xml</descriptor>
+					</descriptors>
+					<outputDirectory>target/distribution</outputDirectory>
+					<workDirectory>target/assembly/work</workDirectory>
+				</configuration>
+				<executions>
+					<execution>
+						<id>make-assembly</id>
+						<phase>package</phase>
+						<goals>
+							<goal>attached</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+	<dependencies>
+		<dependency>
+			<groupId>org.jboss.teiid</groupId>
+			<artifactId>teiid-engine</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.jboss.teiid</groupId>
+			<artifactId>teiid-engine</artifactId>
+			<type>test-jar</type>
+		</dependency>
+		<dependency>
+			<groupId>org.jboss.teiid</groupId>
+			<artifactId>teiid-metadata</artifactId>
+		</dependency>
+		<dependency>
+    		<!-- this dependency can easily be removed -->
+			<groupId>org.jboss.teiid</groupId>
+			<artifactId>teiid-txn-jbossts</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.jboss.teiid</groupId>
+			<artifactId>teiid-common-core</artifactId>
+			<type>test-jar</type>
+		</dependency>
+		<dependency>
+			<groupId>org.jboss.teiid</groupId>
+			<artifactId>teiid-common-internal</artifactId>
+			<type>test-jar</type>
+		</dependency>
+		<dependency>
+			<groupId>org.jboss.teiid</groupId>
+			<artifactId>teiid-client</artifactId>
+			<type>test-jar</type>
+		</dependency>
+		<dependency>
+			<groupId>org.jboss.teiid</groupId>
+			<artifactId>teiid-metadata</artifactId>
+			<type>test-jar</type>
+		</dependency>
+		<dependency>
+			<groupId>org.jboss.teiid</groupId>
+			<artifactId>teiid-cache-jbosscache</artifactId>
+		</dependency>
+		
+		<!-- External dependencies -->
+		<dependency>
+			<groupId>org.jboss.netty</groupId>
+			<artifactId>netty</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.google.code.guice</groupId>
+			<artifactId>guice</artifactId>
+		</dependency>
+	</dependencies>
 </project>
\ No newline at end of file

Added: trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/ChannelListener.java
===================================================================
--- trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/ChannelListener.java	                        (rev 0)
+++ trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/ChannelListener.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.common.comm.platform.socket;
+
+import com.metamatrix.common.comm.exception.CommunicationException;
+
+public interface ChannelListener {
+	
+	public interface ChannelListenerFactory {
+		ChannelListener createChannelListener(ObjectChannel channel);
+	}
+
+	void receivedMessage(Object msg) throws CommunicationException;
+	
+	void exceptionOccurred(Throwable t);
+	
+	void onConnection() throws CommunicationException;
+}
\ No newline at end of file


Property changes on: trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/ChannelListener.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Copied: trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SSLAwareChannelHandler.java (from rev 555, trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/SSLAwareChannelHandler.java)
===================================================================
--- trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SSLAwareChannelHandler.java	                        (rev 0)
+++ trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SSLAwareChannelHandler.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -0,0 +1,250 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+/**
+ * 
+ */
+package com.metamatrix.common.comm.platform.socket;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.net.ssl.SSLEngine;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.DefaultChannelPipeline;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
+import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+import com.metamatrix.common.comm.exception.SingleInstanceCommunicationException;
+import com.metamatrix.common.comm.platform.CommPlatformPlugin;
+
+/**
+ * Main class for creating Netty Nio Channels 
+ */
+
+ at ChannelPipelineCoverage(ChannelPipelineCoverage.ALL)
+public class SSLAwareChannelHandler extends SimpleChannelHandler implements ChannelPipelineFactory {
+	
+	public class ObjectChannelImpl implements ObjectChannel {
+		private final Channel channel;
+
+		public ObjectChannelImpl(Channel channel) {
+			this.channel = channel;
+		}
+
+		public void close() {
+			channel.close();
+		}
+
+		public boolean isOpen() {
+			return channel.isOpen();
+		}
+		
+		public SocketAddress getRemoteAddress() {
+			return channel.getRemoteAddress();
+		}
+		
+		@Override
+		public Object read() throws IOException,
+				ClassNotFoundException {
+			throw new UnsupportedOperationException();
+		}
+
+		public Future<?> write(Object msg) {
+			final ChannelFuture future = channel.write(msg);
+			future.addListener(completionListener);
+			return new Future<Void>() {
+
+				@Override
+				public boolean cancel(boolean arg0) {
+					return future.cancel();
+				}
+
+				@Override
+				public Void get() throws InterruptedException,
+						ExecutionException {
+					future.await();
+					if (!future.isSuccess()) {
+						throw new ExecutionException(future.getCause());
+					}
+					return null;
+				}
+
+				@Override
+				public Void get(long arg0, TimeUnit arg1)
+						throws InterruptedException, ExecutionException,
+						TimeoutException {
+					if (future.await(arg0, arg1)) {
+						if (!future.isSuccess()) {
+							throw new ExecutionException(future.getCause());
+						}
+						return null;
+					}
+					throw new TimeoutException();
+				}
+
+				@Override
+				public boolean isCancelled() {
+					return future.isCancelled();
+				}
+
+				@Override
+				public boolean isDone() {
+					return future.isDone();
+				}
+			};
+		}
+	}
+	
+	private final ChannelListener.ChannelListenerFactory listenerFactory;
+	private final SSLEngine engine;
+	private final ClassLoader classLoader;
+	private Map<Channel, ChannelListener> listeners = Collections.synchronizedMap(new HashMap<Channel, ChannelListener>());
+	private AtomicLong objectsRead = new AtomicLong(0);
+	private AtomicLong objectsWritten = new AtomicLong(0);
+	private volatile int maxChannels;
+	
+	private ChannelFutureListener completionListener = new ChannelFutureListener() {
+
+		@Override
+		public void operationComplete(ChannelFuture arg0)
+				throws Exception {
+			if (arg0.isSuccess()) {
+				objectsWritten.getAndIncrement();
+			}
+		}
+		
+	};
+	 
+	public SSLAwareChannelHandler(ChannelListener.ChannelListenerFactory listenerFactory,
+			SSLEngine engine, ClassLoader classloader) {
+		this.listenerFactory = listenerFactory;
+		this.engine = engine;
+		this.classLoader = classloader;
+	}
+
+	@Override
+	public void channelConnected(ChannelHandlerContext ctx,
+			final ChannelStateEvent e) throws Exception {
+		ChannelListener listener = this.listenerFactory.createChannelListener(new ObjectChannelImpl(e.getChannel()));
+		synchronized (this.listeners) {
+			this.listeners.put(e.getChannel(), listener);
+			maxChannels = Math.max(maxChannels, this.listeners.size());
+		}
+		if (engine != null) {
+			SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
+	        sslHandler.handshake(e.getChannel()).addListener(new ChannelFutureListener() {
+	        	public void operationComplete(ChannelFuture arg0)
+	        			throws Exception {
+	        		onConnection(e.getChannel());
+	        	}
+	        });
+		} else {
+			onConnection(e.getChannel());
+		}
+	}
+	
+	private void onConnection(Channel channel) throws Exception {
+		ChannelListener listener = this.listeners.get(channel);
+		if (listener != null) {
+			listener.onConnection();
+		}
+	}
+	
+	@Override
+	public void exceptionCaught(ChannelHandlerContext ctx,
+			ExceptionEvent e) throws Exception {
+		ChannelListener listener = this.listeners.get(e.getChannel());
+		if (listener != null) {
+			listener.exceptionOccurred(e.getCause());
+		}
+		e.getChannel().close();
+	}
+
+	@Override
+	public void messageReceived(ChannelHandlerContext ctx,
+			MessageEvent e) throws Exception {
+		objectsRead.getAndIncrement();
+		ChannelListener listener = this.listeners.get(e.getChannel());
+		if (listener != null) {
+			listener.receivedMessage(e.getMessage());
+		}
+	}
+	
+	@Override
+	public void channelDisconnected(ChannelHandlerContext ctx,
+			ChannelStateEvent e) throws Exception {
+		ChannelListener listener = this.listeners.remove(e.getChannel());
+		if (listener != null) {
+			listener.exceptionOccurred(new SingleInstanceCommunicationException(CommPlatformPlugin.Util.getString("SSLAwareChannelHandler.channel_closed"))); //$NON-NLS-1$
+		}
+	}
+
+	public ChannelPipeline getPipeline() throws Exception {
+		ChannelPipeline pipeline = new DefaultChannelPipeline();
+
+	    if (engine != null) {
+	        pipeline.addLast("ssl", new SslHandler(engine)); //$NON-NLS-1$
+	    }
+	    pipeline.addLast("decoder", new ObjectDecoder(1 << 24, classLoader)); //$NON-NLS-1$
+	    pipeline.addLast("encoder", new ObjectEncoder()); //$NON-NLS-1$
+	    pipeline.addLast("handler", this); //$NON-NLS-1$
+	    return pipeline;
+	}
+	
+	public long getObjectsRead() {
+		return this.objectsRead.get();
+	}
+	
+	public long getObjectsWritten() {
+		return this.objectsWritten.get();
+	}
+	
+	public int getConnectedChannels() {
+		return this.listeners.size();
+	}
+	
+	public int getMaxConnectedChannels() {
+		return this.maxChannels;
+	}
+
+}
\ No newline at end of file


Property changes on: trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SSLAwareChannelHandler.java
___________________________________________________________________
Name: svn:mergeinfo
   + 

Modified: trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/ServerWorkItem.java
===================================================================
--- trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/ServerWorkItem.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/ServerWorkItem.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -117,14 +117,14 @@
 					throw e.getCause();
 				}
 				if (ResultsFuture.class.isAssignableFrom(m.getReturnType()) && methodResult != null) {
-					ResultsFuture future = (ResultsFuture) methodResult;
-					future.addCompletionListener(new ResultsFuture.CompletionListener() {
+					ResultsFuture<Serializable> future = (ResultsFuture<Serializable>) methodResult;
+					future.addCompletionListener(new ResultsFuture.CompletionListener<Serializable>() {
 
 								public void onCompletion(
-										ResultsFuture completedFuture) {
+										ResultsFuture<Serializable> completedFuture) {
 									Message asynchResult = new Message();
 									try {
-										asynchResult.setContents((Serializable) completedFuture.get());
+										asynchResult.setContents(completedFuture.get());
 									} catch (InterruptedException e) {
 										asynchResult.setContents(processException(e, serviceStruct.targetClass));
 									} catch (ExecutionException e) {

Modified: trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketClientInstance.java
===================================================================
--- trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketClientInstance.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketClientInstance.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -23,15 +23,17 @@
 package com.metamatrix.common.comm.platform.socket.server;
 
 import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 
 import com.metamatrix.common.comm.ClientServiceRegistry;
 import com.metamatrix.common.comm.api.Message;
 import com.metamatrix.common.comm.exception.CommunicationException;
 import com.metamatrix.common.comm.platform.CommPlatformPlugin;
+import com.metamatrix.common.comm.platform.socket.ChannelListener;
 import com.metamatrix.common.comm.platform.socket.Handshake;
 import com.metamatrix.common.comm.platform.socket.ObjectChannel;
 import com.metamatrix.common.comm.platform.socket.SocketVMController;
-import com.metamatrix.common.comm.platform.socket.ObjectChannel.ChannelListener;
 import com.metamatrix.common.log.LogManager;
 import com.metamatrix.common.queue.WorkerPool;
 import com.metamatrix.common.util.crypto.CryptoException;
@@ -67,6 +69,12 @@
         this.server = server;
         this.usingEncryption = isClientEncryptionEnabled;
         this.sessionService = sessionService;
+        SocketAddress address = this.objectSocket.getRemoteAddress();
+        if (address instanceof InetSocketAddress) {
+        	InetSocketAddress addr = (InetSocketAddress)address;
+        	this.workContext.setClientAddress(addr.getAddress().getHostAddress());
+        	this.workContext.setClientHostname(addr.getHostName());
+        }
     }
     
     public void send(Message message, Serializable messageKey) {

Modified: trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java
===================================================================
--- trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -35,11 +35,11 @@
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 
 import com.metamatrix.common.comm.ClientServiceRegistry;
+import com.metamatrix.common.comm.platform.socket.ChannelListener;
 import com.metamatrix.common.comm.platform.socket.ObjectChannel;
 import com.metamatrix.common.comm.platform.socket.SSLAwareChannelHandler;
 import com.metamatrix.common.comm.platform.socket.SocketVMController;
-import com.metamatrix.common.comm.platform.socket.ObjectChannel.ChannelListener;
-import com.metamatrix.common.comm.platform.socket.ObjectChannel.ChannelListenerFactory;
+import com.metamatrix.common.comm.platform.socket.ChannelListener.ChannelListenerFactory;
 import com.metamatrix.common.log.LogManager;
 import com.metamatrix.common.queue.WorkerPool;
 import com.metamatrix.common.queue.WorkerPoolFactory;

Modified: trunk/server/src/main/java/com/metamatrix/common/net/ServerSocketConfiguration.java
===================================================================
--- trunk/server/src/main/java/com/metamatrix/common/net/ServerSocketConfiguration.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/server/src/main/java/com/metamatrix/common/net/ServerSocketConfiguration.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -23,6 +23,7 @@
 package com.metamatrix.common.net;
 
 import java.io.IOException;
+import java.security.GeneralSecurityException;
 import java.util.Arrays;
 import java.util.Properties;
 
@@ -121,7 +122,7 @@
         sslProtocol = props.getProperty(SSL_PROTOCOL, DEFAULT_SSL_PROTOCOL);            
     } 
     
-    public SSLEngine getServerSSLEngine() throws IOException {
+    public SSLEngine getServerSSLEngine() throws IOException, GeneralSecurityException {
         if (!isServerSSLEnabled()) {
         	return null;
         }

Modified: trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestCommSockets.java
===================================================================
--- trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestCommSockets.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestCommSockets.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -22,6 +22,9 @@
  */
 package com.metamatrix.common.comm.platform.socket.server;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
 import java.net.InetSocketAddress;
@@ -29,7 +32,8 @@
 
 import javax.net.ssl.SSLEngine;
 
-import junit.framework.TestCase;
+import org.junit.Before;
+import org.junit.Test;
 
 import com.metamatrix.api.exception.ComponentNotFoundException;
 import com.metamatrix.api.exception.security.LogonException;
@@ -49,18 +53,17 @@
 import com.metamatrix.platform.security.api.service.SessionServiceInterface;
 import com.metamatrix.platform.vm.controller.SocketListenerStats;
 
-public class TestCommSockets extends TestCase {
+public class TestCommSockets {
 
 	SocketListener listener;
 
-	@Override
-	protected void tearDown() throws Exception {
+	@Before public void tearDown() throws Exception {
 		if (listener != null) {
 			listener.stop();
 		}
 	}
 
-	public void testFailedConnect() throws Exception {
+	@Test public void testFailedConnect() throws Exception {
 		InetSocketAddress addr = new InetSocketAddress(0);
 		ClientServiceRegistry csr = new ClientServiceRegistry();
 		SessionServiceInterface sessionService = mock(SessionServiceInterface.class);
@@ -80,7 +83,7 @@
 		}
 	}
 
-	public void testConnect() throws Exception {
+	@Test public void testConnect() throws Exception {
 		SocketServerConnection conn = helpEstablishConnection(false, null);
 		SocketListenerStats stats = listener.getStats();
 		assertEquals(2, stats.objectsRead); // handshake response, logon,
@@ -98,7 +101,7 @@
 		assertEquals(0, stats.sockets);
 	}
 
-	public void testConnectWithoutClientEncryption() throws Exception {
+	@Test public void testConnectWithoutClientEncryption() throws Exception {
 		SocketServerConnection conn = helpEstablishConnection(false, null, false, new Properties());
 		assertTrue(((SocketServerInstanceImpl) conn
 				.selectServerInstance()).getCryptor() instanceof NullCryptor);
@@ -138,11 +141,11 @@
 				secure).getAppServerURL()); 
 		p.setProperty(MMURL.CONNECTION.DISCOVERY_STRATEGY, UrlServerDiscovery.class.getName());
 		SocketServerConnectionFactory sscf = new SocketServerConnectionFactory();
-		sscf.init(socketConfig, false);
+		sscf.init(socketConfig);
 		return sscf.createConnection(p);
 	}
 
-	public void testSSLConnectWithNonSSLServer() throws Exception {
+	@Test public void testSSLConnectWithNonSSLServer() throws Exception {
 		try {
 			helpEstablishConnection(true, null);
 			fail("exception expected"); //$NON-NLS-1$
@@ -151,7 +154,7 @@
 		}
 	}
 
-	public void testAnonSSLConnect() throws Exception {
+	@Test public void testAnonSSLConnect() throws Exception {
 		SSLEngine engine = SocketUtil.getAnonSSLContext().createSSLEngine();
 		engine.setUseClientMode(false);
 		engine.setEnabledCipherSuites(new String[] { SocketUtil.ANON_CIPHER_SUITE });
@@ -160,5 +163,5 @@
 		SocketServerConnection conn = helpEstablishConnection(true, engine, true, p);
 		conn.shutdown();
 	}
-
+	
 }

Modified: trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestSocketRemoting.java
===================================================================
--- trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestSocketRemoting.java	2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestSocketRemoting.java	2009-03-17 14:25:36 UTC (rev 565)
@@ -40,7 +40,7 @@
 import com.metamatrix.common.api.MMURL;
 import com.metamatrix.common.comm.ClientServiceRegistry;
 import com.metamatrix.common.comm.api.Message;
-import com.metamatrix.common.comm.api.MessageListener;
+import com.metamatrix.common.comm.api.ResultsReceiver;
 import com.metamatrix.common.comm.exception.CommunicationException;
 import com.metamatrix.common.comm.exception.ConnectionException;
 import com.metamatrix.common.comm.platform.socket.client.SocketServerConnection;
@@ -73,7 +73,7 @@
 	private static class FakeServiceImpl implements FakeService {
 
 		public ResultsFuture<Integer> asynchResult() {
-			ResultsFuture<Integer> result = new ResultsFuture();
+			ResultsFuture<Integer> result = new ResultsFuture<Integer>();
 			result.getResultsReceiver().receiveResults(new Integer(5));
 			return result;
 		}
@@ -87,9 +87,9 @@
 	private static class FakeClientServerInstance extends SocketServerInstanceImpl implements ClientInstance {
 		
 		ClientServiceRegistry clientServiceRegistry;
-		private MessageListener listener;
+		private ResultsReceiver<Object> listener;
 
-		public FakeClientServerInstance(ClientServiceRegistry clientServiceRegistry) throws CommunicationException, IOException {
+		public FakeClientServerInstance(ClientServiceRegistry clientServiceRegistry) {
 			super();
 			this.clientServiceRegistry = clientServiceRegistry;
 		}
@@ -101,9 +101,11 @@
 		public boolean isOpen() {
 			return true;
 		}
-
-		public void send(Message message, MessageListener listener,
-				Serializable messageKey) throws CommunicationException {
+		
+		@Override
+		public void send(Message message, ResultsReceiver<Object> listener,
+				Serializable messageKey) throws CommunicationException,
+				InterruptedException {
 			ServerWorkItem workItem = new ServerWorkItem(this, messageKey, message, clientServiceRegistry, SimpleMock.createSimpleMock(SessionServiceInterface.class));
 			this.listener = listener;
 			workItem.run();
@@ -122,7 +124,7 @@
 		}
 
 		public void send(Message message, Serializable messageKey) {
-			this.listener.deliverMessage(message, messageKey);
+			this.listener.receiveResults(message.getContents());
 		}
 		
 	}
@@ -208,7 +210,7 @@
 		SocketServerConnection connection = new SocketServerConnection(new SocketServerInstanceFactory() {
 		
 			@Override
-			public SocketServerInstance createServerInstance(HostInfo info,
+			public SocketServerInstance getServerInstance(HostInfo info,
 					boolean ssl) throws CommunicationException, IOException {
 				return serverInstance;
 			}




More information about the teiid-commits mailing list