[teiid-commits] teiid SVN: r4566 - in branches/7.7.x: engine/src/main/java/org/teiid/common/buffer/impl and 4 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue May 14 12:35:36 EDT 2013


Author: jolee
Date: 2013-05-14 12:35:36 -0400 (Tue, 14 May 2013)
New Revision: 4566

Modified:
   branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/Cache.java
   branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
   branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
   branches/7.7.x/engine/src/main/resources/org/teiid/query/i18n.properties
   branches/7.7.x/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java
   branches/7.7.x/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
   branches/7.7.x/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
Log:
TEIID-2460: Weird behavior when Max buffer space restriction is hit

Modified: branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/Cache.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/Cache.java	2013-05-14 15:12:39 UTC (rev 4565)
+++ branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/Cache.java	2013-05-14 16:35:36 UTC (rev 4566)
@@ -51,8 +51,9 @@
 	 * Must be called prior to adding an entry
 	 * @param gid
 	 * @param oid
+	 * @return if the add was successful
 	 */
-	void addToCacheGroup(Long gid, Long oid); 
+	boolean addToCacheGroup(Long gid, Long oid); 
 
 	/**
 	 * Lock the object for load and return an identifier/lock

Modified: branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2013-05-14 15:12:39 UTC (rev 4565)
+++ branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2013-05-14 16:35:36 UTC (rev 4566)
@@ -495,6 +495,7 @@
 	private AtomicLong storageReads = new AtomicLong();
 	
 	private long minDefrag = DEFAULT_MIN_DEFRAG;
+	private BufferManagerImpl bufferManager;
 	
 	@Override
 	public void initialize() throws TeiidComponentException {
@@ -815,12 +816,13 @@
 	}
 	
 	@Override
-	public void addToCacheGroup(Long gid, Long oid) {
+	public boolean addToCacheGroup(Long gid, Long oid) {
 		Map<Long, PhysicalInfo> map = physicalMapping.get(gid);
 		if (map == null) {
-			return;
+			return false;
 		}
 		map.put(oid, null);
+		return true;
 	}
 	
 	@Override
@@ -905,7 +907,11 @@
 				block = blockStore.writeToStorageBlock(info, is);
 			}
 		} catch (IOException e) {
-			LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error transferring block to storage " + oid); //$NON-NLS-1$
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+				LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error transferring block to storage " + oid + " " + info.gid); //$NON-NLS-1$ //$NON-NLS-2$
+			} else {
+				LogManager.logError(LogConstants.CTX_BUFFER_MGR, "Error transferring block to storage " + oid + " " + info.gid + " " + e.getMessage()); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+			}
 		} finally {
 			//ensure post conditions
 			synchronized (info) {
@@ -950,6 +956,10 @@
 						freedLock.unlock();
 					}
 				}
+				if (block == EMPTY_ADDRESS && demote && this.bufferManager != null) {
+					//failed to demote
+					this.bufferManager.invalidCacheGroup(info.gid);
+				}
 			}
 		}
 		return result;
@@ -1083,5 +1093,9 @@
 	public int getMaxMemoryBlocks() {
 		return maxMemoryBlocks;
 	}
+	
+	public void setBufferManager(BufferManagerImpl bufferManager) {
+		this.bufferManager = bufferManager;
+	}
 
 }
\ No newline at end of file

Modified: branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2013-05-14 15:12:39 UTC (rev 4565)
+++ branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2013-05-14 16:35:36 UTC (rev 4566)
@@ -54,12 +54,13 @@
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidRuntimeException;
 import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.types.DataTypeManager.WeakReferenceHashedValueCache;
 import org.teiid.core.types.Streamable;
-import org.teiid.core.types.DataTypeManager.WeakReferenceHashedValueCache;
 import org.teiid.dqp.internal.process.DQPConfiguration;
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
 import org.teiid.logging.MessageLevel;
+import org.teiid.query.QueryPlugin;
 import org.teiid.query.processor.relational.ListNestedSortComparator;
 import org.teiid.query.sql.symbol.Expression;
 
@@ -205,7 +206,10 @@
 			}
 			maxReserveBytes.addAndGet(-BATCH_OVERHEAD);
 			reserveBatchBytes.addAndGet(-BATCH_OVERHEAD);
-			cache.addToCacheGroup(id, ce.getId());
+			if (!cache.addToCacheGroup(id, ce.getId())) {
+				this.remove();
+				throw new TeiidComponentException(QueryPlugin.Util.getString("TEIID31138", id));
+			}
 			addMemoryEntry(ce, true);
 			return oid;
 		}
@@ -769,7 +773,7 @@
 	/**
 	 * Get a CacheEntry without hitting storage
 	 */
-	CacheEntry fastGet(Long batch, boolean prefersMemory, boolean retain) {
+	CacheEntry fastGet(Long batch, Boolean prefersMemory, boolean retain) {
 		CacheEntry ce = null;
 		if (retain) {
 			ce = memoryEntries.get(batch);
@@ -793,7 +797,7 @@
 			}
 			return ce;
 		}
-		if (prefersMemory) {
+		if (prefersMemory == null || prefersMemory) {
 			BatchSoftReference bsr = softCache.remove(batch);
 			if (bsr != null) {
 				ce = bsr.get();
@@ -801,7 +805,8 @@
 					clearSoftReference(bsr);
 				}
 			}
-		} else if (useWeakReferences) {
+		}
+		if (ce == null && (prefersMemory == null || !prefersMemory) && useWeakReferences) {
 			ce = weakReferenceCache.getByHash(batch);
 			if (ce == null || !ce.getId().equals(batch)) {
 				return null;
@@ -861,7 +866,7 @@
 		activeBatchBytes.getAndAdd(ce.getSizeEstimate());
 	}
 	
-	void removeCacheGroup(Long id, boolean prefersMemory) {
+	void removeCacheGroup(Long id, Boolean prefersMemory) {
 		cleanSoftReferences();
 		Collection<Long> vals = cache.removeCacheGroup(id);
 		long overhead = vals.size() * BATCH_OVERHEAD;
@@ -993,5 +998,9 @@
 			byte[] bytes) throws TeiidComponentException {
 		return LobManager.persistLob(lob, store, bytes, inlineLobs, DataTypeManager.MAX_LOB_MEMORY_BYTES);
 	}
+
+	public void invalidCacheGroup(Long gid) {
+		removeCacheGroup(gid, null);
+	}
 	
 }

Modified: branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java	2013-05-14 15:12:39 UTC (rev 4565)
+++ branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java	2013-05-14 16:35:36 UTC (rev 4566)
@@ -121,11 +121,13 @@
 	}
 	
 	@Override
-	public void addToCacheGroup(Long gid, Long oid) {
+	public boolean addToCacheGroup(Long gid, Long oid) {
 		Map<Long, CacheEntry> group = groups.get(gid);
 		if (group != null) {
 			group.put(oid, null);
+			return true;
 		}
+		return false;
 	}
 	
 	@Override

Modified: branches/7.7.x/engine/src/main/resources/org/teiid/query/i18n.properties
===================================================================
--- branches/7.7.x/engine/src/main/resources/org/teiid/query/i18n.properties	2013-05-14 15:12:39 UTC (rev 4565)
+++ branches/7.7.x/engine/src/main/resources/org/teiid/query/i18n.properties	2013-05-14 16:35:36 UTC (rev 4566)
@@ -952,4 +952,5 @@
 invalid_schema=Invalid schema from translator metadata expected {0}, but the returned MetadataStore contained no such schema or more than 1 schema.
 invalid_table=Invalid table {0}.  A table must have 1 or more columns. 
 
-query_timeout=Cancelling query {0} since it has exceeded the timeout of {1} milliseconds.
\ No newline at end of file
+query_timeout=Cancelling query {0} since it has exceeded the timeout of {1} milliseconds.
+TEIID31138=Cannot add batch to invalidated cache group "{1}".  Check prior logs to see if there was an error persisting a batch.
\ No newline at end of file

Modified: branches/7.7.x/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java	2013-05-14 15:12:39 UTC (rev 4565)
+++ branches/7.7.x/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java	2013-05-14 16:35:36 UTC (rev 4566)
@@ -86,6 +86,7 @@
 			SplittableStorageManager ssm = new SplittableStorageManager(storageManager);
 			ssm.setMaxFileSizeDirect(MemoryStorageManager.MAX_FILE_SIZE);
 			BufferFrontedFileStoreCache fsc = new BufferFrontedFileStoreCache();
+			fsc.setBufferManager(bufferManager);
 			//use conservative allocations
 			fsc.setDirect(false); //allow the space to be GCed easily
 			fsc.setMaxStorageObjectSize(1<<20);

Modified: branches/7.7.x/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java	2013-05-14 15:12:39 UTC (rev 4565)
+++ branches/7.7.x/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java	2013-05-14 16:35:36 UTC (rev 4566)
@@ -30,8 +30,11 @@
 import java.lang.ref.WeakReference;
 
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.teiid.common.buffer.CacheEntry;
+import org.teiid.common.buffer.FileStore;
 import org.teiid.common.buffer.Serializer;
+import org.teiid.common.buffer.StorageManager;
 import org.teiid.core.TeiidComponentException;
 
 public class TestBufferFrontedFileStoreCache {
@@ -68,7 +71,7 @@
 	}
 
 	@Test public void testAddGetMultiBlock() throws Exception {
-		BufferFrontedFileStoreCache cache = createLayeredCache(1 << 26, 1 << 26);
+		BufferFrontedFileStoreCache cache = createLayeredCache(1 << 26, 1 << 26, true);
 		
 		CacheEntry ce = new CacheEntry(2l);
 		Serializer<Integer> s = new SimpleSerializer();
@@ -147,7 +150,7 @@
 	}
 	
 	@Test public void testEviction() throws Exception {
-		BufferFrontedFileStoreCache cache = createLayeredCache(1<<15, 1<<15);
+		BufferFrontedFileStoreCache cache = createLayeredCache(1<<15, 1<<15, true);
 		assertEquals(3, cache.getMaxMemoryBlocks());
 		
 		CacheEntry ce = new CacheEntry(2l);
@@ -176,15 +179,77 @@
 		ce = get(cache, 3l, s);
 		assertEquals(Integer.valueOf(5001), ce.getObject());
 	}
+	
+	@Test public void testEvictionFails() throws Exception {
+		BufferFrontedFileStoreCache cache = createLayeredCache(1<<15, 1<<15, false);
+		BufferManagerImpl bmi = Mockito.mock(BufferManagerImpl.class);
+		cache.setBufferManager(bmi);
+		Serializer<Integer> s = new SimpleSerializer();
+		WeakReference<? extends Serializer<?>> ref = new WeakReference<Serializer<?>>(s);
+		cache.createCacheGroup(s.getId());
+		
+		for (int i = 0; i < 3; i++) {
+			add(cache, s, ref, i);
+		}
+		Mockito.verify(bmi, Mockito.atLeastOnce()).invalidCacheGroup(Long.valueOf(1));
+	}
 
-	private static BufferFrontedFileStoreCache createLayeredCache(int bufferSpace, int objectSize) throws TeiidComponentException {
+	private void add(BufferFrontedFileStoreCache cache, Serializer<Integer> s,
+			WeakReference<? extends Serializer<?>> ref, int i) {
+		CacheEntry ce = new CacheEntry(Long.valueOf(i));
+		ce.setSerializer(ref);
+		Integer cacheObject = Integer.valueOf(5000 + i);
+		ce.setObject(cacheObject);
+		cache.addToCacheGroup(s.getId(), ce.getId());
+		cache.add(ce, s);
+	}
+
+	private static BufferFrontedFileStoreCache createLayeredCache(int bufferSpace, int objectSize, boolean memStorage) throws TeiidComponentException {
 		BufferFrontedFileStoreCache fsc = new BufferFrontedFileStoreCache();
 		fsc.setMemoryBufferSpace(bufferSpace);
 		fsc.setMaxStorageObjectSize(objectSize);
 		fsc.setDirect(false);
-		SplittableStorageManager ssm = new SplittableStorageManager(new MemoryStorageManager());
-		ssm.setMaxFileSizeDirect(MemoryStorageManager.MAX_FILE_SIZE);
-		fsc.setStorageManager(ssm);
+		if (memStorage) {
+			SplittableStorageManager ssm = new SplittableStorageManager(new MemoryStorageManager());
+			ssm.setMaxFileSizeDirect(MemoryStorageManager.MAX_FILE_SIZE);
+			fsc.setStorageManager(ssm);
+		} else {
+			StorageManager sm = new StorageManager() {
+				
+				@Override
+				public void initialize() throws TeiidComponentException {
+					
+				}
+				
+				@Override
+				public FileStore createFileStore(String name) {
+					return new FileStore() {
+						
+						@Override
+						public void setLength(long length) throws IOException {
+							throw new IOException();
+						}
+						
+						@Override
+						protected void removeDirect() {
+							
+						}
+						
+						@Override
+						protected int readWrite(long fileOffset, byte[] b, int offSet, int length,
+								boolean write) throws IOException {
+							return 0;
+						}
+						
+						@Override
+						public long getLength() {
+							return 0;
+						}
+					};
+				}
+			};
+			fsc.setStorageManager(sm);
+		}
 		fsc.initialize();
 		return fsc;
 	}

Modified: branches/7.7.x/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
===================================================================
--- branches/7.7.x/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java	2013-05-14 15:12:39 UTC (rev 4565)
+++ branches/7.7.x/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java	2013-05-14 16:35:36 UTC (rev 4566)
@@ -110,6 +110,7 @@
                 SplittableStorageManager ssm = new SplittableStorageManager(fsm);
                 ssm.setMaxFileSize(maxFileSize);
                 BufferFrontedFileStoreCache fsc = new BufferFrontedFileStoreCache();
+                fsc.setBufferManager(this.bufferMgr);
                 fsc.setMaxStorageObjectSize(maxStorageObjectSize);
                 fsc.setDirect(memoryBufferOffHeap);
                 int batchOverheadKB = (int)(this.memoryBufferSpace<0?(this.bufferMgr.getMaxReserveKB()<<8):this.memoryBufferSpace)>>20;



More information about the teiid-commits mailing list