Author: shawkins
Date: 2011-10-08 23:23:46 -0400 (Sat, 08 Oct 2011)
New Revision: 3546
Modified:
trunk/client/src/main/java/org/teiid/client/ResizingArrayList.java
trunk/engine/src/main/java/org/teiid/common/buffer/LightWeightCopyOnWriteList.java
trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java
Log:
TEIID-1750 correcting concurrency issues
Modified: trunk/client/src/main/java/org/teiid/client/ResizingArrayList.java
===================================================================
--- trunk/client/src/main/java/org/teiid/client/ResizingArrayList.java 2011-10-07 20:31:32
UTC (rev 3545)
+++ trunk/client/src/main/java/org/teiid/client/ResizingArrayList.java 2011-10-09 03:23:46
UTC (rev 3546)
@@ -63,8 +63,13 @@
return (T) elementData[index];
}
+ public int getModCount() {
+ return modCount;
+ }
+
public void add(int index, T element) {
rangeCheck(index, true);
+ modCount++;
ensureCapacity(size+1);
System.arraycopy(elementData, index, elementData, index + 1,
size - index);
@@ -99,6 +104,7 @@
@Override
public boolean addAll(int index, Collection<? extends T> c) {
rangeCheck(index, true);
+ modCount++;
int numNew = c.size();
ensureCapacity(size + numNew);
for (T t : c) {
@@ -111,6 +117,7 @@
@Override
public T remove(int index) {
T oldValue = get(index);
+ modCount++;
int numMoved = size - index - 1;
if (numMoved > 0) {
System.arraycopy(elementData, index+1, elementData, index, numMoved);
@@ -134,6 +141,7 @@
@Override
public void clear() {
+ modCount++;
if (size <= MIN_SHRINK_SIZE) {
for (int i = 0; i < size; i++) {
elementData[i] = null;
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/LightWeightCopyOnWriteList.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/LightWeightCopyOnWriteList.java 2011-10-07
20:31:32 UTC (rev 3545)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/LightWeightCopyOnWriteList.java 2011-10-09
03:23:46 UTC (rev 3546)
@@ -28,6 +28,8 @@
import java.util.List;
import java.util.RandomAccess;
+import org.teiid.client.ResizingArrayList;
+
/**
* Creates a copy of a reference list when modified.
*
@@ -101,7 +103,7 @@
@Override
public void clear() {
if (!modified) {
- list = new ArrayList<T>();
+ list = new ResizingArrayList<T>();
modified = true;
} else {
list.clear();
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java 2011-10-07 20:31:32 UTC
(rev 3545)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java 2011-10-09 03:23:46 UTC
(rev 3546)
@@ -25,7 +25,6 @@
import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Iterator;
@@ -120,10 +119,10 @@
SPage clone = (SPage) super.clone();
clone.stree = tree;
if (children != null) {
- clone.children = new ArrayList<SPage>(children);
+ clone.children = new ResizingArrayList<SPage>(children);
}
if (values != null) {
- clone.values = new ArrayList<List<?>>(values);
+ clone.values = new ResizingArrayList<List<?>>(values);
}
return clone;
} catch (CloneNotSupportedException e) {
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-07
20:31:32 UTC (rev 3545)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-09
03:23:46 UTC (rev 3546)
@@ -45,6 +45,7 @@
import java.util.concurrent.locks.ReentrantLock;
import org.teiid.client.BatchSerializer;
+import org.teiid.client.ResizingArrayList;
import org.teiid.common.buffer.AutoCleanupUtil;
import org.teiid.common.buffer.BatchManager;
import org.teiid.common.buffer.BufferManager;
@@ -178,8 +179,21 @@
@Override
public void serialize(List<? extends List<?>> obj,
ObjectOutputStream oos) throws IOException {
- //it's expected that the containing structure has updated the lob manager
- BatchSerializer.writeBatch(oos, types, obj);
+ int expectedModCount = 0;
+ ResizingArrayList<?> list = null;
+ if (obj instanceof ResizingArrayList<?>) {
+ list = (ResizingArrayList<?>)obj;
+ }
+ try {
+ //it's expected that the containing structure has updated the lob manager
+ BatchSerializer.writeBatch(oos, types, obj);
+ } catch (IndexOutOfBoundsException e) {
+ //there is a chance of a concurrent persist while modifying
+ //in which case we want to swallow this exception
+ if (list == null || list.getModCount() == expectedModCount) {
+ throw e;
+ }
+ }
}
public int getSizeEstimate(List<? extends List<?>> obj) {
@@ -574,29 +588,34 @@
}
freed += ce.getSizeEstimate();
activeBatchKB.addAndGet(-ce.getSizeEstimate());
- synchronized (ce) {
- if (ce.isPersistent()) {
- continue;
- }
- ce.setPersistent(true);
+ try {
+ evict(ce);
+ } catch (Throwable e) {
+ LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch,
attempts to read batch "+ ce.getId() +" later will result in an
exception"); //$NON-NLS-1$ //$NON-NLS-2$
+ } finally {
+ this.memoryEntries.finishedEviction(ce.getId());
}
- persist(ce);
}
}
- void persist(CacheEntry ce) {
+ void evict(CacheEntry ce) throws Exception {
Serializer<?> s = ce.getSerializer().get();
if (s == null) {
return;
}
- long count = writeCount.incrementAndGet();
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL))
{
- LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, ce.getId(), "writing batch to
storage, total writes: ", count); //$NON-NLS-1$
+ boolean persist = false;
+ synchronized (ce) {
+ if (!ce.isPersistent()) {
+ persist = true;
+ ce.setPersistent(true);
+ }
}
- try {
+ if (persist) {
+ long count = writeCount.incrementAndGet();
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, ce.getId(), "writing batch to
storage, total writes: ", count); //$NON-NLS-1$
+ }
cache.add(ce, s);
- } catch (Throwable e) {
- LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch,
attempts to read batch "+ ce.getId() +" later will result in an
exception"); //$NON-NLS-1$ //$NON-NLS-2$
}
if (s.useSoftCache()) {
createSoftReference(ce);
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java 2011-10-07
20:31:32 UTC (rev 3545)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java 2011-10-09
03:23:46 UTC (rev 3546)
@@ -64,14 +64,17 @@
public void flush() throws IOException {
if (buf != null && buf.position() > 0) {
- bytesWritten += flushDirect();
+ bytesWritten += flushDirect(buf.position());
}
buf = null;
}
protected abstract ByteBuffer newBuffer();
- protected abstract int flushDirect() throws IOException;
+ /**
+ * Flush up to i bytes where i is the current position of the buffer
+ */
+ protected abstract int flushDirect(int i) throws IOException;
@Override
public void close() throws IOException {
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java 2011-10-07
20:31:32 UTC (rev 3545)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java 2011-10-09
03:23:46 UTC (rev 3546)
@@ -31,9 +31,13 @@
protected Map<K, V> map = new ConcurrentHashMap<K, V>();
protected NavigableMap<V, K> expirationQueue = new ConcurrentSkipListMap<V,
K>();
+ protected Map<K, V> limbo = new ConcurrentHashMap<K, V>();
public V get(K key) {
V result = map.get(key);
+ if (result == null) {
+ result = limbo.get(key);
+ }
if (result != null) {
synchronized (result) {
expirationQueue.remove(result);
@@ -73,9 +77,14 @@
if (entry == null) {
return null;
}
+ limbo.put(entry.getValue(), entry.getKey());
return map.remove(entry.getValue());
}
+ public void finishedEviction(K key) {
+ limbo.remove(key);
+ }
+
public int size() {
return map.size();
}