Author: shawkins
Date: 2010-03-30 23:11:58 -0400 (Tue, 30 Mar 2010)
New Revision: 2011
Added:
trunk/runtime/src/main/java/org/teiid/transport/ObjectDecoder.java
Removed:
trunk/runtime/src/main/java/org/teiid/transport/SocketTransport.java
trunk/runtime/src/test/java/com/metamatrix/platform/security/
Modified:
trunk/client/src/main/java/org/teiid/jdbc/PreparedStatementImpl.java
trunk/client/src/main/java/org/teiid/net/socket/SocketServerInstanceImpl.java
trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/CompactObjectInputStream.java
trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/CompactObjectOutputStream.java
trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectEncoderOutputStream.java
trunk/common-core/src/main/java/com/metamatrix/common/types/BaseLob.java
trunk/common-core/src/main/java/com/metamatrix/common/types/InputStreamFactory.java
trunk/common-core/src/main/java/com/metamatrix/core/util/ExternalizeUtil.java
trunk/common-core/src/test/java/com/metamatrix/core/util/TestExternalizeUtil.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/FileStore.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedFinder.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
trunk/runtime/src/main/java/org/teiid/transport/SSLAwareChannelHandler.java
trunk/runtime/src/main/java/org/teiid/transport/SocketListener.java
trunk/runtime/src/test/java/org/teiid/transport/TestCommSockets.java
trunk/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java
Log:
TEIID-943 adding streaming from client to server.
Modified: trunk/client/src/main/java/org/teiid/jdbc/PreparedStatementImpl.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/PreparedStatementImpl.java 2010-03-30
21:24:33 UTC (rev 2010)
+++ trunk/client/src/main/java/org/teiid/jdbc/PreparedStatementImpl.java 2010-03-31
03:11:58 UTC (rev 2011)
@@ -49,17 +49,17 @@
import java.util.TreeMap;
import javax.sql.rowset.serial.SerialBlob;
-import javax.sql.rowset.serial.SerialClob;
import org.teiid.client.RequestMessage;
import org.teiid.client.RequestMessage.ResultsMode;
import org.teiid.client.RequestMessage.StatementType;
import org.teiid.client.metadata.MetadataResult;
-
-
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.api.exception.MetaMatrixProcessingException;
+import com.metamatrix.common.types.BlobImpl;
+import com.metamatrix.common.types.ClobImpl;
+import com.metamatrix.common.types.InputStreamFactory;
import com.metamatrix.common.types.JDBCSQLTypeInfo;
import com.metamatrix.common.util.SqlUtil;
import com.metamatrix.common.util.TimestampWithTimezone;
@@ -278,13 +278,8 @@
return metadataResults;
}
- public void setAsciiStream (int parameterIndex, java.io.InputStream in, int length)
throws SQLException {
- //create a clob from the ascii stream
- try {
- setObject(parameterIndex, new SerialClob(ObjectConverterUtil.convertToCharArray(in,
length, "ASCII"))); //$NON-NLS-1$
- } catch (IOException e) {
- throw TeiidSQLException.create(e);
- }
+ public void setAsciiStream(int parameterIndex, java.io.InputStream in, int length)
throws SQLException {
+ setAsciiStream(parameterIndex, in);
}
/**
@@ -299,7 +294,6 @@
}
public void setBinaryStream(int parameterIndex, java.io.InputStream in, int length)
throws SQLException {
- //create a blob from the ascii stream
try {
setObject(parameterIndex, new SerialBlob(ObjectConverterUtil.convertToByteArray(in,
length)));
} catch (IOException e) {
@@ -345,17 +339,11 @@
* @param bytes array to which the parameter value is to be set.
*/
public void setBytes(int parameterIndex, byte bytes[]) throws SQLException {
- //create a blob from the ascii stream
setObject(parameterIndex, new SerialBlob(bytes));
}
public void setCharacterStream (int parameterIndex, java.io.Reader reader, int
length) throws SQLException {
- //create a clob from the ascii stream
- try {
- setObject(parameterIndex, new
SerialClob(ObjectConverterUtil.convertToCharArray(reader, length)));
- } catch (IOException e) {
- throw TeiidSQLException.create(e);
- }
+ setCharacterStream(parameterIndex, reader);
}
/**
@@ -692,52 +680,68 @@
throw SqlUtil.createFeatureNotSupportedException();
}
- public void setAsciiStream(int parameterIndex, InputStream x)
+ public void setAsciiStream(int parameterIndex, final InputStream x)
throws SQLException {
- throw SqlUtil.createFeatureNotSupportedException();
+ this.setObject(parameterIndex, new ClobImpl(new
InputStreamFactory("US-ASCII") { //$NON-NLS-1$
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return x;
+ }
+ }, -1));
}
public void setAsciiStream(int parameterIndex, InputStream x, long length)
throws SQLException {
- throw SqlUtil.createFeatureNotSupportedException();
+ setAsciiStream(parameterIndex, x);
}
public void setBinaryStream(int parameterIndex, InputStream x)
throws SQLException {
- throw SqlUtil.createFeatureNotSupportedException();
+ setBlob(parameterIndex, x);
}
public void setBinaryStream(int parameterIndex, InputStream x, long length)
throws SQLException {
- throw SqlUtil.createFeatureNotSupportedException();
+ setBinaryStream(parameterIndex, x);
}
- public void setBlob(int parameterIndex, InputStream inputStream)
+ public void setBlob(int parameterIndex, final InputStream inputStream)
throws SQLException {
- throw SqlUtil.createFeatureNotSupportedException();
+ this.setObject(parameterIndex, new BlobImpl(new InputStreamFactory() {
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return inputStream;
+ }
+ }));
}
public void setBlob(int parameterIndex, InputStream inputStream, long length) throws
SQLException {
- throw SqlUtil.createFeatureNotSupportedException();
+ setBlob(parameterIndex, inputStream);
}
public void setCharacterStream(int parameterIndex, Reader reader)
throws SQLException {
- throw SqlUtil.createFeatureNotSupportedException();
+ setClob(parameterIndex, reader);
}
public void setCharacterStream(int parameterIndex, Reader reader,
long length) throws SQLException {
- throw SqlUtil.createFeatureNotSupportedException();
+ setCharacterStream(parameterIndex, reader);
}
public void setClob(int parameterIndex, Reader reader) throws SQLException {
- throw SqlUtil.createFeatureNotSupportedException();
+ this.setObject(parameterIndex, new ClobImpl(new InputStreamFactory() {
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return null;
+ }
+ }, -1));
}
public void setClob(int parameterIndex, Reader reader, long length)
throws SQLException {
- throw SqlUtil.createFeatureNotSupportedException();
+ setClob(parameterIndex, reader);
}
public void setNCharacterStream(int parameterIndex, Reader value)
Modified: trunk/client/src/main/java/org/teiid/net/socket/SocketServerInstanceImpl.java
===================================================================
---
trunk/client/src/main/java/org/teiid/net/socket/SocketServerInstanceImpl.java 2010-03-30
21:24:33 UTC (rev 2010)
+++
trunk/client/src/main/java/org/teiid/net/socket/SocketServerInstanceImpl.java 2010-03-31
03:11:58 UTC (rev 2011)
@@ -44,7 +44,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.teiid.client.DQP;
+import org.teiid.client.security.ILogon;
import org.teiid.client.util.ExceptionUtil;
import org.teiid.client.util.ResultsFuture;
import org.teiid.client.util.ResultsReceiver;
@@ -300,7 +300,7 @@
public RemoteInvocationHandler(Class<?> targetClass) {
this.targetClass = targetClass;
- this.secure = !DQP.class.isAssignableFrom(targetClass);
+ this.secure = ILogon.class.isAssignableFrom(targetClass);
}
//## JDBC4.0-begin ##
Modified:
trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/CompactObjectInputStream.java
===================================================================
---
trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/CompactObjectInputStream.java 2010-03-30
21:24:33 UTC (rev 2010)
+++
trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/CompactObjectInputStream.java 2010-03-31
03:11:58 UTC (rev 2011)
@@ -36,7 +36,7 @@
* @version $Rev: 381 $, $Date: 2008-10-01 06:06:18 -0500 (Wed, 01 Oct 2008) $
*
*/
-class CompactObjectInputStream extends ObjectInputStream {
+public class CompactObjectInputStream extends ObjectInputStream {
private final ClassLoader classLoader;
@@ -44,7 +44,7 @@
this(in, null);
}
- CompactObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException
{
+ public CompactObjectInputStream(InputStream in, ClassLoader classLoader) throws
IOException {
super(in);
this.classLoader = classLoader;
}
Modified:
trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/CompactObjectOutputStream.java
===================================================================
---
trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/CompactObjectOutputStream.java 2010-03-30
21:24:33 UTC (rev 2010)
+++
trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/CompactObjectOutputStream.java 2010-03-31
03:11:58 UTC (rev 2011)
@@ -22,11 +22,32 @@
*/
package org.teiid.netty.handler.codec.serialization;
+import java.io.Externalizable;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamClass;
import java.io.OutputStream;
+import java.io.Reader;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.SQLException;
+import java.sql.SQLXML;
+import java.util.LinkedList;
+import java.util.List;
+import com.metamatrix.common.types.BlobImpl;
+import com.metamatrix.common.types.ClobImpl;
+import com.metamatrix.common.types.InputStreamFactory;
+import com.metamatrix.common.types.SQLXMLImpl;
+import com.metamatrix.common.types.Streamable;
+import com.metamatrix.common.types.InputStreamFactory.StreamFactoryReference;
+import com.metamatrix.common.util.ReaderInputStream;
+
/**
* @author The Netty Project (netty-dev(a)lists.jboss.org)
* @author Trustin Lee (tlee(a)redhat.com)
@@ -34,14 +55,26 @@
* @version $Rev: 6 $, $Date: 2008-08-07 20:40:10 -0500 (Thu, 07 Aug 2008) $
*
*/
-class CompactObjectOutputStream extends ObjectOutputStream {
+public class CompactObjectOutputStream extends ObjectOutputStream {
static final int TYPE_PRIMITIVE = 0;
static final int TYPE_NON_PRIMITIVE = 1;
-
- CompactObjectOutputStream(OutputStream out) throws IOException {
+
+ private List<InputStream> lobs = new LinkedList<InputStream>();
+ private List<StreamFactoryReference> references = new
LinkedList<StreamFactoryReference>();
+
+ public CompactObjectOutputStream(OutputStream out) throws IOException {
super(out);
+ enableReplaceObject(true);
}
+
+ public List<InputStream> getLobs() {
+ return lobs;
+ }
+
+ public List<StreamFactoryReference> getReferences() {
+ return references;
+ }
@Override
protected void writeStreamHeader() throws IOException {
@@ -58,4 +91,113 @@
writeUTF(desc.getName());
}
}
+
+ @Override
+ protected Object replaceObject(Object obj) throws IOException {
+ if (obj instanceof Serializable) {
+ return obj;
+ }
+ try {
+ if (obj instanceof Reader) {
+ lobs.add(new ReaderInputStream((Reader)obj,
Charset.forName(Streamable.ENCODING)));
+ StreamFactoryReference sfr = new SerializableReader();
+ references.add(sfr);
+ return sfr;
+ } else if (obj instanceof InputStream) {
+ lobs.add((InputStream)obj);
+ StreamFactoryReference sfr = new SerializableInputStream();
+ references.add(sfr);
+ return sfr;
+ } else if (obj instanceof SQLXML) {
+ lobs.add(new ReaderInputStream(((SQLXML)obj).getCharacterStream(),
Charset.forName(Streamable.ENCODING)));
+ StreamFactoryReference sfr = new SQLXMLImpl((InputStreamFactory)null);
+ references.add(sfr);
+ return sfr;
+ } else if (obj instanceof Clob) {
+ lobs.add(new ReaderInputStream(((Clob)obj).getCharacterStream(),
Charset.forName(Streamable.ENCODING)));
+ StreamFactoryReference sfr = new ClobImpl(null, -1);
+ references.add(sfr);
+ return sfr;
+ } else if (obj instanceof Blob) {
+ lobs.add(((Blob)obj).getBinaryStream());
+ StreamFactoryReference sfr = new BlobImpl(null);
+ references.add(sfr);
+ return sfr;
+ }
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ return super.replaceObject(obj);
+ }
+
+ static class SerializableInputStream extends InputStream implements Externalizable,
StreamFactoryReference {
+
+ private InputStreamFactory isf;
+ private InputStream is;
+
+ public SerializableInputStream() {
+ }
+
+ public void setStreamFactory(InputStreamFactory streamFactory) {
+ this.isf = streamFactory;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (is == null) {
+ is = isf.getInputStream();
+ }
+ return is.read();
+ }
+
+ @Override
+ public void close() throws IOException {
+ isf.free();
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ }
+ }
+
+ static class SerializableReader extends Reader implements Externalizable,
StreamFactoryReference {
+
+ private InputStreamFactory isf;
+ private Reader r;
+
+ public SerializableReader() {
+ }
+
+ public void setStreamFactory(InputStreamFactory streamFactory) {
+ this.isf = streamFactory;
+ }
+
+ @Override
+ public void close() throws IOException {
+ isf.free();
+ }
+
+ @Override
+ public int read(char[] cbuf, int off, int len) throws IOException {
+ if (r == null) {
+ r = isf.getCharacterStream();
+ }
+ return r.read(cbuf, off, len);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ }
+ }
+
}
Modified:
trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectEncoderOutputStream.java
===================================================================
---
trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectEncoderOutputStream.java 2010-03-30
21:24:33 UTC (rev 2010)
+++
trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectEncoderOutputStream.java 2010-03-31
03:11:58 UTC (rev 2011)
@@ -24,10 +24,12 @@
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import com.metamatrix.core.util.AccessibleByteArrayOutputStream;
+import com.metamatrix.core.util.ExternalizeUtil;
/**
* An {@link ObjectOutput} which is interoperable with {@link ObjectDecoder}
@@ -53,13 +55,25 @@
@Override
final protected void writeObjectOverride(Object obj) throws IOException {
AccessibleByteArrayOutputStream baos = new
AccessibleByteArrayOutputStream(estimatedLength);
- ObjectOutputStream oout = new CompactObjectOutputStream(baos);
+ CompactObjectOutputStream oout = new CompactObjectOutputStream(baos);
oout.writeObject(obj);
+ ExternalizeUtil.writeCollection(oout, oout.getReferences());
oout.flush();
oout.close();
-
- out.writeInt(baos.getCount());
+
+ out.writeInt(baos.getCount()); //includes the lob references
out.write(baos.getBuffer(), 0, baos.getCount());
+ byte[] chunk = new byte[1 << 16];
+ for (InputStream is : oout.getLobs()) {
+ while (true) {
+ int bytes = is.read(chunk);
+ out.writeShort(Math.max(0, bytes));
+ if (bytes < 1) {
+ break;
+ }
+ out.write(chunk, 0, bytes);
+ }
+ }
}
@Override
Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/BaseLob.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/BaseLob.java 2010-03-30
21:24:33 UTC (rev 2010)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/BaseLob.java 2010-03-31
03:11:58 UTC (rev 2011)
@@ -2,17 +2,24 @@
import java.io.IOException;
import java.io.InputStream;
-import java.io.InputStreamReader;
import java.io.Reader;
+import java.io.Serializable;
import java.sql.SQLException;
-public class BaseLob {
+import com.metamatrix.common.types.InputStreamFactory.StreamFactoryReference;
+
+public class BaseLob implements Serializable, StreamFactoryReference {
+ private static final long serialVersionUID = -1586959324208959519L;
private InputStreamFactory streamFactory;
protected BaseLob(InputStreamFactory streamFactory) {
this.streamFactory = streamFactory;
}
+
+ public void setStreamFactory(InputStreamFactory streamFactory) {
+ this.streamFactory = streamFactory;
+ }
public InputStreamFactory getStreamFactory() throws SQLException {
if (this.streamFactory == null) {
@@ -36,7 +43,7 @@
public Reader getCharacterStream() throws SQLException {
try {
- return new InputStreamReader(this.getStreamFactory().getInputStream(),
this.getStreamFactory().getEncoding());
+ return this.getStreamFactory().getCharacterStream();
} catch (IOException e) {
SQLException ex = new SQLException(e.getMessage());
ex.initCause(e);
Modified:
trunk/common-core/src/main/java/com/metamatrix/common/types/InputStreamFactory.java
===================================================================
---
trunk/common-core/src/main/java/com/metamatrix/common/types/InputStreamFactory.java 2010-03-30
21:24:33 UTC (rev 2010)
+++
trunk/common-core/src/main/java/com/metamatrix/common/types/InputStreamFactory.java 2010-03-31
03:11:58 UTC (rev 2011)
@@ -24,13 +24,20 @@
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
import java.nio.charset.Charset;
import javax.xml.transform.Source;
-//TODO: add support for Readers
public abstract class InputStreamFactory implements Source {
+ public interface StreamFactoryReference {
+
+ void setStreamFactory(InputStreamFactory inputStreamFactory);
+
+ }
+
private String encoding;
private String systemId;
private long length = -1;
@@ -74,4 +81,8 @@
public void setLength(long length) {
this.length = length;
}
+
+ public Reader getCharacterStream() throws IOException {
+ return new InputStreamReader(this.getInputStream(), this.getEncoding());
+ }
}
Modified: trunk/common-core/src/main/java/com/metamatrix/core/util/ExternalizeUtil.java
===================================================================
---
trunk/common-core/src/main/java/com/metamatrix/core/util/ExternalizeUtil.java 2010-03-30
21:24:33 UTC (rev 2010)
+++
trunk/common-core/src/main/java/com/metamatrix/core/util/ExternalizeUtil.java 2010-03-31
03:11:58 UTC (rev 2011)
@@ -71,8 +71,8 @@
out.writeInt(0);
} else {
final int size = coll.size();
+ out.writeInt(coll.size());
if (size > 0) {
- out.writeInt(coll.size());
for (Object object : coll) {
out.writeObject(object);
}
Modified:
trunk/common-core/src/test/java/com/metamatrix/core/util/TestExternalizeUtil.java
===================================================================
---
trunk/common-core/src/test/java/com/metamatrix/core/util/TestExternalizeUtil.java 2010-03-30
21:24:33 UTC (rev 2010)
+++
trunk/common-core/src/test/java/com/metamatrix/core/util/TestExternalizeUtil.java 2010-03-31
03:11:58 UTC (rev 2011)
@@ -26,6 +26,8 @@
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.util.Arrays;
+import java.util.List;
import junit.framework.TestCase;
@@ -36,10 +38,6 @@
private ByteArrayOutputStream bout;
private ObjectOutputStream oout;
- private ByteArrayInputStream bin;
- private ObjectInputStream oin;
-
-
/**
* Constructor for TestExternalizeUtil.
* @param name
@@ -53,18 +51,6 @@
oout = new ObjectOutputStream(bout);
}
- public void tearDown() throws Exception {
- oout.close();
- bout.close();
-
- if (oin!=null) {
- oin.close();
- }
- if (bin!=null) {
- bin.close();
- }
- }
-
/**
* Test ExternalizeUtil writeThrowable() and readThrowable() on Throwables.
* @throws Exception
@@ -76,8 +62,8 @@
ExternalizeUtil.writeThrowable(oout, t1);
oout.flush();
- bin = new ByteArrayInputStream(bout.toByteArray());
- oin = new ObjectInputStream(bin);
+ ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
+ ObjectInputStream oin = new ObjectInputStream(bin);
Throwable result1 = ExternalizeUtil.readThrowable(oin);
assertEqualThrowables(t1, result1);
@@ -102,8 +88,8 @@
ExternalizeUtil.writeThrowable(oout, t1);
oout.flush();
- bin = new ByteArrayInputStream(bout.toByteArray());
- oin = new ObjectInputStream(bin);
+ ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
+ ObjectInputStream oin = new ObjectInputStream(bin);
MetaMatrixCoreException result1 = (MetaMatrixCoreException)
ExternalizeUtil.readThrowable(oin);
assertEqualThrowables(t1, result1);
@@ -129,5 +115,15 @@
assertEquals(stack1[i], stack2[i]);
}
}
+
+ public void testEmptyCollection() throws Exception {
+ ExternalizeUtil.writeCollection(oout, Arrays.asList(new Object[0]));
+ oout.flush();
+ ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
+ ObjectInputStream oin = new ObjectInputStream(bin);
+
+ List<?> result = ExternalizeUtil.readList(oin);
+ assertEquals(0, result.size());
+ }
}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java 2010-03-30
21:24:33 UTC (rev 2010)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java 2010-03-31
03:11:58 UTC (rev 2011)
@@ -34,7 +34,7 @@
* how to store data. The buffer manager should also be aware of memory
* management issues.
*/
-public interface BufferManager {
+public interface BufferManager extends StorageManager {
public enum TupleSourceType {
/**
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/FileStore.java 2010-03-30
21:24:33 UTC (rev 2010)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/FileStore.java 2010-03-31
03:11:58 UTC (rev 2011)
@@ -218,6 +218,25 @@
};
}
+ public OutputStream createOutputStream() {
+ return new OutputStream() {
+
+ @Override
+ public void write(int b) throws IOException {
+ throw new UnsupportedOperationException("buffered reading must be used");
//$NON-NLS-1$
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ try {
+ FileStore.this.write(b, off, len);
+ } catch (MetaMatrixComponentException e) {
+ throw new IOException(e);
+ }
+ }
+ };
+ }
+
public FileStoreOutputStream createOutputStream(int maxMemorySize) {
return new FileStoreOutputStream(maxMemorySize);
}
Modified:
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2010-03-30
21:24:33 UTC (rev 2010)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2010-03-31
03:11:58 UTC (rev 2011)
@@ -23,9 +23,11 @@
package com.metamatrix.common.buffer.impl;
import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.io.OutputStream;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.util.Iterator;
@@ -213,12 +215,11 @@
LogManager.logTrace(com.metamatrix.common.log.LogConstants.CTX_BUFFER_MGR,
"Writing batch to disk", writeCount.incrementAndGet()); //$NON-NLS-1$
synchronized (store) {
offset = store.getLength();
- FileStoreOutputStream fsos = store.createOutputStream(IO_BUFFER_SIZE);
+ OutputStream fsos = new BufferedOutputStream(store.createOutputStream(),
IO_BUFFER_SIZE);
ObjectOutputStream oos = new ObjectOutputStream(fsos);
batch.writeExternal(oos);
oos.flush();
oos.close();
- fsos.flushBuffer();
}
}
this.batchReference = new WeakReference<TupleBatch>(batch);
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedFinder.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedFinder.java 2010-03-30
21:24:33 UTC (rev 2010)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedFinder.java 2010-03-31
03:11:58 UTC (rev 2011)
@@ -33,6 +33,7 @@
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.core.CoreConstants;
+import com.metamatrix.core.MetaMatrixRuntimeException;
import com.metamatrix.dqp.DQPPlugin;
import com.metamatrix.query.optimizer.capabilities.BasicSourceCapabilities;
import com.metamatrix.query.optimizer.capabilities.CapabilitiesFinder;
@@ -73,7 +74,7 @@
try {
ConnectorManager mgr =
this.connectorRepo.getConnectorManager(model.getSourceJndiName(sourceName));
if (mgr == null) {
- throw new
ConnectorException(DQPPlugin.Util.getString("CachedFinder.no_connector_found",
model.getSourceJndiName(sourceName), modelName, sourceName));
+ throw new
ConnectorException(DQPPlugin.Util.getString("CachedFinder.no_connector_found",
model.getSourceJndiName(sourceName), modelName, sourceName)); //$NON-NLS-1$
}
caps = mgr.getCapabilities();
break;
@@ -88,6 +89,10 @@
throw new MetaMatrixComponentException(exception);
}
+ if (caps == null) {
+ throw new MetaMatrixRuntimeException("No sources were given for the model
" + modelName); //$NON-NLS-1$
+ }
+
userCache.put(modelName, caps);
return caps;
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2010-03-30
21:24:33 UTC (rev 2010)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2010-03-31
03:11:58 UTC (rev 2011)
@@ -578,7 +578,7 @@
this.dataTierMgr = dataTierMgr;
}
- BufferManager getBufferManager() {
+ public BufferManager getBufferManager() {
return bufferManager;
}
Modified:
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
===================================================================
---
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2010-03-30
21:24:33 UTC (rev 2010)
+++
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2010-03-31
03:11:58 UTC (rev 2011)
@@ -66,7 +66,7 @@
import org.teiid.transport.ClientServiceRegistryImpl;
import org.teiid.transport.LogonImpl;
import org.teiid.transport.SocketConfiguration;
-import org.teiid.transport.SocketTransport;
+import org.teiid.transport.SocketListener;
import com.metamatrix.api.exception.ComponentNotFoundException;
import com.metamatrix.api.exception.MetaMatrixComponentException;
@@ -85,8 +85,8 @@
private transient SocketConfiguration jdbcSocketConfiguration;
private transient SocketConfiguration adminSocketConfiguration;
- private transient SocketTransport jdbcSocket;
- private transient SocketTransport adminSocket;
+ private transient SocketListener jdbcSocket;
+ private transient SocketListener adminSocket;
private transient TransactionServerImpl transactionServerImpl = new
TransactionServerImpl();
private transient DQPCore dqpCore = new DQPCore();
@@ -123,16 +123,14 @@
this.csr.registerClientService(Admin.class, proxyService(Admin.class, admin),
LogConstants.CTX_ADMIN_API);
if (this.jdbcSocketConfiguration.isEnabled()) {
- this.jdbcSocket = new SocketTransport(this.jdbcSocketConfiguration, csr);
- this.jdbcSocket.start();
+ this.jdbcSocket = new SocketListener(this.jdbcSocketConfiguration, csr,
this.dqpCore.getBufferManager());
LogManager.logInfo(LogConstants.CTX_RUNTIME,
IntegrationPlugin.Util.getString("socket_enabled","Teiid JDBC =
",(this.jdbcSocketConfiguration.getSSLConfiguration().isSslEnabled()?"mms://":"mm://")+this.jdbcSocketConfiguration.getHostAddress().getHostName()+":"+this.jdbcSocketConfiguration.getPortNumber()));
//$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$
} else {
LogManager.logInfo(LogConstants.CTX_RUNTIME,
IntegrationPlugin.Util.getString("socket_not_enabled", "jdbc
connections")); //$NON-NLS-1$ //$NON-NLS-2$
}
if (this.adminSocketConfiguration.isEnabled()) {
- this.adminSocket = new SocketTransport(this.adminSocketConfiguration, csr);
- this.adminSocket.start();
+ this.adminSocket = new SocketListener(this.adminSocketConfiguration, csr,
this.dqpCore.getBufferManager());
LogManager.logInfo(LogConstants.CTX_RUNTIME,
IntegrationPlugin.Util.getString("socket_enabled","Teiid Admin",
(this.adminSocketConfiguration.getSSLConfiguration().isSslEnabled()?"mms://":"mm://")+this.adminSocketConfiguration.getHostAddress().getHostName()+":"+this.adminSocketConfiguration.getPortNumber()));
//$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$
} else {
LogManager.logInfo(LogConstants.CTX_RUNTIME,
IntegrationPlugin.Util.getString("socket_not_enabled", "admin
connections")); //$NON-NLS-1$ //$NON-NLS-2$
Added: trunk/runtime/src/main/java/org/teiid/transport/ObjectDecoder.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/transport/ObjectDecoder.java
(rev 0)
+++ trunk/runtime/src/main/java/org/teiid/transport/ObjectDecoder.java 2010-03-31 03:11:58
UTC (rev 2011)
@@ -0,0 +1,167 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @author tags. See the COPYRIGHT.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.teiid.transport;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.StreamCorruptedException;
+import java.util.List;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+import org.jboss.netty.handler.codec.serialization.CompatibleObjectDecoder;
+import org.jboss.netty.handler.codec.serialization.CompatibleObjectEncoder;
+import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
+import org.teiid.netty.handler.codec.serialization.CompactObjectInputStream;
+import org.teiid.netty.handler.codec.serialization.ObjectEncoderOutputStream;
+
+import com.metamatrix.common.buffer.FileStore;
+import com.metamatrix.common.buffer.StorageManager;
+import com.metamatrix.common.types.InputStreamFactory;
+import com.metamatrix.common.types.Streamable;
+import com.metamatrix.common.types.InputStreamFactory.StreamFactoryReference;
+import com.metamatrix.core.util.ExternalizeUtil;
+
+/**
+ * A decoder which deserializes the received {@link ChannelBuffer}s into Java
+ * objects.
+ * <p>
+ * Please note that the serialized form this decoder expects is not
+ * compatible with the standard {@link ObjectOutputStream}. Please use
+ * {@link ObjectEncoder} or {@link ObjectEncoderOutputStream} to ensure the
+ * interoperability with this decoder.
+ * <p>
+ * Unless there's a requirement for the interoperability with the standard
+ * object streams, it is recommended to use {@link ObjectEncoder} and
+ * {@link ObjectDecoder} rather than {@link CompatibleObjectEncoder} and
+ * {@link CompatibleObjectDecoder}.
+ *
+ * @author The Netty Project (netty-dev(a)lists.jboss.org)
+ * @author Trustin Lee (tlee(a)redhat.com)
+ *
+ * @version $Rev: 381 $, $Date: 2008-10-01 20:06:18 +0900 (Wed, 01 Oct 2008) $
+ *
+ * @apiviz.landmark
+ */
+public class ObjectDecoder extends FrameDecoder {
+
+ private final int maxObjectSize;
+ private final ClassLoader classLoader;
+
+ private Object result;
+ private int streamIndex;
+ private OutputStream stream;
+ private List<StreamFactoryReference> streams;
+ private StorageManager storageManager;
+
+ /**
+ * Creates a new decoder with the specified maximum object size.
+ *
+ * @param maxObjectSize the maximum byte length of the serialized object.
+ * if the length of the received object is greater
+ * than this value, {@link StreamCorruptedException}
+ * will be raised.
+ * @param classLoader the {@link ClassLoader} which will load the class
+ * of the serialized object
+ */
+ public ObjectDecoder(int maxObjectSize, ClassLoader classLoader, StorageManager
storageManager) {
+ if (maxObjectSize <= 0) {
+ throw new IllegalArgumentException("maxObjectSize: " +
maxObjectSize);
+ }
+
+ this.maxObjectSize = maxObjectSize;
+ this.classLoader = classLoader;
+ this.storageManager = storageManager;
+ }
+
+ @Override
+ protected Object decode(
+ ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws
Exception {
+ if (result == null) {
+ if (buffer.readableBytes() < 4) {
+ return null;
+ }
+
+ int dataLen = buffer.getInt(buffer.readerIndex());
+ if (dataLen <= 0) {
+ throw new StreamCorruptedException("invalid data length: " +
dataLen);
+ }
+ if (dataLen > maxObjectSize) {
+ throw new StreamCorruptedException(
+ "data length too big: " + dataLen + " (max: " +
maxObjectSize + ')');
+ }
+
+ if (buffer.readableBytes() < dataLen + 4) {
+ return null;
+ }
+
+ buffer.skipBytes(4);
+ CompactObjectInputStream cois = new CompactObjectInputStream(
+ new ChannelBufferInputStream(buffer, dataLen), classLoader);
+ result = cois.readObject();
+ streams = ExternalizeUtil.readList(cois, StreamFactoryReference.class);
+ }
+ while (streamIndex < streams.size()) {
+ if (buffer.readableBytes() < 2) {
+ return null;
+ }
+ int dataLen = buffer.getShort(buffer.readerIndex()) & 0xff;
+ if (dataLen == 0) {
+ stream.close();
+ stream = null;
+ streamIndex++;
+ buffer.skipBytes(2);
+ continue;
+ }
+ if (buffer.readableBytes() < dataLen + 2) {
+ return null;
+ }
+ buffer.skipBytes(2);
+
+ if (stream == null) {
+ final FileStore store =
storageManager.createFileStore("temp-stream"); //$NON-NLS-1$
+ StreamFactoryReference sfr = streams.get(streamIndex);
+ store.setCleanupReference(sfr);
+ sfr.setStreamFactory(new InputStreamFactory(Streamable.ENCODING) {
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return new BufferedInputStream(store.createInputStream(0));
+ }
+ });
+ this.stream = new BufferedOutputStream(store.createOutputStream());
+ }
+ buffer.readBytes(this.stream, dataLen);
+ }
+ Object toReturn = result;
+ result = null;
+ return toReturn;
+ }
+}
Property changes on: trunk/runtime/src/main/java/org/teiid/transport/ObjectDecoder.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: trunk/runtime/src/main/java/org/teiid/transport/SSLAwareChannelHandler.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/transport/SSLAwareChannelHandler.java 2010-03-30
21:24:33 UTC (rev 2010)
+++ trunk/runtime/src/main/java/org/teiid/transport/SSLAwareChannelHandler.java 2010-03-31
03:11:58 UTC (rev 2011)
@@ -27,9 +27,8 @@
import java.io.IOException;
import java.net.SocketAddress;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -50,12 +49,12 @@
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
import org.jboss.netty.handler.ssl.SslHandler;
import org.teiid.net.NetPlugin;
import org.teiid.net.socket.ObjectChannel;
+import com.metamatrix.common.buffer.StorageManager;
import com.metamatrix.common.log.LogConstants;
import com.metamatrix.common.log.LogManager;
@@ -140,7 +139,8 @@
private final ChannelListener.ChannelListenerFactory listenerFactory;
private final SSLConfiguration config;
private final ClassLoader classLoader;
- private Map<Channel, ChannelListener> listeners = Collections.synchronizedMap(new
HashMap<Channel, ChannelListener>());
+ private final StorageManager storageManager;
+ private Map<Channel, ChannelListener> listeners = new
ConcurrentHashMap<Channel, ChannelListener>();
private AtomicLong objectsRead = new AtomicLong(0);
private AtomicLong objectsWritten = new AtomicLong(0);
private volatile int maxChannels;
@@ -158,20 +158,19 @@
};
public SSLAwareChannelHandler(ChannelListener.ChannelListenerFactory listenerFactory,
- SSLConfiguration config, ClassLoader classloader) {
+ SSLConfiguration config, ClassLoader classloader, StorageManager storageManager) {
this.listenerFactory = listenerFactory;
this.config = config;
this.classLoader = classloader;
+ this.storageManager = storageManager;
}
@Override
public void channelConnected(ChannelHandlerContext ctx,
final ChannelStateEvent e) throws Exception {
ChannelListener listener = this.listenerFactory.createChannelListener(new
ObjectChannelImpl(e.getChannel()));
- synchronized (this.listeners) {
- this.listeners.put(e.getChannel(), listener);
- maxChannels = Math.max(maxChannels, this.listeners.size());
- }
+ this.listeners.put(e.getChannel(), listener);
+ maxChannels = Math.max(maxChannels, this.listeners.size());
SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
if (sslHandler != null) {
sslHandler.handshake(e.getChannel()).addListener(new ChannelFutureListener() {
@@ -228,7 +227,7 @@
if (engine != null) {
pipeline.addLast("ssl", new SslHandler(engine)); //$NON-NLS-1$
}
- pipeline.addLast("decoder", new ObjectDecoder(1 << 24,
classLoader)); //$NON-NLS-1$
+ pipeline.addLast("decoder", new ObjectDecoder(1 << 20, classLoader,
storageManager)); //$NON-NLS-1$
pipeline.addLast("encoder", new ObjectEncoder()); //$NON-NLS-1$
pipeline.addLast("handler", this); //$NON-NLS-1$
return pipeline;
Modified: trunk/runtime/src/main/java/org/teiid/transport/SocketListener.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/transport/SocketListener.java 2010-03-30
21:24:33 UTC (rev 2010)
+++ trunk/runtime/src/main/java/org/teiid/transport/SocketListener.java 2010-03-31
03:11:58 UTC (rev 2011)
@@ -31,8 +31,10 @@
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.teiid.net.socket.ObjectChannel;
+import org.teiid.runtime.RuntimePlugin;
import org.teiid.transport.ChannelListener.ChannelListenerFactory;
+import com.metamatrix.common.buffer.StorageManager;
import com.metamatrix.common.log.LogConstants;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.util.ApplicationInfo;
@@ -49,6 +51,12 @@
private ExecutorService nettyPool;
private ClientServiceRegistryImpl csr;
+ public SocketListener(SocketConfiguration config, ClientServiceRegistryImpl csr,
StorageManager storageManager) {
+ this(config.getPortNumber(), config.getHostAddress().getHostAddress(),
config.getInputBufferSize(), config.getOutputBufferSize(), config.getMaxSocketThreads(),
config.getSSLConfiguration(), csr, storageManager);
+
+ LogManager.logDetail(LogConstants.CTX_TRANSPORT,
RuntimePlugin.Util.getString("SocketTransport.1", new Object[]
{config.getHostAddress().getHostAddress(), String.valueOf(config.getPortNumber())}));
//$NON-NLS-1$
+ }
+
/**
*
* @param port
@@ -59,7 +67,7 @@
* @param server
*/
public SocketListener(int port, String bindAddress, int inputBufferSize,
- int outputBufferSize, int maxWorkers, SSLConfiguration config,
ClientServiceRegistryImpl csr) {
+ int outputBufferSize, int maxWorkers, SSLConfiguration config,
ClientServiceRegistryImpl csr, StorageManager storageManager) {
this.isClientEncryptionEnabled = config.isClientEncryptionEnabled();
this.csr = csr;
if (port < 0 || port > 0xFFFF) {
@@ -74,7 +82,7 @@
ChannelFactory factory = new NioServerSocketChannelFactory(this.nettyPool,
this.nettyPool, Math.min(Runtime.getRuntime().availableProcessors(), maxWorkers));
ServerBootstrap bootstrap = new ServerBootstrap(factory);
- this.channelHandler = new SSLAwareChannelHandler(this, config,
Thread.currentThread().getContextClassLoader());
+ this.channelHandler = new SSLAwareChannelHandler(this, config,
Thread.currentThread().getContextClassLoader(), storageManager);
bootstrap.setPipelineFactory(channelHandler);
if (inputBufferSize != 0) {
bootstrap.setOption("receiveBufferSize", new
Integer(inputBufferSize)); //$NON-NLS-1$
Deleted: trunk/runtime/src/main/java/org/teiid/transport/SocketTransport.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/transport/SocketTransport.java 2010-03-30
21:24:33 UTC (rev 2010)
+++ trunk/runtime/src/main/java/org/teiid/transport/SocketTransport.java 2010-03-31
03:11:58 UTC (rev 2011)
@@ -1,59 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-package org.teiid.transport;
-
-import org.teiid.runtime.RuntimePlugin;
-
-import com.metamatrix.common.log.LogConstants;
-import com.metamatrix.common.log.LogManager;
-
-/**
- * This class starts a Socket for DQP connections and listens on the port and hands out
the connections to the
- * users
- */
-public class SocketTransport {
-
- private SocketListener listener;
- private SocketConfiguration config;
- private ClientServiceRegistryImpl csr;
-
- public SocketTransport(SocketConfiguration config, ClientServiceRegistryImpl csr) {
- this.config = config;
- this.csr = csr;
- }
-
- public void start() {
- String bindAddress = this.config.getHostAddress().getHostAddress();
-
- LogManager.logDetail(LogConstants.CTX_TRANSPORT,
RuntimePlugin.Util.getString("SocketTransport.1", new Object[] {bindAddress,
String.valueOf(this.config.getPortNumber())})); //$NON-NLS-1$
- this.listener = new SocketListener(this.config.getPortNumber(), bindAddress,
this.config.getInputBufferSize(), this.config.getOutputBufferSize(),
this.config.getMaxSocketThreads(), this.config.getSSLConfiguration(), csr);
- }
-
- public void stop() {
- this.listener.stop();
- }
-
- public SocketListenerStats getStats() {
- return this.listener.getStats();
- }
-
-}
Modified: trunk/runtime/src/test/java/org/teiid/transport/TestCommSockets.java
===================================================================
--- trunk/runtime/src/test/java/org/teiid/transport/TestCommSockets.java 2010-03-30
21:24:33 UTC (rev 2010)
+++ trunk/runtime/src/test/java/org/teiid/transport/TestCommSockets.java 2010-03-31
03:11:58 UTC (rev 2011)
@@ -25,6 +25,8 @@
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
+import java.io.ByteArrayInputStream;
+import java.io.StringReader;
import java.net.InetSocketAddress;
import java.util.Properties;
@@ -42,8 +44,10 @@
import org.teiid.net.socket.SocketServerConnectionFactory;
import org.teiid.net.socket.SocketUtil;
import org.teiid.net.socket.UrlServerDiscovery;
+import org.teiid.transport.TestSocketRemoting.FakeService;
import com.metamatrix.api.exception.ComponentNotFoundException;
+import com.metamatrix.common.buffer.BufferManagerFactory;
import com.metamatrix.common.util.crypto.NullCryptor;
import com.metamatrix.dqp.service.SessionService;
@@ -65,7 +69,7 @@
@Test public void testFailedConnect() throws Exception {
SSLConfiguration config = new SSLConfiguration();
- listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(),1024,
1024, 1, config, null);
+ listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(),1024,
1024, 1, config, null, BufferManagerFactory.getStandaloneBufferManager());
try {
Properties p = new Properties();
@@ -116,6 +120,11 @@
assertEquals(1, stats.maxSockets);
}
+ @Test public void testLobs() throws Exception {
+ SocketServerConnection conn = helpEstablishConnection(false);
+ FakeService fs = conn.getService(FakeService.class);
+ assertEquals(150, fs.lobMethod(new ByteArrayInputStream(new byte[100]), new
StringReader(new String(new char[50]))));
+ }
@Test public void testConnectWithoutClientEncryption() throws Exception {
SSLConfiguration config = new SSLConfiguration();
@@ -141,7 +150,8 @@
}
}, null);
- listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(),
1024, 1024, 1, config, server);
+ server.registerClientService(FakeService.class, new
TestSocketRemoting.FakeServiceImpl(), null);
+ listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(),
1024, 1024, 1, config, server, BufferManagerFactory.getStandaloneBufferManager());
SocketListenerStats stats = listener.getStats();
assertEquals(0, stats.maxSockets);
Modified: trunk/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java
===================================================================
--- trunk/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java 2010-03-30
21:24:33 UTC (rev 2010)
+++ trunk/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java 2010-03-31
03:11:58 UTC (rev 2011)
@@ -25,6 +25,8 @@
import static org.junit.Assert.*;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
import java.io.Serializable;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
@@ -56,6 +58,7 @@
import com.metamatrix.api.exception.MetaMatrixProcessingException;
import com.metamatrix.common.util.crypto.Cryptor;
import com.metamatrix.common.util.crypto.NullCryptor;
+import com.metamatrix.core.util.ObjectConverterUtil;
public class TestSocketRemoting {
@@ -65,9 +68,11 @@
String exceptionMethod() throws MetaMatrixProcessingException;
+ int lobMethod(InputStream is, Reader r) throws IOException;
+
}
- private static class FakeServiceImpl implements FakeService {
+ static class FakeServiceImpl implements FakeService {
public ResultsFuture<Integer> asynchResult() {
ResultsFuture<Integer> result = new ResultsFuture<Integer>();
@@ -79,6 +84,11 @@
throw new MetaMatrixProcessingException();
}
+ @Override
+ public int lobMethod(InputStream is, Reader r) throws IOException {
+ return ObjectConverterUtil.convertToByteArray(is).length +
ObjectConverterUtil.convertToString(r).length();
+ }
+
}
private static class FakeClientServerInstance extends SocketServerInstanceImpl
implements ClientInstance {