[teiid-commits] teiid SVN: r3546 - in trunk: engine/src/main/java/org/teiid/common/buffer and 1 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Sat Oct 8 23:23:47 EDT 2011


Author: shawkins
Date: 2011-10-08 23:23:46 -0400 (Sat, 08 Oct 2011)
New Revision: 3546

Modified:
   trunk/client/src/main/java/org/teiid/client/ResizingArrayList.java
   trunk/engine/src/main/java/org/teiid/common/buffer/LightWeightCopyOnWriteList.java
   trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java
Log:
TEIID-1750 correcting concurrency issues

Modified: trunk/client/src/main/java/org/teiid/client/ResizingArrayList.java
===================================================================
--- trunk/client/src/main/java/org/teiid/client/ResizingArrayList.java	2011-10-07 20:31:32 UTC (rev 3545)
+++ trunk/client/src/main/java/org/teiid/client/ResizingArrayList.java	2011-10-09 03:23:46 UTC (rev 3546)
@@ -63,8 +63,13 @@
 		return (T) elementData[index];
 	}
 	
+	public int getModCount() {
+		return modCount;
+	}
+	
 	public void add(int index, T element) {
 		rangeCheck(index, true);
+		modCount++;
 		ensureCapacity(size+1); 
 		System.arraycopy(elementData, index, elementData, index + 1,
 				 size - index);
@@ -99,6 +104,7 @@
 	@Override
 	public boolean addAll(int index, Collection<? extends T> c) {
 		rangeCheck(index, true);
+		modCount++;
         int numNew = c.size();
         ensureCapacity(size + numNew);
         for (T t : c) {
@@ -111,6 +117,7 @@
 	@Override
 	public T remove(int index) {
 		T oldValue = get(index);
+		modCount++;
 		int numMoved = size - index - 1;
 		if (numMoved > 0) {
 		    System.arraycopy(elementData, index+1, elementData, index, numMoved);
@@ -134,6 +141,7 @@
 	
 	@Override
 	public void clear() {
+		modCount++;
 		if (size <= MIN_SHRINK_SIZE) {
 			for (int i = 0; i < size; i++) {
 				elementData[i] = null;

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/LightWeightCopyOnWriteList.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/LightWeightCopyOnWriteList.java	2011-10-07 20:31:32 UTC (rev 3545)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/LightWeightCopyOnWriteList.java	2011-10-09 03:23:46 UTC (rev 3546)
@@ -28,6 +28,8 @@
 import java.util.List;
 import java.util.RandomAccess;
 
+import org.teiid.client.ResizingArrayList;
+
 /**
  * Creates a copy of a reference list when modified.
  * 
@@ -101,7 +103,7 @@
 	@Override
 	public void clear() {
 		if (!modified) {
-			list = new ArrayList<T>();
+			list = new ResizingArrayList<T>();
 			modified = true;
 		} else {
 			list.clear();

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java	2011-10-07 20:31:32 UTC (rev 3545)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java	2011-10-09 03:23:46 UTC (rev 3546)
@@ -25,7 +25,6 @@
 import java.lang.ref.PhantomReference;
 import java.lang.ref.Reference;
 import java.lang.ref.ReferenceQueue;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
@@ -120,10 +119,10 @@
 			SPage clone = (SPage) super.clone();
 			clone.stree = tree;
 			if (children != null) {
-				clone.children = new ArrayList<SPage>(children);
+				clone.children = new ResizingArrayList<SPage>(children);
 			}
 			if (values != null) {
-				clone.values = new ArrayList<List<?>>(values);
+				clone.values = new ResizingArrayList<List<?>>(values);
 			}
 			return clone;
 		} catch (CloneNotSupportedException e) {

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-10-07 20:31:32 UTC (rev 3545)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-10-09 03:23:46 UTC (rev 3546)
@@ -45,6 +45,7 @@
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.teiid.client.BatchSerializer;
+import org.teiid.client.ResizingArrayList;
 import org.teiid.common.buffer.AutoCleanupUtil;
 import org.teiid.common.buffer.BatchManager;
 import org.teiid.common.buffer.BufferManager;
@@ -178,8 +179,21 @@
 		@Override
 		public void serialize(List<? extends List<?>> obj,
 				ObjectOutputStream oos) throws IOException {
-			//it's expected that the containing structure has updated the lob manager
-			BatchSerializer.writeBatch(oos, types, obj);
+			int expectedModCount = 0;
+			ResizingArrayList<?> list = null;
+			if (obj instanceof ResizingArrayList<?>) {
+				list = (ResizingArrayList<?>)obj;
+			}
+			try {
+				//it's expected that the containing structure has updated the lob manager
+				BatchSerializer.writeBatch(oos, types, obj);
+			} catch (IndexOutOfBoundsException e) {
+				//there is a chance of a concurrent persist while modifying 
+				//in which case we want to swallow this exception
+				if (list == null || list.getModCount() == expectedModCount) {
+					throw e;
+				}
+			}
 		}
 		
 		public int getSizeEstimate(List<? extends List<?>> obj) {
@@ -574,29 +588,34 @@
 			}
 			freed += ce.getSizeEstimate();
 			activeBatchKB.addAndGet(-ce.getSizeEstimate());
-			synchronized (ce) {
-				if (ce.isPersistent()) {
-					continue;
-				}
-				ce.setPersistent(true);
+			try {
+				evict(ce);
+			} catch (Throwable e) {
+				LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read batch "+ ce.getId() +" later will result in an exception"); //$NON-NLS-1$ //$NON-NLS-2$
+			} finally {
+				this.memoryEntries.finishedEviction(ce.getId());
 			}
-			persist(ce);
 		}
 	}
 
-	void persist(CacheEntry ce) {
+	void evict(CacheEntry ce) throws Exception {
 		Serializer<?> s = ce.getSerializer().get();
 		if (s == null) {
 			return;
 		}
-		long count = writeCount.incrementAndGet();
-		if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
-			LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, ce.getId(), "writing batch to storage, total writes: ", count); //$NON-NLS-1$
+		boolean persist = false;
+		synchronized (ce) {
+			if (!ce.isPersistent()) {
+				persist = true;
+				ce.setPersistent(true);
+			}
 		}
-		try {
+		if (persist) {
+			long count = writeCount.incrementAndGet();
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+				LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, ce.getId(), "writing batch to storage, total writes: ", count); //$NON-NLS-1$
+			}
 			cache.add(ce, s);
-		} catch (Throwable e) {
-			LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read batch "+ ce.getId() +" later will result in an exception"); //$NON-NLS-1$ //$NON-NLS-2$
 		}
 		if (s.useSoftCache()) {
 			createSoftReference(ce);

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java	2011-10-07 20:31:32 UTC (rev 3545)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java	2011-10-09 03:23:46 UTC (rev 3546)
@@ -64,14 +64,17 @@
 
 	public void flush() throws IOException {
 		if (buf != null && buf.position() > 0) {
-			bytesWritten += flushDirect();
+			bytesWritten += flushDirect(buf.position());
 		}
 		buf = null;
 	}
 
 	protected abstract ByteBuffer newBuffer();
 	
-	protected abstract int flushDirect() throws IOException;
+	/**
+	 * Flush up to i bytes where i is the current position of the buffer
+	 */
+	protected abstract int flushDirect(int i) throws IOException;
     
     @Override
     public void close() throws IOException {

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java	2011-10-07 20:31:32 UTC (rev 3545)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java	2011-10-09 03:23:46 UTC (rev 3546)
@@ -31,9 +31,13 @@
 	
 	protected Map<K, V> map = new ConcurrentHashMap<K, V>(); 
 	protected NavigableMap<V, K> expirationQueue = new ConcurrentSkipListMap<V, K>();
+	protected Map<K, V> limbo = new ConcurrentHashMap<K, V>();
 		
 	public V get(K key) {
 		V result = map.get(key);
+		if (result == null) {
+			result = limbo.get(key);
+		}
 		if (result != null) {
 			synchronized (result) {
 				expirationQueue.remove(result);
@@ -73,9 +77,14 @@
 		if (entry == null) {
 			return null;
 		}
+		limbo.put(entry.getValue(), entry.getKey());
 		return map.remove(entry.getValue());
 	}
 	
+	public void finishedEviction(K key) {
+		limbo.remove(key);
+	}
+	
 	public int size() {
 		return map.size();
 	}



More information about the teiid-commits mailing list