Author: shawkins
Date: 2012-08-02 21:24:25 -0400 (Thu, 02 Aug 2012)
New Revision: 4296
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachingTupleSource.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
Log:
TEIID-1598 fixing blocked exceptions preventing the cache entry from being created
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachingTupleSource.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachingTupleSource.java 2012-08-02
23:10:40 UTC (rev 4295)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachingTupleSource.java 2012-08-03
01:24:25 UTC (rev 4296)
@@ -50,8 +50,7 @@
private final RegisterRequestParameter parameterObject;
private final CacheDirective cd;
private final Collection<GroupSymbol> accessedGroups;
- private boolean cached = false;
- private DataTierTupleSource dtts;
+ DataTierTupleSource dtts;
CachingTupleSource(DataTierManagerImpl dataTierManagerImpl, TupleBuffer tb,
DataTierTupleSource ts, CacheID cid,
RegisterRequestParameter parameterObject, CacheDirective cd,
@@ -74,13 +73,12 @@
}
//TODO: the cache directive object needs synchronized for consistency
List<?> tuple = super.nextTuple();
- if (tuple == null && !cached && !dtts.errored) {
+ if (tuple == null && !dtts.errored) {
synchronized (cd) {
if (dtts.scope == Scope.NONE) {
removeTupleBuffer();
return tuple;
}
- cached = true;
CachedResults cr = new CachedResults();
cr.setResults(tb, null);
if (!Boolean.FALSE.equals(cd.getUpdatable())) {
@@ -108,6 +106,7 @@
}
}
this.dataTierManagerImpl.requestMgr.getRsCache().put(cid, determinismLevel, cr,
cd.getTtl());
+ tb = null;
}
}
return tuple;
@@ -123,7 +122,7 @@
@Override
public void closeSource() {
try {
- if (tb != null && !cached && !dtts.errored) {
+ if (tb != null && !dtts.errored) {
boolean readAll = true;
synchronized (cd) {
readAll = !Boolean.FALSE.equals(cd.getReadAll());
@@ -154,9 +153,7 @@
}
}
} finally {
- if (!cached) {
- removeTupleBuffer();
- }
+ removeTupleBuffer();
ts.closeSource();
}
}
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2012-08-02
23:10:40 UTC (rev 4295)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2012-08-03
01:24:25 UTC (rev 4296)
@@ -262,7 +262,7 @@
}
boolean partial = false;
AtomicResultsMessage results = null;
- boolean dna = false;
+ boolean noResults = false;
try {
if (futureResult != null || !aqr.isSerial()) {
results = asynchGet();
@@ -288,12 +288,15 @@
errored = true;
results = exceptionOccurred(e);
partial = true;
+ } catch (BlockedException e) {
+ noResults = true;
+ throw e;
} catch (DataNotAvailableException e) {
- dna = true;
+ noResults = true;
handleDataNotAvailable(e);
continue;
} finally {
- if (!dna && results == null) {
+ if (!noResults && results == null) {
errored = true;
}
}
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2012-08-02
23:10:40 UTC (rev 4295)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2012-08-03
01:24:25 UTC (rev 4296)
@@ -67,6 +67,7 @@
private AtomicInteger cacheHit = new AtomicInteger();
private AtomicInteger totalRequests = new AtomicInteger();
+ private AtomicInteger cachePuts = new AtomicInteger();
private TupleBufferCache bufferManager;
@@ -152,6 +153,10 @@
return this.totalRequests.get();
}
+ public int getCachePutCount() {
+ return cachePuts.get();
+ }
+
public int getTotalCacheEntries() {
if (this.localCache == this.distributedCache) {
return this.localCache.size();
@@ -160,6 +165,7 @@
}
public void put(CacheID id, Determinism determinismLevel, T t, Long ttl){
+ cachePuts.incrementAndGet();
if (determinismLevel.compareTo(Determinism.SESSION_DETERMINISTIC) <= 0) {
id.setSessionId(id.originalSessionId);
LogManager.logTrace(LogConstants.CTX_DQP, "Adding to session/local cache",
id); //$NON-NLS-1$
@@ -197,8 +203,9 @@
public void clearAll(){
this.localCache.clear();
this.distributedCache.clear();
- this.totalRequests = new AtomicInteger();
- this.cacheHit = new AtomicInteger();
+ this.totalRequests.set(0);
+ this.cacheHit.set(0);
+ this.cachePuts.set(0);
}
public void clearForVDB(String vdbName, int version) {
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java 2012-08-02
23:10:40 UTC (rev 4295)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java 2012-08-03
01:24:25 UTC (rev 4296)
@@ -304,7 +304,10 @@
assertEquals(10, pullTuples(ts, -1));
assertEquals(1, connectorManager.getExecuteCount().get());
assertFalse(rrp.doNotCache);
-
+ assertFalse(((CachingTupleSource)ts).dtts.errored);
+ assertNull(((CachingTupleSource)ts).dtts.scope);
+ ts.closeSource();
+ assertEquals(1, this.rm.getRsCache().getCachePutCount());
assertEquals(1, this.rm.getRsCache().getTotalCacheEntries());
//same session, should be cached
@@ -324,9 +327,15 @@
rrp.connectorBindingId = "x";
ts = dtm.registerRequest(context, command, "foo", rrp);
assertTrue(ts instanceof CachingTupleSource);
- assertEquals(10, pullTuples(ts, -1));
+ assertEquals(9, pullTuples(ts, 9));
assertEquals(2, connectorManager.getExecuteCount().get());
assertFalse(rrp.doNotCache);
+ ts.closeSource(); //should force read all
+ assertFalse(((CachingTupleSource)ts).dtts.errored);
+ assertNull(((CachingTupleSource)ts).dtts.scope);
+
+ assertEquals(2, this.rm.getRsCache().getCachePutCount());
+ assertEquals(2, this.rm.getRsCache().getTotalCacheEntries());
}
}