[teiid-commits] teiid SVN: r4296 - in trunk/engine/src: test/java/org/teiid/dqp/internal/process and 1 other directory.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Thu Aug 2 21:24:28 EDT 2012


Author: shawkins
Date: 2012-08-02 21:24:25 -0400 (Thu, 02 Aug 2012)
New Revision: 4296

Modified:
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachingTupleSource.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
Log:
TEIID-1598 fixing blocked exceptions preventing the cache entry from being created

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachingTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachingTupleSource.java	2012-08-02 23:10:40 UTC (rev 4295)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachingTupleSource.java	2012-08-03 01:24:25 UTC (rev 4296)
@@ -50,8 +50,7 @@
 	private final RegisterRequestParameter parameterObject;
 	private final CacheDirective cd;
 	private final Collection<GroupSymbol> accessedGroups;
-	private boolean cached = false;
-	private DataTierTupleSource dtts;
+	DataTierTupleSource dtts;
 
 	CachingTupleSource(DataTierManagerImpl dataTierManagerImpl, TupleBuffer tb, DataTierTupleSource ts, CacheID cid,
 			RegisterRequestParameter parameterObject, CacheDirective cd,
@@ -74,13 +73,12 @@
 		}
 		//TODO: the cache directive object needs synchronized for consistency
 		List<?> tuple = super.nextTuple();
-		if (tuple == null && !cached && !dtts.errored) {
+		if (tuple == null && !dtts.errored) {
 			synchronized (cd) {
 				if (dtts.scope == Scope.NONE) {
 					removeTupleBuffer();
 					return tuple;
 				}
-				cached = true;
 				CachedResults cr = new CachedResults();
 		        cr.setResults(tb, null);
 		        if (!Boolean.FALSE.equals(cd.getUpdatable())) {
@@ -108,6 +106,7 @@
 		    		}
 		    	}
 		        this.dataTierManagerImpl.requestMgr.getRsCache().put(cid, determinismLevel, cr, cd.getTtl()); 
+		        tb = null;
 			}
 		}
 		return tuple;
@@ -123,7 +122,7 @@
 	@Override
 	public void closeSource() {
 		try {
-			if (tb != null && !cached && !dtts.errored) {
+			if (tb != null && !dtts.errored) {
 				boolean readAll = true;
 				synchronized (cd) {
 					readAll = !Boolean.FALSE.equals(cd.getReadAll()); 
@@ -154,9 +153,7 @@
 				}
 			}
 		} finally {
-			if (!cached) {
-				removeTupleBuffer();
-			}
+			removeTupleBuffer();
 			ts.closeSource();
 		}
 	}

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2012-08-02 23:10:40 UTC (rev 4295)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2012-08-03 01:24:25 UTC (rev 4296)
@@ -262,7 +262,7 @@
     			}
     			boolean partial = false;
     			AtomicResultsMessage results = null;
-    			boolean dna = false;
+    			boolean noResults = false;
     			try {
 	    			if (futureResult != null || !aqr.isSerial()) {
 	    				results = asynchGet();
@@ -288,12 +288,15 @@
     				errored = true;
     				results = exceptionOccurred(e);
     				partial = true;
+    			} catch (BlockedException e) {
+    				noResults = true;
+    				throw e;
     			} catch (DataNotAvailableException e) {
-    				dna = true;
+    				noResults = true;
     				handleDataNotAvailable(e);
     				continue;
     			} finally {
-    				if (!dna && results == null) {
+    				if (!noResults && results == null) {
     					errored = true;
     				}
     			}

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java	2012-08-02 23:10:40 UTC (rev 4295)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java	2012-08-03 01:24:25 UTC (rev 4296)
@@ -67,6 +67,7 @@
 	
 	private AtomicInteger cacheHit = new AtomicInteger();
 	private AtomicInteger totalRequests = new AtomicInteger();
+	private AtomicInteger cachePuts = new AtomicInteger();
 	
 	private TupleBufferCache bufferManager;
 	
@@ -152,6 +153,10 @@
 		return this.totalRequests.get();
 	}
 	
+	public int getCachePutCount() {
+		return cachePuts.get();
+	}
+	
 	public int getTotalCacheEntries() {
 		if (this.localCache == this.distributedCache) {
 			return this.localCache.size();
@@ -160,6 +165,7 @@
 	}
 	
 	public void put(CacheID id, Determinism determinismLevel, T t, Long ttl){
+		cachePuts.incrementAndGet();
 		if (determinismLevel.compareTo(Determinism.SESSION_DETERMINISTIC) <= 0) {
 			id.setSessionId(id.originalSessionId);
 			LogManager.logTrace(LogConstants.CTX_DQP, "Adding to session/local cache", id); //$NON-NLS-1$
@@ -197,8 +203,9 @@
 	public void clearAll(){
 		this.localCache.clear();
 		this.distributedCache.clear();
-		this.totalRequests = new AtomicInteger();
-		this.cacheHit = new AtomicInteger();
+		this.totalRequests.set(0);
+		this.cacheHit.set(0);
+		this.cachePuts.set(0);
 	}	
 	
 	public void clearForVDB(String vdbName, int version) {

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java	2012-08-02 23:10:40 UTC (rev 4295)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java	2012-08-03 01:24:25 UTC (rev 4296)
@@ -304,7 +304,10 @@
     	assertEquals(10, pullTuples(ts, -1));
     	assertEquals(1, connectorManager.getExecuteCount().get());
     	assertFalse(rrp.doNotCache);
-    	
+    	assertFalse(((CachingTupleSource)ts).dtts.errored);
+    	assertNull(((CachingTupleSource)ts).dtts.scope);
+    	ts.closeSource();
+    	assertEquals(1, this.rm.getRsCache().getCachePutCount());
     	assertEquals(1, this.rm.getRsCache().getTotalCacheEntries());
     	
     	//same session, should be cached
@@ -324,9 +327,15 @@
     	rrp.connectorBindingId = "x";
     	ts = dtm.registerRequest(context, command, "foo", rrp);
     	assertTrue(ts instanceof CachingTupleSource);
-    	assertEquals(10, pullTuples(ts, -1));
+    	assertEquals(9, pullTuples(ts, 9));
     	assertEquals(2, connectorManager.getExecuteCount().get());
     	assertFalse(rrp.doNotCache);
+    	ts.closeSource(); //should force read all
+    	assertFalse(((CachingTupleSource)ts).dtts.errored);
+    	assertNull(((CachingTupleSource)ts).dtts.scope);
+    	
+    	assertEquals(2, this.rm.getRsCache().getCachePutCount());
+    	assertEquals(2, this.rm.getRsCache().getTotalCacheEntries());
     }
     
 }



More information about the teiid-commits mailing list