[teiid-commits] teiid SVN: r3452 - in trunk/engine/src: main/java/org/teiid/common/buffer/impl and 3 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue Sep 6 16:08:59 EDT 2011


Author: shawkins
Date: 2011-09-06 16:08:58 -0400 (Tue, 06 Sep 2011)
New Revision: 3452

Modified:
   trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
   trunk/engine/src/main/java/org/teiid/common/buffer/STree.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
   trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java
   trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java
   trunk/engine/src/main/resources/org/teiid/query/i18n.properties
   trunk/engine/src/test/java/org/teiid/query/processor/TestTempTables.java
Log:
TEIID-942 TEIID-1742 refining transaction support, fixing ids to use longs.

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java	2011-09-05 14:27:44 UTC (rev 3451)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java	2011-09-06 20:08:58 UTC (rev 3452)
@@ -31,6 +31,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.teiid.common.buffer.BatchManager.CleanupHook;
 import org.teiid.common.buffer.BatchManager.ManagedBatch;
@@ -83,25 +84,25 @@
 			}
 		}
 	}
-	
+
+	private static AtomicLong counter = new AtomicLong();
+
 	STree stree;
 	
-	private int id;
+	private long id;
 	protected SPage next;
 	protected SPage prev;
 	protected ManagedBatch managedBatch;
 	private Object trackingObject;
 	protected TupleBatch values;
 	protected ArrayList<SPage> children;
-	//TODO: could track cloning more completely, which would allow for earlier batch removal
-	private boolean cloned; 
+	protected boolean cloned; 
 	
 	SPage(STree stree, boolean leaf) {
 		this.stree = stree;
-		this.id = stree.counter.getAndIncrement();
+		this.id = counter.getAndIncrement();
 		stree.pages.put(this.id, this);
-		//TODO: this counter is a hack.  need a better idea of a storage id
-		this.values = new TupleBatch(id, new ArrayList(stree.pageSize/4));
+		this.values = new TupleBatch(0, new ArrayList(stree.pageSize/4));
 		if (!leaf) {
 			children = new ArrayList<SPage>(stree.pageSize/4);
 		}
@@ -121,7 +122,7 @@
 				clone.children = new ArrayList<SPage>(children);
 			}
 			if (values != null) {
-				clone.values = new TupleBatch(stree.counter.getAndIncrement(), new ArrayList<List<?>>(values.getTuples()));
+				clone.values = new TupleBatch(0, new ArrayList<List<?>>(values.getTuples()));
 			}
 			return clone;
 		} catch (CloneNotSupportedException e) {
@@ -129,7 +130,7 @@
 		}
 	}
 	
-	public int getId() {
+	public long getId() {
 		return id;
 	}
 	
@@ -196,7 +197,6 @@
 			values.setDataTypes(stree.types);
 		}
 		if (cloned) {
-			values.setRowOffset(stree.counter.getAndIncrement());
 			cloned = false;
 			trackingObject = null;
 		}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/STree.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/STree.java	2011-09-05 14:27:44 UTC (rev 3451)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/STree.java	2011-09-06 20:08:58 UTC (rev 3452)
@@ -57,8 +57,7 @@
 	private int mask = 1;
 	private int shift = 1;
 
-	protected AtomicInteger counter = new AtomicInteger();
-    protected ConcurrentHashMap<Integer, SPage> pages = new ConcurrentHashMap<Integer, SPage>();
+    protected ConcurrentHashMap<Long, SPage> pages = new ConcurrentHashMap<Long, SPage>();
 	protected volatile SPage[] header = new SPage[] {new SPage(this, true)};
     protected BatchManager keyManager;
     protected BatchManager leafManager;
@@ -103,12 +102,12 @@
 			clone.updateLock = new ReentrantLock();
 			clone.rowCount = new AtomicInteger(rowCount.get());
 			//clone the pages
-			clone.pages = new ConcurrentHashMap<Integer, SPage>(pages);
-			for (Map.Entry<Integer, SPage> entry : clone.pages.entrySet()) {
+			clone.pages = new ConcurrentHashMap<Long, SPage>(pages);
+			for (Map.Entry<Long, SPage> entry : clone.pages.entrySet()) {
 				entry.setValue(entry.getValue().clone(clone));
 			}
 			//reset the pointers
-			for (Map.Entry<Integer, SPage> entry : clone.pages.entrySet()) {
+			for (Map.Entry<Long, SPage> entry : clone.pages.entrySet()) {
 				SPage clonePage = entry.getValue();
 				clonePage.next = clone.getPage(clonePage.next);
 				clonePage.prev = clone.getPage(clonePage.prev);
@@ -447,7 +446,7 @@
 	}
 	
 	public void remove() {
-		truncate(false);
+		truncate(true);
 		this.keyManager.remove();
 		this.leafManager.remove();
 	}
@@ -530,4 +529,12 @@
 		this.comparator.setSortParameters(sortParameters);
 	}
 	
+	public void clearClonedFlags() {
+		for (SPage page : pages.values()) {
+			page.cloned = false;
+			//we don't really care about using synchronization or a volatile here
+			//since the worst case is that we'll just use gc cleanup
+		}
+	}
+	
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-09-05 14:27:44 UTC (rev 3451)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-09-06 20:08:58 UTC (rev 3452)
@@ -95,12 +95,10 @@
 	private final class CleanupHook implements org.teiid.common.buffer.BatchManager.CleanupHook {
 		
 		private long id;
-		private int beginRow;
 		private WeakReference<BatchManagerImpl> ref;
 		
-		CleanupHook(long id, int beginRow, BatchManagerImpl batchManager) {
+		CleanupHook(long id, BatchManagerImpl batchManager) {
 			this.id = id;
-			this.beginRow = beginRow;
 			this.ref = new WeakReference<BatchManagerImpl>(batchManager);
 		}
 		
@@ -109,7 +107,7 @@
 			if (batchManager == null) {
 				return;
 			}
-			cleanupManagedBatch(batchManager, beginRow, id);
+			cleanupManagedBatch(batchManager, id);
 		}
 		
 	}
@@ -204,10 +202,10 @@
 	 * Holder for active batches
 	 */
 	private class TupleBufferInfo {
-		TreeMap<Integer, ManagedBatchImpl> batches = new TreeMap<Integer, ManagedBatchImpl>();
-		Integer lastUsed = null;
+		TreeMap<Long, ManagedBatchImpl> batches = new TreeMap<Long, ManagedBatchImpl>();
+		Long lastUsed = null;
 		
-		ManagedBatchImpl removeBatch(int row) {
+		ManagedBatchImpl removeBatch(long row) {
 			ManagedBatchImpl result = batches.remove(row);
 			if (result != null) {
 				activeBatchKB -= result.sizeEstimate;
@@ -268,7 +266,7 @@
 				if (update) {
 					activeBatches.put(batchManager.id, tbi);
 				}
-				Assertion.isNull(tbi.batches.put(this.beginRow, this));
+				tbi.batches.put(this.id, this);
 			}
 		}
 
@@ -283,13 +281,13 @@
 				if (tbi != null) { 
 					boolean put = true;
 					if (!cache) {
-						tbi.removeBatch(this.beginRow);
+						tbi.removeBatch(this.id);
 						if (tbi.batches.isEmpty()) {
 							put = false;
 						}
 					}
 					if (put) {
-						tbi.lastUsed = this.beginRow;
+						tbi.lastUsed = this.id;
 						activeBatches.put(batchManager.id, tbi);
 					}
 				}
@@ -397,17 +395,17 @@
 		}
 
 		public void remove() {
-			cleanupManagedBatch(batchManager, beginRow, id);
+			cleanupManagedBatch(batchManager, id);
 		}
 				
 		@Override
 		public CleanupHook getCleanupHook() {
-			return new CleanupHook(id, beginRow, batchManager);
+			return new CleanupHook(id, batchManager);
 		}
 		
 		@Override
 		public String toString() {
-			return "ManagedBatch " + batchManager.id + " " + this.beginRow + " " + activeBatch; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+			return "ManagedBatch " + batchManager.id + " " + this.id + " " + activeBatch; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
 		}
 	}
 	
@@ -513,10 +511,10 @@
         return tupleBuffer;
     }
     
-    private void cleanupManagedBatch(BatchManagerImpl batchManager, int beginRow, long id) {
+    private void cleanupManagedBatch(BatchManagerImpl batchManager, long id) {
 		synchronized (activeBatches) {
 			TupleBufferInfo tbi = activeBatches.get(batchManager.id);
-			if (tbi != null && tbi.removeBatch(beginRow) != null) {
+			if (tbi != null && tbi.removeBatch(id) != null) {
 				if (tbi.batches.isEmpty()) {
 					activeBatches.remove(batchManager.id);
 				}
@@ -655,7 +653,7 @@
 				}
 				Iterator<TupleBufferInfo> iter = activeBatches.values().iterator();
 				TupleBufferInfo tbi = iter.next();
-				Map.Entry<Integer, ManagedBatchImpl> entry = null;
+				Map.Entry<Long, ManagedBatchImpl> entry = null;
 				if (tbi.lastUsed != null) {
 					entry = tbi.batches.floorEntry(tbi.lastUsed - 1);
 				}

Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java	2011-09-05 14:27:44 UTC (rev 3451)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java	2011-09-06 20:08:58 UTC (rev 3452)
@@ -290,7 +290,7 @@
 		if (tempTable != null) {
 			TempMetadataID id = tableStore.getMetadataStore().getTempGroupID(matTableName);
 			synchronized (id) {
-				boolean clone = tempTable.getActiveReaders().get() != 0;
+				boolean clone = tempTable.getActive().get() != 0;
 				if (clone) {
 					tempTable = tempTable.clone();
 				}

Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java	2011-09-05 14:27:44 UTC (rev 3451)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java	2011-09-06 20:08:58 UTC (rev 3452)
@@ -36,6 +36,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.teiid.api.exception.query.ExpressionEvaluationException;
@@ -275,9 +276,9 @@
 		}
 		
 	}
-	private static AtomicInteger ID_GENERATOR = new AtomicInteger();
+	private static AtomicLong ID_GENERATOR = new AtomicLong();
 	
-	private int id = ID_GENERATOR.getAndIncrement();
+	private Long id = ID_GENERATOR.getAndIncrement();
 	private STree tree;
 	private AtomicInteger rowId;
 	private List<ElementSymbol> columns;
@@ -361,7 +362,7 @@
 		}
 	}
 	
-	public AtomicInteger getActiveReaders() {
+	public AtomicInteger getActive() {
 		return activeReaders;
 	}
 	
@@ -803,13 +804,13 @@
 		return tid;
 	}
 	
-	public int getId() {
+	public Long getId() {
 		return id;
 	}
 	
 	@Override
 	public int hashCode() {
-		return id;
+		return id.hashCode();
 	}
 	
 	@Override
@@ -821,7 +822,7 @@
 			return false;
 		}
 		TempTable other = (TempTable)obj;
-		return id == other.id;
+		return id.equals(other.id);
 	}
 
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java	2011-09-05 14:27:44 UTC (rev 3451)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java	2011-09-06 20:08:58 UTC (rev 3452)
@@ -82,7 +82,7 @@
     public class TempTableSynchronization implements Synchronization {
     	
     	private String id;
-    	Set<Integer> existingTables = new HashSet<Integer>();
+    	Set<Long> existingTables = new HashSet<Long>();
     	ConcurrentHashMap<String, TempTable> tables = new ConcurrentHashMap<String, TempTable>();
         private List<TransactionCallback> callbacks = new LinkedList<TransactionCallback>();
     	        
@@ -131,10 +131,21 @@
     	
     	@Override
     	public synchronized void afterCompletion(int status) {
-    		//TODO: cleanup tables
     		completed = true;
     		synchronizations.remove(id);
-    		for (TransactionCallback callback : callbacks) {
+    		if (transactionMode == TransactionMode.ISOLATE_READS) {
+				for (TempTable table : tables.values()) {
+					table.getActive().decrementAndGet();
+				}
+    		} else {
+    			HashSet<TempTable> current = new HashSet<TempTable>(tempTables.values());
+    			current.retainAll(tables.values());
+				for (TempTable table : current) {
+					table.getActive().set(0);
+					table.getTree().clearClonedFlags();
+				}
+    		}
+			for (TransactionCallback callback : callbacks) {
         		if (status == Status.STATUS_COMMITTED) {
         			callback.commit();
         		} else {
@@ -144,6 +155,10 @@
     		callbacks.clear();
     	}
     	
+    	public boolean isCompleted() {
+			return completed;
+		}
+    	
     	@Override
     	public void beforeCompletion() {
     		
@@ -307,11 +322,21 @@
         				if (synch != null && synch.existingTables.contains(tempTable.getId())) {
         					TempTable result = synch.tables.get(tempTableID);
         					if (result == null) {
-        						synch.tables.put(tempTableID, tempTable.clone());
+        						synchronized (synch) {
+									if (synch.isCompleted()) {
+										throw new AssertionError("Expected active transaction"); //$NON-NLS-1$
+									}
+	        						if (!tempTable.getActive().compareAndSet(0, 1)) {
+	        	    					throw new TeiidProcessingException(QueryPlugin.Util.getString("TempTableStore.pending_update", tempTableID)); //$NON-NLS-1$ 
+	        	    				}
+	        						synch.tables.put(tempTableID, tempTable.clone());
+								}
         					}
         					return tempTable;
         				}
-        			}	
+        			} else if (tempTable.getActive().get() != 0) {
+        				throw new TeiidProcessingException(QueryPlugin.Util.getString("TempTableStore.pending_update", tempTableID)); //$NON-NLS-1$
+    				}
     			}
     		} else if (transactionMode == TransactionMode.ISOLATE_READS) {
     			TransactionContext tc = context.getTransactionContext();
@@ -320,24 +345,13 @@
     				if (synch != null) {
     					TempTable result = synch.tables.get(tempTableID);
     					if (result == null) {
-    						synch.tables.put(tempTableID, tempTable);
     						result = tempTable;
-    						result.getActiveReaders().getAndIncrement();
-        					TransactionCallback callback = new TransactionCallback() {
-    							
-    							@Override
-    							public void rollback() {
-    								tempTable.getActiveReaders().getAndDecrement();
-    							}
-    							
-    							@Override
-    							public void commit() {
-    								tempTable.getActiveReaders().getAndDecrement();
-    							}
-    						};
-    						if (!synch.addCallback(callback)) {
-        						callback.rollback();
-        					}
+    						synchronized (synch) {
+								if (!synch.isCompleted()) {
+									synch.tables.put(tempTableID, tempTable);
+									result.getActive().getAndIncrement();
+								}
+							}
     					}
     					return result;
     				}

Modified: trunk/engine/src/main/resources/org/teiid/query/i18n.properties
===================================================================
--- trunk/engine/src/main/resources/org/teiid/query/i18n.properties	2011-09-05 14:27:44 UTC (rev 3451)
+++ trunk/engine/src/main/resources/org/teiid/query/i18n.properties	2011-09-06 20:08:58 UTC (rev 3452)
@@ -791,6 +791,7 @@
 RulePlanJoins.cantSatisfy=Join region with unsatisfied access patterns cannot be satisfied by the join criteria, Access patterns: {0} 
 TempTableStore.table_exist_error=Temporary table "{0}" already exists.
 TempTableStore.table_doesnt_exist_error=Temporary table "{0}" does not exist.
+TempTableStore.pending_update=Table {0} is locked by pending transaction update.
 
 XMLQueryPlanner.cannot_plan=Cannot create a query for MappingClass with user criteria {0}
 XMLQueryPlanner.invalid_relationship=Conjunct "{0}" has no relationship with target context {1}.

Modified: trunk/engine/src/test/java/org/teiid/query/processor/TestTempTables.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/TestTempTables.java	2011-09-05 14:27:44 UTC (rev 3451)
+++ trunk/engine/src/test/java/org/teiid/query/processor/TestTempTables.java	2011-09-06 20:08:58 UTC (rev 3452)
@@ -135,6 +135,34 @@
 		execute("select * from x", new List[] {});
 	}
 	
+	@Test public void testCommitExistingRemoved() throws Exception {
+		execute("create local temporary table x (e1 string, e2 integer)", new List[] {Arrays.asList(0)}); //$NON-NLS-1$
+		setupTransaction(Connection.TRANSACTION_SERIALIZABLE);
+		execute("drop table x", new List[] {Arrays.asList(0)}); //$NON-NLS-1$
+		synch.afterCompletion(Status.STATUS_COMMITTED);
+		try {
+			execute("select * from x", new List[] {});
+			fail();
+		} catch (Exception e) {
+			
+		}
+	}
+	
+	@Test public void testUpdateLock() throws Exception {
+		execute("create local temporary table x (e1 string, e2 integer)", new List[] {Arrays.asList(0)}); //$NON-NLS-1$
+		setupTransaction(Connection.TRANSACTION_SERIALIZABLE);
+		execute("insert into x (e2, e1) select e2, e1 from pm1.g1", new List[] {Arrays.asList(6)}); //$NON-NLS-1$
+		tc = null;
+		try {
+			execute("insert into x (e2, e1) select e2, e1 from pm1.g1", new List[] {Arrays.asList(6)}); //$NON-NLS-1$
+			fail();
+		} catch (Exception e) {
+			
+		}
+		synch.afterCompletion(Status.STATUS_COMMITTED);
+		execute("insert into x (e2, e1) select e2, e1 from pm1.g1", new List[] {Arrays.asList(6)}); //$NON-NLS-1$
+	}
+	
 	@Test public void testRollbackExisting1() throws Exception {
 		execute("create local temporary table x (e1 string, e2 integer)", new List[] {Arrays.asList(0)}); //$NON-NLS-1$
 		for (int i = 0; i < 86; i++) {



More information about the teiid-commits mailing list