[teiid-commits] teiid SVN: r1732 - in trunk: client/src/main/java/org/teiid/netty/handler/codec/serialization and 13 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Jan 13 01:03:54 EST 2010


Author: shawkins
Date: 2010-01-13 01:03:53 -0500 (Wed, 13 Jan 2010)
New Revision: 1732

Added:
   trunk/client/src/test/java/org/
   trunk/client/src/test/java/org/teiid/
   trunk/client/src/test/java/org/teiid/netty/
   trunk/client/src/test/java/org/teiid/netty/handler/
   trunk/client/src/test/java/org/teiid/netty/handler/codec/
   trunk/client/src/test/java/org/teiid/netty/handler/codec/serialization/
   trunk/client/src/test/java/org/teiid/netty/handler/codec/serialization/TestObjectDecoderInputStream.java
   trunk/common-core/src/main/java/com/metamatrix/core/util/AccessibleByteArrayOutputStream.java
Modified:
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java
   trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectDecoderInputStream.java
   trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectEncoderOutputStream.java
   trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerInstanceImpl.java
   trunk/common-core/src/main/java/com/metamatrix/common/types/BlobType.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
   trunk/runtime/src/main/java/com/metamatrix/platform/security/session/service/SessionServiceImpl.java
Log:
TEIID-916 TEIID-925 fix for stream corruption during a timeout.  Also further refining the dup remove strategy for performance.

Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java	2010-01-12 21:02:10 UTC (rev 1731)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java	2010-01-13 06:03:53 UTC (rev 1732)
@@ -58,7 +58,6 @@
 		private final Socket socket;
 		private ObjectOutputStream outputStream;
 		private ObjectInputStream inputStream;
-		private Object readLock = new Object();
 
 		private OioObjectChannel(Socket socket) throws IOException {
 			log.fine("creating new OioObjectChannel"); //$NON-NLS-1$
@@ -119,15 +118,13 @@
 		//## JDBC4.0-end ##
 		public Object read() throws IOException, ClassNotFoundException {
 			log.finer("reading message from socket"); //$NON-NLS-1$
-			synchronized (readLock) {
-				try {
-					return inputStream.readObject();
-				} catch (SocketTimeoutException e) {
-					throw e;
-		        } catch (IOException e) {
-		            close();
-		            throw e;
-		        }
+			try {
+				return inputStream.readObject();
+			} catch (SocketTimeoutException e) {
+				throw e;
+	        } catch (IOException e) {
+	            close();
+	            throw e;
 			}
 		}
 

Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java	2010-01-12 21:02:10 UTC (rev 1731)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java	2010-01-13 06:03:53 UTC (rev 1732)
@@ -41,6 +41,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -83,6 +84,8 @@
     
     private Map<Serializable, ResultsReceiver<Object>> asynchronousListeners = new ConcurrentHashMap<Serializable, ResultsReceiver<Object>>();
     
+    private ReentrantLock readLock = new ReentrantLock();
+    
     public SocketServerInstanceImpl() {
     	
     }
@@ -173,7 +176,7 @@
         return socketChannel.isOpen();
     }
 
-    public void send(Message message, ResultsReceiver<Object> listener, Serializable messageKey)
+    protected void send(Message message, ResultsReceiver<Object> listener, Serializable messageKey)
         throws CommunicationException, InterruptedException {
 	    if (listener != null) {
 	        asynchronousListeners.put(messageKey, listener);
@@ -197,7 +200,7 @@
      * Send an exception to all clients that are currently waiting for a
      * response.
      */
-	public void exceptionOccurred(Throwable e) {
+	private void exceptionOccurred(Throwable e) {
     	if (e instanceof CommunicationException) {
 	        if (e.getCause() instanceof InvalidClassException) {
 	            log.log(Level.SEVERE, "Unknown class or incorrect class version:", e); //$NON-NLS-1$ 
@@ -222,7 +225,7 @@
 		}
     }
 
-	public void receivedMessage(Object packet) {
+	private void receivedMessage(Object packet) {
 		log.log(Level.FINE, "reading packet"); //$NON-NLS-1$ 
         if (packet instanceof Message) {
         	Message messagePacket = (Message)packet;
@@ -313,25 +316,31 @@
 					public Object get(long timeout, TimeUnit unit)
 							throws InterruptedException, ExecutionException,
 							TimeoutException {
-						int timeoutMillis = (int)Math.min(unit.toMillis(timeout), Integer.MAX_VALUE);
-						synchronized (SocketServerInstanceImpl.this) {
-							while (!isDone()) {
-								if (timeoutMillis <= 0) {
-									throw new TimeoutException();
-								}
-								long start = System.currentTimeMillis();
-								try {
+						long timeoutMillis = (int)Math.min(unit.toMillis(timeout), Integer.MAX_VALUE);
+						long start = System.currentTimeMillis();
+						boolean reading = false;
+						while (!isDone()) {
+							try {
+								if ((reading = readLock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS)) == true && !isDone()) {
 									receivedMessage(socketChannel.read());
-								} catch (SocketTimeoutException e) {
-								} catch (IOException e) {
-									exceptionOccurred(e);
-								} catch (ClassNotFoundException e) {
-									exceptionOccurred(e);
 								}
-								if (!isDone()) {
-									timeoutMillis -= (System.currentTimeMillis() - start);
+							} catch (SocketTimeoutException e) {
+								System.out.println("here");
+							} catch (Exception e) {
+								exceptionOccurred(e);
+							} finally {
+								if (reading) {
+									readLock.unlock();
 								}
 							}
+							if (!isDone()) {
+								long now = System.currentTimeMillis();
+								timeoutMillis -= now - start;
+								start = now;
+								if (timeoutMillis <= 0) {
+									throw new TimeoutException();
+								}
+							}
 						}
 						return super.get(timeout, unit);
 					}

Modified: trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectDecoderInputStream.java
===================================================================
--- trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectDecoderInputStream.java	2010-01-12 21:02:10 UTC (rev 1731)
+++ trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectDecoderInputStream.java	2010-01-13 06:03:53 UTC (rev 1732)
@@ -22,8 +22,10 @@
  */
 package org.teiid.netty.handler.codec.serialization;
 
-import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.ObjectInput;
 import java.io.ObjectInputStream;
 import java.io.StreamCorruptedException;
@@ -40,11 +42,15 @@
  */
 public class ObjectDecoderInputStream extends ObjectInputStream {
 
-    private final DataInputStream in;
+    private final InputStream in;
     private final ClassLoader classLoader;
     private final int maxObjectSize;
+    
+    private boolean foundLength;
+    private byte[] buffer;
+    private int count;
 
-    public ObjectDecoderInputStream(DataInputStream in, ClassLoader classLoader, int maxObjectSize) throws SecurityException, IOException {
+    public ObjectDecoderInputStream(InputStream in, ClassLoader classLoader, int maxObjectSize) throws SecurityException, IOException {
     	super();
     	this.in = in;
         this.classLoader = classLoader;
@@ -54,17 +60,43 @@
     @Override
     protected final Object readObjectOverride() throws IOException,
     		ClassNotFoundException {
-        int dataLen = in.readInt();
-        if (dataLen <= 0) {
-            throw new StreamCorruptedException("invalid data length: " + dataLen); //$NON-NLS-1$
+        if (!foundLength) {
+        	if (buffer == null) {
+        		buffer = new byte[4];
+        	}
+            fillBuffer();
+    		int dataLen = ((buffer[0] & 0xff << 24) + (buffer[1] & 0xff << 16) + (buffer[2] & 0xff << 8) + (buffer[3] & 0xff << 0));
+	        if (dataLen <= 0) {
+	            throw new StreamCorruptedException("invalid data length: " + dataLen); //$NON-NLS-1$
+	        }
+	        if (dataLen > maxObjectSize) {
+	            throw new StreamCorruptedException(
+	                    "data length too big: " + dataLen + " (max: " + maxObjectSize + ')'); //$NON-NLS-1$ //$NON-NLS-2$
+	        }
+        	//check if the underlying buffer can be used
+	        if (in.available() >= dataLen) { 
+	        	return new CompactObjectInputStream(in, classLoader).readObject();
+	        }
+	        buffer = new byte[dataLen];
+	        foundLength = true;
         }
-        if (dataLen > maxObjectSize) {
-            throw new StreamCorruptedException(
-                    "data length too big: " + dataLen + " (max: " + maxObjectSize + ')'); //$NON-NLS-1$ //$NON-NLS-2$
+        fillBuffer();
+        foundLength = false;
+        ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
+        buffer = null;
+        return new CompactObjectInputStream(bais, classLoader).readObject();
+    }
+
+	private void fillBuffer() throws IOException, EOFException {
+		while (count < buffer.length) {
+	        int read = in.read(buffer, count, buffer.length - count);
+	        if (read == -1) {
+	        	throw new EOFException();
+	        }
+	        count += read;
         }
-
-        return new CompactObjectInputStream(in, classLoader).readObject();
-    }
+        count = 0;
+	}
     
     @Override
     public void close() throws IOException {

Modified: trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectEncoderOutputStream.java
===================================================================
--- trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectEncoderOutputStream.java	2010-01-12 21:02:10 UTC (rev 1731)
+++ trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectEncoderOutputStream.java	2010-01-13 06:03:53 UTC (rev 1732)
@@ -22,12 +22,13 @@
  */
 package org.teiid.netty.handler.codec.serialization;
 
-import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.ObjectOutput;
 import java.io.ObjectOutputStream;
 
+import com.metamatrix.core.util.AccessibleByteArrayOutputStream;
+
 /**
  * An {@link ObjectOutput} which is interoperable with {@link ObjectDecoder}
  * and {@link ObjectDecoderInputStream}.
@@ -42,7 +43,7 @@
 
     private final DataOutputStream out;
     private final int estimatedLength;
-
+    
     public ObjectEncoderOutputStream(DataOutputStream out, int estimatedLength) throws SecurityException, IOException {
     	super();
     	this.out = out;
@@ -51,14 +52,14 @@
     
     @Override
     final protected void writeObjectOverride(Object obj) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream(estimatedLength);
+        AccessibleByteArrayOutputStream baos = new AccessibleByteArrayOutputStream(estimatedLength);
         ObjectOutputStream oout = new CompactObjectOutputStream(baos);
         oout.writeObject(obj);
         oout.flush();
         oout.close();
 
-        out.writeInt(baos.size());
-        out.write(baos.toByteArray());
+        out.writeInt(baos.getCount());
+        out.write(baos.getBuffer(), 0, baos.getCount());
     }
     
     @Override

Modified: trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerInstanceImpl.java
===================================================================
--- trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerInstanceImpl.java	2010-01-12 21:02:10 UTC (rev 1731)
+++ trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerInstanceImpl.java	2010-01-13 06:03:53 UTC (rev 1732)
@@ -22,6 +22,8 @@
 
 package com.metamatrix.common.comm.platform.socket.client;
 
+import static org.junit.Assert.*;
+
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.net.SocketTimeoutException;
@@ -32,7 +34,7 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 
-import junit.framework.TestCase;
+import org.junit.Test;
 
 import com.metamatrix.api.exception.MetaMatrixComponentException;
 import com.metamatrix.common.api.HostInfo;
@@ -42,7 +44,7 @@
 import com.metamatrix.dqp.client.ResultsFuture;
 import com.metamatrix.platform.security.api.ILogon;
 
-public class TestSocketServerInstanceImpl extends TestCase {
+public class TestSocketServerInstanceImpl {
 	
 	private static class FakeObjectChannel implements ObjectChannel, ObjectChannelFactory {
 		List<Object> msgs = new ArrayList<Object>();
@@ -119,7 +121,7 @@
 		
 	}
 
-	public void testHandshakeTimeout() throws Exception {
+	@Test public void testHandshakeTimeout() throws Exception {
 		SocketTimeoutException[] exs = new SocketTimeoutException[SocketServerInstanceImpl.HANDSHAKE_RETRIES];
 		Arrays.fill(exs, new SocketTimeoutException());
 		final FakeObjectChannel channel = new FakeObjectChannel(Arrays.asList(exs));
@@ -139,7 +141,7 @@
 		return ssii;
 	}
 	
-	public void testSuccessfulHandshake() throws Exception {
+	@Test public void testSuccessfulHandshake() throws Exception {
 		final FakeObjectChannel channel = new FakeObjectChannel(Arrays.asList(new Handshake(), new SocketTimeoutException()));
 		
 		SocketServerInstanceImpl instance = createInstance(channel);
@@ -154,7 +156,7 @@
 		}
 	}
 	
-	public void testVersionMismatch() throws Exception {
+	@Test public void testVersionMismatch() throws Exception {
 		Handshake h = new Handshake();
 		h.setVersion("foo"); //$NON-NLS-1$
 		final FakeObjectChannel channel = new FakeObjectChannel(Arrays.asList(h));

Added: trunk/client/src/test/java/org/teiid/netty/handler/codec/serialization/TestObjectDecoderInputStream.java
===================================================================
--- trunk/client/src/test/java/org/teiid/netty/handler/codec/serialization/TestObjectDecoderInputStream.java	                        (rev 0)
+++ trunk/client/src/test/java/org/teiid/netty/handler/codec/serialization/TestObjectDecoderInputStream.java	2010-01-13 06:03:53 UTC (rev 1732)
@@ -0,0 +1,70 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.netty.handler.codec.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.SocketTimeoutException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestObjectDecoderInputStream {
+
+	@Test public void testTimeoutException() throws Exception {
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		ObjectEncoderOutputStream oeos = new ObjectEncoderOutputStream(new DataOutputStream(baos), 512);
+		List<Integer> obj = Arrays.asList(1, 2, 3);
+		oeos.writeObject(obj);
+		oeos.close();
+		final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+		InputStream is = new InputStream() {
+			int count;
+			@Override
+			public int read() throws IOException {
+				if (count++%2==0) { 
+					throw new SocketTimeoutException();
+				}
+				return bais.read();
+			}
+		};
+		ObjectDecoderInputStream odis = new ObjectDecoderInputStream(new DataInputStream(is), Thread.currentThread().getContextClassLoader(), 1024);
+		Object result = null;
+		do {
+			try {
+				result = odis.readObject();
+			} catch (IOException e) {
+				
+			}
+		} while (result == null);
+		assertEquals(obj, result);
+	}
+	
+}


Property changes on: trunk/client/src/test/java/org/teiid/netty/handler/codec/serialization/TestObjectDecoderInputStream.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/BlobType.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/BlobType.java	2010-01-12 21:02:10 UTC (rev 1731)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/BlobType.java	2010-01-13 06:03:53 UTC (rev 1732)
@@ -22,8 +22,6 @@
 
 package com.metamatrix.common.types;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.sql.Blob;
@@ -130,24 +128,6 @@
         this.reference.truncate(len);
     }
     
-    /**
-     * Utility Method to convert blob into byte array  
-     * @param blob
-     * @return byte array
-     */
-    public static byte[] getByteArray(Blob blob) throws SQLException, IOException {
-        InputStream reader = blob.getBinaryStream();
-        ByteArrayOutputStream writer = new ByteArrayOutputStream();
-        int c = reader.read();
-        while (c != -1) {
-            writer.write((byte)c);
-            c = reader.read();
-        }
-        reader.close();
-        byte[] data = writer.toByteArray();
-        writer.close();
-        return data;        
-    }
     //## JDBC4.0-begin ##
 	public void free() throws SQLException {
 		this.reference.free();

Added: trunk/common-core/src/main/java/com/metamatrix/core/util/AccessibleByteArrayOutputStream.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/core/util/AccessibleByteArrayOutputStream.java	                        (rev 0)
+++ trunk/common-core/src/main/java/com/metamatrix/core/util/AccessibleByteArrayOutputStream.java	2010-01-13 06:03:53 UTC (rev 1732)
@@ -0,0 +1,45 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.core.util;
+
+import java.io.ByteArrayOutputStream;
+
+public class AccessibleByteArrayOutputStream extends ByteArrayOutputStream {
+
+	public AccessibleByteArrayOutputStream() {
+		super();
+	}
+	
+	public AccessibleByteArrayOutputStream(int size) {
+		super(size);
+	}
+	
+	public byte[] getBuffer() {
+		return this.buf;
+	}
+	
+	public int getCount() {
+		return this.count;
+	}
+	
+}
\ No newline at end of file


Property changes on: trunk/common-core/src/main/java/com/metamatrix/core/util/AccessibleByteArrayOutputStream.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java	2010-01-12 21:02:10 UTC (rev 1731)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java	2010-01-13 06:03:53 UTC (rev 1732)
@@ -23,7 +23,6 @@
 package com.metamatrix.common.buffer;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -40,6 +39,7 @@
 import com.metamatrix.api.exception.MetaMatrixComponentException;
 import com.metamatrix.common.types.DataTypeManager;
 import com.metamatrix.common.types.Streamable;
+import com.metamatrix.core.util.AccessibleByteArrayOutputStream;
 import com.metamatrix.core.util.Assertion;
 import com.metamatrix.dqp.DQPPlugin;
 import com.metamatrix.query.execution.QueryExecPlugin;
@@ -228,40 +228,22 @@
 			this.store = this.manager.createFileStore(this.tupleSourceID);
 			this.store.setCleanupReference(this);
 		}
-		byte[] bytes = convertToBytes(writeBatch);
-		mbatch.setLength(bytes.length);
-		mbatch.setOffset(this.store.write(bytes));
-		this.batches.put(mbatch.getBeginRow(), mbatch);
-        batchBuffer = null;
-	}
-	
-    /**
-     * Convert from an object to a byte array
-     * @param object Object to convert
-     * @return Byte array
-     */
-    private byte[] convertToBytes(TupleBatch batch) throws MetaMatrixComponentException {
-        ObjectOutputStream oos = null;
+		AccessibleByteArrayOutputStream baos = null;
         try {
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            oos = new ObjectOutputStream(baos);
-
-            batch.setDataTypes(types);
-            batch.writeExternal(oos);
+            baos = new AccessibleByteArrayOutputStream(1024);
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            writeBatch.setDataTypes(types);
+            writeBatch.writeExternal(oos);
             oos.flush();
-            return baos.toByteArray();
-
+            oos.close();
         } catch(IOException e) {
         	throw new MetaMatrixComponentException(e, QueryExecPlugin.Util.getString("FileStorageManager.batch_error")); //$NON-NLS-1$
-        } finally {
-            if(oos != null) {
-                try {
-                    oos.close();
-                } catch(IOException e) {
-                }
-            }
         }
-    }
+        mbatch.setLength(baos.getCount());
+		mbatch.setOffset(this.store.write(baos.getBuffer(), 0, baos.getCount()));
+		this.batches.put(mbatch.getBeginRow(), mbatch);
+        batchBuffer = null;
+	}
 	
 	public void close() throws MetaMatrixComponentException {
 		//if there is only a single batch, let it stay in memory

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java	2010-01-12 21:02:10 UTC (rev 1731)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java	2010-01-13 06:03:53 UTC (rev 1732)
@@ -51,7 +51,7 @@
 	 * for buffermanager reserve/release of memory 
 	 * (would also help the sort utility)
 	 */
-	public static final int MAX_PARTITIONS = 8; 
+	public static final int MAX_PARTITIONS = 16; 
 	
 	private List[] endTuples;
 	private List<Boolean> overlap = new ArrayList<Boolean>();

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java	2010-01-12 21:02:10 UTC (rev 1731)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java	2010-01-13 06:03:53 UTC (rev 1732)
@@ -23,9 +23,11 @@
 package com.metamatrix.query.processor.relational;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.ListIterator;
+import java.util.TreeSet;
 
 import com.metamatrix.api.exception.MetaMatrixComponentException;
 import com.metamatrix.api.exception.MetaMatrixProcessingException;
@@ -69,7 +71,8 @@
 		
 		@Override
 		public int compareTo(SortedSublist o) {
-			return comparator.compare(this.tuple, o.tuple);
+			//reverse the comparison, so that removal of the lowest is a low cost operation
+			return -comparator.compare(this.tuple, o.tuple);
 		}
 		
 		@Override
@@ -92,12 +95,13 @@
     private List<TupleBuffer> activeTupleBuffers = new ArrayList<TupleBuffer>();
     private int masterSortIndex;
     
-    private int initialSortPass = 1;     //used to track the number of times through the initial sort method
+    private int dupRemoveSublists = 1;     //used to control the number of sublists needed for dup remove
 
     // Phase constants for readability
     private static final int INITIAL_SORT = 1;
     private static final int MERGE = 2;
     private static final int DONE = 3;
+	private Collection<List<?>> workingTuples;
     
     public SortUtility(TupleSource sourceID, List sortElements, List<Boolean> sortTypes, Mode mode, BufferManager bufferMgr,
                         String groupName) {
@@ -162,7 +166,14 @@
 	 */
     protected void initialSort() throws MetaMatrixComponentException, MetaMatrixProcessingException {
     	while(!doneReading) {
-            List<List<?>> workingTuples = new ArrayList<List<?>>();
+    		if (workingTuples == null) {
+	            if (mode == Mode.SORT) {
+	            	workingTuples = new ArrayList<List<?>>();
+	            } else {
+	            	workingTuples = new TreeSet<List<?>>(comparator);
+	            }
+    		}
+            
             int maxRows = bufferManager.getMaxProcessingBatches() * bufferManager.getProcessorBatchSize();
 	        while(!doneReading && workingTuples.size() < maxRows) {
 	            try {
@@ -173,9 +184,10 @@
 	            		break;
 	            	}
 	            	
-                    addTuple(workingTuples, tuple);
+                    workingTuples.add(tuple);
 	            } catch(BlockedException e) {
-	            	if (workingTuples.isEmpty() && (mode != Mode.DUP_REMOVE || activeTupleBuffers.size() < initialSortPass)) {
+	            	if ((workingTuples.size() < maxRows/2 && mode != Mode.DUP_REMOVE) 
+	            			|| (workingTuples.size() < (dupRemoveSublists/4)*bufferManager.getProcessorBatchSize() && activeTupleBuffers.size() < dupRemoveSublists)) {
             			throw e; //block if no work can be performed
 	            	}
 	            	break;
@@ -190,33 +202,22 @@
 	        activeTupleBuffers.add(sublist);
 	        if (this.mode == Mode.SORT) {
 	        	//perform a stable sort
-	    		Collections.sort(workingTuples, comparator);
+	    		Collections.sort((List<List<?>>)workingTuples, comparator);
 	        }
 	        for (List<?> list : workingTuples) {
 				sublist.addTuple(list);
 			}
+	        workingTuples = null;
 	        sublist.saveBatch();
         }
     	
     	if (this.activeTupleBuffers.isEmpty()) {
             activeTupleBuffers.add(createTupleBuffer());
         }  
-    	this.initialSortPass = Math.min(initialSortPass + 1, bufferManager.getMaxProcessingBatches() * 2);
+    	this.dupRemoveSublists = Math.min(dupRemoveSublists * 2, bufferManager.getMaxProcessingBatches() * 2);
         this.phase = MERGE;
     }
 
-	protected void addTuple(List<List<?>> workingTuples, List<?> tuple) {
-		if (this.mode == Mode.SORT) {
-			workingTuples.add(tuple);
-			return;
-		}
-		int index = Collections.binarySearch(workingTuples, tuple, comparator);
-		if (index >= 0) {
-			return; //it's already there
-		}
-		workingTuples.add(-index - 1, tuple);
-	}
-		
     protected void mergePhase() throws MetaMatrixComponentException, MetaMatrixProcessingException {
     	while(this.activeTupleBuffers.size() > 1) {    		
     		ArrayList<SortedSublist> sublists = new ArrayList<SortedSublist>(activeTupleBuffers.size());
@@ -227,7 +228,6 @@
         	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
             	LogManager.logTrace(LogConstants.CTX_DQP, "Merging", maxSortIndex, "sublists out of", activeTupleBuffers.size()); //$NON-NLS-1$ //$NON-NLS-2$
             }
-        	
         	// initialize the sublists with the min value
             for(int i = 0; i<maxSortIndex; i++) { 
              	TupleBuffer activeID = activeTupleBuffers.get(i);
@@ -242,7 +242,7 @@
             
             // iteratively process the lowest tuple
             while (sublists.size() > 0) {
-            	SortedSublist sortedSublist = sublists.remove(0);
+            	SortedSublist sortedSublist = sublists.remove(sublists.size() - 1);
         		merged.addTuple(sortedSublist.tuple);
                 if (this.output != null && sortedSublist.index > masterSortIndex) {
                 	this.output.addTuple(sortedSublist.tuple); //a new distinct row

Modified: trunk/runtime/src/main/java/com/metamatrix/platform/security/session/service/SessionServiceImpl.java
===================================================================
--- trunk/runtime/src/main/java/com/metamatrix/platform/security/session/service/SessionServiceImpl.java	2010-01-12 21:02:10 UTC (rev 1731)
+++ trunk/runtime/src/main/java/com/metamatrix/platform/security/session/service/SessionServiceImpl.java	2010-01-13 06:03:53 UTC (rev 1732)
@@ -286,6 +286,9 @@
 
 	private MetaMatrixSessionInfo getSessionInfo(MetaMatrixSessionID sessionID)
 			throws InvalidSessionException {
+		if (sessionID == null) {
+			throw new InvalidSessionException(DQPEmbeddedPlugin.Util.getString("SessionServiceImpl.invalid_session", new Object[] {null})); //$NON-NLS-1$
+		}
 		MetaMatrixSessionInfo info = this.sessionCache.get(sessionID);
 		if (info == null) {
 			throw new InvalidSessionException(DQPEmbeddedPlugin.Util.getString("SessionServiceImpl.invalid_session", sessionID)); //$NON-NLS-1$



More information about the teiid-commits mailing list