[teiid-commits] teiid SVN: r3457 - in branches/7.4.x: engine/src/main/java/org/teiid/dqp/message and 2 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Wed Sep 7 22:02:26 EDT 2011
Author: shawkins
Date: 2011-09-07 22:02:25 -0400 (Wed, 07 Sep 2011)
New Revision: 3457
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
branches/7.4.x/engine/src/main/java/org/teiid/dqp/message/RequestID.java
branches/7.4.x/engine/src/test/java/org/teiid/dqp/message/TestRequestID.java
branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java
Log:
TEIID-1745 fix for buffer full plans holding active plans
Modified: branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-09-07 20:04:39 UTC (rev 3456)
+++ branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-09-08 02:02:25 UTC (rev 3457)
@@ -25,6 +25,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -209,6 +211,7 @@
private int currentlyActivePlans;
private int userRequestSourceConcurrency;
private LinkedList<RequestWorkItem> waitingPlans = new LinkedList<RequestWorkItem>();
+ private LinkedHashSet<RequestWorkItem> bufferFullPlans = new LinkedHashSet<RequestWorkItem>();
private CacheFactory cacheFactory;
private SessionAwareCache<CachedResults> matTables;
@@ -352,12 +355,21 @@
addRequest(requestID, workItem, state);
boolean runInThread = DQPWorkContext.getWorkContext().useCallingThread() || requestMsg.isSync();
synchronized (waitingPlans) {
- if (runInThread || currentlyActivePlans < maxActivePlans) {
+ if (runInThread || currentlyActivePlans <= maxActivePlans) {
startActivePlan(workItem, !runInThread);
} else {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_DQP, "Queuing plan, since max plans has been reached."); //$NON-NLS-1$
+ LogManager.logDetail(LogConstants.CTX_DQP, workItem.requestID, "Queuing plan, since max plans has been reached."); //$NON-NLS-1$
}
+ if (!bufferFullPlans.isEmpty()) {
+ Iterator<RequestWorkItem> id = bufferFullPlans.iterator();
+ RequestWorkItem bufferFull = id.next();
+ id.remove();
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_DQP, bufferFull.requestID, "Restarting plan with full buffer, since there is a pending active plan."); //$NON-NLS-1$
+ }
+ bufferFull.moreWork();
+ }
waitingPlans.add(workItem);
}
}
@@ -400,12 +412,23 @@
}
workItem.active = false;
currentlyActivePlans--;
+ bufferFullPlans.remove(workItem.requestID);
if (!waitingPlans.isEmpty()) {
startActivePlan(waitingPlans.remove(), true);
}
}
}
+ public boolean hasWaitingPlans(RequestWorkItem item) {
+ synchronized (waitingPlans) {
+ if (!waitingPlans.isEmpty()) {
+ return true;
+ }
+ this.bufferFullPlans.add(item);
+ }
+ return false;
+ }
+
void removeRequest(final RequestWorkItem workItem) {
finishProcessing(workItem);
this.requests.remove(workItem.requestID);
Modified: branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-09-07 20:04:39 UTC (rev 3456)
+++ branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-09-08 02:02:25 UTC (rev 3457)
@@ -77,6 +77,9 @@
public class RequestWorkItem extends AbstractWorkItem implements PrioritizedRunnable {
+ //TODO: this could be configurable
+ private static final int OUTPUT_BUFFER_MAX_BATCHES = 20;
+
private final class WorkWrapper<T> implements
DQPCore.CompletionListener<T> {
@@ -275,7 +278,9 @@
resume();
if (this.state == ProcessingState.PROCESSING) {
- processMore();
+ if (!this.closeRequested) {
+ processMore();
+ }
if (this.closeRequested) {
this.state = ProcessingState.CLOSE;
}
@@ -478,10 +483,8 @@
collector = new BatchCollector(processor, processor.getBufferManager(), this.request.context, isForwardOnly()) {
protected void flushBatchDirect(TupleBatch batch, boolean add) throws TeiidComponentException,TeiidProcessingException {
resultsBuffer = getTupleBuffer();
- boolean added = false;
if (cid != null) {
super.flushBatchDirect(batch, add);
- added = true;
}
if (batch.getTerminationFlag()) {
doneProducingBatches();
@@ -489,14 +492,20 @@
addToCache();
synchronized (lobStreams) {
add = sendResultsIfNeeded(batch);
- if (!added) {
- super.flushBatchDirect(batch, add);
- //restrict the buffer size for forward only results
- if (add && !processor.hasFinalBuffer()
- && !batch.getTerminationFlag()
- && this.getTupleBuffer().getManagedRowCount() >= 20 * this.getTupleBuffer().getBatchSize()) {
+ if (cid != null) {
+ return;
+ }
+ super.flushBatchDirect(batch, add);
+ //restrict the buffer size for forward only results
+ if (add && !processor.hasFinalBuffer()
+ && !batch.getTerminationFlag()
+ && this.getTupleBuffer().getManagedRowCount() >= OUTPUT_BUFFER_MAX_BATCHES * this.getTupleBuffer().getBatchSize()) {
+ if (!dqpCore.hasWaitingPlans(RequestWorkItem.this)) {
//requestMore will trigger more processing
throw BlockedException.block(requestID, "Blocking due to full results buffer."); //$NON-NLS-1$
+ }
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_DQP, requestID, "Exceeding buffer limit since there are pending active plans."); //$NON-NLS-1$
}
}
}
@@ -782,10 +791,10 @@
return;
}
}
- this.closeRequested = true;
if (!this.doneProducingBatches) {
this.requestCancel(); //pending work should be canceled for fastest clean up
}
+ this.closeRequested = true;
this.doMoreWork();
}
Modified: branches/7.4.x/engine/src/main/java/org/teiid/dqp/message/RequestID.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/dqp/message/RequestID.java 2011-09-07 20:04:39 UTC (rev 3456)
+++ branches/7.4.x/engine/src/main/java/org/teiid/dqp/message/RequestID.java 2011-09-08 02:02:25 UTC (rev 3457)
@@ -27,6 +27,9 @@
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import org.teiid.core.util.EquivalenceUtil;
+import org.teiid.core.util.HashCodeUtil;
+
/**
* <p>This class represents an identifier for a request. However, there are some
* differences in what constitutes "uniqueness" for a given RequestID that
@@ -58,7 +61,6 @@
// Derived state
private String combinedID;
- private int hash;
/**
* Necessary for implementing Externalizable
@@ -74,17 +76,11 @@
public RequestID(String connectionID, long executionID) {
this.connectionID = connectionID;
this.executionID = executionID;
-
- createCombinedID();
- computeHashCode();
}
public RequestID(long connectionID, long executionID) {
this.connectionID = String.valueOf(connectionID);
this.executionID = executionID;
-
- createCombinedID();
- computeHashCode();
}
/**
@@ -129,28 +125,28 @@
this.combinedID = combinedStr.toString();
}
- private void computeHashCode() {
- this.hash = combinedID.hashCode();
- }
-
public int hashCode() {
- return this.hash;
+ return HashCodeUtil.hashCode(connectionID==null?0:connectionID.hashCode(), executionID);
}
public boolean equals(Object obj) {
if(obj == this) {
return true;
- } else if(obj == null || !(obj instanceof RequestID) || obj.hashCode() != this.hashCode()) {
+ } else if(obj == null || !(obj instanceof RequestID)) {
return false;
- } else {
- return this.toString().equals(obj.toString());
- }
+ }
+ RequestID other = (RequestID)obj;
+ return this.executionID == other.executionID
+ && EquivalenceUtil.areEqual(this.connectionID, other.connectionID);
}
/**
* Return a combined string for the ID.
*/
public String toString() {
+ if (combinedID == null) {
+ createCombinedID();
+ }
return this.combinedID;
}
@@ -160,9 +156,6 @@
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
connectionID = (String)in.readObject();
executionID = in.readLong();
-
- createCombinedID();
- computeHashCode();
}
/**
Modified: branches/7.4.x/engine/src/test/java/org/teiid/dqp/message/TestRequestID.java
===================================================================
--- branches/7.4.x/engine/src/test/java/org/teiid/dqp/message/TestRequestID.java 2011-09-07 20:04:39 UTC (rev 3456)
+++ branches/7.4.x/engine/src/test/java/org/teiid/dqp/message/TestRequestID.java 2011-09-08 02:02:25 UTC (rev 3457)
@@ -97,7 +97,7 @@
}
public void testSerialize2() throws Exception {
- RequestID copy = UnitTestUtil.helpSerialize(new RequestID(100)); //$NON-NLS-1$
+ RequestID copy = UnitTestUtil.helpSerialize(new RequestID(100));
assertEquals(null, copy.getConnectionID());
assertEquals(100, copy.getExecutionID());
Modified: branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java
===================================================================
--- branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java 2011-09-07 20:04:39 UTC (rev 3456)
+++ branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java 2011-09-08 02:02:25 UTC (rev 3457)
@@ -36,6 +36,7 @@
import org.junit.Test;
import org.teiid.common.buffer.BufferManagerFactory;
import org.teiid.core.util.UnitTestUtil;
+import org.teiid.dqp.internal.process.DQPConfiguration;
import org.teiid.jdbc.FakeServer;
import org.teiid.jdbc.TeiidDriver;
import org.teiid.jdbc.TestMMDatabaseMetaData;
@@ -53,6 +54,8 @@
config.setBindAddress(addr.getHostName());
config.setPortNumber(0);
+ DQPConfiguration dqpConfig = new DQPConfiguration();
+ dqpConfig.setMaxActivePlans(2);
FakeServer server = new FakeServer();
server.setUseCallingThread(false);
server.deployVDB("parts", UnitTestUtil.getTestDataPath() + "/PartsSupplier.vdb");
@@ -81,13 +84,23 @@
}
}
- /**
- * Under the covers this still executes a prepared statement due to the driver handling
- */
@Test public void testSelect() throws Exception {
Statement s = conn.createStatement();
assertTrue(s.execute("select * from tables order by name"));
TestMMDatabaseMetaData.compareResultSet(s.getResultSet());
}
+ /**
+ * Ensures if you start more than the maxActivePlans
+ * where all the plans take up more than output buffer limit
+ * that processing still proceeds
+ * @throws Exception
+ */
+ @Test public void testSimultaneousLargeSelects() throws Exception {
+ for (int j = 0; j < 3; j++) {
+ Statement s = conn.createStatement();
+ assertTrue(s.execute("select * from columns c1, columns c2"));
+ }
+ }
+
}
More information about the teiid-commits
mailing list