[teiid-commits] teiid SVN: r1732 - in trunk: client/src/main/java/org/teiid/netty/handler/codec/serialization and 13 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Wed Jan 13 01:03:54 EST 2010
Author: shawkins
Date: 2010-01-13 01:03:53 -0500 (Wed, 13 Jan 2010)
New Revision: 1732
Added:
trunk/client/src/test/java/org/
trunk/client/src/test/java/org/teiid/
trunk/client/src/test/java/org/teiid/netty/
trunk/client/src/test/java/org/teiid/netty/handler/
trunk/client/src/test/java/org/teiid/netty/handler/codec/
trunk/client/src/test/java/org/teiid/netty/handler/codec/serialization/
trunk/client/src/test/java/org/teiid/netty/handler/codec/serialization/TestObjectDecoderInputStream.java
trunk/common-core/src/main/java/com/metamatrix/core/util/AccessibleByteArrayOutputStream.java
Modified:
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java
trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectDecoderInputStream.java
trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectEncoderOutputStream.java
trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerInstanceImpl.java
trunk/common-core/src/main/java/com/metamatrix/common/types/BlobType.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
trunk/runtime/src/main/java/com/metamatrix/platform/security/session/service/SessionServiceImpl.java
Log:
TEIID-916 TEIID-925 fix for stream corruption during a timeout. Also further refining the dup remove strategy for performance.
Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java 2010-01-12 21:02:10 UTC (rev 1731)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java 2010-01-13 06:03:53 UTC (rev 1732)
@@ -58,7 +58,6 @@
private final Socket socket;
private ObjectOutputStream outputStream;
private ObjectInputStream inputStream;
- private Object readLock = new Object();
private OioObjectChannel(Socket socket) throws IOException {
log.fine("creating new OioObjectChannel"); //$NON-NLS-1$
@@ -119,15 +118,13 @@
//## JDBC4.0-end ##
public Object read() throws IOException, ClassNotFoundException {
log.finer("reading message from socket"); //$NON-NLS-1$
- synchronized (readLock) {
- try {
- return inputStream.readObject();
- } catch (SocketTimeoutException e) {
- throw e;
- } catch (IOException e) {
- close();
- throw e;
- }
+ try {
+ return inputStream.readObject();
+ } catch (SocketTimeoutException e) {
+ throw e;
+ } catch (IOException e) {
+ close();
+ throw e;
}
}
Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java 2010-01-12 21:02:10 UTC (rev 1731)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java 2010-01-13 06:03:53 UTC (rev 1732)
@@ -41,6 +41,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -83,6 +84,8 @@
private Map<Serializable, ResultsReceiver<Object>> asynchronousListeners = new ConcurrentHashMap<Serializable, ResultsReceiver<Object>>();
+ private ReentrantLock readLock = new ReentrantLock();
+
public SocketServerInstanceImpl() {
}
@@ -173,7 +176,7 @@
return socketChannel.isOpen();
}
- public void send(Message message, ResultsReceiver<Object> listener, Serializable messageKey)
+ protected void send(Message message, ResultsReceiver<Object> listener, Serializable messageKey)
throws CommunicationException, InterruptedException {
if (listener != null) {
asynchronousListeners.put(messageKey, listener);
@@ -197,7 +200,7 @@
* Send an exception to all clients that are currently waiting for a
* response.
*/
- public void exceptionOccurred(Throwable e) {
+ private void exceptionOccurred(Throwable e) {
if (e instanceof CommunicationException) {
if (e.getCause() instanceof InvalidClassException) {
log.log(Level.SEVERE, "Unknown class or incorrect class version:", e); //$NON-NLS-1$
@@ -222,7 +225,7 @@
}
}
- public void receivedMessage(Object packet) {
+ private void receivedMessage(Object packet) {
log.log(Level.FINE, "reading packet"); //$NON-NLS-1$
if (packet instanceof Message) {
Message messagePacket = (Message)packet;
@@ -313,25 +316,31 @@
public Object get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException {
- int timeoutMillis = (int)Math.min(unit.toMillis(timeout), Integer.MAX_VALUE);
- synchronized (SocketServerInstanceImpl.this) {
- while (!isDone()) {
- if (timeoutMillis <= 0) {
- throw new TimeoutException();
- }
- long start = System.currentTimeMillis();
- try {
+ long timeoutMillis = (int)Math.min(unit.toMillis(timeout), Integer.MAX_VALUE);
+ long start = System.currentTimeMillis();
+ boolean reading = false;
+ while (!isDone()) {
+ try {
+ if ((reading = readLock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS)) == true && !isDone()) {
receivedMessage(socketChannel.read());
- } catch (SocketTimeoutException e) {
- } catch (IOException e) {
- exceptionOccurred(e);
- } catch (ClassNotFoundException e) {
- exceptionOccurred(e);
}
- if (!isDone()) {
- timeoutMillis -= (System.currentTimeMillis() - start);
+ } catch (SocketTimeoutException e) {
+ System.out.println("here");
+ } catch (Exception e) {
+ exceptionOccurred(e);
+ } finally {
+ if (reading) {
+ readLock.unlock();
}
}
+ if (!isDone()) {
+ long now = System.currentTimeMillis();
+ timeoutMillis -= now - start;
+ start = now;
+ if (timeoutMillis <= 0) {
+ throw new TimeoutException();
+ }
+ }
}
return super.get(timeout, unit);
}
Modified: trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectDecoderInputStream.java
===================================================================
--- trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectDecoderInputStream.java 2010-01-12 21:02:10 UTC (rev 1731)
+++ trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectDecoderInputStream.java 2010-01-13 06:03:53 UTC (rev 1732)
@@ -22,8 +22,10 @@
*/
package org.teiid.netty.handler.codec.serialization;
-import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
import java.io.IOException;
+import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.StreamCorruptedException;
@@ -40,11 +42,15 @@
*/
public class ObjectDecoderInputStream extends ObjectInputStream {
- private final DataInputStream in;
+ private final InputStream in;
private final ClassLoader classLoader;
private final int maxObjectSize;
+
+ private boolean foundLength;
+ private byte[] buffer;
+ private int count;
- public ObjectDecoderInputStream(DataInputStream in, ClassLoader classLoader, int maxObjectSize) throws SecurityException, IOException {
+ public ObjectDecoderInputStream(InputStream in, ClassLoader classLoader, int maxObjectSize) throws SecurityException, IOException {
super();
this.in = in;
this.classLoader = classLoader;
@@ -54,17 +60,43 @@
@Override
protected final Object readObjectOverride() throws IOException,
ClassNotFoundException {
- int dataLen = in.readInt();
- if (dataLen <= 0) {
- throw new StreamCorruptedException("invalid data length: " + dataLen); //$NON-NLS-1$
+ if (!foundLength) {
+ if (buffer == null) {
+ buffer = new byte[4];
+ }
+ fillBuffer();
+ int dataLen = ((buffer[0] & 0xff << 24) + (buffer[1] & 0xff << 16) + (buffer[2] & 0xff << 8) + (buffer[3] & 0xff << 0));
+ if (dataLen <= 0) {
+ throw new StreamCorruptedException("invalid data length: " + dataLen); //$NON-NLS-1$
+ }
+ if (dataLen > maxObjectSize) {
+ throw new StreamCorruptedException(
+ "data length too big: " + dataLen + " (max: " + maxObjectSize + ')'); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+ //check if the underlying buffer can be used
+ if (in.available() >= dataLen) {
+ return new CompactObjectInputStream(in, classLoader).readObject();
+ }
+ buffer = new byte[dataLen];
+ foundLength = true;
}
- if (dataLen > maxObjectSize) {
- throw new StreamCorruptedException(
- "data length too big: " + dataLen + " (max: " + maxObjectSize + ')'); //$NON-NLS-1$ //$NON-NLS-2$
+ fillBuffer();
+ foundLength = false;
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
+ buffer = null;
+ return new CompactObjectInputStream(bais, classLoader).readObject();
+ }
+
+ private void fillBuffer() throws IOException, EOFException {
+ while (count < buffer.length) {
+ int read = in.read(buffer, count, buffer.length - count);
+ if (read == -1) {
+ throw new EOFException();
+ }
+ count += read;
}
-
- return new CompactObjectInputStream(in, classLoader).readObject();
- }
+ count = 0;
+ }
@Override
public void close() throws IOException {
Modified: trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectEncoderOutputStream.java
===================================================================
--- trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectEncoderOutputStream.java 2010-01-12 21:02:10 UTC (rev 1731)
+++ trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectEncoderOutputStream.java 2010-01-13 06:03:53 UTC (rev 1732)
@@ -22,12 +22,13 @@
*/
package org.teiid.netty.handler.codec.serialization;
-import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
+import com.metamatrix.core.util.AccessibleByteArrayOutputStream;
+
/**
* An {@link ObjectOutput} which is interoperable with {@link ObjectDecoder}
* and {@link ObjectDecoderInputStream}.
@@ -42,7 +43,7 @@
private final DataOutputStream out;
private final int estimatedLength;
-
+
public ObjectEncoderOutputStream(DataOutputStream out, int estimatedLength) throws SecurityException, IOException {
super();
this.out = out;
@@ -51,14 +52,14 @@
@Override
final protected void writeObjectOverride(Object obj) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream(estimatedLength);
+ AccessibleByteArrayOutputStream baos = new AccessibleByteArrayOutputStream(estimatedLength);
ObjectOutputStream oout = new CompactObjectOutputStream(baos);
oout.writeObject(obj);
oout.flush();
oout.close();
- out.writeInt(baos.size());
- out.write(baos.toByteArray());
+ out.writeInt(baos.getCount());
+ out.write(baos.getBuffer(), 0, baos.getCount());
}
@Override
Modified: trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerInstanceImpl.java
===================================================================
--- trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerInstanceImpl.java 2010-01-12 21:02:10 UTC (rev 1731)
+++ trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerInstanceImpl.java 2010-01-13 06:03:53 UTC (rev 1732)
@@ -22,6 +22,8 @@
package com.metamatrix.common.comm.platform.socket.client;
+import static org.junit.Assert.*;
+
import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
@@ -32,7 +34,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
-import junit.framework.TestCase;
+import org.junit.Test;
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.common.api.HostInfo;
@@ -42,7 +44,7 @@
import com.metamatrix.dqp.client.ResultsFuture;
import com.metamatrix.platform.security.api.ILogon;
-public class TestSocketServerInstanceImpl extends TestCase {
+public class TestSocketServerInstanceImpl {
private static class FakeObjectChannel implements ObjectChannel, ObjectChannelFactory {
List<Object> msgs = new ArrayList<Object>();
@@ -119,7 +121,7 @@
}
- public void testHandshakeTimeout() throws Exception {
+ @Test public void testHandshakeTimeout() throws Exception {
SocketTimeoutException[] exs = new SocketTimeoutException[SocketServerInstanceImpl.HANDSHAKE_RETRIES];
Arrays.fill(exs, new SocketTimeoutException());
final FakeObjectChannel channel = new FakeObjectChannel(Arrays.asList(exs));
@@ -139,7 +141,7 @@
return ssii;
}
- public void testSuccessfulHandshake() throws Exception {
+ @Test public void testSuccessfulHandshake() throws Exception {
final FakeObjectChannel channel = new FakeObjectChannel(Arrays.asList(new Handshake(), new SocketTimeoutException()));
SocketServerInstanceImpl instance = createInstance(channel);
@@ -154,7 +156,7 @@
}
}
- public void testVersionMismatch() throws Exception {
+ @Test public void testVersionMismatch() throws Exception {
Handshake h = new Handshake();
h.setVersion("foo"); //$NON-NLS-1$
final FakeObjectChannel channel = new FakeObjectChannel(Arrays.asList(h));
Added: trunk/client/src/test/java/org/teiid/netty/handler/codec/serialization/TestObjectDecoderInputStream.java
===================================================================
--- trunk/client/src/test/java/org/teiid/netty/handler/codec/serialization/TestObjectDecoderInputStream.java (rev 0)
+++ trunk/client/src/test/java/org/teiid/netty/handler/codec/serialization/TestObjectDecoderInputStream.java 2010-01-13 06:03:53 UTC (rev 1732)
@@ -0,0 +1,70 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.netty.handler.codec.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.SocketTimeoutException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestObjectDecoderInputStream {
+
+ @Test public void testTimeoutException() throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectEncoderOutputStream oeos = new ObjectEncoderOutputStream(new DataOutputStream(baos), 512);
+ List<Integer> obj = Arrays.asList(1, 2, 3);
+ oeos.writeObject(obj);
+ oeos.close();
+ final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ InputStream is = new InputStream() {
+ int count;
+ @Override
+ public int read() throws IOException {
+ if (count++%2==0) {
+ throw new SocketTimeoutException();
+ }
+ return bais.read();
+ }
+ };
+ ObjectDecoderInputStream odis = new ObjectDecoderInputStream(new DataInputStream(is), Thread.currentThread().getContextClassLoader(), 1024);
+ Object result = null;
+ do {
+ try {
+ result = odis.readObject();
+ } catch (IOException e) {
+
+ }
+ } while (result == null);
+ assertEquals(obj, result);
+ }
+
+}
Property changes on: trunk/client/src/test/java/org/teiid/netty/handler/codec/serialization/TestObjectDecoderInputStream.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/BlobType.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/BlobType.java 2010-01-12 21:02:10 UTC (rev 1731)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/BlobType.java 2010-01-13 06:03:53 UTC (rev 1732)
@@ -22,8 +22,6 @@
package com.metamatrix.common.types;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Blob;
@@ -130,24 +128,6 @@
this.reference.truncate(len);
}
- /**
- * Utility Method to convert blob into byte array
- * @param blob
- * @return byte array
- */
- public static byte[] getByteArray(Blob blob) throws SQLException, IOException {
- InputStream reader = blob.getBinaryStream();
- ByteArrayOutputStream writer = new ByteArrayOutputStream();
- int c = reader.read();
- while (c != -1) {
- writer.write((byte)c);
- c = reader.read();
- }
- reader.close();
- byte[] data = writer.toByteArray();
- writer.close();
- return data;
- }
//## JDBC4.0-begin ##
public void free() throws SQLException {
this.reference.free();
Added: trunk/common-core/src/main/java/com/metamatrix/core/util/AccessibleByteArrayOutputStream.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/core/util/AccessibleByteArrayOutputStream.java (rev 0)
+++ trunk/common-core/src/main/java/com/metamatrix/core/util/AccessibleByteArrayOutputStream.java 2010-01-13 06:03:53 UTC (rev 1732)
@@ -0,0 +1,45 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.core.util;
+
+import java.io.ByteArrayOutputStream;
+
+public class AccessibleByteArrayOutputStream extends ByteArrayOutputStream {
+
+ public AccessibleByteArrayOutputStream() {
+ super();
+ }
+
+ public AccessibleByteArrayOutputStream(int size) {
+ super(size);
+ }
+
+ public byte[] getBuffer() {
+ return this.buf;
+ }
+
+ public int getCount() {
+ return this.count;
+ }
+
+}
\ No newline at end of file
Property changes on: trunk/common-core/src/main/java/com/metamatrix/core/util/AccessibleByteArrayOutputStream.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java 2010-01-12 21:02:10 UTC (rev 1731)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java 2010-01-13 06:03:53 UTC (rev 1732)
@@ -23,7 +23,6 @@
package com.metamatrix.common.buffer;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@@ -40,6 +39,7 @@
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.common.types.DataTypeManager;
import com.metamatrix.common.types.Streamable;
+import com.metamatrix.core.util.AccessibleByteArrayOutputStream;
import com.metamatrix.core.util.Assertion;
import com.metamatrix.dqp.DQPPlugin;
import com.metamatrix.query.execution.QueryExecPlugin;
@@ -228,40 +228,22 @@
this.store = this.manager.createFileStore(this.tupleSourceID);
this.store.setCleanupReference(this);
}
- byte[] bytes = convertToBytes(writeBatch);
- mbatch.setLength(bytes.length);
- mbatch.setOffset(this.store.write(bytes));
- this.batches.put(mbatch.getBeginRow(), mbatch);
- batchBuffer = null;
- }
-
- /**
- * Convert from an object to a byte array
- * @param object Object to convert
- * @return Byte array
- */
- private byte[] convertToBytes(TupleBatch batch) throws MetaMatrixComponentException {
- ObjectOutputStream oos = null;
+ AccessibleByteArrayOutputStream baos = null;
try {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- oos = new ObjectOutputStream(baos);
-
- batch.setDataTypes(types);
- batch.writeExternal(oos);
+ baos = new AccessibleByteArrayOutputStream(1024);
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ writeBatch.setDataTypes(types);
+ writeBatch.writeExternal(oos);
oos.flush();
- return baos.toByteArray();
-
+ oos.close();
} catch(IOException e) {
throw new MetaMatrixComponentException(e, QueryExecPlugin.Util.getString("FileStorageManager.batch_error")); //$NON-NLS-1$
- } finally {
- if(oos != null) {
- try {
- oos.close();
- } catch(IOException e) {
- }
- }
}
- }
+ mbatch.setLength(baos.getCount());
+ mbatch.setOffset(this.store.write(baos.getBuffer(), 0, baos.getCount()));
+ this.batches.put(mbatch.getBeginRow(), mbatch);
+ batchBuffer = null;
+ }
public void close() throws MetaMatrixComponentException {
//if there is only a single batch, let it stay in memory
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java 2010-01-12 21:02:10 UTC (rev 1731)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java 2010-01-13 06:03:53 UTC (rev 1732)
@@ -51,7 +51,7 @@
* for buffermanager reserve/release of memory
* (would also help the sort utility)
*/
- public static final int MAX_PARTITIONS = 8;
+ public static final int MAX_PARTITIONS = 16;
private List[] endTuples;
private List<Boolean> overlap = new ArrayList<Boolean>();
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java 2010-01-12 21:02:10 UTC (rev 1731)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java 2010-01-13 06:03:53 UTC (rev 1732)
@@ -23,9 +23,11 @@
package com.metamatrix.query.processor.relational;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
+import java.util.TreeSet;
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.api.exception.MetaMatrixProcessingException;
@@ -69,7 +71,8 @@
@Override
public int compareTo(SortedSublist o) {
- return comparator.compare(this.tuple, o.tuple);
+ //reverse the comparison, so that removal of the lowest is a low cost operation
+ return -comparator.compare(this.tuple, o.tuple);
}
@Override
@@ -92,12 +95,13 @@
private List<TupleBuffer> activeTupleBuffers = new ArrayList<TupleBuffer>();
private int masterSortIndex;
- private int initialSortPass = 1; //used to track the number of times through the initial sort method
+ private int dupRemoveSublists = 1; //used to control the number of sublists needed for dup remove
// Phase constants for readability
private static final int INITIAL_SORT = 1;
private static final int MERGE = 2;
private static final int DONE = 3;
+ private Collection<List<?>> workingTuples;
public SortUtility(TupleSource sourceID, List sortElements, List<Boolean> sortTypes, Mode mode, BufferManager bufferMgr,
String groupName) {
@@ -162,7 +166,14 @@
*/
protected void initialSort() throws MetaMatrixComponentException, MetaMatrixProcessingException {
while(!doneReading) {
- List<List<?>> workingTuples = new ArrayList<List<?>>();
+ if (workingTuples == null) {
+ if (mode == Mode.SORT) {
+ workingTuples = new ArrayList<List<?>>();
+ } else {
+ workingTuples = new TreeSet<List<?>>(comparator);
+ }
+ }
+
int maxRows = bufferManager.getMaxProcessingBatches() * bufferManager.getProcessorBatchSize();
while(!doneReading && workingTuples.size() < maxRows) {
try {
@@ -173,9 +184,10 @@
break;
}
- addTuple(workingTuples, tuple);
+ workingTuples.add(tuple);
} catch(BlockedException e) {
- if (workingTuples.isEmpty() && (mode != Mode.DUP_REMOVE || activeTupleBuffers.size() < initialSortPass)) {
+ if ((workingTuples.size() < maxRows/2 && mode != Mode.DUP_REMOVE)
+ || (workingTuples.size() < (dupRemoveSublists/4)*bufferManager.getProcessorBatchSize() && activeTupleBuffers.size() < dupRemoveSublists)) {
throw e; //block if no work can be performed
}
break;
@@ -190,33 +202,22 @@
activeTupleBuffers.add(sublist);
if (this.mode == Mode.SORT) {
//perform a stable sort
- Collections.sort(workingTuples, comparator);
+ Collections.sort((List<List<?>>)workingTuples, comparator);
}
for (List<?> list : workingTuples) {
sublist.addTuple(list);
}
+ workingTuples = null;
sublist.saveBatch();
}
if (this.activeTupleBuffers.isEmpty()) {
activeTupleBuffers.add(createTupleBuffer());
}
- this.initialSortPass = Math.min(initialSortPass + 1, bufferManager.getMaxProcessingBatches() * 2);
+ this.dupRemoveSublists = Math.min(dupRemoveSublists * 2, bufferManager.getMaxProcessingBatches() * 2);
this.phase = MERGE;
}
- protected void addTuple(List<List<?>> workingTuples, List<?> tuple) {
- if (this.mode == Mode.SORT) {
- workingTuples.add(tuple);
- return;
- }
- int index = Collections.binarySearch(workingTuples, tuple, comparator);
- if (index >= 0) {
- return; //it's already there
- }
- workingTuples.add(-index - 1, tuple);
- }
-
protected void mergePhase() throws MetaMatrixComponentException, MetaMatrixProcessingException {
while(this.activeTupleBuffers.size() > 1) {
ArrayList<SortedSublist> sublists = new ArrayList<SortedSublist>(activeTupleBuffers.size());
@@ -227,7 +228,6 @@
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
LogManager.logTrace(LogConstants.CTX_DQP, "Merging", maxSortIndex, "sublists out of", activeTupleBuffers.size()); //$NON-NLS-1$ //$NON-NLS-2$
}
-
// initialize the sublists with the min value
for(int i = 0; i<maxSortIndex; i++) {
TupleBuffer activeID = activeTupleBuffers.get(i);
@@ -242,7 +242,7 @@
// iteratively process the lowest tuple
while (sublists.size() > 0) {
- SortedSublist sortedSublist = sublists.remove(0);
+ SortedSublist sortedSublist = sublists.remove(sublists.size() - 1);
merged.addTuple(sortedSublist.tuple);
if (this.output != null && sortedSublist.index > masterSortIndex) {
this.output.addTuple(sortedSublist.tuple); //a new distinct row
Modified: trunk/runtime/src/main/java/com/metamatrix/platform/security/session/service/SessionServiceImpl.java
===================================================================
--- trunk/runtime/src/main/java/com/metamatrix/platform/security/session/service/SessionServiceImpl.java 2010-01-12 21:02:10 UTC (rev 1731)
+++ trunk/runtime/src/main/java/com/metamatrix/platform/security/session/service/SessionServiceImpl.java 2010-01-13 06:03:53 UTC (rev 1732)
@@ -286,6 +286,9 @@
private MetaMatrixSessionInfo getSessionInfo(MetaMatrixSessionID sessionID)
throws InvalidSessionException {
+ if (sessionID == null) {
+ throw new InvalidSessionException(DQPEmbeddedPlugin.Util.getString("SessionServiceImpl.invalid_session", new Object[] {null})); //$NON-NLS-1$
+ }
MetaMatrixSessionInfo info = this.sessionCache.get(sessionID);
if (info == null) {
throw new InvalidSessionException(DQPEmbeddedPlugin.Util.getString("SessionServiceImpl.invalid_session", sessionID)); //$NON-NLS-1$
More information about the teiid-commits
mailing list