[teiid-commits] teiid SVN: r3457 - in branches/7.4.x: engine/src/main/java/org/teiid/dqp/message and 2 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Sep 7 22:02:26 EDT 2011


Author: shawkins
Date: 2011-09-07 22:02:25 -0400 (Wed, 07 Sep 2011)
New Revision: 3457

Modified:
   branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
   branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   branches/7.4.x/engine/src/main/java/org/teiid/dqp/message/RequestID.java
   branches/7.4.x/engine/src/test/java/org/teiid/dqp/message/TestRequestID.java
   branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java
Log:
TEIID-1745 fix for buffer full plans holding active plans

Modified: branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2011-09-07 20:04:39 UTC (rev 3456)
+++ branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2011-09-08 02:02:25 UTC (rev 3457)
@@ -25,6 +25,8 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -209,6 +211,7 @@
     private int currentlyActivePlans;
     private int userRequestSourceConcurrency;
     private LinkedList<RequestWorkItem> waitingPlans = new LinkedList<RequestWorkItem>();
+    private LinkedHashSet<RequestWorkItem> bufferFullPlans = new LinkedHashSet<RequestWorkItem>();
     private CacheFactory cacheFactory;
 
 	private SessionAwareCache<CachedResults> matTables;
@@ -352,12 +355,21 @@
         addRequest(requestID, workItem, state);
         boolean runInThread = DQPWorkContext.getWorkContext().useCallingThread() || requestMsg.isSync();
         synchronized (waitingPlans) {
-			if (runInThread || currentlyActivePlans < maxActivePlans) {
+			if (runInThread || currentlyActivePlans <= maxActivePlans) {
 				startActivePlan(workItem, !runInThread);
 			} else {
 				if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
-		            LogManager.logDetail(LogConstants.CTX_DQP, "Queuing plan, since max plans has been reached.");  //$NON-NLS-1$
+		            LogManager.logDetail(LogConstants.CTX_DQP, workItem.requestID, "Queuing plan, since max plans has been reached.");  //$NON-NLS-1$
 		        }  
+				if (!bufferFullPlans.isEmpty()) {
+	        		Iterator<RequestWorkItem> id = bufferFullPlans.iterator();
+	        		RequestWorkItem bufferFull = id.next();
+	        		id.remove();
+					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+			            LogManager.logDetail(LogConstants.CTX_DQP, bufferFull.requestID, "Restarting plan with full buffer, since there is a pending active plan.");  //$NON-NLS-1$
+			        }  
+	        		bufferFull.moreWork();
+	        	}
 				waitingPlans.add(workItem);
 			}
 		}
@@ -400,12 +412,23 @@
         	}
         	workItem.active = false;
     		currentlyActivePlans--;
+    		bufferFullPlans.remove(workItem.requestID);
 			if (!waitingPlans.isEmpty()) {
 				startActivePlan(waitingPlans.remove(), true);
 			}
 		}
     }
     
+    public boolean hasWaitingPlans(RequestWorkItem item) {
+    	synchronized (waitingPlans) {
+    		if (!waitingPlans.isEmpty()) {
+    			return true;
+    		}
+    		this.bufferFullPlans.add(item);
+		}
+    	return false;
+    }
+    
     void removeRequest(final RequestWorkItem workItem) {
     	finishProcessing(workItem);
     	this.requests.remove(workItem.requestID);

Modified: branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-09-07 20:04:39 UTC (rev 3456)
+++ branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-09-08 02:02:25 UTC (rev 3457)
@@ -77,6 +77,9 @@
 
 public class RequestWorkItem extends AbstractWorkItem implements PrioritizedRunnable {
 	
+	//TODO: this could be configurable
+	private static final int OUTPUT_BUFFER_MAX_BATCHES = 20;
+
 	private final class WorkWrapper<T> implements
 			DQPCore.CompletionListener<T> {
 		
@@ -275,7 +278,9 @@
             resume();
         	
             if (this.state == ProcessingState.PROCESSING) {
-            	processMore();
+            	if (!this.closeRequested) {
+            		processMore();
+            	}
             	if (this.closeRequested) {
             		this.state = ProcessingState.CLOSE;
             	}
@@ -478,10 +483,8 @@
 		collector = new BatchCollector(processor, processor.getBufferManager(), this.request.context, isForwardOnly()) {
 			protected void flushBatchDirect(TupleBatch batch, boolean add) throws TeiidComponentException,TeiidProcessingException {
 				resultsBuffer = getTupleBuffer();
-				boolean added = false;
 				if (cid != null) {
 					super.flushBatchDirect(batch, add);
-					added = true;
 				}
 				if (batch.getTerminationFlag()) {
 					doneProducingBatches();
@@ -489,14 +492,20 @@
 				addToCache();
 				synchronized (lobStreams) {
 					add = sendResultsIfNeeded(batch);
-					if (!added) {
-						super.flushBatchDirect(batch, add);
-						//restrict the buffer size for forward only results
-						if (add && !processor.hasFinalBuffer()
-								&& !batch.getTerminationFlag() 
-								&& this.getTupleBuffer().getManagedRowCount() >= 20 * this.getTupleBuffer().getBatchSize()) {
+					if (cid != null) {
+						return;
+					}
+					super.flushBatchDirect(batch, add);
+					//restrict the buffer size for forward only results
+					if (add && !processor.hasFinalBuffer()
+							&& !batch.getTerminationFlag() 
+							&& this.getTupleBuffer().getManagedRowCount() >= OUTPUT_BUFFER_MAX_BATCHES * this.getTupleBuffer().getBatchSize()) {
+						if (!dqpCore.hasWaitingPlans(RequestWorkItem.this)) {
 							//requestMore will trigger more processing
 							throw BlockedException.block(requestID, "Blocking due to full results buffer."); //$NON-NLS-1$
+						} 
+						if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+							LogManager.logDetail(LogConstants.CTX_DQP, requestID, "Exceeding buffer limit since there are pending active plans."); //$NON-NLS-1$
 						}
 					}
 				}
@@ -782,10 +791,10 @@
         		return;
         	}
 		}
-    	this.closeRequested = true;
     	if (!this.doneProducingBatches) {
     		this.requestCancel(); //pending work should be canceled for fastest clean up
     	}
+    	this.closeRequested = true;
     	this.doMoreWork();
     }
     

Modified: branches/7.4.x/engine/src/main/java/org/teiid/dqp/message/RequestID.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/dqp/message/RequestID.java	2011-09-07 20:04:39 UTC (rev 3456)
+++ branches/7.4.x/engine/src/main/java/org/teiid/dqp/message/RequestID.java	2011-09-08 02:02:25 UTC (rev 3457)
@@ -27,6 +27,9 @@
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 
+import org.teiid.core.util.EquivalenceUtil;
+import org.teiid.core.util.HashCodeUtil;
+
 /**
  * <p>This class represents an identifier for a request.  However, there are some
  * differences in what constitutes "uniqueness" for a given RequestID that 
@@ -58,7 +61,6 @@
     
     // Derived state
     private String combinedID;
-    private int hash;
 
     /**
      * Necessary for implementing Externalizable 
@@ -74,17 +76,11 @@
     public RequestID(String connectionID, long executionID) {
         this.connectionID = connectionID;
         this.executionID = executionID;
-        
-        createCombinedID();
-        computeHashCode();
     }
     
     public RequestID(long connectionID, long executionID) {
         this.connectionID = String.valueOf(connectionID);
         this.executionID = executionID;
-        
-        createCombinedID();
-        computeHashCode();
     }    
     
     /**
@@ -129,28 +125,28 @@
         this.combinedID = combinedStr.toString();
     }
     
-    private void computeHashCode() {
-        this.hash = combinedID.hashCode();
-    }
-    
     public int hashCode() {
-        return this.hash;
+        return HashCodeUtil.hashCode(connectionID==null?0:connectionID.hashCode(), executionID);
     }
     
     public boolean equals(Object obj) {
         if(obj == this) {
             return true;
-        } else if(obj == null || !(obj instanceof RequestID) || obj.hashCode() != this.hashCode()) {
+        } else if(obj == null || !(obj instanceof RequestID)) {
             return false;
-        } else {
-            return this.toString().equals(obj.toString());
-        }
+        } 
+        RequestID other = (RequestID)obj;
+        return this.executionID == other.executionID 
+        	&& EquivalenceUtil.areEqual(this.connectionID, other.connectionID);
     }
     
     /**
      * Return a combined string for the ID.
      */
     public String toString() {
+    	if (combinedID == null) {
+    		createCombinedID();
+    	}
         return this.combinedID;
     }
 
@@ -160,9 +156,6 @@
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         connectionID = (String)in.readObject();
         executionID = in.readLong();
-
-        createCombinedID();
-        computeHashCode();
     }
 
     /**

Modified: branches/7.4.x/engine/src/test/java/org/teiid/dqp/message/TestRequestID.java
===================================================================
--- branches/7.4.x/engine/src/test/java/org/teiid/dqp/message/TestRequestID.java	2011-09-07 20:04:39 UTC (rev 3456)
+++ branches/7.4.x/engine/src/test/java/org/teiid/dqp/message/TestRequestID.java	2011-09-08 02:02:25 UTC (rev 3457)
@@ -97,7 +97,7 @@
     }
 
     public void testSerialize2() throws Exception {
-        RequestID copy = UnitTestUtil.helpSerialize(new RequestID(100)); //$NON-NLS-1$
+        RequestID copy = UnitTestUtil.helpSerialize(new RequestID(100));
 
         assertEquals(null, copy.getConnectionID());
         assertEquals(100, copy.getExecutionID());

Modified: branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java
===================================================================
--- branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java	2011-09-07 20:04:39 UTC (rev 3456)
+++ branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java	2011-09-08 02:02:25 UTC (rev 3457)
@@ -36,6 +36,7 @@
 import org.junit.Test;
 import org.teiid.common.buffer.BufferManagerFactory;
 import org.teiid.core.util.UnitTestUtil;
+import org.teiid.dqp.internal.process.DQPConfiguration;
 import org.teiid.jdbc.FakeServer;
 import org.teiid.jdbc.TeiidDriver;
 import org.teiid.jdbc.TestMMDatabaseMetaData;
@@ -53,6 +54,8 @@
 		config.setBindAddress(addr.getHostName());
 		config.setPortNumber(0);
 		
+		DQPConfiguration dqpConfig = new DQPConfiguration();
+		dqpConfig.setMaxActivePlans(2);
 		FakeServer server = new FakeServer();
 		server.setUseCallingThread(false);
 		server.deployVDB("parts", UnitTestUtil.getTestDataPath() + "/PartsSupplier.vdb");
@@ -81,13 +84,23 @@
 		}
 	}
 	
-	/**
-	 * Under the covers this still executes a prepared statement due to the driver handling
-	 */
 	@Test public void testSelect() throws Exception {
 		Statement s = conn.createStatement();
 		assertTrue(s.execute("select * from tables order by name"));
 		TestMMDatabaseMetaData.compareResultSet(s.getResultSet());
 	}
 	
+	/**
+	 * Ensures if you start more than the maxActivePlans
+	 * where all the plans take up more than output buffer limit
+	 * that processing still proceeds
+	 * @throws Exception
+	 */
+	@Test public void testSimultaneousLargeSelects() throws Exception {
+		for (int j = 0; j < 3; j++) {
+			Statement s = conn.createStatement();
+			assertTrue(s.execute("select * from columns c1, columns c2"));
+		}
+	}
+	
 }



More information about the teiid-commits mailing list