[teiid-commits] teiid SVN: r4379 - in trunk/engine/src/main/java/org/teiid: common/buffer/impl and 1 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue Aug 28 16:02:39 EDT 2012


Author: shawkins
Date: 2012-08-28 16:02:39 -0400 (Tue, 28 Aug 2012)
New Revision: 4379

Modified:
   trunk/engine/src/main/java/org/teiid/common/buffer/AutoCleanupUtil.java
   trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java
   trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/SubqueryAwareEvaluator.java
Log:
TEIID-2171 lazily associating the cleanup and proactively removing also ensuring iterators are created for common iterations

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/AutoCleanupUtil.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/AutoCleanupUtil.java	2012-08-28 19:41:34 UTC (rev 4378)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/AutoCleanupUtil.java	2012-08-28 20:02:39 UTC (rev 4379)
@@ -55,10 +55,20 @@
 	private static ReferenceQueue<Object> QUEUE = new ReferenceQueue<Object>();
 	private static final Set<PhantomReference<Object>> REFERENCES = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<PhantomReference<Object>, Boolean>()));
 
-	public static void setCleanupReference(Object o, Removable r) {
-		REFERENCES.add(new PhantomCleanupReference(o, r));
+	public static PhantomReference<Object> setCleanupReference(Object o, Removable r) {
+		PhantomCleanupReference ref = new PhantomCleanupReference(o, r);
+		REFERENCES.add(ref);
 		doCleanup();
+		return ref;
 	}
+	
+	public static void removeCleanupReference(PhantomReference<Object> ref) {
+		if (ref == null) {
+			return;
+		}
+		REFERENCES.remove(ref);
+		ref.clear();
+	}
 
 	public static void doCleanup() {
 		for (int i = 0; i < 10; i++) {

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java	2012-08-28 19:41:34 UTC (rev 4378)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java	2012-08-28 20:02:39 UTC (rev 4379)
@@ -28,6 +28,7 @@
 import java.io.OutputStreamWriter;
 import java.io.Reader;
 import java.io.Writer;
+import java.lang.ref.PhantomReference;
 import java.nio.charset.Charset;
 
 import org.teiid.common.buffer.FileStore.FileStoreOutputStream;
@@ -39,11 +40,12 @@
 	private final FileStore lobBuffer;
 	private FileStoreOutputStream fsos;
 	private String encoding;
+	private PhantomReference<Object> cleanup;
 
 	public FileStoreInputStreamFactory(FileStore lobBuffer, String encoding) {
 		this.encoding = encoding;
 		this.lobBuffer = lobBuffer;
-		AutoCleanupUtil.setCleanupReference(this, lobBuffer);
+		cleanup = AutoCleanupUtil.setCleanupReference(this, lobBuffer);
 	}
 
 	@Override
@@ -100,6 +102,8 @@
 	public void free() {
 		fsos = null;
 		lobBuffer.remove();
+		AutoCleanupUtil.removeCleanupReference(cleanup);
+		cleanup = null;
 	}
 	
 	@Override

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java	2012-08-28 19:41:34 UTC (rev 4378)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java	2012-08-28 20:02:39 UTC (rev 4379)
@@ -189,7 +189,7 @@
 	public void persist() throws TeiidComponentException {
 		// stream the contents of lob into file store.
 		byte[] bytes = new byte[1 << 14]; 
-
+		AutoCleanupUtil.setCleanupReference(this, lobStore);
 		for (Map.Entry<String, LobHolder> entry : this.lobReferences.entrySet()) {
 			entry.getValue().lob = detachLob(entry.getValue().lob, lobStore, bytes);
 		}
@@ -294,5 +294,6 @@
 
 	public void remove() {
 		this.lobReferences.clear();
+		//we don't remove the filestore as there could be local connection references to the lob objects
 	}
 }

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2012-08-28 19:41:34 UTC (rev 4378)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2012-08-28 20:02:39 UTC (rev 4379)
@@ -127,15 +127,16 @@
 	 */
 	public void addTupleBatch(TupleBatch batch, boolean save) throws TeiidComponentException {
 		setRowCount(batch.getBeginRow() - 1); 
+		List<List<?>> tuples = batch.getTuples();
 		if (save) {
-			for (List<?> tuple : batch.getTuples()) {
-				addTuple(tuple);
+			for (int i = 0; i < batch.getRowCount(); i++) {
+				addTuple(tuples.get(i));
 			}
 		} else {
 			//add the lob references only, since they may still be referenced later
 			if (isLobs()) {
-				for (List<?> tuple : batch.getTuples()) {
-					lobManager.updateReferences(tuple, ReferenceMode.CREATE);
+				for (int i = 0; i < batch.getRowCount(); i++) {
+					lobManager.updateReferences(tuples.get(i), ReferenceMode.CREATE);
 				}
 			}
 		}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2012-08-28 19:41:34 UTC (rev 4378)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2012-08-28 20:02:39 UTC (rev 4379)
@@ -30,6 +30,7 @@
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.lang.ref.PhantomReference;
 import java.lang.ref.Reference;
 import java.lang.ref.ReferenceQueue;
 import java.lang.ref.SoftReference;
@@ -126,7 +127,22 @@
 			}
 		}
 	}
+	
+	private final class Remover implements Removable {
+		private Long id;
+		private AtomicBoolean prefersMemory;
+		
+		public Remover(Long id, AtomicBoolean prefersMemory) {
+			this.id = id;
+			this.prefersMemory = prefersMemory;
+		}
 
+		@Override
+		public void remove() {
+			removeCacheGroup(id, prefersMemory.get());
+		}
+	}
+
 	/**
 	 * This estimate is based upon adding the value to 2/3 maps and having CacheEntry/PhysicalInfo keys
 	 */
@@ -136,6 +152,7 @@
 		final Long id;
 		SizeUtility sizeUtility;
 		private WeakReference<BatchManagerImpl> ref = new WeakReference<BatchManagerImpl>(this);
+		private PhantomReference<Object> cleanup;
 		AtomicBoolean prefersMemory = new AtomicBoolean();
 		String[] types;
 		private LobManager lobManager;
@@ -147,7 +164,6 @@
 			for (int i = 0; i < types.length; i++) {
 				this.types[i] = DataTypeManager.getDataTypeName(types[i]);
 			}
-			cache.createCacheGroup(newID);
 		}
 		
 		@Override
@@ -189,6 +205,10 @@
 		public Long createManagedBatch(List<? extends List<?>> batch,
 				Long previous, boolean removeOld)
 				throws TeiidComponentException {
+			if (cleanup == null) {
+				cache.createCacheGroup(id);
+				cleanup = AutoCleanupUtil.setCleanupReference(this, new Remover(id, prefersMemory));
+			}
 			int sizeEstimate = getSizeEstimate(batch);
 			Long oid = batchAdded.getAndIncrement();
 			CacheEntry old = null;
@@ -216,9 +236,9 @@
 				throws IOException, ClassNotFoundException {
 			List<? extends List<?>> batch = BatchSerializer.readBatch(ois, types);
 			if (lobManager != null) {
-				for (List<?> list : batch) {
+				for (int i = batch.size() - 1; i >= 0; i--) {
 					try {
-						lobManager.updateReferences(list, ReferenceMode.ATTACH);
+						lobManager.updateReferences(batch.get(i), ReferenceMode.ATTACH);
 					} catch (TeiidComponentException e) {
 						 throw new TeiidRuntimeException(QueryPlugin.Event.TEIID30052, e);
 					}
@@ -297,7 +317,11 @@
 
 		@Override
 		public void remove() {
-			removeCacheGroup(id, prefersMemory.get());
+			if (cleanup != null) {
+				removeCacheGroup(id, prefersMemory.get());
+				AutoCleanupUtil.removeCleanupReference(cleanup);
+				cleanup = null;
+			}
 		}
 
 		@Override
@@ -468,7 +492,6 @@
 		if (lobIndexes != null) {
 			FileStore lobStore = createFileStore(newID + "_lobs"); //$NON-NLS-1$
 			lobManager = new LobManager(lobIndexes, lobStore);
-			AutoCleanupUtil.setCleanupReference(lobManager, lobStore);
 			batchManager.setLobManager(lobManager);
 		}
     	TupleBuffer tupleBuffer = new TupleBuffer(batchManager, String.valueOf(newID), elements, lobManager, getProcessorBatchSize(elements));
@@ -510,16 +533,7 @@
 	}
 
 	private BatchManagerImpl createBatchManager(final Long newID, Class<?>[] types) {
-		BatchManagerImpl bm = new BatchManagerImpl(newID, types);
-		final AtomicBoolean prefersMemory = bm.prefersMemory;
-    	AutoCleanupUtil.setCleanupReference(bm, new Removable() {
-			
-			@Override
-			public void remove() {
-				BufferManagerImpl.this.removeCacheGroup(newID, prefersMemory.get());
-			}
-		});
-		return bm;
+		return new BatchManagerImpl(newID, types);
 	}
 
     @Override
@@ -860,9 +874,11 @@
 		long overhead = vals.size() * BATCH_OVERHEAD;
 		maxReserveBytes.addAndGet(overhead);
 		reserveBatchBytes.addAndGet(overhead);
-		for (Long val : vals) {
-			//TODO: we will unnecessarily call remove on the cache, but that should be low cost
-			fastGet(val, prefersMemory, false);
+		if (!vals.isEmpty()) {
+			for (Long val : vals) {
+				//TODO: we will unnecessarily call remove on the cache, but that should be low cost
+				fastGet(val, prefersMemory, false);
+			}
 		}
 	}
 	
@@ -885,8 +901,8 @@
 	private int[] getSizeEstimates(List<? extends Expression> elements) {
 		int total = 0;
 		boolean isValueCacheEnabled = DataTypeManager.isValueCacheEnabled();
-		for (Expression element : elements) {
-			Class<?> type = element.getType();
+		for (int i = elements.size() - 1; i >= 0; i--) {
+			Class<?> type = elements.get(i).getType();
 			total += SizeUtility.getSize(isValueCacheEnabled, type);
 		}
 		//assume 64-bit
@@ -1027,9 +1043,9 @@
 		String[] types = (String[])in.readObject();
 		
 		List<ElementSymbol> schema = new ArrayList<ElementSymbol>(types.length);
-		for (String type : types) {
+		for (int i = 0; i < types.length; i++) {
 			ElementSymbol es = new ElementSymbol("x"); //$NON-NLS-1$
-			es.setType(DataTypeManager.getDataTypeClass(type));
+			es.setType(DataTypeManager.getDataTypeClass(types[i]));
 			schema.add(es);
 		}
 		TupleBuffer buffer = createTupleBuffer(schema, "cached", TupleSourceType.FINAL); //$NON-NLS-1$

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java	2012-08-28 19:41:34 UTC (rev 4378)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java	2012-08-28 20:02:39 UTC (rev 4379)
@@ -140,10 +140,9 @@
 	    }
 	    
 		public synchronized void removeDirect() {
-			for (FileStore info : storageFiles) {
-				info.remove();
+			for (int i = storageFiles.size() - 1; i >= 0; i--) {
+				this.storageFiles.remove(i).remove();
 			}
-			storageFiles.clear();
 		}
 		
 	}

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java	2012-08-28 19:41:34 UTC (rev 4378)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java	2012-08-28 20:02:39 UTC (rev 4379)
@@ -600,7 +600,7 @@
         return this.data.estimateNodeCardinality;
     }
 
-	private ProcessingState getProcessingState() {
+	private final ProcessingState getProcessingState() {
 		//construct lazily since not all tests call initialize
 		if (this.processingState == null) {
 			this.processingState = new ProcessingState();

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SubqueryAwareEvaluator.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/SubqueryAwareEvaluator.java	2012-08-28 19:41:34 UTC (rev 4378)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/SubqueryAwareEvaluator.java	2012-08-28 20:02:39 UTC (rev 4379)
@@ -82,10 +82,12 @@
 		
 		@Override
 		public void clear() {
-			for (TupleBuffer buffer : values()) {
-				buffer.remove();
+			if (!isEmpty()) {
+				for (TupleBuffer buffer : values()) {
+					buffer.remove();
+				}
+				super.clear();
 			}
-			super.clear();
 		}
 	}
 



More information about the teiid-commits mailing list