[teiid-commits] teiid SVN: r2011 - in trunk: client/src/main/java/org/teiid/net/socket and 11 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue Mar 30 23:12:00 EDT 2010


Author: shawkins
Date: 2010-03-30 23:11:58 -0400 (Tue, 30 Mar 2010)
New Revision: 2011

Added:
   trunk/runtime/src/main/java/org/teiid/transport/ObjectDecoder.java
Removed:
   trunk/runtime/src/main/java/org/teiid/transport/SocketTransport.java
   trunk/runtime/src/test/java/com/metamatrix/platform/security/
Modified:
   trunk/client/src/main/java/org/teiid/jdbc/PreparedStatementImpl.java
   trunk/client/src/main/java/org/teiid/net/socket/SocketServerInstanceImpl.java
   trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/CompactObjectInputStream.java
   trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/CompactObjectOutputStream.java
   trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectEncoderOutputStream.java
   trunk/common-core/src/main/java/com/metamatrix/common/types/BaseLob.java
   trunk/common-core/src/main/java/com/metamatrix/common/types/InputStreamFactory.java
   trunk/common-core/src/main/java/com/metamatrix/core/util/ExternalizeUtil.java
   trunk/common-core/src/test/java/com/metamatrix/core/util/TestExternalizeUtil.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/FileStore.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedFinder.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
   trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
   trunk/runtime/src/main/java/org/teiid/transport/SSLAwareChannelHandler.java
   trunk/runtime/src/main/java/org/teiid/transport/SocketListener.java
   trunk/runtime/src/test/java/org/teiid/transport/TestCommSockets.java
   trunk/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java
Log:
TEIID-943 adding streaming from client to server.

Modified: trunk/client/src/main/java/org/teiid/jdbc/PreparedStatementImpl.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/PreparedStatementImpl.java	2010-03-30 21:24:33 UTC (rev 2010)
+++ trunk/client/src/main/java/org/teiid/jdbc/PreparedStatementImpl.java	2010-03-31 03:11:58 UTC (rev 2011)
@@ -49,17 +49,17 @@
 import java.util.TreeMap;
 
 import javax.sql.rowset.serial.SerialBlob;
-import javax.sql.rowset.serial.SerialClob;
 
 import org.teiid.client.RequestMessage;
 import org.teiid.client.RequestMessage.ResultsMode;
 import org.teiid.client.RequestMessage.StatementType;
 import org.teiid.client.metadata.MetadataResult;
 
-
-
 import com.metamatrix.api.exception.MetaMatrixComponentException;
 import com.metamatrix.api.exception.MetaMatrixProcessingException;
+import com.metamatrix.common.types.BlobImpl;
+import com.metamatrix.common.types.ClobImpl;
+import com.metamatrix.common.types.InputStreamFactory;
 import com.metamatrix.common.types.JDBCSQLTypeInfo;
 import com.metamatrix.common.util.SqlUtil;
 import com.metamatrix.common.util.TimestampWithTimezone;
@@ -278,13 +278,8 @@
 		return metadataResults;
 	}
 
-    public void setAsciiStream (int parameterIndex, java.io.InputStream in, int length) throws SQLException {
-        //create a clob from the ascii stream
-    	try {
-			setObject(parameterIndex, new SerialClob(ObjectConverterUtil.convertToCharArray(in, length, "ASCII"))); //$NON-NLS-1$
-		} catch (IOException e) {
-			throw TeiidSQLException.create(e);
-		} 
+    public void setAsciiStream(int parameterIndex, java.io.InputStream in, int length) throws SQLException {
+    	setAsciiStream(parameterIndex, in);
     }
 
     /**
@@ -299,7 +294,6 @@
     }
 
     public void setBinaryStream(int parameterIndex, java.io.InputStream in, int length) throws SQLException {
-    	//create a blob from the ascii stream
     	try {
 			setObject(parameterIndex, new SerialBlob(ObjectConverterUtil.convertToByteArray(in, length)));
 		} catch (IOException e) {
@@ -345,17 +339,11 @@
      * @param bytes array to which the parameter value is to be set.
      */
     public void setBytes(int parameterIndex, byte bytes[]) throws SQLException {
-    	//create a blob from the ascii stream
     	setObject(parameterIndex, new SerialBlob(bytes));
     }
 
     public void setCharacterStream (int parameterIndex, java.io.Reader reader, int length) throws SQLException {
-    	//create a clob from the ascii stream
-    	try {
-			setObject(parameterIndex, new SerialClob(ObjectConverterUtil.convertToCharArray(reader, length)));
-		} catch (IOException e) {
-			throw TeiidSQLException.create(e);
-		}
+    	setCharacterStream(parameterIndex, reader);
     }
 
     /**
@@ -692,52 +680,68 @@
 		throw SqlUtil.createFeatureNotSupportedException();
 	}
 
-	public void setAsciiStream(int parameterIndex, InputStream x)
+	public void setAsciiStream(int parameterIndex, final InputStream x)
 			throws SQLException {
-		throw SqlUtil.createFeatureNotSupportedException();
+		this.setObject(parameterIndex, new ClobImpl(new InputStreamFactory("US-ASCII") { //$NON-NLS-1$
+			@Override
+			public InputStream getInputStream() throws IOException {
+				return x;
+			}
+		}, -1));
 	}
 
 	public void setAsciiStream(int parameterIndex, InputStream x, long length)
 			throws SQLException {
-		throw SqlUtil.createFeatureNotSupportedException();
+		setAsciiStream(parameterIndex, x);
 	}
 
 	public void setBinaryStream(int parameterIndex, InputStream x)
 			throws SQLException {
-		throw SqlUtil.createFeatureNotSupportedException();
+		setBlob(parameterIndex, x);
 	}
 
 	public void setBinaryStream(int parameterIndex, InputStream x, long length)
 			throws SQLException {
-		throw SqlUtil.createFeatureNotSupportedException();
+		setBinaryStream(parameterIndex, x);
 	}
 
-	public void setBlob(int parameterIndex, InputStream inputStream)
+	public void setBlob(int parameterIndex, final InputStream inputStream)
 			throws SQLException {
-		throw SqlUtil.createFeatureNotSupportedException();
+		this.setObject(parameterIndex, new BlobImpl(new InputStreamFactory() {
+			@Override
+			public InputStream getInputStream() throws IOException {
+				return inputStream;
+			}
+		}));
 	}
 
 	public void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException {
-		throw SqlUtil.createFeatureNotSupportedException();
+		setBlob(parameterIndex, inputStream);
 	}
 
 	public void setCharacterStream(int parameterIndex, Reader reader)
 			throws SQLException {
-		throw SqlUtil.createFeatureNotSupportedException();
+		setClob(parameterIndex, reader);
 	}
 
 	public void setCharacterStream(int parameterIndex, Reader reader,
 			long length) throws SQLException {
-		throw SqlUtil.createFeatureNotSupportedException();
+		setCharacterStream(parameterIndex, reader);
 	}
 
 	public void setClob(int parameterIndex, Reader reader) throws SQLException {
-		throw SqlUtil.createFeatureNotSupportedException();
+		this.setObject(parameterIndex, new ClobImpl(new InputStreamFactory() {
+			
+			@Override
+			public InputStream getInputStream() throws IOException {
+				return null;
+			}
+		}, -1));
 	}
 
 	public void setClob(int parameterIndex, Reader reader, long length)
 			throws SQLException {
-		throw SqlUtil.createFeatureNotSupportedException();
+		setClob(parameterIndex, reader);
 	}
 
 	public void setNCharacterStream(int parameterIndex, Reader value)

Modified: trunk/client/src/main/java/org/teiid/net/socket/SocketServerInstanceImpl.java
===================================================================
--- trunk/client/src/main/java/org/teiid/net/socket/SocketServerInstanceImpl.java	2010-03-30 21:24:33 UTC (rev 2010)
+++ trunk/client/src/main/java/org/teiid/net/socket/SocketServerInstanceImpl.java	2010-03-31 03:11:58 UTC (rev 2011)
@@ -44,7 +44,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.teiid.client.DQP;
+import org.teiid.client.security.ILogon;
 import org.teiid.client.util.ExceptionUtil;
 import org.teiid.client.util.ResultsFuture;
 import org.teiid.client.util.ResultsReceiver;
@@ -300,7 +300,7 @@
 		
 		public RemoteInvocationHandler(Class<?> targetClass) {
 			this.targetClass = targetClass;
-			this.secure = !DQP.class.isAssignableFrom(targetClass);
+			this.secure = ILogon.class.isAssignableFrom(targetClass);
 		}
 
 		//## JDBC4.0-begin ##

Modified: trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/CompactObjectInputStream.java
===================================================================
--- trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/CompactObjectInputStream.java	2010-03-30 21:24:33 UTC (rev 2010)
+++ trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/CompactObjectInputStream.java	2010-03-31 03:11:58 UTC (rev 2011)
@@ -36,7 +36,7 @@
  * @version $Rev: 381 $, $Date: 2008-10-01 06:06:18 -0500 (Wed, 01 Oct 2008) $
  *
  */
-class CompactObjectInputStream extends ObjectInputStream {
+public class CompactObjectInputStream extends ObjectInputStream {
 
     private final ClassLoader classLoader;
 
@@ -44,7 +44,7 @@
         this(in, null);
     }
 
-    CompactObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException {
+    public CompactObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException {
         super(in);
         this.classLoader = classLoader;
     }

Modified: trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/CompactObjectOutputStream.java
===================================================================
--- trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/CompactObjectOutputStream.java	2010-03-30 21:24:33 UTC (rev 2010)
+++ trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/CompactObjectOutputStream.java	2010-03-31 03:11:58 UTC (rev 2011)
@@ -22,11 +22,32 @@
  */
 package org.teiid.netty.handler.codec.serialization;
 
+import java.io.Externalizable;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.io.ObjectOutputStream;
 import java.io.ObjectStreamClass;
 import java.io.OutputStream;
+import java.io.Reader;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.SQLException;
+import java.sql.SQLXML;
+import java.util.LinkedList;
+import java.util.List;
 
+import com.metamatrix.common.types.BlobImpl;
+import com.metamatrix.common.types.ClobImpl;
+import com.metamatrix.common.types.InputStreamFactory;
+import com.metamatrix.common.types.SQLXMLImpl;
+import com.metamatrix.common.types.Streamable;
+import com.metamatrix.common.types.InputStreamFactory.StreamFactoryReference;
+import com.metamatrix.common.util.ReaderInputStream;
+
 /**
  * @author The Netty Project (netty-dev at lists.jboss.org)
  * @author Trustin Lee (tlee at redhat.com)
@@ -34,14 +55,26 @@
  * @version $Rev: 6 $, $Date: 2008-08-07 20:40:10 -0500 (Thu, 07 Aug 2008) $
  *
  */
-class CompactObjectOutputStream extends ObjectOutputStream {
+public class CompactObjectOutputStream extends ObjectOutputStream {
 
     static final int TYPE_PRIMITIVE = 0;
     static final int TYPE_NON_PRIMITIVE = 1;
-
-    CompactObjectOutputStream(OutputStream out) throws IOException {
+    
+    private List<InputStream> lobs = new LinkedList<InputStream>();
+    private List<StreamFactoryReference> references = new LinkedList<StreamFactoryReference>();
+    
+    public CompactObjectOutputStream(OutputStream out) throws IOException {
         super(out);
+        enableReplaceObject(true);
     }
+    
+    public List<InputStream> getLobs() {
+		return lobs;
+	}
+    
+    public List<StreamFactoryReference> getReferences() {
+		return references;
+	}
 
     @Override
     protected void writeStreamHeader() throws IOException {
@@ -58,4 +91,113 @@
             writeUTF(desc.getName());
         }
     }
+        
+    @Override
+    protected Object replaceObject(Object obj) throws IOException {
+    	if (obj instanceof Serializable) {
+    		return obj;
+    	}
+		try {
+	    	if (obj instanceof Reader) {
+	    		lobs.add(new ReaderInputStream((Reader)obj, Charset.forName(Streamable.ENCODING)));
+	    		StreamFactoryReference sfr = new SerializableReader();
+	    		references.add(sfr);
+	    		return sfr;
+	    	} else if (obj instanceof InputStream) {
+	    		lobs.add((InputStream)obj);
+	    		StreamFactoryReference sfr = new SerializableInputStream();
+	    		references.add(sfr);
+	    		return sfr;
+	    	} else if (obj instanceof SQLXML) {
+				lobs.add(new ReaderInputStream(((SQLXML)obj).getCharacterStream(), Charset.forName(Streamable.ENCODING)));
+	    		StreamFactoryReference sfr = new SQLXMLImpl((InputStreamFactory)null);
+	    		references.add(sfr);
+	    		return sfr;
+	    	} else if (obj instanceof Clob) {
+	    		lobs.add(new ReaderInputStream(((Clob)obj).getCharacterStream(), Charset.forName(Streamable.ENCODING)));
+	    		StreamFactoryReference sfr = new ClobImpl(null, -1);
+	    		references.add(sfr);
+	    		return sfr;
+	    	} else if (obj instanceof Blob) {
+	    		lobs.add(((Blob)obj).getBinaryStream());
+	    		StreamFactoryReference sfr = new BlobImpl(null);
+	    		references.add(sfr);
+	    		return sfr;
+	    	}
+		} catch (SQLException e) {
+			throw new IOException(e);
+		}
+    	return super.replaceObject(obj);
+    }
+    
+    static class SerializableInputStream extends InputStream implements Externalizable, StreamFactoryReference {
+
+		private InputStreamFactory isf;
+    	private InputStream is;
+    	
+    	public SerializableInputStream() {
+		}
+    	
+    	public void setStreamFactory(InputStreamFactory streamFactory) {
+    		this.isf = streamFactory;
+    	}
+    	
+		@Override
+		public int read() throws IOException {
+			if (is == null) {
+				is = isf.getInputStream();
+			}
+			return is.read();
+		}
+		
+		@Override
+		public void close() throws IOException {
+			isf.free();
+		}
+
+		@Override
+		public void readExternal(ObjectInput in) throws IOException,
+				ClassNotFoundException {
+		}
+
+		@Override
+		public void writeExternal(ObjectOutput out) throws IOException {
+		}
+    }
+    
+    static class SerializableReader extends Reader implements Externalizable, StreamFactoryReference {
+
+		private InputStreamFactory isf;
+    	private Reader r;
+    	
+    	public SerializableReader() {
+		}
+    	
+    	public void setStreamFactory(InputStreamFactory streamFactory) {
+    		this.isf = streamFactory;
+    	}
+
+		@Override
+		public void close() throws IOException {
+			isf.free();
+		}
+
+		@Override
+		public int read(char[] cbuf, int off, int len) throws IOException {
+			if (r == null) {
+				r = isf.getCharacterStream();
+			}
+			return r.read(cbuf, off, len);
+		}
+
+		@Override
+		public void readExternal(ObjectInput in) throws IOException,
+				ClassNotFoundException {
+		}
+
+		@Override
+		public void writeExternal(ObjectOutput out) throws IOException {
+		}
+    }
+    
 }

Modified: trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectEncoderOutputStream.java
===================================================================
--- trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectEncoderOutputStream.java	2010-03-30 21:24:33 UTC (rev 2010)
+++ trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectEncoderOutputStream.java	2010-03-31 03:11:58 UTC (rev 2011)
@@ -24,10 +24,12 @@
 
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.ObjectOutput;
 import java.io.ObjectOutputStream;
 
 import com.metamatrix.core.util.AccessibleByteArrayOutputStream;
+import com.metamatrix.core.util.ExternalizeUtil;
 
 /**
  * An {@link ObjectOutput} which is interoperable with {@link ObjectDecoder}
@@ -53,13 +55,25 @@
     @Override
     final protected void writeObjectOverride(Object obj) throws IOException {
         AccessibleByteArrayOutputStream baos = new AccessibleByteArrayOutputStream(estimatedLength);
-        ObjectOutputStream oout = new CompactObjectOutputStream(baos);
+        CompactObjectOutputStream oout = new CompactObjectOutputStream(baos);
         oout.writeObject(obj);
+        ExternalizeUtil.writeCollection(oout, oout.getReferences());
         oout.flush();
         oout.close();
-
-        out.writeInt(baos.getCount());
+        
+        out.writeInt(baos.getCount()); //includes the lob references
         out.write(baos.getBuffer(), 0, baos.getCount());
+        byte[] chunk = new byte[1 << 16];
+        for (InputStream is : oout.getLobs()) {
+        	while (true) {
+	        	int bytes = is.read(chunk);
+	        	out.writeShort(Math.max(0, bytes));
+	        	if (bytes < 1) {
+	        		break;
+	        	}
+	    		out.write(chunk, 0, bytes);
+        	}
+		}
     }
     
     @Override

Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/BaseLob.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/BaseLob.java	2010-03-30 21:24:33 UTC (rev 2010)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/BaseLob.java	2010-03-31 03:11:58 UTC (rev 2011)
@@ -2,17 +2,24 @@
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.Reader;
+import java.io.Serializable;
 import java.sql.SQLException;
 
-public class BaseLob {
+import com.metamatrix.common.types.InputStreamFactory.StreamFactoryReference;
+
+public class BaseLob implements Serializable, StreamFactoryReference {
 	
+	private static final long serialVersionUID = -1586959324208959519L;
 	private InputStreamFactory streamFactory;
 	
 	protected BaseLob(InputStreamFactory streamFactory) {
 		this.streamFactory = streamFactory;
 	}
+	
+	public void setStreamFactory(InputStreamFactory streamFactory) {
+		this.streamFactory = streamFactory;
+	}
 
 	public InputStreamFactory getStreamFactory() throws SQLException {
 		if (this.streamFactory == null) {
@@ -36,7 +43,7 @@
 	
     public Reader getCharacterStream() throws SQLException {
     	try {
-			return new InputStreamReader(this.getStreamFactory().getInputStream(), this.getStreamFactory().getEncoding());
+			return this.getStreamFactory().getCharacterStream();
 		} catch (IOException e) {
 			SQLException ex = new SQLException(e.getMessage());
 			ex.initCause(e);

Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/InputStreamFactory.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/InputStreamFactory.java	2010-03-30 21:24:33 UTC (rev 2010)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/InputStreamFactory.java	2010-03-31 03:11:58 UTC (rev 2011)
@@ -24,13 +24,20 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
 import java.nio.charset.Charset;
 
 import javax.xml.transform.Source;
 
-//TODO: add support for Readers
 public abstract class InputStreamFactory implements Source {
 	
+	public interface StreamFactoryReference {
+		
+		void setStreamFactory(InputStreamFactory inputStreamFactory);
+		
+	}
+	
 	private String encoding;
 	private String systemId;
 	private long length = -1;
@@ -74,4 +81,8 @@
     public void setLength(long length) {
 		this.length = length;
 	}
+    
+    public Reader getCharacterStream() throws IOException {
+		return new InputStreamReader(this.getInputStream(), this.getEncoding());
+    }
 }

Modified: trunk/common-core/src/main/java/com/metamatrix/core/util/ExternalizeUtil.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/core/util/ExternalizeUtil.java	2010-03-30 21:24:33 UTC (rev 2010)
+++ trunk/common-core/src/main/java/com/metamatrix/core/util/ExternalizeUtil.java	2010-03-31 03:11:58 UTC (rev 2011)
@@ -71,8 +71,8 @@
             out.writeInt(0);
         } else {
             final int size = coll.size();
+            out.writeInt(coll.size());
             if (size > 0) {
-                out.writeInt(coll.size());
                 for (Object object : coll) {
                     out.writeObject(object);
                 }

Modified: trunk/common-core/src/test/java/com/metamatrix/core/util/TestExternalizeUtil.java
===================================================================
--- trunk/common-core/src/test/java/com/metamatrix/core/util/TestExternalizeUtil.java	2010-03-30 21:24:33 UTC (rev 2010)
+++ trunk/common-core/src/test/java/com/metamatrix/core/util/TestExternalizeUtil.java	2010-03-31 03:11:58 UTC (rev 2011)
@@ -26,6 +26,8 @@
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.util.Arrays;
+import java.util.List;
 
 import junit.framework.TestCase;
 
@@ -36,10 +38,6 @@
     private ByteArrayOutputStream bout;
     private ObjectOutputStream oout;   
     
-    private ByteArrayInputStream bin;
-    private ObjectInputStream oin;
-    
-    
     /**
      * Constructor for TestExternalizeUtil.
      * @param name
@@ -53,18 +51,6 @@
         oout = new ObjectOutputStream(bout);
     }
     
-    public void tearDown() throws Exception {
-        oout.close();
-        bout.close();
-        
-        if (oin!=null) {
-            oin.close();
-        }
-        if (bin!=null) {
-            bin.close();
-        }
-    }
-    
     /**
      * Test ExternalizeUtil writeThrowable() and readThrowable() on Throwables. 
      * @throws Exception
@@ -76,8 +62,8 @@
 
         ExternalizeUtil.writeThrowable(oout, t1);
         oout.flush();        
-        bin = new ByteArrayInputStream(bout.toByteArray());
-        oin = new ObjectInputStream(bin);
+        ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
+        ObjectInputStream oin = new ObjectInputStream(bin);
         
         Throwable result1 = ExternalizeUtil.readThrowable(oin);        
         assertEqualThrowables(t1, result1);
@@ -102,8 +88,8 @@
         
         ExternalizeUtil.writeThrowable(oout, t1);
         oout.flush();        
-        bin = new ByteArrayInputStream(bout.toByteArray());
-        oin = new ObjectInputStream(bin);
+        ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
+        ObjectInputStream oin = new ObjectInputStream(bin);
         
         MetaMatrixCoreException result1 = (MetaMatrixCoreException) ExternalizeUtil.readThrowable(oin);
         assertEqualThrowables(t1, result1);
@@ -129,5 +115,15 @@
             assertEquals(stack1[i], stack2[i]);
         }
     }
+    
+    public void testEmptyCollection() throws Exception {
+    	ExternalizeUtil.writeCollection(oout, Arrays.asList(new Object[0]));
+    	oout.flush();        
+        ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
+        ObjectInputStream oin = new ObjectInputStream(bin);
+        
+        List<?> result = ExternalizeUtil.readList(oin);
+        assertEquals(0, result.size());
+    }
 
 }

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java	2010-03-30 21:24:33 UTC (rev 2010)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java	2010-03-31 03:11:58 UTC (rev 2011)
@@ -34,7 +34,7 @@
  * how to store data.  The buffer manager should also be aware of memory 
  * management issues.
  */
-public interface BufferManager {
+public interface BufferManager extends StorageManager {
 	
 	public enum TupleSourceType {
 		/**

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/FileStore.java	2010-03-30 21:24:33 UTC (rev 2010)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/FileStore.java	2010-03-31 03:11:58 UTC (rev 2011)
@@ -218,6 +218,25 @@
 		};
 	}
 	
+	public OutputStream createOutputStream() {
+		return new OutputStream() {
+			
+			@Override
+			public void write(int b) throws IOException {
+				throw new UnsupportedOperationException("buffered reading must be used"); //$NON-NLS-1$
+			}
+			
+			@Override
+			public void write(byte[] b, int off, int len) throws IOException {
+				try {
+					FileStore.this.write(b, off, len);
+				} catch (MetaMatrixComponentException e) {
+					throw new IOException(e);
+				}
+			}
+		};
+	}
+	
 	public  FileStoreOutputStream createOutputStream(int maxMemorySize) {
 		return new FileStoreOutputStream(maxMemorySize);
 	}

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java	2010-03-30 21:24:33 UTC (rev 2010)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java	2010-03-31 03:11:58 UTC (rev 2011)
@@ -23,9 +23,11 @@
 package com.metamatrix.common.buffer.impl;
 
 import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.OutputStream;
 import java.lang.ref.Reference;
 import java.lang.ref.WeakReference;
 import java.util.Iterator;
@@ -213,12 +215,11 @@
 						LogManager.logTrace(com.metamatrix.common.log.LogConstants.CTX_BUFFER_MGR, "Writing batch to disk", writeCount.incrementAndGet()); //$NON-NLS-1$
 						synchronized (store) {
 							offset = store.getLength();
-							FileStoreOutputStream fsos = store.createOutputStream(IO_BUFFER_SIZE);
+							OutputStream fsos = new BufferedOutputStream(store.createOutputStream(), IO_BUFFER_SIZE);
 				            ObjectOutputStream oos = new ObjectOutputStream(fsos);
 				            batch.writeExternal(oos);
 				            oos.flush();
 				            oos.close();
-				            fsos.flushBuffer();
 						}
 					}
 					this.batchReference = new WeakReference<TupleBatch>(batch);

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedFinder.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedFinder.java	2010-03-30 21:24:33 UTC (rev 2010)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedFinder.java	2010-03-31 03:11:58 UTC (rev 2011)
@@ -33,6 +33,7 @@
 
 import com.metamatrix.api.exception.MetaMatrixComponentException;
 import com.metamatrix.core.CoreConstants;
+import com.metamatrix.core.MetaMatrixRuntimeException;
 import com.metamatrix.dqp.DQPPlugin;
 import com.metamatrix.query.optimizer.capabilities.BasicSourceCapabilities;
 import com.metamatrix.query.optimizer.capabilities.CapabilitiesFinder;
@@ -73,7 +74,7 @@
         	try {
         		ConnectorManager mgr = this.connectorRepo.getConnectorManager(model.getSourceJndiName(sourceName));
         		if (mgr == null) {
-        			throw new ConnectorException(DQPPlugin.Util.getString("CachedFinder.no_connector_found", model.getSourceJndiName(sourceName), modelName, sourceName));
+        			throw new ConnectorException(DQPPlugin.Util.getString("CachedFinder.no_connector_found", model.getSourceJndiName(sourceName), modelName, sourceName)); //$NON-NLS-1$
         		}
         		caps = mgr.getCapabilities();
         		break;
@@ -88,6 +89,10 @@
         	throw new MetaMatrixComponentException(exception);
         }
         
+        if (caps == null) {
+        	throw new MetaMatrixRuntimeException("No sources were given for the model " + modelName); //$NON-NLS-1$
+        }
+        
         userCache.put(modelName, caps);
         return caps;
     }

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2010-03-30 21:24:33 UTC (rev 2010)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2010-03-31 03:11:58 UTC (rev 2011)
@@ -578,7 +578,7 @@
     	this.dataTierMgr = dataTierMgr;
     }
 
-	BufferManager getBufferManager() {
+	public BufferManager getBufferManager() {
 		return bufferManager;
 	}
 

Modified: trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java	2010-03-30 21:24:33 UTC (rev 2010)
+++ trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java	2010-03-31 03:11:58 UTC (rev 2011)
@@ -66,7 +66,7 @@
 import org.teiid.transport.ClientServiceRegistryImpl;
 import org.teiid.transport.LogonImpl;
 import org.teiid.transport.SocketConfiguration;
-import org.teiid.transport.SocketTransport;
+import org.teiid.transport.SocketListener;
 
 import com.metamatrix.api.exception.ComponentNotFoundException;
 import com.metamatrix.api.exception.MetaMatrixComponentException;
@@ -85,8 +85,8 @@
 
 	private transient SocketConfiguration jdbcSocketConfiguration;
 	private transient SocketConfiguration adminSocketConfiguration;	
-	private transient SocketTransport jdbcSocket;	
-	private transient SocketTransport adminSocket;
+	private transient SocketListener jdbcSocket;	
+	private transient SocketListener adminSocket;
 	private transient TransactionServerImpl transactionServerImpl = new TransactionServerImpl();
 		
 	private transient DQPCore dqpCore = new DQPCore();
@@ -123,16 +123,14 @@
     	this.csr.registerClientService(Admin.class, proxyService(Admin.class, admin), LogConstants.CTX_ADMIN_API);
     	
     	if (this.jdbcSocketConfiguration.isEnabled()) {
-	    	this.jdbcSocket = new SocketTransport(this.jdbcSocketConfiguration, csr);
-	    	this.jdbcSocket.start();
+	    	this.jdbcSocket = new SocketListener(this.jdbcSocketConfiguration, csr, this.dqpCore.getBufferManager());
 	    	LogManager.logInfo(LogConstants.CTX_RUNTIME, IntegrationPlugin.Util.getString("socket_enabled","Teiid JDBC = ",(this.jdbcSocketConfiguration.getSSLConfiguration().isSslEnabled()?"mms://":"mm://")+this.jdbcSocketConfiguration.getHostAddress().getHostName()+":"+this.jdbcSocketConfiguration.getPortNumber())); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$
     	} else {
     		LogManager.logInfo(LogConstants.CTX_RUNTIME, IntegrationPlugin.Util.getString("socket_not_enabled", "jdbc connections")); //$NON-NLS-1$ //$NON-NLS-2$
     	}
     	
     	if (this.adminSocketConfiguration.isEnabled()) {
-	    	this.adminSocket = new SocketTransport(this.adminSocketConfiguration, csr);
-	    	this.adminSocket.start(); 
+	    	this.adminSocket = new SocketListener(this.adminSocketConfiguration, csr, this.dqpCore.getBufferManager());
 	    	LogManager.logInfo(LogConstants.CTX_RUNTIME, IntegrationPlugin.Util.getString("socket_enabled","Teiid Admin", (this.adminSocketConfiguration.getSSLConfiguration().isSslEnabled()?"mms://":"mm://")+this.adminSocketConfiguration.getHostAddress().getHostName()+":"+this.adminSocketConfiguration.getPortNumber())); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$
     	} else {
     		LogManager.logInfo(LogConstants.CTX_RUNTIME, IntegrationPlugin.Util.getString("socket_not_enabled", "admin connections")); //$NON-NLS-1$ //$NON-NLS-2$

Added: trunk/runtime/src/main/java/org/teiid/transport/ObjectDecoder.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/transport/ObjectDecoder.java	                        (rev 0)
+++ trunk/runtime/src/main/java/org/teiid/transport/ObjectDecoder.java	2010-03-31 03:11:58 UTC (rev 2011)
@@ -0,0 +1,167 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @author tags. See the COPYRIGHT.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.teiid.transport;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.StreamCorruptedException;
+import java.util.List;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+import org.jboss.netty.handler.codec.serialization.CompatibleObjectDecoder;
+import org.jboss.netty.handler.codec.serialization.CompatibleObjectEncoder;
+import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
+import org.teiid.netty.handler.codec.serialization.CompactObjectInputStream;
+import org.teiid.netty.handler.codec.serialization.ObjectEncoderOutputStream;
+
+import com.metamatrix.common.buffer.FileStore;
+import com.metamatrix.common.buffer.StorageManager;
+import com.metamatrix.common.types.InputStreamFactory;
+import com.metamatrix.common.types.Streamable;
+import com.metamatrix.common.types.InputStreamFactory.StreamFactoryReference;
+import com.metamatrix.core.util.ExternalizeUtil;
+
+/**
+ * A decoder which deserializes the received {@link ChannelBuffer}s into Java
+ * objects.
+ * <p>
+ * Please note that the serialized form this decoder expects is not
+ * compatible with the standard {@link ObjectOutputStream}.  Please use
+ * {@link ObjectEncoder} or {@link ObjectEncoderOutputStream} to ensure the
+ * interoperability with this decoder.
+ * <p>
+ * Unless there's a requirement for the interoperability with the standard
+ * object streams, it is recommended to use {@link ObjectEncoder} and
+ * {@link ObjectDecoder} rather than {@link CompatibleObjectEncoder} and
+ * {@link CompatibleObjectDecoder}.
+ *
+ * @author The Netty Project (netty-dev at lists.jboss.org)
+ * @author Trustin Lee (tlee at redhat.com)
+ *
+ * @version $Rev: 381 $, $Date: 2008-10-01 20:06:18 +0900 (Wed, 01 Oct 2008) $
+ *
+ * @apiviz.landmark
+ */
+public class ObjectDecoder extends FrameDecoder {
+
+    private final int maxObjectSize;
+    private final ClassLoader classLoader;
+    
+    private Object result;
+    private int streamIndex;
+    private OutputStream stream;
+    private List<StreamFactoryReference> streams;
+    private StorageManager storageManager;
+
+    /**
+     * Creates a new decoder with the specified maximum object size.
+     *
+     * @param maxObjectSize  the maximum byte length of the serialized object.
+     *                       if the length of the received object is greater
+     *                       than this value, {@link StreamCorruptedException}
+     *                       will be raised.
+     * @param classLoader    the {@link ClassLoader} which will load the class
+     *                       of the serialized object
+     */
+    public ObjectDecoder(int maxObjectSize, ClassLoader classLoader, StorageManager storageManager) {
+        if (maxObjectSize <= 0) {
+            throw new IllegalArgumentException("maxObjectSize: " + maxObjectSize);
+        }
+
+        this.maxObjectSize = maxObjectSize;
+        this.classLoader = classLoader;
+        this.storageManager = storageManager;
+    }
+
+    @Override
+    protected Object decode(
+            ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
+    	if (result == null) {
+	        if (buffer.readableBytes() < 4) {
+	            return null;
+	        }
+	
+	        int dataLen = buffer.getInt(buffer.readerIndex());
+	        if (dataLen <= 0) {
+	            throw new StreamCorruptedException("invalid data length: " + dataLen);
+	        }
+	        if (dataLen > maxObjectSize) {
+	            throw new StreamCorruptedException(
+	                    "data length too big: " + dataLen + " (max: " + maxObjectSize + ')');
+	        }
+	
+	        if (buffer.readableBytes() < dataLen + 4) {
+	            return null;
+	        }
+	
+	        buffer.skipBytes(4);
+	        CompactObjectInputStream cois = new CompactObjectInputStream(
+	                new ChannelBufferInputStream(buffer, dataLen), classLoader);
+	        result = cois.readObject();
+	        streams = ExternalizeUtil.readList(cois, StreamFactoryReference.class);
+    	}
+    	while (streamIndex < streams.size()) {
+	    	if (buffer.readableBytes() < 2) {
+	            return null;
+	        }
+	        int dataLen = buffer.getShort(buffer.readerIndex()) & 0xff;
+	        if (dataLen == 0) {
+	        	stream.close();
+	        	stream = null;
+	        	streamIndex++;
+		        buffer.skipBytes(2);
+		        continue;
+	        }
+	        if (buffer.readableBytes() < dataLen + 2) {
+	            return null;
+	        }
+	        buffer.skipBytes(2);
+	        
+	        if (stream == null) {
+	        	final FileStore store = storageManager.createFileStore("temp-stream"); //$NON-NLS-1$
+		        StreamFactoryReference sfr = streams.get(streamIndex);
+		        store.setCleanupReference(sfr);
+		        sfr.setStreamFactory(new InputStreamFactory(Streamable.ENCODING) {
+					
+					@Override
+					public InputStream getInputStream() throws IOException {
+						return new BufferedInputStream(store.createInputStream(0));
+					}
+				});
+		        this.stream = new BufferedOutputStream(store.createOutputStream());
+	        }
+	        buffer.readBytes(this.stream, dataLen);
+    	}
+        Object toReturn = result;
+        result = null;
+        return toReturn;
+    }
+}


Property changes on: trunk/runtime/src/main/java/org/teiid/transport/ObjectDecoder.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Modified: trunk/runtime/src/main/java/org/teiid/transport/SSLAwareChannelHandler.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/transport/SSLAwareChannelHandler.java	2010-03-30 21:24:33 UTC (rev 2010)
+++ trunk/runtime/src/main/java/org/teiid/transport/SSLAwareChannelHandler.java	2010-03-31 03:11:58 UTC (rev 2011)
@@ -27,9 +27,8 @@
 
 import java.io.IOException;
 import java.net.SocketAddress;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -50,12 +49,12 @@
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
 import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
 import org.jboss.netty.handler.ssl.SslHandler;
 import org.teiid.net.NetPlugin;
 import org.teiid.net.socket.ObjectChannel;
 
+import com.metamatrix.common.buffer.StorageManager;
 import com.metamatrix.common.log.LogConstants;
 import com.metamatrix.common.log.LogManager;
 
@@ -140,7 +139,8 @@
 	private final ChannelListener.ChannelListenerFactory listenerFactory;
 	private final SSLConfiguration config;
 	private final ClassLoader classLoader;
-	private Map<Channel, ChannelListener> listeners = Collections.synchronizedMap(new HashMap<Channel, ChannelListener>());
+	private final StorageManager storageManager;
+	private Map<Channel, ChannelListener> listeners = new ConcurrentHashMap<Channel, ChannelListener>();
 	private AtomicLong objectsRead = new AtomicLong(0);
 	private AtomicLong objectsWritten = new AtomicLong(0);
 	private volatile int maxChannels;
@@ -158,20 +158,19 @@
 	};
 	 
 	public SSLAwareChannelHandler(ChannelListener.ChannelListenerFactory listenerFactory,
-			SSLConfiguration config, ClassLoader classloader) {
+			SSLConfiguration config, ClassLoader classloader, StorageManager storageManager) {
 		this.listenerFactory = listenerFactory;
 		this.config = config;
 		this.classLoader = classloader;
+		this.storageManager = storageManager;
 	}
 
 	@Override
 	public void channelConnected(ChannelHandlerContext ctx,
 			final ChannelStateEvent e) throws Exception {
 		ChannelListener listener = this.listenerFactory.createChannelListener(new ObjectChannelImpl(e.getChannel()));
-		synchronized (this.listeners) {
-			this.listeners.put(e.getChannel(), listener);
-			maxChannels = Math.max(maxChannels, this.listeners.size());
-		}
+		this.listeners.put(e.getChannel(), listener);
+		maxChannels = Math.max(maxChannels, this.listeners.size());
 		SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
 		if (sslHandler != null) {
 	        sslHandler.handshake(e.getChannel()).addListener(new ChannelFutureListener() {
@@ -228,7 +227,7 @@
 	    if (engine != null) {
 	        pipeline.addLast("ssl", new SslHandler(engine)); //$NON-NLS-1$
 	    }
-	    pipeline.addLast("decoder", new ObjectDecoder(1 << 24, classLoader)); //$NON-NLS-1$
+	    pipeline.addLast("decoder", new ObjectDecoder(1 << 20, classLoader, storageManager)); //$NON-NLS-1$
 	    pipeline.addLast("encoder", new ObjectEncoder()); //$NON-NLS-1$
 	    pipeline.addLast("handler", this); //$NON-NLS-1$
 	    return pipeline;

Modified: trunk/runtime/src/main/java/org/teiid/transport/SocketListener.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/transport/SocketListener.java	2010-03-30 21:24:33 UTC (rev 2010)
+++ trunk/runtime/src/main/java/org/teiid/transport/SocketListener.java	2010-03-31 03:11:58 UTC (rev 2011)
@@ -31,8 +31,10 @@
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 import org.teiid.net.socket.ObjectChannel;
+import org.teiid.runtime.RuntimePlugin;
 import org.teiid.transport.ChannelListener.ChannelListenerFactory;
 
+import com.metamatrix.common.buffer.StorageManager;
 import com.metamatrix.common.log.LogConstants;
 import com.metamatrix.common.log.LogManager;
 import com.metamatrix.common.util.ApplicationInfo;
@@ -49,6 +51,12 @@
     private ExecutorService nettyPool;
     private ClientServiceRegistryImpl csr;
     
+    public SocketListener(SocketConfiguration config, ClientServiceRegistryImpl csr, StorageManager storageManager) {
+		this(config.getPortNumber(), config.getHostAddress().getHostAddress(), config.getInputBufferSize(), config.getOutputBufferSize(), config.getMaxSocketThreads(), config.getSSLConfiguration(), csr, storageManager);
+        
+		LogManager.logDetail(LogConstants.CTX_TRANSPORT, RuntimePlugin.Util.getString("SocketTransport.1", new Object[] {config.getHostAddress().getHostAddress(), String.valueOf(config.getPortNumber())})); //$NON-NLS-1$
+    }
+    
     /**
      * 
      * @param port
@@ -59,7 +67,7 @@
      * @param server
      */
     public SocketListener(int port, String bindAddress, int inputBufferSize,
-			int outputBufferSize, int maxWorkers, SSLConfiguration config, ClientServiceRegistryImpl csr) {
+			int outputBufferSize, int maxWorkers, SSLConfiguration config, ClientServiceRegistryImpl csr, StorageManager storageManager) {
     	this.isClientEncryptionEnabled = config.isClientEncryptionEnabled();
     	this.csr = csr;
     	if (port < 0 || port > 0xFFFF) {
@@ -74,7 +82,7 @@
         ChannelFactory factory = new NioServerSocketChannelFactory(this.nettyPool, this.nettyPool, Math.min(Runtime.getRuntime().availableProcessors(), maxWorkers));
         
         ServerBootstrap bootstrap = new ServerBootstrap(factory);
-        this.channelHandler = new SSLAwareChannelHandler(this, config, Thread.currentThread().getContextClassLoader());
+        this.channelHandler = new SSLAwareChannelHandler(this, config, Thread.currentThread().getContextClassLoader(), storageManager);
         bootstrap.setPipelineFactory(channelHandler);
         if (inputBufferSize != 0) {
         	bootstrap.setOption("receiveBufferSize", new Integer(inputBufferSize)); //$NON-NLS-1$

Deleted: trunk/runtime/src/main/java/org/teiid/transport/SocketTransport.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/transport/SocketTransport.java	2010-03-30 21:24:33 UTC (rev 2010)
+++ trunk/runtime/src/main/java/org/teiid/transport/SocketTransport.java	2010-03-31 03:11:58 UTC (rev 2011)
@@ -1,59 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-package org.teiid.transport;
-
-import org.teiid.runtime.RuntimePlugin;
-
-import com.metamatrix.common.log.LogConstants;
-import com.metamatrix.common.log.LogManager;
-
-/**
- * This class starts a Socket for DQP connections and listens on the port and hands out the connections to the 
- * users 
- */
-public class SocketTransport {
-        
-	private SocketListener listener;
-	private SocketConfiguration config;
-	private ClientServiceRegistryImpl csr;
-	
-	public SocketTransport(SocketConfiguration config, ClientServiceRegistryImpl csr) {
-		this.config = config;
-		this.csr = csr;
-	}
-	
-    public void start() {
-        String bindAddress = this.config.getHostAddress().getHostAddress();
-        
-		LogManager.logDetail(LogConstants.CTX_TRANSPORT, RuntimePlugin.Util.getString("SocketTransport.1", new Object[] {bindAddress, String.valueOf(this.config.getPortNumber())})); //$NON-NLS-1$
-		this.listener = new SocketListener(this.config.getPortNumber(), bindAddress, this.config.getInputBufferSize(), this.config.getOutputBufferSize(), this.config.getMaxSocketThreads(), this.config.getSSLConfiguration(), csr);
-    }
-    
-    public void stop() {
-    	this.listener.stop();
-    }
-    
-    public SocketListenerStats getStats() {
-    	return this.listener.getStats();
-    }    
- 	
-}

Modified: trunk/runtime/src/test/java/org/teiid/transport/TestCommSockets.java
===================================================================
--- trunk/runtime/src/test/java/org/teiid/transport/TestCommSockets.java	2010-03-30 21:24:33 UTC (rev 2010)
+++ trunk/runtime/src/test/java/org/teiid/transport/TestCommSockets.java	2010-03-31 03:11:58 UTC (rev 2011)
@@ -25,6 +25,8 @@
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
+import java.io.ByteArrayInputStream;
+import java.io.StringReader;
 import java.net.InetSocketAddress;
 import java.util.Properties;
 
@@ -42,8 +44,10 @@
 import org.teiid.net.socket.SocketServerConnectionFactory;
 import org.teiid.net.socket.SocketUtil;
 import org.teiid.net.socket.UrlServerDiscovery;
+import org.teiid.transport.TestSocketRemoting.FakeService;
 
 import com.metamatrix.api.exception.ComponentNotFoundException;
+import com.metamatrix.common.buffer.BufferManagerFactory;
 import com.metamatrix.common.util.crypto.NullCryptor;
 import com.metamatrix.dqp.service.SessionService;
 
@@ -65,7 +69,7 @@
 
 	@Test public void testFailedConnect() throws Exception {
 		SSLConfiguration config = new SSLConfiguration();
-		listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(),1024, 1024, 1, config, null);
+		listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(),1024, 1024, 1, config, null, BufferManagerFactory.getStandaloneBufferManager());
 
 		try {
 			Properties p = new Properties();
@@ -116,6 +120,11 @@
 		assertEquals(1, stats.maxSockets);
 	}
 
+	@Test public void testLobs() throws Exception {
+		SocketServerConnection conn = helpEstablishConnection(false);
+		FakeService fs = conn.getService(FakeService.class);
+		assertEquals(150, fs.lobMethod(new ByteArrayInputStream(new byte[100]), new StringReader(new String(new char[50]))));
+	}
 
 	@Test public void testConnectWithoutClientEncryption() throws Exception {
 		SSLConfiguration config = new SSLConfiguration();
@@ -141,7 +150,8 @@
 				}
 
 			}, null); 
-			listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(), 1024, 1024, 1, config, server);
+			server.registerClientService(FakeService.class, new TestSocketRemoting.FakeServiceImpl(), null);
+			listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(), 1024, 1024, 1, config, server, BufferManagerFactory.getStandaloneBufferManager());
 			
 			SocketListenerStats stats = listener.getStats();
 			assertEquals(0, stats.maxSockets);

Modified: trunk/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java
===================================================================
--- trunk/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java	2010-03-30 21:24:33 UTC (rev 2010)
+++ trunk/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java	2010-03-31 03:11:58 UTC (rev 2011)
@@ -25,6 +25,8 @@
 import static org.junit.Assert.*;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
 import java.io.Serializable;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
@@ -56,6 +58,7 @@
 import com.metamatrix.api.exception.MetaMatrixProcessingException;
 import com.metamatrix.common.util.crypto.Cryptor;
 import com.metamatrix.common.util.crypto.NullCryptor;
+import com.metamatrix.core.util.ObjectConverterUtil;
 
 public class TestSocketRemoting {
 	
@@ -65,9 +68,11 @@
 		
 		String exceptionMethod() throws MetaMatrixProcessingException;
 		
+		int lobMethod(InputStream is, Reader r) throws IOException;
+		
 	}
 	
-	private static class FakeServiceImpl implements FakeService {
+	static class FakeServiceImpl implements FakeService {
 
 		public ResultsFuture<Integer> asynchResult() {
 			ResultsFuture<Integer> result = new ResultsFuture<Integer>();
@@ -79,6 +84,11 @@
 			throw new MetaMatrixProcessingException();
 		}
 		
+		@Override
+		public int lobMethod(InputStream is, Reader r) throws IOException {
+			return ObjectConverterUtil.convertToByteArray(is).length + ObjectConverterUtil.convertToString(r).length();
+		}
+		
 	}
 	
 	private static class FakeClientServerInstance extends SocketServerInstanceImpl implements ClientInstance {



More information about the teiid-commits mailing list