[teiid-commits] teiid SVN: r4379 - in trunk/engine/src/main/java/org/teiid: common/buffer/impl and 1 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Tue Aug 28 16:02:39 EDT 2012
Author: shawkins
Date: 2012-08-28 16:02:39 -0400 (Tue, 28 Aug 2012)
New Revision: 4379
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/AutoCleanupUtil.java
trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java
trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/SubqueryAwareEvaluator.java
Log:
TEIID-2171 lazily associating the cleanup and proactively removing also ensuring iterators are created for common iterations
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/AutoCleanupUtil.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/AutoCleanupUtil.java 2012-08-28 19:41:34 UTC (rev 4378)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/AutoCleanupUtil.java 2012-08-28 20:02:39 UTC (rev 4379)
@@ -55,10 +55,20 @@
private static ReferenceQueue<Object> QUEUE = new ReferenceQueue<Object>();
private static final Set<PhantomReference<Object>> REFERENCES = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<PhantomReference<Object>, Boolean>()));
- public static void setCleanupReference(Object o, Removable r) {
- REFERENCES.add(new PhantomCleanupReference(o, r));
+ public static PhantomReference<Object> setCleanupReference(Object o, Removable r) {
+ PhantomCleanupReference ref = new PhantomCleanupReference(o, r);
+ REFERENCES.add(ref);
doCleanup();
+ return ref;
}
+
+ public static void removeCleanupReference(PhantomReference<Object> ref) {
+ if (ref == null) {
+ return;
+ }
+ REFERENCES.remove(ref);
+ ref.clear();
+ }
public static void doCleanup() {
for (int i = 0; i < 10; i++) {
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java 2012-08-28 19:41:34 UTC (rev 4378)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java 2012-08-28 20:02:39 UTC (rev 4379)
@@ -28,6 +28,7 @@
import java.io.OutputStreamWriter;
import java.io.Reader;
import java.io.Writer;
+import java.lang.ref.PhantomReference;
import java.nio.charset.Charset;
import org.teiid.common.buffer.FileStore.FileStoreOutputStream;
@@ -39,11 +40,12 @@
private final FileStore lobBuffer;
private FileStoreOutputStream fsos;
private String encoding;
+ private PhantomReference<Object> cleanup;
public FileStoreInputStreamFactory(FileStore lobBuffer, String encoding) {
this.encoding = encoding;
this.lobBuffer = lobBuffer;
- AutoCleanupUtil.setCleanupReference(this, lobBuffer);
+ cleanup = AutoCleanupUtil.setCleanupReference(this, lobBuffer);
}
@Override
@@ -100,6 +102,8 @@
public void free() {
fsos = null;
lobBuffer.remove();
+ AutoCleanupUtil.removeCleanupReference(cleanup);
+ cleanup = null;
}
@Override
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java 2012-08-28 19:41:34 UTC (rev 4378)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java 2012-08-28 20:02:39 UTC (rev 4379)
@@ -189,7 +189,7 @@
public void persist() throws TeiidComponentException {
// stream the contents of lob into file store.
byte[] bytes = new byte[1 << 14];
-
+ AutoCleanupUtil.setCleanupReference(this, lobStore);
for (Map.Entry<String, LobHolder> entry : this.lobReferences.entrySet()) {
entry.getValue().lob = detachLob(entry.getValue().lob, lobStore, bytes);
}
@@ -294,5 +294,6 @@
public void remove() {
this.lobReferences.clear();
+ //we don't remove the filestore as there could be local connection references to the lob objects
}
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2012-08-28 19:41:34 UTC (rev 4378)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2012-08-28 20:02:39 UTC (rev 4379)
@@ -127,15 +127,16 @@
*/
public void addTupleBatch(TupleBatch batch, boolean save) throws TeiidComponentException {
setRowCount(batch.getBeginRow() - 1);
+ List<List<?>> tuples = batch.getTuples();
if (save) {
- for (List<?> tuple : batch.getTuples()) {
- addTuple(tuple);
+ for (int i = 0; i < batch.getRowCount(); i++) {
+ addTuple(tuples.get(i));
}
} else {
//add the lob references only, since they may still be referenced later
if (isLobs()) {
- for (List<?> tuple : batch.getTuples()) {
- lobManager.updateReferences(tuple, ReferenceMode.CREATE);
+ for (int i = 0; i < batch.getRowCount(); i++) {
+ lobManager.updateReferences(tuples.get(i), ReferenceMode.CREATE);
}
}
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2012-08-28 19:41:34 UTC (rev 4378)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2012-08-28 20:02:39 UTC (rev 4379)
@@ -30,6 +30,7 @@
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
+import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
@@ -126,7 +127,22 @@
}
}
}
+
+ private final class Remover implements Removable {
+ private Long id;
+ private AtomicBoolean prefersMemory;
+
+ public Remover(Long id, AtomicBoolean prefersMemory) {
+ this.id = id;
+ this.prefersMemory = prefersMemory;
+ }
+ @Override
+ public void remove() {
+ removeCacheGroup(id, prefersMemory.get());
+ }
+ }
+
/**
* This estimate is based upon adding the value to 2/3 maps and having CacheEntry/PhysicalInfo keys
*/
@@ -136,6 +152,7 @@
final Long id;
SizeUtility sizeUtility;
private WeakReference<BatchManagerImpl> ref = new WeakReference<BatchManagerImpl>(this);
+ private PhantomReference<Object> cleanup;
AtomicBoolean prefersMemory = new AtomicBoolean();
String[] types;
private LobManager lobManager;
@@ -147,7 +164,6 @@
for (int i = 0; i < types.length; i++) {
this.types[i] = DataTypeManager.getDataTypeName(types[i]);
}
- cache.createCacheGroup(newID);
}
@Override
@@ -189,6 +205,10 @@
public Long createManagedBatch(List<? extends List<?>> batch,
Long previous, boolean removeOld)
throws TeiidComponentException {
+ if (cleanup == null) {
+ cache.createCacheGroup(id);
+ cleanup = AutoCleanupUtil.setCleanupReference(this, new Remover(id, prefersMemory));
+ }
int sizeEstimate = getSizeEstimate(batch);
Long oid = batchAdded.getAndIncrement();
CacheEntry old = null;
@@ -216,9 +236,9 @@
throws IOException, ClassNotFoundException {
List<? extends List<?>> batch = BatchSerializer.readBatch(ois, types);
if (lobManager != null) {
- for (List<?> list : batch) {
+ for (int i = batch.size() - 1; i >= 0; i--) {
try {
- lobManager.updateReferences(list, ReferenceMode.ATTACH);
+ lobManager.updateReferences(batch.get(i), ReferenceMode.ATTACH);
} catch (TeiidComponentException e) {
throw new TeiidRuntimeException(QueryPlugin.Event.TEIID30052, e);
}
@@ -297,7 +317,11 @@
@Override
public void remove() {
- removeCacheGroup(id, prefersMemory.get());
+ if (cleanup != null) {
+ removeCacheGroup(id, prefersMemory.get());
+ AutoCleanupUtil.removeCleanupReference(cleanup);
+ cleanup = null;
+ }
}
@Override
@@ -468,7 +492,6 @@
if (lobIndexes != null) {
FileStore lobStore = createFileStore(newID + "_lobs"); //$NON-NLS-1$
lobManager = new LobManager(lobIndexes, lobStore);
- AutoCleanupUtil.setCleanupReference(lobManager, lobStore);
batchManager.setLobManager(lobManager);
}
TupleBuffer tupleBuffer = new TupleBuffer(batchManager, String.valueOf(newID), elements, lobManager, getProcessorBatchSize(elements));
@@ -510,16 +533,7 @@
}
private BatchManagerImpl createBatchManager(final Long newID, Class<?>[] types) {
- BatchManagerImpl bm = new BatchManagerImpl(newID, types);
- final AtomicBoolean prefersMemory = bm.prefersMemory;
- AutoCleanupUtil.setCleanupReference(bm, new Removable() {
-
- @Override
- public void remove() {
- BufferManagerImpl.this.removeCacheGroup(newID, prefersMemory.get());
- }
- });
- return bm;
+ return new BatchManagerImpl(newID, types);
}
@Override
@@ -860,9 +874,11 @@
long overhead = vals.size() * BATCH_OVERHEAD;
maxReserveBytes.addAndGet(overhead);
reserveBatchBytes.addAndGet(overhead);
- for (Long val : vals) {
- //TODO: we will unnecessarily call remove on the cache, but that should be low cost
- fastGet(val, prefersMemory, false);
+ if (!vals.isEmpty()) {
+ for (Long val : vals) {
+ //TODO: we will unnecessarily call remove on the cache, but that should be low cost
+ fastGet(val, prefersMemory, false);
+ }
}
}
@@ -885,8 +901,8 @@
private int[] getSizeEstimates(List<? extends Expression> elements) {
int total = 0;
boolean isValueCacheEnabled = DataTypeManager.isValueCacheEnabled();
- for (Expression element : elements) {
- Class<?> type = element.getType();
+ for (int i = elements.size() - 1; i >= 0; i--) {
+ Class<?> type = elements.get(i).getType();
total += SizeUtility.getSize(isValueCacheEnabled, type);
}
//assume 64-bit
@@ -1027,9 +1043,9 @@
String[] types = (String[])in.readObject();
List<ElementSymbol> schema = new ArrayList<ElementSymbol>(types.length);
- for (String type : types) {
+ for (int i = 0; i < types.length; i++) {
ElementSymbol es = new ElementSymbol("x"); //$NON-NLS-1$
- es.setType(DataTypeManager.getDataTypeClass(type));
+ es.setType(DataTypeManager.getDataTypeClass(types[i]));
schema.add(es);
}
TupleBuffer buffer = createTupleBuffer(schema, "cached", TupleSourceType.FINAL); //$NON-NLS-1$
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java 2012-08-28 19:41:34 UTC (rev 4378)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java 2012-08-28 20:02:39 UTC (rev 4379)
@@ -140,10 +140,9 @@
}
public synchronized void removeDirect() {
- for (FileStore info : storageFiles) {
- info.remove();
+ for (int i = storageFiles.size() - 1; i >= 0; i--) {
+ this.storageFiles.remove(i).remove();
}
- storageFiles.clear();
}
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java 2012-08-28 19:41:34 UTC (rev 4378)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java 2012-08-28 20:02:39 UTC (rev 4379)
@@ -600,7 +600,7 @@
return this.data.estimateNodeCardinality;
}
- private ProcessingState getProcessingState() {
+ private final ProcessingState getProcessingState() {
//construct lazily since not all tests call initialize
if (this.processingState == null) {
this.processingState = new ProcessingState();
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SubqueryAwareEvaluator.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/SubqueryAwareEvaluator.java 2012-08-28 19:41:34 UTC (rev 4378)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/SubqueryAwareEvaluator.java 2012-08-28 20:02:39 UTC (rev 4379)
@@ -82,10 +82,12 @@
@Override
public void clear() {
- for (TupleBuffer buffer : values()) {
- buffer.remove();
+ if (!isEmpty()) {
+ for (TupleBuffer buffer : values()) {
+ buffer.remove();
+ }
+ super.clear();
}
- super.clear();
}
}
More information about the teiid-commits
mailing list