Author: jolee
Date: 2013-05-14 12:35:36 -0400 (Tue, 14 May 2013)
New Revision: 4566
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/Cache.java
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
branches/7.7.x/engine/src/main/resources/org/teiid/query/i18n.properties
branches/7.7.x/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java
branches/7.7.x/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
branches/7.7.x/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
Log:
TEIID-2460: Weird behavior when Max buffer space restriction is hit
Modified: branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/Cache.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/Cache.java 2013-05-14
15:12:39 UTC (rev 4565)
+++ branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/Cache.java 2013-05-14
16:35:36 UTC (rev 4566)
@@ -51,8 +51,9 @@
* Must be called prior to adding an entry
* @param gid
* @param oid
+ * @return if the add was successful
*/
- void addToCacheGroup(Long gid, Long oid);
+ boolean addToCacheGroup(Long gid, Long oid);
/**
* Lock the object for load and return an identifier/lock
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2013-05-14
15:12:39 UTC (rev 4565)
+++
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2013-05-14
16:35:36 UTC (rev 4566)
@@ -495,6 +495,7 @@
private AtomicLong storageReads = new AtomicLong();
private long minDefrag = DEFAULT_MIN_DEFRAG;
+ private BufferManagerImpl bufferManager;
@Override
public void initialize() throws TeiidComponentException {
@@ -815,12 +816,13 @@
}
@Override
- public void addToCacheGroup(Long gid, Long oid) {
+ public boolean addToCacheGroup(Long gid, Long oid) {
Map<Long, PhysicalInfo> map = physicalMapping.get(gid);
if (map == null) {
- return;
+ return false;
}
map.put(oid, null);
+ return true;
}
@Override
@@ -905,7 +907,11 @@
block = blockStore.writeToStorageBlock(info, is);
}
} catch (IOException e) {
- LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error transferring block to
storage " + oid); //$NON-NLS-1$
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
+ LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error transferring block to
storage " + oid + " " + info.gid); //$NON-NLS-1$ //$NON-NLS-2$
+ } else {
+ LogManager.logError(LogConstants.CTX_BUFFER_MGR, "Error transferring block to
storage " + oid + " " + info.gid + " " + e.getMessage());
//$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+ }
} finally {
//ensure post conditions
synchronized (info) {
@@ -950,6 +956,10 @@
freedLock.unlock();
}
}
+ if (block == EMPTY_ADDRESS && demote && this.bufferManager != null)
{
+ //failed to demote
+ this.bufferManager.invalidCacheGroup(info.gid);
+ }
}
}
return result;
@@ -1083,5 +1093,9 @@
public int getMaxMemoryBlocks() {
return maxMemoryBlocks;
}
+
+ public void setBufferManager(BufferManagerImpl bufferManager) {
+ this.bufferManager = bufferManager;
+ }
}
\ No newline at end of file
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2013-05-14
15:12:39 UTC (rev 4565)
+++
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2013-05-14
16:35:36 UTC (rev 4566)
@@ -54,12 +54,13 @@
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.types.DataTypeManager.WeakReferenceHashedValueCache;
import org.teiid.core.types.Streamable;
-import org.teiid.core.types.DataTypeManager.WeakReferenceHashedValueCache;
import org.teiid.dqp.internal.process.DQPConfiguration;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.logging.MessageLevel;
+import org.teiid.query.QueryPlugin;
import org.teiid.query.processor.relational.ListNestedSortComparator;
import org.teiid.query.sql.symbol.Expression;
@@ -205,7 +206,10 @@
}
maxReserveBytes.addAndGet(-BATCH_OVERHEAD);
reserveBatchBytes.addAndGet(-BATCH_OVERHEAD);
- cache.addToCacheGroup(id, ce.getId());
+ if (!cache.addToCacheGroup(id, ce.getId())) {
+ this.remove();
+ throw new TeiidComponentException(QueryPlugin.Util.getString("TEIID31138",
id));
+ }
addMemoryEntry(ce, true);
return oid;
}
@@ -769,7 +773,7 @@
/**
* Get a CacheEntry without hitting storage
*/
- CacheEntry fastGet(Long batch, boolean prefersMemory, boolean retain) {
+ CacheEntry fastGet(Long batch, Boolean prefersMemory, boolean retain) {
CacheEntry ce = null;
if (retain) {
ce = memoryEntries.get(batch);
@@ -793,7 +797,7 @@
}
return ce;
}
- if (prefersMemory) {
+ if (prefersMemory == null || prefersMemory) {
BatchSoftReference bsr = softCache.remove(batch);
if (bsr != null) {
ce = bsr.get();
@@ -801,7 +805,8 @@
clearSoftReference(bsr);
}
}
- } else if (useWeakReferences) {
+ }
+ if (ce == null && (prefersMemory == null || !prefersMemory) &&
useWeakReferences) {
ce = weakReferenceCache.getByHash(batch);
if (ce == null || !ce.getId().equals(batch)) {
return null;
@@ -861,7 +866,7 @@
activeBatchBytes.getAndAdd(ce.getSizeEstimate());
}
- void removeCacheGroup(Long id, boolean prefersMemory) {
+ void removeCacheGroup(Long id, Boolean prefersMemory) {
cleanSoftReferences();
Collection<Long> vals = cache.removeCacheGroup(id);
long overhead = vals.size() * BATCH_OVERHEAD;
@@ -993,5 +998,9 @@
byte[] bytes) throws TeiidComponentException {
return LobManager.persistLob(lob, store, bytes, inlineLobs,
DataTypeManager.MAX_LOB_MEMORY_BYTES);
}
+
+ public void invalidCacheGroup(Long gid) {
+ removeCacheGroup(gid, null);
+ }
}
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java 2013-05-14
15:12:39 UTC (rev 4565)
+++
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java 2013-05-14
16:35:36 UTC (rev 4566)
@@ -121,11 +121,13 @@
}
@Override
- public void addToCacheGroup(Long gid, Long oid) {
+ public boolean addToCacheGroup(Long gid, Long oid) {
Map<Long, CacheEntry> group = groups.get(gid);
if (group != null) {
group.put(oid, null);
+ return true;
}
+ return false;
}
@Override
Modified: branches/7.7.x/engine/src/main/resources/org/teiid/query/i18n.properties
===================================================================
--- branches/7.7.x/engine/src/main/resources/org/teiid/query/i18n.properties 2013-05-14
15:12:39 UTC (rev 4565)
+++ branches/7.7.x/engine/src/main/resources/org/teiid/query/i18n.properties 2013-05-14
16:35:36 UTC (rev 4566)
@@ -952,4 +952,5 @@
invalid_schema=Invalid schema from translator metadata expected {0}, but the returned
MetadataStore contained no such schema or more than 1 schema.
invalid_table=Invalid table {0}. A table must have 1 or more columns.
-query_timeout=Cancelling query {0} since it has exceeded the timeout of {1}
milliseconds.
\ No newline at end of file
+query_timeout=Cancelling query {0} since it has exceeded the timeout of {1}
milliseconds.
+TEIID31138=Cannot add batch to invalidated cache group "{1}". Check prior logs
to see if there was an error persisting a batch.
\ No newline at end of file
Modified:
branches/7.7.x/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java
===================================================================
---
branches/7.7.x/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java 2013-05-14
15:12:39 UTC (rev 4565)
+++
branches/7.7.x/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java 2013-05-14
16:35:36 UTC (rev 4566)
@@ -86,6 +86,7 @@
SplittableStorageManager ssm = new SplittableStorageManager(storageManager);
ssm.setMaxFileSizeDirect(MemoryStorageManager.MAX_FILE_SIZE);
BufferFrontedFileStoreCache fsc = new BufferFrontedFileStoreCache();
+ fsc.setBufferManager(bufferManager);
//use conservative allocations
fsc.setDirect(false); //allow the space to be GCed easily
fsc.setMaxStorageObjectSize(1<<20);
Modified:
branches/7.7.x/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
===================================================================
---
branches/7.7.x/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2013-05-14
15:12:39 UTC (rev 4565)
+++
branches/7.7.x/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2013-05-14
16:35:36 UTC (rev 4566)
@@ -30,8 +30,11 @@
import java.lang.ref.WeakReference;
import org.junit.Test;
+import org.mockito.Mockito;
import org.teiid.common.buffer.CacheEntry;
+import org.teiid.common.buffer.FileStore;
import org.teiid.common.buffer.Serializer;
+import org.teiid.common.buffer.StorageManager;
import org.teiid.core.TeiidComponentException;
public class TestBufferFrontedFileStoreCache {
@@ -68,7 +71,7 @@
}
@Test public void testAddGetMultiBlock() throws Exception {
- BufferFrontedFileStoreCache cache = createLayeredCache(1 << 26, 1 << 26);
+ BufferFrontedFileStoreCache cache = createLayeredCache(1 << 26, 1 << 26,
true);
CacheEntry ce = new CacheEntry(2l);
Serializer<Integer> s = new SimpleSerializer();
@@ -147,7 +150,7 @@
}
@Test public void testEviction() throws Exception {
- BufferFrontedFileStoreCache cache = createLayeredCache(1<<15, 1<<15);
+ BufferFrontedFileStoreCache cache = createLayeredCache(1<<15, 1<<15,
true);
assertEquals(3, cache.getMaxMemoryBlocks());
CacheEntry ce = new CacheEntry(2l);
@@ -176,15 +179,77 @@
ce = get(cache, 3l, s);
assertEquals(Integer.valueOf(5001), ce.getObject());
}
+
+ @Test public void testEvictionFails() throws Exception {
+ BufferFrontedFileStoreCache cache = createLayeredCache(1<<15, 1<<15,
false);
+ BufferManagerImpl bmi = Mockito.mock(BufferManagerImpl.class);
+ cache.setBufferManager(bmi);
+ Serializer<Integer> s = new SimpleSerializer();
+ WeakReference<? extends Serializer<?>> ref = new
WeakReference<Serializer<?>>(s);
+ cache.createCacheGroup(s.getId());
+
+ for (int i = 0; i < 3; i++) {
+ add(cache, s, ref, i);
+ }
+ Mockito.verify(bmi, Mockito.atLeastOnce()).invalidCacheGroup(Long.valueOf(1));
+ }
- private static BufferFrontedFileStoreCache createLayeredCache(int bufferSpace, int
objectSize) throws TeiidComponentException {
+ private void add(BufferFrontedFileStoreCache cache, Serializer<Integer> s,
+ WeakReference<? extends Serializer<?>> ref, int i) {
+ CacheEntry ce = new CacheEntry(Long.valueOf(i));
+ ce.setSerializer(ref);
+ Integer cacheObject = Integer.valueOf(5000 + i);
+ ce.setObject(cacheObject);
+ cache.addToCacheGroup(s.getId(), ce.getId());
+ cache.add(ce, s);
+ }
+
+ private static BufferFrontedFileStoreCache createLayeredCache(int bufferSpace, int
objectSize, boolean memStorage) throws TeiidComponentException {
BufferFrontedFileStoreCache fsc = new BufferFrontedFileStoreCache();
fsc.setMemoryBufferSpace(bufferSpace);
fsc.setMaxStorageObjectSize(objectSize);
fsc.setDirect(false);
- SplittableStorageManager ssm = new SplittableStorageManager(new
MemoryStorageManager());
- ssm.setMaxFileSizeDirect(MemoryStorageManager.MAX_FILE_SIZE);
- fsc.setStorageManager(ssm);
+ if (memStorage) {
+ SplittableStorageManager ssm = new SplittableStorageManager(new
MemoryStorageManager());
+ ssm.setMaxFileSizeDirect(MemoryStorageManager.MAX_FILE_SIZE);
+ fsc.setStorageManager(ssm);
+ } else {
+ StorageManager sm = new StorageManager() {
+
+ @Override
+ public void initialize() throws TeiidComponentException {
+
+ }
+
+ @Override
+ public FileStore createFileStore(String name) {
+ return new FileStore() {
+
+ @Override
+ public void setLength(long length) throws IOException {
+ throw new IOException();
+ }
+
+ @Override
+ protected void removeDirect() {
+
+ }
+
+ @Override
+ protected int readWrite(long fileOffset, byte[] b, int offSet, int length,
+ boolean write) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public long getLength() {
+ return 0;
+ }
+ };
+ }
+ };
+ fsc.setStorageManager(sm);
+ }
fsc.initialize();
return fsc;
}
Modified: branches/7.7.x/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
===================================================================
---
branches/7.7.x/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java 2013-05-14
15:12:39 UTC (rev 4565)
+++
branches/7.7.x/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java 2013-05-14
16:35:36 UTC (rev 4566)
@@ -110,6 +110,7 @@
SplittableStorageManager ssm = new SplittableStorageManager(fsm);
ssm.setMaxFileSize(maxFileSize);
BufferFrontedFileStoreCache fsc = new BufferFrontedFileStoreCache();
+ fsc.setBufferManager(this.bufferMgr);
fsc.setMaxStorageObjectSize(maxStorageObjectSize);
fsc.setDirect(memoryBufferOffHeap);
int batchOverheadKB =
(int)(this.memoryBufferSpace<0?(this.bufferMgr.getMaxReserveKB()<<8):this.memoryBufferSpace)>>20;