[teiid-commits] teiid SVN: r4276 - in trunk: engine/src/main/java/org/teiid/dqp/internal/datamgr and 6 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Mon Jul 30 12:04:36 EDT 2012


Author: shawkins
Date: 2012-07-30 12:04:34 -0400 (Mon, 30 Jul 2012)
New Revision: 4276

Added:
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachingTupleSource.java
Modified:
   trunk/api/src/main/java/org/teiid/translator/CacheDirective.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/TupleSourceCache.java
   trunk/engine/src/main/java/org/teiid/dqp/message/AtomicResultsMessage.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
   trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
   trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
Log:
TEIID-1598 adding code support for the translator caching feature

Modified: trunk/api/src/main/java/org/teiid/translator/CacheDirective.java
===================================================================
--- trunk/api/src/main/java/org/teiid/translator/CacheDirective.java	2012-07-30 13:07:44 UTC (rev 4275)
+++ trunk/api/src/main/java/org/teiid/translator/CacheDirective.java	2012-07-30 16:04:34 UTC (rev 4276)
@@ -40,6 +40,7 @@
 	
 	private Boolean prefersMemory;
 	private Boolean updatable;
+	private Boolean readAll;
 	private Long ttl;
 	private Scope scope;
 	
@@ -75,6 +76,10 @@
 		this.ttl = ttl;
 	}
 	
+	/**
+	 * Get whether the result is updatable and therefore sensitive to data changes.
+	 * @return
+	 */
 	public Boolean getUpdatable() {
 		return updatable;
 	}
@@ -91,6 +96,18 @@
 		this.scope = scope;
 	}
 	
+	/**
+	 * Whether the engine should read and cache the entire results.
+	 * @return
+	 */
+	public Boolean getReadAll() {
+		return readAll;
+	}
+	
+	public void setReadAll(Boolean readAll) {
+		this.readAll = readAll;
+	}
+	
 	@Override
 	public boolean equals(Object obj) {
 		if (obj == this) {
@@ -100,7 +117,8 @@
 			return false;
 		}
 		CacheDirective other = (CacheDirective)obj;
-		return EquivalenceUtil.areEqual(this.prefersMemory, other.prefersMemory) 
+		return EquivalenceUtil.areEqual(this.prefersMemory, other.prefersMemory)
+		&& EquivalenceUtil.areEqual(this.readAll, other.readAll) 
 		&& EquivalenceUtil.areEqual(this.ttl, other.ttl) 
 		&& EquivalenceUtil.areEqual(this.updatable, other.updatable)
 		&& EquivalenceUtil.areEqual(this.scope, other.scope);

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java	2012-07-30 13:07:44 UTC (rev 4275)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java	2012-07-30 16:04:34 UTC (rev 4276)
@@ -49,5 +49,7 @@
 	boolean copyLobs();
 
 	CacheDirective getCacheDirective() throws TranslatorException;
+
+	boolean areLobsUsableAfterClose();
 	
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java	2012-07-30 13:07:44 UTC (rev 4275)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java	2012-07-30 16:04:34 UTC (rev 4276)
@@ -359,15 +359,21 @@
 		
 		// if we need to keep the execution alive, then we can not support implicit close.
 		response.setSupportsImplicitClose(!this.securityContext.keepExecutionAlive());
-		response.setTransactional(this.securityContext.isTransactional());
 		response.setWarnings(this.securityContext.getWarnings());
-		response.setSupportsCloseWithLobs(this.connector.areLobsUsableAfterClose());
+		if (this.securityContext.getCacheDirective() != null) {
+			response.setScope(this.securityContext.getCacheDirective().getScope());
+		}
 
 		if ( lastBatch ) {
 		    response.setFinalRow(rowCount);
 		} 
 		return response;
 	}
+    
+    @Override
+    public boolean areLobsUsableAfterClose() {
+    	return this.connector.areLobsUsableAfterClose();
+    }
 
     public static AtomicResultsMessage createResultsMessage(List<?>[] batch) {
         return new AtomicResultsMessage(batch);

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java	2012-07-30 13:07:44 UTC (rev 4275)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java	2012-07-30 16:04:34 UTC (rev 4276)
@@ -91,6 +91,13 @@
 		return externalNames;
 	}
 	
+	public void addAccessedObject(Object id) {
+		if (this.objectsAccessed == null) {
+			this.objectsAccessed = new HashSet<Object>();
+		}
+		this.objectsAccessed.add(id);
+	}
+	
 	public Set<Object> getObjectsAccessed() {
 		return objectsAccessed;
 	}

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java	2012-07-30 13:07:44 UTC (rev 4275)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java	2012-07-30 16:04:34 UTC (rev 4276)
@@ -40,7 +40,6 @@
 import org.teiid.query.parser.QueryParser;
 import org.teiid.query.processor.ProcessorPlan;
 import org.teiid.query.resolver.QueryResolver;
-import org.teiid.query.sql.lang.CacheHint;
 import org.teiid.query.sql.lang.Command;
 
 
@@ -50,7 +49,6 @@
 	private transient Command command;
 	private transient TupleBuffer results;
 
-	private CacheHint hint;
 	private String uuid;
 	private boolean hasLobs;
 	private int rowLimit;
@@ -69,22 +67,15 @@
 		this.results = results;
 		this.uuid = results.getId();
 		this.hasLobs = results.isLobs();
-		this.accessInfo.populate(plan.getContext(), true);
+		if (plan != null) {
+			this.accessInfo.populate(plan.getContext(), true);
+		}
 	}
 	
 	public void setCommand(Command command) {
 		this.command = command;
-		this.hint = command.getCacheHint();
 	}
 	
-	public void setHint(CacheHint hint) {
-		this.hint = hint;
-	}
-	
-	public CacheHint getHint() {
-		return hint;
-	}
-	
 	public synchronized Command getCommand(String sql, QueryMetadataInterface metadata, ParseInfo info) throws QueryParserException, QueryResolverException, TeiidComponentException {
 		if (command == null) {
 			command = QueryParser.getQueryParser().parseCommand(sql, info);

Added: trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachingTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachingTupleSource.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachingTupleSource.java	2012-07-30 16:04:34 UTC (rev 4276)
@@ -0,0 +1,163 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.dqp.internal.process;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.teiid.common.buffer.BlockedException;
+import org.teiid.common.buffer.TupleBuffer;
+import org.teiid.common.buffer.TupleSource;
+import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidException;
+import org.teiid.core.TeiidProcessingException;
+import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
+import org.teiid.logging.LogConstants;
+import org.teiid.logging.LogManager;
+import org.teiid.metadata.FunctionMethod.Determinism;
+import org.teiid.query.processor.RegisterRequestParameter;
+import org.teiid.query.sql.symbol.GroupSymbol;
+import org.teiid.translator.CacheDirective;
+import org.teiid.translator.CacheDirective.Scope;
+
+/**
+ * A proxy {@link TupleSource} that caches a {@link DataTierTupleSource}
+ */
+final class CachingTupleSource extends
+		TupleSourceCache.BufferedTupleSource {
+	private final DataTierManagerImpl dataTierManagerImpl;
+	private final CacheID cid;
+	private final RegisterRequestParameter parameterObject;
+	private final CacheDirective cd;
+	private final Collection<GroupSymbol> accessedGroups;
+	private boolean cached = false;
+	private DataTierTupleSource dtts;
+
+	CachingTupleSource(DataTierManagerImpl dataTierManagerImpl, TupleBuffer tb, DataTierTupleSource ts, CacheID cid,
+			RegisterRequestParameter parameterObject, CacheDirective cd,
+			Collection<GroupSymbol> accessedGroups) {
+		super(tb, ts);
+		this.dataTierManagerImpl = dataTierManagerImpl;
+		this.dtts = ts;
+		this.cid = cid;
+		this.parameterObject = parameterObject;
+		this.cd = cd;
+		this.accessedGroups = accessedGroups;
+	}
+
+	@Override
+	public List<?> nextTuple() throws TeiidComponentException,
+			TeiidProcessingException {
+		if (dtts.scope == Scope.NONE || tb == null) {
+			removeTupleBuffer();
+			return ts.nextTuple();
+		}
+		//TODO: the cache directive object needs synchronized for consistency
+		List<?> tuple = super.nextTuple();
+		if (tuple == null && !cached && !dtts.errored) {
+			synchronized (cd) {
+				if (dtts.scope == Scope.NONE) {
+					removeTupleBuffer();
+					return tuple;
+				}
+				cached = true;
+				CachedResults cr = new CachedResults();
+		        cr.setResults(tb, null);
+		        if (!Boolean.FALSE.equals(cd.getUpdatable())) {
+		        	if (accessedGroups != null) {
+			        	for (GroupSymbol gs : accessedGroups) {
+			        		cr.getAccessInfo().addAccessedObject(gs.getMetadataID());
+			        	}
+		        	}
+		        } else {
+		        	cr.getAccessInfo().setSensitiveToMetadataChanges(false);
+		        }
+		        if (parameterObject.limit > 0 && parameterObject.limit == rowNumber) {
+		        	cr.setRowLimit(rowNumber);
+		        }
+		        tb.setPrefersMemory(Boolean.TRUE.equals(cd.getPrefersMemory()));
+		    	Determinism determinismLevel = Determinism.SESSION_DETERMINISTIC;
+		    	if (dtts.scope != null) {
+		    		switch (dtts.scope) {
+		    		case VDB:
+		    			determinismLevel = Determinism.VDB_DETERMINISTIC;
+		    		case SESSION:
+		    			determinismLevel = Determinism.SESSION_DETERMINISTIC;
+		    		case USER:
+		    			determinismLevel = Determinism.USER_DETERMINISTIC;
+		    		}
+		    	}
+		        this.dataTierManagerImpl.requestMgr.getRsCache().put(cid, determinismLevel, cr, cd.getTtl()); 
+			}
+		}
+		return tuple;
+	}
+
+	private void removeTupleBuffer() {
+		if (tb != null) {
+			tb.remove();
+			tb = null;
+		}
+	}
+
+	@Override
+	public void closeSource() {
+		try {
+			if (tb != null && !cached && !dtts.errored) {
+				boolean readAll = true;
+				synchronized (cd) {
+					readAll = !Boolean.FALSE.equals(cd.getReadAll()); 
+				}
+				if (readAll) {
+					//TODO that this is blocking, so it could be made faster in non-transactional scenarios
+					//we should also shut off any warnings, since the plan isn't consuming these tuples
+					//the approach would probably be to do more read-ahead
+					dtts.getAtomicRequestMessage().setSerial(true);
+					while (dtts.scope != Scope.NONE) { 
+						try {
+							List<?> tuple = nextTuple();
+							if (tuple == null) {
+								break;
+							}
+						} catch (BlockedException e) {
+							//this is possible if were were already waiting for an asynch result
+							try {
+								Thread.sleep(50); //TODO: we could synch/notify in the DataTierTupleSource
+							} catch (InterruptedException e1) {
+								break;
+							} 
+						} catch (TeiidException e) {
+							LogManager.logDetail(LogConstants.CTX_DQP, e, dtts.getAtomicRequestMessage().getAtomicRequestID(), "Not using full results due to error."); //$NON-NLS-1$
+							break;
+						}
+					}
+				}
+			}
+		} finally {
+			if (!cached) {
+				removeTupleBuffer();
+			}
+			ts.closeSource();
+		}
+	}
+}
\ No newline at end of file


Property changes on: trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachingTupleSource.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java	2012-07-30 13:07:44 UTC (rev 4275)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java	2012-07-30 16:04:34 UTC (rev 4276)
@@ -41,7 +41,9 @@
 import org.teiid.client.RequestMessage;
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.TupleBuffer;
 import org.teiid.common.buffer.TupleSource;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.core.CoreConstants;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
@@ -149,7 +151,7 @@
 	}
 	
 	// Resources
-	private DQPCore requestMgr;
+	DQPCore requestMgr;
     private BufferManager bufferManager;
     private EventDistributor eventDistributor;
     private boolean detectChangeEvents;
@@ -172,7 +174,7 @@
 		return eventDistributor;
 	}
     
-	public TupleSource registerRequest(CommandContext context, Command command, String modelName, RegisterRequestParameter parameterObject) throws TeiidComponentException, TeiidProcessingException {
+	public TupleSource registerRequest(CommandContext context, Command command, String modelName, final RegisterRequestParameter parameterObject) throws TeiidComponentException, TeiidProcessingException {
 		RequestWorkItem workItem = requestMgr.getRequestWorkItem((RequestID)context.getProcessorID());
 		
 		if(CoreConstants.SYSTEM_MODEL.equals(modelName) || CoreConstants.SYSTEM_ADMIN_MODEL.equals(modelName)) {
@@ -215,11 +217,11 @@
 						parameterObject.doNotCache = true;
 					} else {
 						String cmdString = command.toString();
-						if (cmdString.length() < 200000) { //TODO: this check won't be needed if keys aren't exclusively held in memory
+						if (cmdString.length() < 100000) { //TODO: this check won't be needed if keys aren't exclusively held in memory
 							cid = new CacheID(workItem.getDqpWorkContext(), ParseInfo.DEFAULT_INSTANCE, cmdString);
 							cid.setParameters(cv.parameters);
 							CachedResults cr = workItem.getRsCache().get(cid);
-							if (cr != null) {
+							if (cr != null && (cr.getRowLimit() == 0 || (parameterObject.limit > 0 && cr.getRowLimit() >= parameterObject.limit))) {
 								parameterObject.doNotCache = true;
 								LogManager.logDetail(LogConstants.CTX_DQP, "Using cache entry for", cid); //$NON-NLS-1$
 								work.close();
@@ -235,7 +237,12 @@
 			}
 		}
 		work.setRequestWorkItem(workItem);
-        return new DataTierTupleSource(aqr, workItem, work, this, parameterObject.limit);
+		DataTierTupleSource dtts = new DataTierTupleSource(aqr, workItem, work, this, parameterObject.limit);
+        if (cid != null) {
+        	TupleBuffer tb = getBufferManager().createTupleBuffer(aqr.getCommand().getProjectedSymbols(), aqr.getCommandContext().getConnectionId(), TupleSourceType.PROCESSOR);
+        	return new CachingTupleSource(this, tb, dtts, cid, parameterObject, cd, accessedGroups);
+        }
+		return dtts;
 	}
 
 	/**

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2012-07-30 13:07:44 UTC (rev 4275)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2012-07-30 16:04:34 UTC (rev 4276)
@@ -68,6 +68,7 @@
 import org.teiid.query.sql.symbol.GroupSymbol;
 import org.teiid.translator.DataNotAvailableException;
 import org.teiid.translator.TranslatorException;
+import org.teiid.translator.CacheDirective.Scope;
 
 
 /**
@@ -110,6 +111,9 @@
     private volatile FutureWork<AtomicResultsMessage> futureResult;
     private volatile boolean running;
     
+    boolean errored;
+	Scope scope; //this is to avoid synchronization
+    
     public DataTierTupleSource(AtomicRequestMessage aqr, RequestWorkItem workItem, ConnectorWork cwi, DataTierManagerImpl dtm, int limit) {
         this.aqr = aqr;
         this.workItem = workItem;
@@ -158,7 +162,7 @@
 				if (value == result && !DataTypeManager.DefaultDataClasses.OBJECT.equals(this.schema[i])) {
 					convertToRuntimeType[i] = false;
 				} else {
-					if (isLob[i] && !cwi.copyLobs() && !arm.supportsCloseWithLobs() && DataTypeManager.isLOB(value.getClass())) {
+					if (isLob[i] && !cwi.copyLobs() && !cwi.areLobsUsableAfterClose() && DataTypeManager.isLOB(value.getClass())) {
 						explicitClose = true;
 					}				
 					row.set(i, result);
@@ -224,8 +228,13 @@
     public List<?> nextTuple() throws TeiidComponentException, TeiidProcessingException {
     	while (true) {
     		if (arm == null) {
+    			if (isDone()) {
+    				//sanity check
+    				return null; //TODO: could throw an illegal state exception
+    			}
     			boolean partial = false;
     			AtomicResultsMessage results = null;
+    			boolean dna = false;
     			try {
 	    			if (futureResult != null || !aqr.isSerial()) {
 	    				results = asynchGet();
@@ -248,9 +257,11 @@
 	    				}
 	    			}
     			} catch (TranslatorException e) {
-    				results = exceptionOccurred(e, true);
+    				errored = true;
+    				results = exceptionOccurred(e);
     				partial = true;
     			} catch (DataNotAvailableException e) {
+    				dna = true;
     				if (e.getRetryDelay() >= 0) {
 	    				workItem.scheduleWork(new Runnable() {
 	    					@Override
@@ -262,7 +273,11 @@
     					continue; 
     				}
     				throw BlockedException.block(aqr.getAtomicRequestID(), "Blocking on DataNotAvailableException", aqr.getAtomicRequestID()); //$NON-NLS-1$
-    			} 
+    			} finally {
+    				if (!dna && results == null) {
+    					errored = true;
+    				}
+    			}
     			receiveResults(results, partial);
     		}
 	    	if (index < arm.getResults().length) {
@@ -412,16 +427,14 @@
     	}
     }
 
-    AtomicResultsMessage exceptionOccurred(TranslatorException exception, boolean removeState) throws TeiidComponentException, TeiidProcessingException {
-    	if (removeState) {
-			fullyCloseSource();
-		}
+    AtomicResultsMessage exceptionOccurred(TranslatorException exception) throws TeiidComponentException, TeiidProcessingException {
     	if(workItem.requestMsg.supportsPartialResults()) {
 			AtomicResultsMessage emptyResults = new AtomicResultsMessage(new List[0]);
 			emptyResults.setWarnings(Arrays.asList((Exception)exception));
 			emptyResults.setFinalRow(this.rowsProcessed);
 			return emptyResults;
 		} 
+		fullyCloseSource();
 		if (exception.getCause() instanceof TeiidComponentException) {
 			throw (TeiidComponentException)exception.getCause();
 		}
@@ -433,6 +446,7 @@
 
 	void receiveResults(AtomicResultsMessage response, boolean partial) {
 		this.arm = response;
+		this.scope = response.getScope();
 		explicitClose |= !arm.supportsImplicitClose();
         rowsProcessed += response.getResults().length;
         index = 0;

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2012-07-30 13:07:44 UTC (rev 4275)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2012-07-30 16:04:34 UTC (rev 4276)
@@ -566,7 +566,7 @@
 						if (!dqpCore.hasWaitingPlans(RequestWorkItem.this)) {
 							//requestMore will trigger more processing
 							throw BlockedException.block(requestID, "Blocking due to full results TupleBuffer", //$NON-NLS-1$
-									this.getTupleBuffer().getId(), "rows", this.getTupleBuffer().getManagedRowCount(), "batch size", this.getTupleBuffer().getBatchSize()); //$NON-NLS-1$ //$NON-NLS-2$ 
+									this.getTupleBuffer(), "rows", this.getTupleBuffer().getManagedRowCount(), "batch size", this.getTupleBuffer().getBatchSize()); //$NON-NLS-1$ //$NON-NLS-2$ 
 						} 
 						if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
 							LogManager.logDetail(LogConstants.CTX_DQP, requestID, "Exceeding buffer limit since there are pending active plans."); //$NON-NLS-1$

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/TupleSourceCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/TupleSourceCache.java	2012-07-30 13:07:44 UTC (rev 4275)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/TupleSourceCache.java	2012-07-30 16:04:34 UTC (rev 4276)
@@ -86,9 +86,9 @@
     }
 	
 	public abstract static class BufferedTupleSource implements TupleSource {
-		private int rowNumber = 1;
-		private TupleBuffer tb;
-		private TupleSource ts;
+		int rowNumber = 1;
+		TupleBuffer tb;
+		TupleSource ts;
 		
 		protected BufferedTupleSource(TupleBuffer tb, TupleSource ts) {
 			this.tb = tb;

Modified: trunk/engine/src/main/java/org/teiid/dqp/message/AtomicResultsMessage.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/message/AtomicResultsMessage.java	2012-07-30 13:07:44 UTC (rev 4275)
+++ trunk/engine/src/main/java/org/teiid/dqp/message/AtomicResultsMessage.java	2012-07-30 16:04:34 UTC (rev 4276)
@@ -24,10 +24,12 @@
 
 import java.util.List;
 
+import org.teiid.translator.CacheDirective.Scope;
 
+
 public class AtomicResultsMessage {
 
-	private List[] results;
+	private List<?>[] results;
 
     // Final row index in complete result set, if known
     private int finalRow = -1;
@@ -35,17 +37,15 @@
     // by default we support implicit close.
     private boolean supportsImplicitClose = true;
     
-    private boolean supportsCloseWithLobs;
-
-    private boolean isTransactional;
-    
     private List<Exception> warnings;
+    
+    private Scope scope;
 
     // to honor the externalizable contract
 	public AtomicResultsMessage() {
 	}
 	
-	public AtomicResultsMessage(List[] results) {
+	public AtomicResultsMessage(List<?>[] results) {
         this.results = results;
 	}
 	
@@ -53,14 +53,6 @@
         return this.supportsImplicitClose;
     }
     
-    public boolean supportsCloseWithLobs() {
-		return supportsCloseWithLobs;
-	}
-    
-    public void setSupportsCloseWithLobs(boolean supportsCloseWithLobs) {
-		this.supportsCloseWithLobs = supportsCloseWithLobs;
-	}
-    
     public void setSupportsImplicitClose(boolean supportsImplicitClose) {
         this.supportsImplicitClose = supportsImplicitClose;
     }    
@@ -77,14 +69,6 @@
 		return results;
 	}
 
-	public boolean isTransactional() {
-		return isTransactional;
-	}
-
-	public void setTransactional(boolean isTransactional) {
-		this.isTransactional = isTransactional;
-	}   
-	
 	public void setWarnings(List<Exception> warnings) {
 		this.warnings = warnings;
 	}
@@ -92,4 +76,12 @@
 	public List<Exception> getWarnings() {
 		return warnings;
 	}
+	
+	public void setScope(Scope scope) {
+		this.scope = scope;
+	}
+	
+	public Scope getScope() {
+		return scope;
+	}
 }

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java	2012-07-30 13:07:44 UTC (rev 4275)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java	2012-07-30 16:04:34 UTC (rev 4276)
@@ -144,8 +144,6 @@
     
     /**
      * Create an index of the smaller size
-     *  
-     * TODO: reuse existing temp table indexes
      */
     public void createIndex(SourceState state, boolean sorted) throws TeiidComponentException, TeiidProcessingException {
     	int[] expressionIndexes = state.getExpressionIndexes();

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java	2012-07-30 13:07:44 UTC (rev 4275)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java	2012-07-30 16:04:34 UTC (rev 4276)
@@ -224,8 +224,8 @@
                         outerState = this.leftSource;
                         innerState = this.rightSource;
                         outerMatched = false;
-                        this.leftSource.getIterator().setPosition(this.leftSource.getMaxProbeMatch());
-                        this.rightSource.getIterator().setPosition(this.rightSource.getMaxProbeMatch());
+                        this.leftSource.setMaxProbePosition();
+                        this.rightSource.setMaxProbePosition();
                         this.mergeState = MergeState.SCAN;
                         this.matchState = MatchState.MATCH_LEFT;
                         break;

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java	2012-07-30 13:07:44 UTC (rev 4275)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java	2012-07-30 16:04:34 UTC (rev 4276)
@@ -210,7 +210,7 @@
 	private TupleBuffer createTupleBuffer() throws TeiidComponentException {
 		TupleBuffer tb = bufferManager.createTupleBuffer(this.schema, this.groupName, TupleSourceType.PROCESSOR);
 		if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
-			LogManager.logDetail(LogConstants.CTX_DQP, "Created intermediate sort buffer ", tb.getId()); //$NON-NLS-1$
+			LogManager.logDetail(LogConstants.CTX_DQP, "Created intermediate sort buffer ", tb); //$NON-NLS-1$
 		}
 		tb.setForwardOnly(true);
 		return tb;

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java	2012-07-30 13:07:44 UTC (rev 4275)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java	2012-07-30 16:04:34 UTC (rev 4276)
@@ -238,5 +238,10 @@
 		this.currentTuple = null;
 		this.maxProbeMatch = 1;
 	}
+	
+	public void setMaxProbePosition() throws TeiidComponentException {
+		this.getIterator().setPosition(this.getMaxProbeMatch());
+		this.currentTuple = null;
+	}
     
 }

Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2012-07-30 13:07:44 UTC (rev 4275)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2012-07-30 16:04:34 UTC (rev 4276)
@@ -269,7 +269,6 @@
 		TupleBuffer tb = bc.collectTuples();
 		CachedResults cr = new CachedResults();
 		cr.setResults(tb, qp.getProcessorPlan());
-		cr.setHint(hint);
 		if (hint != null && hint.getDeterminism() != null) {
 			LogManager.logTrace(LogConstants.CTX_DQP, new Object[] { "Cache hint modified the query determinism from ",determinismLevel, " to ", hint.getDeterminism() }); //$NON-NLS-1$ //$NON-NLS-2$
 			determinismLevel = hint.getDeterminism();

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java	2012-07-30 13:07:44 UTC (rev 4275)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java	2012-07-30 16:04:34 UTC (rev 4276)
@@ -29,11 +29,15 @@
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.teiid.adminapi.impl.VDBMetaData;
 import org.teiid.cache.CacheConfiguration;
 import org.teiid.cache.DefaultCacheFactory;
 import org.teiid.client.RequestMessage;
 import org.teiid.client.SourceWarning;
 import org.teiid.common.buffer.BlockedException;
+import org.teiid.common.buffer.TupleSource;
+import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.types.ClobType;
 import org.teiid.core.types.InputStreamFactory;
 import org.teiid.core.types.InputStreamFactory.StorageMode;
@@ -46,20 +50,20 @@
 import org.teiid.query.metadata.QueryMetadataInterface;
 import org.teiid.query.optimizer.capabilities.DefaultCapabilitiesFinder;
 import org.teiid.query.parser.QueryParser;
+import org.teiid.query.processor.RegisterRequestParameter;
 import org.teiid.query.resolver.QueryResolver;
 import org.teiid.query.sql.lang.Command;
 import org.teiid.query.unittest.RealMetadataFactory;
 import org.teiid.query.util.CommandContext;
+import org.teiid.translator.CacheDirective;
 
 @SuppressWarnings("nls")
 public class TestDataTierManager {
     
+	private VDBMetaData vdb = RealMetadataFactory.exampleBQTVDB();
     private DQPCore rm;
     private DataTierManagerImpl dtm;
     private CommandContext context;
-    private AtomicRequestMessage request;
-    private Command command;
-    private DataTierTupleSource info;
     private AutoGenDataService connectorManager = new AutoGenDataService();
     private RequestWorkItem workItem;
     private int limit = -1;
@@ -75,50 +79,60 @@
         return command;
     }
     
-    private void helpSetup(int nodeId) throws Exception {
-        helpSetup("SELECT * FROM BQT1.SmallA", nodeId); //$NON-NLS-1$
+    private DataTierTupleSource helpSetup(int nodeId) throws Exception {
+        return helpSetup("SELECT * FROM BQT1.SmallA", nodeId); //$NON-NLS-1$
     }
     
-    private void helpSetup(String sql, int nodeId) throws Exception {
-        QueryMetadataInterface metadata = RealMetadataFactory.exampleBQTCached();
-        DQPWorkContext workContext = RealMetadataFactory.buildWorkContext(metadata, RealMetadataFactory.exampleBQTVDB());
-        
-        rm = new DQPCore();
-        rm.setTransactionService(new FakeTransactionService());
-        rm.setBufferManager(new FakeBufferService().getBufferManager());
-        rm.setResultsetCache(new SessionAwareCache<CachedResults>(new DefaultCacheFactory(), SessionAwareCache.Type.RESULTSET, new CacheConfiguration()));
-        rm.setPreparedPlanCache(new SessionAwareCache<PreparedPlan>(new DefaultCacheFactory(), SessionAwareCache.Type.PREPAREDPLAN, new CacheConfiguration()));
-        rm.start(new DQPConfiguration());
-        FakeBufferService bs = new FakeBufferService();
+    private DataTierTupleSource helpSetup(String sql, int nodeId) throws Exception {
+        helpSetupDataTierManager();
+        AtomicRequestMessage request = helpSetupRequest(sql, nodeId);
+        return new DataTierTupleSource(request, workItem, connectorManager.registerRequest(request), dtm, limit);
+    }
 
-        ConnectorManagerRepository repo = Mockito.mock(ConnectorManagerRepository.class);
-        Mockito.stub(repo.getConnectorManager(Mockito.anyString())).toReturn(connectorManager);
+	private AtomicRequestMessage helpSetupRequest(String sql, int nodeId) throws Exception {
+		QueryMetadataInterface metadata = RealMetadataFactory.exampleBQTCached();
+        DQPWorkContext workContext = RealMetadataFactory.buildWorkContext(metadata, vdb);
         
+        Command command = helpGetCommand(sql, metadata);
         
-        dtm = new DataTierManagerImpl(rm,bs.getBufferManager(), true);
-        command = helpGetCommand(sql, metadata);
-        
         RequestMessage original = new RequestMessage();
         original.setExecutionId(1);
         original.setPartialResults(true);
         RequestID requestID = workContext.getRequestID(original.getExecutionId());
         
         context = new CommandContext();
+        context.setSession(workContext.getSession());
         context.setProcessorID(requestID);
         context.setVdbName("test"); //$NON-NLS-1$
         context.setVdbVersion(1);
-        context.setQueryProcessorFactory(new QueryProcessorFactoryImpl(bs.getBufferManager(), dtm, new DefaultCapabilitiesFinder(), null, metadata));
+        context.setQueryProcessorFactory(new QueryProcessorFactoryImpl(dtm.getBufferManager(), dtm, new DefaultCapabilitiesFinder(), null, metadata));
         workItem = TestDQPCoreRequestHandling.addRequest(rm, original, requestID, null, workContext);
         
-        request = new AtomicRequestMessage(original, workContext, nodeId);
+        AtomicRequestMessage request = new AtomicRequestMessage(original, workContext, nodeId);
         request.setCommand(command);
         request.setConnectorName("FakeConnectorID"); //$NON-NLS-1$
-        info = new DataTierTupleSource(request, workItem, connectorManager.registerRequest(request), dtm, limit);
-    }
+        return request;
+	}
+
+	private void helpSetupDataTierManager() {
+        FakeBufferService bs = new FakeBufferService();
+		rm = new DQPCore();
+        rm.setTransactionService(new FakeTransactionService());
+        rm.setBufferManager(bs.getBufferManager());
+        rm.setResultsetCache(new SessionAwareCache<CachedResults>(new DefaultCacheFactory(), SessionAwareCache.Type.RESULTSET, new CacheConfiguration()));
+        rm.setPreparedPlanCache(new SessionAwareCache<PreparedPlan>(new DefaultCacheFactory(), SessionAwareCache.Type.PREPAREDPLAN, new CacheConfiguration()));
+        rm.start(new DQPConfiguration());
+
+        ConnectorManagerRepository repo = Mockito.mock(ConnectorManagerRepository.class);
+        Mockito.stub(repo.getConnectorManager(Mockito.anyString())).toReturn(connectorManager);
+        vdb.addAttchment(ConnectorManagerRepository.class, repo);
+        
+        dtm = new DataTierManagerImpl(rm,bs.getBufferManager(), true);
+	}
     
     @Test public void testCopyLobs() throws Exception {
     	connectorManager.copyLobs = true;
-    	helpSetup("SELECT cast(stringkey as clob) from bqt1.smalla", 1);
+    	DataTierTupleSource info = helpSetup("SELECT cast(stringkey as clob) from bqt1.smalla", 1);
     	for (int i = 0; i < 10;) {
 	    	try {
 	    		List<?> tuple = info.nextTuple();
@@ -130,7 +144,7 @@
 	    	}
     	}
     	connectorManager.copyLobs = false;
-    	helpSetup("SELECT cast(stringkey as clob) from bqt1.smalla", 1);
+    	info = helpSetup("SELECT cast(stringkey as clob) from bqt1.smalla", 1);
     	for (int i = 0; i < 10;) {
 	    	try {
 	    		List<?> tuple = info.nextTuple();
@@ -144,60 +158,58 @@
     }
     
     @Test public void testDataTierTupleSource() throws Exception {
-    	helpSetup(1);
-    	for (int i = 0; i < 10;) {
-	    	try {
-	    		info.nextTuple();
-	    		i++;
-	    	} catch (BlockedException e) {
-	    		Thread.sleep(50);
-	    	}
-    	}
-        assertNotNull(workItem.getConnectorRequest(request.getAtomicRequestID()));
+    	DataTierTupleSource info = helpSetup(1);
+    	assertEquals(10, pullTuples(info, 10));
+        assertNotNull(workItem.getConnectorRequest(info.getAtomicRequestMessage().getAtomicRequestID()));
         assertNull(info.nextTuple());
         info.closeSource();
-        assertNull(workItem.getConnectorRequest(request.getAtomicRequestID()));
+        assertNull(workItem.getConnectorRequest(info.getAtomicRequestMessage().getAtomicRequestID()));
     }
     
     @Test public void testDataTierTupleSourceWarnings() throws Exception {
-    	helpSetup(1);
+    	DataTierTupleSource info = helpSetup(1);
     	connectorManager.addWarning = true;
-    	for (int i = 0; i < 10;) {
-	    	try {
-	    		info.nextTuple();
-	    		i++;
-	    	} catch (BlockedException e) {
-	    		Thread.sleep(50);
-	    	}
-    	}
-        assertNotNull(workItem.getConnectorRequest(request.getAtomicRequestID()));
+    	assertEquals(10, pullTuples(info, 10));
+        assertNotNull(workItem.getConnectorRequest(info.getAtomicRequestMessage().getAtomicRequestID()));
         assertNull(info.nextTuple());
         assertEquals(1, workItem.getWarnings().size());
         SourceWarning warning = (SourceWarning) workItem.getWarnings().get(0);
 		assertFalse(warning.isPartialResultsError());
         info.closeSource();
-        assertNull(workItem.getConnectorRequest(request.getAtomicRequestID()));
+        assertNull(workItem.getConnectorRequest(info.getAtomicRequestMessage().getAtomicRequestID()));
     }
     
     @Test public void testDataTierTupleSourceLimit() throws Exception {
     	limit = 1;
-    	helpSetup(1);
-    	for (int i = 0; i < 1;) {
+    	DataTierTupleSource info = helpSetup(1);
+    	assertEquals(1, pullTuples(info, 1));
+        assertNotNull(workItem.getConnectorRequest(info.getAtomicRequestMessage().getAtomicRequestID()));
+        assertNull(info.nextTuple());
+        info.closeSource();
+        assertNull(workItem.getConnectorRequest(info.getAtomicRequestMessage().getAtomicRequestID()));
+    }
+
+	private int pullTuples(TupleSource info, int limit)
+			throws TeiidComponentException, TeiidProcessingException,
+			InterruptedException {
+		int i = 0;
+		while (true) {
 	    	try {
-	    		info.nextTuple();
-	    		i++;
+	    		if (info.nextTuple() == null) {
+	    			break;
+	    		}
+	    		if (++i == limit) {
+	    			break;
+	    		}
 	    	} catch (BlockedException e) {
 	    		Thread.sleep(50);
 	    	}
     	}
-        assertNotNull(workItem.getConnectorRequest(request.getAtomicRequestID()));
-        assertNull(info.nextTuple());
-        info.closeSource();
-        assertNull(workItem.getConnectorRequest(request.getAtomicRequestID()));
-    }
+		return i;
+	}
     
     @Test public void testPartialResults() throws Exception {
-    	helpSetup(1);
+    	DataTierTupleSource info = helpSetup(1);
     	connectorManager.throwExceptionOnExecute = true;
     	for (int i = 0; i < 10; i++) {
 	    	try {
@@ -214,7 +226,7 @@
     
     @Test public void testNoRowsException() throws Exception {
     	this.connectorManager.setRows(0);
-    	helpSetup(3);
+    	DataTierTupleSource info = helpSetup(3);
     	while (true) {
 	    	try {
 	        	assertNull(info.nextTuple());
@@ -228,7 +240,7 @@
     @Test public void testAsynch() throws Exception {
     	this.connectorManager.dataNotAvailable = 10;
     	this.connectorManager.setRows(0);
-    	helpSetup(3);
+    	DataTierTupleSource info = helpSetup(3);
     	boolean blocked = false;
     	while (true) {
 	    	try {
@@ -242,4 +254,41 @@
     	assertTrue(blocked);
     }
     
+    @Test public void testCaching() throws Exception {
+    	assertEquals(0, connectorManager.getExecuteCount().get());
+
+    	CacheDirective cd = new CacheDirective();
+    	this.connectorManager.cacheDirective = cd;
+    	helpSetupDataTierManager();
+    	Command command = helpSetupRequest("SELECT stringkey from bqt1.smalla", 1).getCommand();
+    	RegisterRequestParameter rrp = new RegisterRequestParameter();
+    	rrp.connectorBindingId = "x";
+    	TupleSource ts = dtm.registerRequest(context, command, "foo", rrp);
+    	assertTrue(ts instanceof CachingTupleSource);
+    	assertEquals(10, pullTuples(ts, -1));
+    	assertEquals(1, connectorManager.getExecuteCount().get());
+    	assertFalse(rrp.doNotCache);
+    	
+    	//same session, should be cached
+    	command = helpSetupRequest("SELECT stringkey from bqt1.smalla", 1).getCommand();
+    	rrp = new RegisterRequestParameter();
+    	rrp.connectorBindingId = "x";
+    	ts = dtm.registerRequest(context, command, "foo", rrp);
+    	assertFalse(ts instanceof CachingTupleSource);
+    	assertEquals(10, pullTuples(ts, -1));
+    	assertEquals(1, connectorManager.getExecuteCount().get());
+    	assertTrue(rrp.doNotCache);
+    	
+    	//switch sessions
+    	command = helpSetupRequest("SELECT stringkey from bqt1.smalla", 1).getCommand();
+    	this.context.getSession().setSessionId("different");
+    	rrp = new RegisterRequestParameter();
+    	rrp.connectorBindingId = "x";
+    	ts = dtm.registerRequest(context, command, "foo", rrp);
+    	assertTrue(ts instanceof CachingTupleSource);
+    	assertEquals(10, pullTuples(ts, -1));
+    	assertEquals(2, connectorManager.getExecuteCount().get());
+    	assertFalse(rrp.doNotCache);
+    }
+    
 }

Modified: trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java	2012-07-30 13:07:44 UTC (rev 4275)
+++ trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java	2012-07-30 16:04:34 UTC (rev 4276)
@@ -72,6 +72,7 @@
     private boolean useIntCounter;
 	public boolean addWarning;
 	public boolean copyLobs;
+	public CacheDirective cacheDirective;
 
     public AutoGenDataService() {
     	super("FakeConnector","FakeConnector"); //$NON-NLS-1$ //$NON-NLS-2$
@@ -120,6 +121,11 @@
         	}
         	
         	@Override
+        	public boolean areLobsUsableAfterClose() {
+        		return false;
+        	}
+        	
+        	@Override
         	public void setRequestWorkItem(RequestWorkItem item) {
         		this.item = item;
         	}
@@ -183,7 +189,7 @@
 
 			@Override
 			public CacheDirective getCacheDirective() {
-				return null;
+				return cacheDirective;
 			}
 			
 		};



More information about the teiid-commits mailing list