[teiid-commits] teiid SVN: r1224 - trunk/engine/src/main/java/com/metamatrix/query/processor/proc.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Sun Aug 9 00:10:31 EDT 2009


Author: shawkins
Date: 2009-08-09 00:10:31 -0400 (Sun, 09 Aug 2009)
New Revision: 1224

Modified:
   trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java
Log:
TEIID-762 fix for tuplesourceleak in procedureplan

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java	2009-08-09 02:37:26 UTC (rev 1223)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java	2009-08-09 04:10:31 UTC (rev 1224)
@@ -73,6 +73,12 @@
  */
 public class ProcedurePlan extends BaseProcessorPlan {
 
+	private class CursorState {
+		TupleSourceID tsID;
+		TupleSource ts;
+		List<?> currentRow;
+	}
+	
     private Program originalProgram;
 
 	// State initialized by processor
@@ -92,11 +98,9 @@
     private Map<ElementSymbol, Expression> params;
     private Map<ElementSymbol, Reference> implicitParams;
     private QueryMetadataInterface metadata;
-    
-    private Map tupleSourceMap = new HashMap();     // rsName -> TupleSource
-    private Map tupleSourceIDMap = new HashMap();   // rsName -> TupleSourceID
-    private Map currentRowMap = new HashMap();
 
+    private Map<String, CursorState> cursorStates = new HashMap<String, CursorState>();
+
 	private static ElementSymbol ROWS_UPDATED =
 			new ElementSymbol(ProcedureReservedWords.VARIABLES+"."+ProcedureReservedWords.ROWS_UPDATED); //$NON-NLS-1$
 
@@ -147,9 +151,7 @@
         	evaluator.reset();
         }
         evaluatedParams = false;
-        tupleSourceMap.clear();
-        tupleSourceIDMap.clear();
-        currentRowMap.clear();
+        cursorStates.clear();
         createVariableContext();
         lastTupleSource = null;
         
@@ -173,30 +175,6 @@
         return this.dataMgr;
     }
 
-    /**
-     * Request for data from a node
-     * @param command Command to execute from node
-     * @return The <code>TupleSourceID</code> for the results
-     */
-    TupleSourceID registerRequest(ProcessorPlan subPlan, VariableContext currentVariableContext)
-        throws MetaMatrixComponentException {
-        
-        if(this.internalProcessor != null){
-            return this.internalResultID;
-        }
-        
-        //this may not be the first time the plan is being run
-        subPlan.reset();
-
-        // Run query processor on command
-        CommandContext subContext = (CommandContext) getContext().clone();
-        subContext.setVariableContext(currentVariableContext);
-        subContext.setTempTableStore(getTempTableStore());
-        internalProcessor = new QueryProcessor(subPlan, subContext, this.bufferMgr, this.dataMgr);
-        this.internalResultID = this.internalProcessor.getResultsID();
-        return this.internalResultID;
-    }
-
     public void open() throws MetaMatrixProcessingException, MetaMatrixComponentException {
     	if (!this.evaluatedParams) {
     		if (this.params != null) { 
@@ -319,32 +297,19 @@
         return lastTupleSource;
     }
 
-    private TupleSource getResults()
-        throws MetaMatrixComponentException, BlockedException, MetaMatrixProcessingException {
-
-        this.internalProcessor.process(Integer.MAX_VALUE); //TODO: put a better value here
-
-        // didn't throw processor blocked, so must be done
-        TupleSource results = this.bufferMgr.getTupleSource(this.internalResultID);
-
-        // clean up internal stuff
-        this.internalProcessor = null;
-
-        return results;
-    }
-
     public void close()
         throws MetaMatrixComponentException {
-        // Defect 14544 - remove the tuple batches for the internal tuple source from the buffer manager.
-        // This is the last tuple source created by the procedure plan.
-        if (internalResultID != null) {
-            try {
-                bufferMgr.removeTupleSource(internalResultID);
-                internalResultID = null;
-            } catch (Exception e) {
-                // Ignore
-            }
+        if (!this.cursorStates.isEmpty()) {
+        	List<String> cursors = new ArrayList<String>(this.cursorStates.keySet());
+        	for (String rsName : cursors) {
+        		try {
+        			removeResults(rsName);
+        		} catch (MetaMatrixComponentException e) {
+        			LogManager.logDetail(LogConstants.CTX_DQP, e, "Exception closing procedure plan cursor"); //$NON-NLS-1$
+        		}
+			}
         }
+        internalResultID = null;
         if(getTempTableStore()!=null) {
         	getTempTableStore().removeTempTables();
         }
@@ -466,10 +431,29 @@
             removeResults(ExecSqlInstruction.RS_NAME);
         }
         
-        TupleSourceID tsID = registerRequest(command, this.currentVarContext);
-        TupleSource source = getResults();
-        tupleSourceIDMap.put(rsName.toUpperCase(), tsID);
-        tupleSourceMap.put(rsName.toUpperCase(), source);
+        if(this.internalProcessor == null){
+            //this may not be the first time the plan is being run
+            command.reset();
+
+            // Run query processor on command
+            CommandContext subContext = (CommandContext) getContext().clone();
+            subContext.setVariableContext(this.currentVarContext);
+            subContext.setTempTableStore(getTempTableStore());
+            internalProcessor = new QueryProcessor(command, subContext, this.bufferMgr, this.dataMgr);
+            this.internalResultID = this.internalProcessor.getResultsID();
+        }
+        this.internalProcessor.process(Integer.MAX_VALUE); //TODO: put a better value here
+
+        // didn't throw processor blocked, so must be done
+        TupleSource source = this.bufferMgr.getTupleSource(this.internalResultID);
+
+        this.internalProcessor = null;        
+
+        CursorState cursorState = new CursorState();
+        cursorState.tsID = this.internalResultID;
+        cursorState.ts = source;
+        this.cursorStates.put(rsName.toUpperCase(), cursorState);
+        
         if(isExecSQLInstruction){
             //keep a reference to the tuple source
             //it may be the last one
@@ -550,8 +534,8 @@
         return (Set)this.tempContext.getLast();
     }
 
-    public List getCurrentRow(String rsName) {
-        return (List) currentRowMap.get(rsName.toUpperCase());
+    public List getCurrentRow(String rsName) throws MetaMatrixComponentException {
+        return getCursorState(rsName.toUpperCase()).currentRow;
     }
 
     public boolean iterateCursor(String rsName)
@@ -559,34 +543,33 @@
 
         String rsKey = rsName.toUpperCase();
 
-        TupleSource source = (TupleSource) tupleSourceMap.get(rsKey);
-        if(source == null) {
-            // TODO - throw exception?
-            return false;
-        }
-
-        List row = source.nextTuple();
-        currentRowMap.put(rsKey, row);
-        return (row != null);
+        CursorState state = getCursorState(rsKey);
+        
+        state.currentRow = state.ts.nextTuple();
+        return (state.currentRow != null);
     }
 
+	private CursorState getCursorState(String rsKey) throws MetaMatrixComponentException {
+		CursorState state = this.cursorStates.get(rsKey);
+		if (state == null) {
+			throw new MetaMatrixComponentException(QueryExecPlugin.Util.getString(ErrorMessageKeys.PROCESSOR_0037, rsKey));
+		}
+		return state;
+	}
+
     public void removeResults(String rsName) throws MetaMatrixComponentException {
         String rsKey = rsName.toUpperCase();
-        TupleSource source = (TupleSource) tupleSourceMap.get(rsKey);
-        if(source != null) {
-            source.closeSource();
-            TupleSourceID tsID = (TupleSourceID) tupleSourceIDMap.get(rsKey);
+        CursorState state = this.cursorStates.remove(rsKey);
+        if(state != null) {
+        	state.ts.closeSource();
             try {
-    			this.bufferMgr.removeTupleSource(tsID);
+    			this.bufferMgr.removeTupleSource(state.tsID);
     		} catch (TupleSourceNotFoundException e) {
                 throw new MetaMatrixComponentException(e, ErrorMessageKeys.PROCESSOR_0021, QueryExecPlugin.Util.getString(ErrorMessageKeys.PROCESSOR_0021, (String)null));
     		} catch (MetaMatrixComponentException e) {
                 throw new MetaMatrixComponentException(e, ErrorMessageKeys.PROCESSOR_0022, QueryExecPlugin.Util.getString(ErrorMessageKeys.PROCESSOR_0022, (String) null));
     		}
-            LogManager.logTrace(LogConstants.CTX_DQP, new Object[]{"removed tuple source", tsID, "for result set"}); //$NON-NLS-1$ //$NON-NLS-2$
-            tupleSourceMap.remove(rsKey);
-            tupleSourceIDMap.remove(rsKey);
-            currentRowMap.remove(rsKey);
+            LogManager.logTrace(LogConstants.CTX_DQP, new Object[]{"removed tuple source", state.tsID, "for result set"}); //$NON-NLS-1$ //$NON-NLS-2$
             this.tempTableStore.removeTempTableByName(rsKey);
         }
     }
@@ -603,22 +586,16 @@
 
         // get the tuple source
         String rsKey = rsName.toUpperCase();
-        TupleSource source = (TupleSource) tupleSourceMap.get(rsKey);
-        if(source == null){
-            throw new MetaMatrixComponentException(QueryExecPlugin.Util.getString(ErrorMessageKeys.PROCESSOR_0037, rsName));
-        }
+        
+        CursorState cursorState = getCursorState(rsKey);
         // get the schema from the tuple source
-        List schema = source.getSchema();
-        if(schema == null){
-            throw new MetaMatrixComponentException(QueryExecPlugin.Util.getString(ErrorMessageKeys.PROCESSOR_0038));
-        }
-
+        List schema = cursorState.ts.getSchema();
         return schema;
     }
 
     public boolean resultSetExists(String rsName) {
         String rsKey = rsName.toUpperCase();
-        boolean exists = this.tupleSourceMap.containsKey(rsKey);
+        boolean exists = this.cursorStates.containsKey(rsKey);
         return exists;
     }
 



More information about the teiid-commits mailing list