[teiid-commits] teiid SVN: r1224 - trunk/engine/src/main/java/com/metamatrix/query/processor/proc.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Sun Aug 9 00:10:31 EDT 2009
Author: shawkins
Date: 2009-08-09 00:10:31 -0400 (Sun, 09 Aug 2009)
New Revision: 1224
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java
Log:
TEIID-762 fix for tuplesourceleak in procedureplan
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java 2009-08-09 02:37:26 UTC (rev 1223)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java 2009-08-09 04:10:31 UTC (rev 1224)
@@ -73,6 +73,12 @@
*/
public class ProcedurePlan extends BaseProcessorPlan {
+ private class CursorState {
+ TupleSourceID tsID;
+ TupleSource ts;
+ List<?> currentRow;
+ }
+
private Program originalProgram;
// State initialized by processor
@@ -92,11 +98,9 @@
private Map<ElementSymbol, Expression> params;
private Map<ElementSymbol, Reference> implicitParams;
private QueryMetadataInterface metadata;
-
- private Map tupleSourceMap = new HashMap(); // rsName -> TupleSource
- private Map tupleSourceIDMap = new HashMap(); // rsName -> TupleSourceID
- private Map currentRowMap = new HashMap();
+ private Map<String, CursorState> cursorStates = new HashMap<String, CursorState>();
+
private static ElementSymbol ROWS_UPDATED =
new ElementSymbol(ProcedureReservedWords.VARIABLES+"."+ProcedureReservedWords.ROWS_UPDATED); //$NON-NLS-1$
@@ -147,9 +151,7 @@
evaluator.reset();
}
evaluatedParams = false;
- tupleSourceMap.clear();
- tupleSourceIDMap.clear();
- currentRowMap.clear();
+ cursorStates.clear();
createVariableContext();
lastTupleSource = null;
@@ -173,30 +175,6 @@
return this.dataMgr;
}
- /**
- * Request for data from a node
- * @param command Command to execute from node
- * @return The <code>TupleSourceID</code> for the results
- */
- TupleSourceID registerRequest(ProcessorPlan subPlan, VariableContext currentVariableContext)
- throws MetaMatrixComponentException {
-
- if(this.internalProcessor != null){
- return this.internalResultID;
- }
-
- //this may not be the first time the plan is being run
- subPlan.reset();
-
- // Run query processor on command
- CommandContext subContext = (CommandContext) getContext().clone();
- subContext.setVariableContext(currentVariableContext);
- subContext.setTempTableStore(getTempTableStore());
- internalProcessor = new QueryProcessor(subPlan, subContext, this.bufferMgr, this.dataMgr);
- this.internalResultID = this.internalProcessor.getResultsID();
- return this.internalResultID;
- }
-
public void open() throws MetaMatrixProcessingException, MetaMatrixComponentException {
if (!this.evaluatedParams) {
if (this.params != null) {
@@ -319,32 +297,19 @@
return lastTupleSource;
}
- private TupleSource getResults()
- throws MetaMatrixComponentException, BlockedException, MetaMatrixProcessingException {
-
- this.internalProcessor.process(Integer.MAX_VALUE); //TODO: put a better value here
-
- // didn't throw processor blocked, so must be done
- TupleSource results = this.bufferMgr.getTupleSource(this.internalResultID);
-
- // clean up internal stuff
- this.internalProcessor = null;
-
- return results;
- }
-
public void close()
throws MetaMatrixComponentException {
- // Defect 14544 - remove the tuple batches for the internal tuple source from the buffer manager.
- // This is the last tuple source created by the procedure plan.
- if (internalResultID != null) {
- try {
- bufferMgr.removeTupleSource(internalResultID);
- internalResultID = null;
- } catch (Exception e) {
- // Ignore
- }
+ if (!this.cursorStates.isEmpty()) {
+ List<String> cursors = new ArrayList<String>(this.cursorStates.keySet());
+ for (String rsName : cursors) {
+ try {
+ removeResults(rsName);
+ } catch (MetaMatrixComponentException e) {
+ LogManager.logDetail(LogConstants.CTX_DQP, e, "Exception closing procedure plan cursor"); //$NON-NLS-1$
+ }
+ }
}
+ internalResultID = null;
if(getTempTableStore()!=null) {
getTempTableStore().removeTempTables();
}
@@ -466,10 +431,29 @@
removeResults(ExecSqlInstruction.RS_NAME);
}
- TupleSourceID tsID = registerRequest(command, this.currentVarContext);
- TupleSource source = getResults();
- tupleSourceIDMap.put(rsName.toUpperCase(), tsID);
- tupleSourceMap.put(rsName.toUpperCase(), source);
+ if(this.internalProcessor == null){
+ //this may not be the first time the plan is being run
+ command.reset();
+
+ // Run query processor on command
+ CommandContext subContext = (CommandContext) getContext().clone();
+ subContext.setVariableContext(this.currentVarContext);
+ subContext.setTempTableStore(getTempTableStore());
+ internalProcessor = new QueryProcessor(command, subContext, this.bufferMgr, this.dataMgr);
+ this.internalResultID = this.internalProcessor.getResultsID();
+ }
+ this.internalProcessor.process(Integer.MAX_VALUE); //TODO: put a better value here
+
+ // didn't throw processor blocked, so must be done
+ TupleSource source = this.bufferMgr.getTupleSource(this.internalResultID);
+
+ this.internalProcessor = null;
+
+ CursorState cursorState = new CursorState();
+ cursorState.tsID = this.internalResultID;
+ cursorState.ts = source;
+ this.cursorStates.put(rsName.toUpperCase(), cursorState);
+
if(isExecSQLInstruction){
//keep a reference to the tuple source
//it may be the last one
@@ -550,8 +534,8 @@
return (Set)this.tempContext.getLast();
}
- public List getCurrentRow(String rsName) {
- return (List) currentRowMap.get(rsName.toUpperCase());
+ public List getCurrentRow(String rsName) throws MetaMatrixComponentException {
+ return getCursorState(rsName.toUpperCase()).currentRow;
}
public boolean iterateCursor(String rsName)
@@ -559,34 +543,33 @@
String rsKey = rsName.toUpperCase();
- TupleSource source = (TupleSource) tupleSourceMap.get(rsKey);
- if(source == null) {
- // TODO - throw exception?
- return false;
- }
-
- List row = source.nextTuple();
- currentRowMap.put(rsKey, row);
- return (row != null);
+ CursorState state = getCursorState(rsKey);
+
+ state.currentRow = state.ts.nextTuple();
+ return (state.currentRow != null);
}
+ private CursorState getCursorState(String rsKey) throws MetaMatrixComponentException {
+ CursorState state = this.cursorStates.get(rsKey);
+ if (state == null) {
+ throw new MetaMatrixComponentException(QueryExecPlugin.Util.getString(ErrorMessageKeys.PROCESSOR_0037, rsKey));
+ }
+ return state;
+ }
+
public void removeResults(String rsName) throws MetaMatrixComponentException {
String rsKey = rsName.toUpperCase();
- TupleSource source = (TupleSource) tupleSourceMap.get(rsKey);
- if(source != null) {
- source.closeSource();
- TupleSourceID tsID = (TupleSourceID) tupleSourceIDMap.get(rsKey);
+ CursorState state = this.cursorStates.remove(rsKey);
+ if(state != null) {
+ state.ts.closeSource();
try {
- this.bufferMgr.removeTupleSource(tsID);
+ this.bufferMgr.removeTupleSource(state.tsID);
} catch (TupleSourceNotFoundException e) {
throw new MetaMatrixComponentException(e, ErrorMessageKeys.PROCESSOR_0021, QueryExecPlugin.Util.getString(ErrorMessageKeys.PROCESSOR_0021, (String)null));
} catch (MetaMatrixComponentException e) {
throw new MetaMatrixComponentException(e, ErrorMessageKeys.PROCESSOR_0022, QueryExecPlugin.Util.getString(ErrorMessageKeys.PROCESSOR_0022, (String) null));
}
- LogManager.logTrace(LogConstants.CTX_DQP, new Object[]{"removed tuple source", tsID, "for result set"}); //$NON-NLS-1$ //$NON-NLS-2$
- tupleSourceMap.remove(rsKey);
- tupleSourceIDMap.remove(rsKey);
- currentRowMap.remove(rsKey);
+ LogManager.logTrace(LogConstants.CTX_DQP, new Object[]{"removed tuple source", state.tsID, "for result set"}); //$NON-NLS-1$ //$NON-NLS-2$
this.tempTableStore.removeTempTableByName(rsKey);
}
}
@@ -603,22 +586,16 @@
// get the tuple source
String rsKey = rsName.toUpperCase();
- TupleSource source = (TupleSource) tupleSourceMap.get(rsKey);
- if(source == null){
- throw new MetaMatrixComponentException(QueryExecPlugin.Util.getString(ErrorMessageKeys.PROCESSOR_0037, rsName));
- }
+
+ CursorState cursorState = getCursorState(rsKey);
// get the schema from the tuple source
- List schema = source.getSchema();
- if(schema == null){
- throw new MetaMatrixComponentException(QueryExecPlugin.Util.getString(ErrorMessageKeys.PROCESSOR_0038));
- }
-
+ List schema = cursorState.ts.getSchema();
return schema;
}
public boolean resultSetExists(String rsName) {
String rsKey = rsName.toUpperCase();
- boolean exists = this.tupleSourceMap.containsKey(rsKey);
+ boolean exists = this.cursorStates.containsKey(rsKey);
return exists;
}
More information about the teiid-commits
mailing list