[teiid-commits] teiid SVN: r3448 - in trunk: documentation/caching-guide/src/main/docbook/en-US/content and 12 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Fri Sep 2 21:59:27 EDT 2011


Author: shawkins
Date: 2011-09-02 21:59:26 -0400 (Fri, 02 Sep 2011)
New Revision: 3448

Modified:
   trunk/build/kits/jboss-container/teiid-releasenotes.html
   trunk/documentation/caching-guide/src/main/docbook/en-US/content/matviews.xml
   trunk/documentation/reference/src/main/docbook/en-US/content/ddl_support.xml
   trunk/documentation/reference/src/main/docbook/en-US/content/transaction_support.xml
   trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
   trunk/engine/src/main/java/org/teiid/common/buffer/STree.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java
   trunk/engine/src/main/java/org/teiid/dqp/service/TransactionContext.java
   trunk/engine/src/main/java/org/teiid/dqp/service/TransactionService.java
   trunk/engine/src/main/java/org/teiid/query/processor/proc/Program.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java
   trunk/engine/src/main/java/org/teiid/query/processor/xml/XMLPlan.java
   trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java
   trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
   trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java
   trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
   trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java
   trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java
   trunk/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java
   trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
   trunk/engine/src/test/java/org/teiid/query/processor/TestTempTables.java
Log:
TEIID-942 adding increased transaction support for temp tables

Modified: trunk/build/kits/jboss-container/teiid-releasenotes.html
===================================================================
--- trunk/build/kits/jboss-container/teiid-releasenotes.html	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/build/kits/jboss-container/teiid-releasenotes.html	2011-09-03 01:59:26 UTC (rev 3448)
@@ -29,6 +29,7 @@
   <LI><B>Procedure language features</B> - Added support for compound/block statements, BEGIN [[NOT] ATOMIC], loop/block labels, and the leave statement.  See the reference for more.
   <LI><B>File Enhancements</B> - the file translator can now optionally (via the ExceptionIfFileNotFound property) throw an exception if the path refers to a file that doesn't exist.  The file resource adapter can be configured to map file names and can prevent parent path .. references.  See the Admin Guide or the file-ds.xml template for more.
   <LI><B>TEXTTABLE Enhancements</B> - TEXTTABLE can now parse fixed width files that do not use a row delimiter and can optionally produce fixed values that haven't been trimmed.
+  <LI><B>Temp table transactions</B> - Internal materialized views and temp table usage from a session and within procedures can take advantage of greater transaction support.
 </UL>
 
 <h2><a name="Compatibility">Compatibility Issues</a></h2>

Modified: trunk/documentation/caching-guide/src/main/docbook/en-US/content/matviews.xml
===================================================================
--- trunk/documentation/caching-guide/src/main/docbook/en-US/content/matviews.xml	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/documentation/caching-guide/src/main/docbook/en-US/content/matviews.xml	2011-09-03 01:59:26 UTC (rev 3448)
@@ -104,6 +104,7 @@
 		Internal materialization also provides more built-in facilities for refreshing and monitoring.</para>
 		<para>The cache hint, when used in the context of an internal materialized view transformation query, provides the ability to fine tune the materialized table.  
 		The pref_mem option also applies to internal materialized views.  Internal table index pages already have a memory preference, so the perf_mem option indicates that the data pages should prefer memory as well.</para>
+		<para>All internal materialized view refresh and updates happen atomically.  Internal materialized views support READ_COMMITTED (used also for READ_UNCOMMITED) and SERIALIZABLE (used also for REPEATABLE_READ) transaction isolation levels.</para>
 		<section>
 			<title>Loading And Refreshing</title>
 			<para>An internal materialized view table is initially in an invalid state (there is no data).  The first user query will trigger an implicit loading of the data.  
@@ -120,22 +121,6 @@
 			subsequent refreshes performed with <code>refreshMatView</code> will use dependent materialized view tables if they exist.  Only one load may occur at a time.  If a load is already in progress when
 			the <code>SYSADMIN.refreshMatView</code> procedure is called, it will return -1 immediately rather than preempting the current load.
 			</para>
-            <para>
-            When Teiid is running clustered mode, after of loading of internal materialized view contents, an event will be sent to all
-            the other participating nodes in the cluster to refresh the contents from the original node in asynchronus fashion.
-            During this remote node loading process, if the node that is reading the contents gets a request from user to serve the
-            results of that view, then the current results in cache will be served. If no results were available at that node, then 
-            request will be blocked until load process is finished.
-            </para>
-            <para>
-            When a Teiid node joins the cluster, at the end of start-up cycle, an asynchronus job
-            will be started to fetch all the previously cached internal materialized views at other nodes
-            for the deployed VDBs. The query request behaviour during this load process is same as above. 
-            </para>  
-            <note><para>In the clustered mode, the "invalidate=true" flag in the "SYSADMIN.refreshMatView" procedure
-            will only apply to the node that is refreshing the contents from source. All other nodes, will still serve the 
-            old contents during the refresh process.</para>
-            </note>          
 			<section>
 		   		<title>TTL Snapshot Refresh</title>
 		   		<para>The <link linkend="cache-hint">cache hint</link> may be used to automatically trigger a full snapshot refresh after a specified time to live (ttl).  
@@ -198,12 +183,9 @@
 		<section>
 			<title>Clustering Considerations</title>
 			<para>Each member in a cluster maintains its own copy of each materialized table and associated indexes.  
-			With cache clustering enabled, an additional snapshot copy of the table is maintained for loading by other members.  
 			An attempt is made to ensure each member receives the same full refresh events as the others.  
 			Full consistency for updatable materialized views however is not guaranteed.  
 			Periodic full refreshes of updatable materialized view tables helps ensure consistency among members.
-			<note><para>Loads of materialized tables are not coordinated across the cluster.  It is possible for the same ttl expiration to trigger a load at each member.</para></note> 
-		    In many clustered scenarios using external materialization is advantageous to fully control the loading of the tables and to have materialized data that is durable.
 		    </para>
 		</section>
 	</section>

Modified: trunk/documentation/reference/src/main/docbook/en-US/content/ddl_support.xml
===================================================================
--- trunk/documentation/reference/src/main/docbook/en-US/content/ddl_support.xml	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/documentation/reference/src/main/docbook/en-US/content/ddl_support.xml	2011-09-03 01:59:26 UTC (rev 3448)
@@ -77,7 +77,7 @@
         </para>
       </listitem>
       <listitem> 
-        <para>Only local temporary tables are supported. This implies that the scope of temp table will be either to the sesssion or the block of a virtual procedure that creates it.
+        <para>Only local temporary tables are supported. This implies that the scope of temp table will be either to the session or the block of a virtual procedure that creates it.
         </para>
       </listitem>
       <listitem>
@@ -85,7 +85,9 @@
         </para>    
       </listitem>    
       <listitem>
-        <para>Temp tables are non-transactional.
+        <para>Temp tables support a READ_UNCOMMITED transaction isolation level.  There are no locking mechanisms available to support higher isolation levels and the result of a rollback may be inconsistent across multiple transactions.
+        If concurrent transactions are not associated with the same local temporary table or session, then the transaction isolation level is effectively SERIALIZABLE.
+        If you want full constency with local temporary tables, then only use a connection with 1 transaction at a time.  This mode of operation is ensured by connection pooling that tracking connections by transaction.
         </para>    
       </listitem>  
       <listitem>

Modified: trunk/documentation/reference/src/main/docbook/en-US/content/transaction_support.xml
===================================================================
--- trunk/documentation/reference/src/main/docbook/en-US/content/transaction_support.xml	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/documentation/reference/src/main/docbook/en-US/content/transaction_support.xml	2011-09-03 01:59:26 UTC (rev 3448)
@@ -341,11 +341,6 @@
 					connector, however this isolation level is fixed and cannot be
 					changed at runtime for specific connections/commands.</para>
 			</listitem>
-			<listitem>
-				<para>Temporary tables are not transactional. For example,
-					a global temporary table will retain all inserts performed during a
-					local transaction that was rolled back.</para>
-			</listitem>
 			<!-- <listitem>
 				<para>Connectors may be set to immutable to prevent their
 					participation in transactions. This is useful in situations where

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java	2011-09-03 01:59:26 UTC (rev 3448)
@@ -26,6 +26,12 @@
 
 public interface BatchManager {
 	
+	public interface CleanupHook {
+		
+		void cleanup();
+		
+	}
+	
 	public interface ManagedBatch {
 		
 		TupleBatch getBatch(boolean cache, String[] types) throws TeiidComponentException;
@@ -34,6 +40,13 @@
 		
 		void setPrefersMemory(boolean prefers);
 		
+		/**
+		 * Get an object that can cleaup the {@link ManagedBatch}, but does not hold a hard reference to
+		 * the {@link ManagedBatch} or the {@link BatchManager}
+		 * @return
+		 */
+		CleanupHook getCleanupHook();
+		
 	}
 	
 	ManagedBatch createManagedBatch(TupleBatch batch, boolean softCache) throws TeiidComponentException;

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java	2011-09-03 01:59:26 UTC (rev 3448)
@@ -22,24 +22,34 @@
 
 package org.teiid.common.buffer;
 
+import java.lang.ref.PhantomReference;
+import java.lang.ref.ReferenceQueue;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Set;
 
+import org.teiid.common.buffer.BatchManager.CleanupHook;
 import org.teiid.common.buffer.BatchManager.ManagedBatch;
 import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidRuntimeException;
 
 /**
  * A linked list Page entry in the tree
  * 
  * TODO: return the tuplebatch from getvalues, since that is what we're tracking
  * 
+ * State cloning allows a single storage reference to be shared in many trees.
+ * A phantom reference is used for proper cleanup once cloned.
+ * 
+ * TODO: a better purging strategy for managedbatchs.
+ * 
  */
 @SuppressWarnings("unchecked")
-class SPage {
+class SPage implements Cloneable {
 	
 	static final int MIN_PERSISTENT_SIZE = 16;
 
@@ -54,24 +64,75 @@
 		}
 	}
 	
-	private static AtomicInteger counter = new AtomicInteger();
-
+	private static final Set<PhantomReference<Object>> REFERENCES = Collections.newSetFromMap(new IdentityHashMap<PhantomReference<Object>, Boolean>());
+	private static ReferenceQueue<Object> QUEUE = new ReferenceQueue<Object>();
+	static class CleanupReference extends PhantomReference<Object> {
+		
+		private CleanupHook batch;
+		
+		public CleanupReference(Object referent, CleanupHook batch) {
+			super(referent, QUEUE);
+			this.batch = batch;
+		}
+		
+		public void cleanup() {
+			try {
+				this.batch.cleanup();
+			} finally {
+				this.clear();
+			}
+		}
+	}
+	
 	STree stree;
 	
+	private int id;
 	protected SPage next;
 	protected SPage prev;
 	protected ManagedBatch managedBatch;
+	private Object trackingObject;
 	protected TupleBatch values;
 	protected ArrayList<SPage> children;
+	//TODO: could track cloning more completely, which would allow for earlier batch removal
+	private boolean cloned; 
 	
 	SPage(STree stree, boolean leaf) {
 		this.stree = stree;
-		this.values = new TupleBatch(counter.getAndIncrement(), new ArrayList(stree.pageSize/4));
+		this.id = stree.counter.getAndIncrement();
+		stree.pages.put(this.id, this);
+		//TODO: this counter is a hack.  need a better idea of a storage id
+		this.values = new TupleBatch(id, new ArrayList(stree.pageSize/4));
 		if (!leaf) {
 			children = new ArrayList<SPage>(stree.pageSize/4);
 		}
 	}
 	
+	public SPage clone(STree tree) {
+		try {
+			if (this.managedBatch != null && trackingObject == null) {
+				cloned = true;
+				this.trackingObject = new Object();
+				CleanupReference managedBatchReference  = new CleanupReference(trackingObject, managedBatch.getCleanupHook());
+				REFERENCES.add(managedBatchReference);
+			}
+			SPage clone = (SPage) super.clone();
+			clone.stree = tree;
+			if (children != null) {
+				clone.children = new ArrayList<SPage>(children);
+			}
+			if (values != null) {
+				clone.values = new TupleBatch(stree.counter.getAndIncrement(), new ArrayList<List<?>>(values.getTuples()));
+			}
+			return clone;
+		} catch (CloneNotSupportedException e) {
+			throw new TeiidRuntimeException(e);
+		}
+	}
+	
+	public int getId() {
+		return id;
+	}
+	
 	static SearchResult search(SPage page, List k, LinkedList<SearchResult> parent) throws TeiidComponentException {
 		TupleBatch previousValues = null;
 		for (;;) {
@@ -121,7 +182,7 @@
 	}
 	
 	protected void setValues(TupleBatch values) throws TeiidComponentException {
-		if (managedBatch != null) {
+		if (managedBatch != null && !cloned) {
 			managedBatch.remove();
 		}
 		if (values.getTuples().size() < MIN_PERSISTENT_SIZE) {
@@ -134,17 +195,25 @@
 		} else {
 			values.setDataTypes(stree.types);
 		}
+		if (cloned) {
+			values.setRowOffset(stree.counter.getAndIncrement());
+			cloned = false;
+			trackingObject = null;
+		}
 		if (children != null) {
 			managedBatch = stree.keyManager.createManagedBatch(values, true);
 		} else {
 			managedBatch = stree.leafManager.createManagedBatch(values, stree.preferMemory);
 		}
 	}
-	
-	protected void remove() {
+
+	protected void remove(boolean force) {
 		if (managedBatch != null) {
-			managedBatch.remove();
+			if (force || !cloned) {
+				managedBatch.remove();
+			}
 			managedBatch = null;
+			trackingObject = null;
 		}
 		values = null;
 		children = null;
@@ -157,6 +226,14 @@
 		if (managedBatch == null) {
 			throw new AssertionError("Batch removed"); //$NON-NLS-1$
 		}
+		for (int i = 0; i < 10; i++) {
+			CleanupReference ref = (CleanupReference)QUEUE.poll();
+			if (ref == null) {
+				break;
+			}
+			REFERENCES.remove(ref);
+			ref.cleanup();
+		}
 		if (children != null) {
 			return managedBatch.getBatch(true, stree.keytypes);
 		}
@@ -173,7 +250,7 @@
 		if (current.children != null) {
 			current.children.addAll(current.next.children);
 		}
-		current.next.remove();
+		current.next.remove(false);
 		current.next = current.next.next;
 		if (current.next != null) {
 			current.next.prev = current;

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/STree.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/STree.java	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/STree.java	2011-09-03 01:59:26 UTC (rev 3448)
@@ -29,13 +29,16 @@
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.teiid.client.BatchSerializer;
 import org.teiid.common.buffer.SPage.SearchResult;
 import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidRuntimeException;
 import org.teiid.query.processor.relational.ListNestedSortComparator;
 
 /**
@@ -44,7 +47,7 @@
  * but with fewer updates. 
  */
 @SuppressWarnings("unchecked")
-public class STree {
+public class STree implements Cloneable {
 	
 	public enum InsertMode {ORDERED, NEW, UPDATE}
 
@@ -53,7 +56,9 @@
 	protected int randomSeed;
 	private int mask = 1;
 	private int shift = 1;
-	
+
+	protected AtomicInteger counter = new AtomicInteger();
+    protected ConcurrentHashMap<Integer, SPage> pages = new ConcurrentHashMap<Integer, SPage>();
 	protected volatile SPage[] header = new SPage[] {new SPage(this, true)};
     protected BatchManager keyManager;
     protected BatchManager leafManager;
@@ -91,6 +96,47 @@
 		this.keytypes = Arrays.copyOf(types, keyLength);
 	}
 	
+	public STree clone() {
+		updateLock.lock();
+		try {
+			STree clone = (STree) super.clone();
+			clone.updateLock = new ReentrantLock();
+			clone.rowCount = new AtomicInteger(rowCount.get());
+			//clone the pages
+			clone.pages = new ConcurrentHashMap<Integer, SPage>(pages);
+			for (Map.Entry<Integer, SPage> entry : clone.pages.entrySet()) {
+				entry.setValue(entry.getValue().clone(clone));
+			}
+			//reset the pointers
+			for (Map.Entry<Integer, SPage> entry : clone.pages.entrySet()) {
+				SPage clonePage = entry.getValue();
+				clonePage.next = clone.getPage(clonePage.next);
+				clonePage.prev = clone.getPage(clonePage.prev);
+				if (clonePage.children != null) {
+					for (int i = 0; i < clonePage.children.size(); i++) {
+						clonePage.children.set(i, clone.getPage(clonePage.children.get(i)));
+					}
+				}
+			}
+			clone.header = Arrays.copyOf(header, header.length);
+			for (int i = 0; i < header.length; i++) {
+				clone.header[i] = clone.pages.get(header[i].getId());
+			}
+			return clone;
+		} catch (CloneNotSupportedException e) {
+			throw new TeiidRuntimeException(e);
+		} finally {
+			updateLock.unlock();
+		}
+	}
+	
+	private SPage getPage(SPage page) {
+		if (page == null) {
+			return page;
+		}
+		return pages.get(page.getId());
+	}
+	
 	public void writeValuesTo(ObjectOutputStream oos) throws TeiidComponentException, IOException {
 		SPage page = header[0];
 		oos.writeInt(this.rowCount.get());
@@ -357,7 +403,7 @@
 			int size = searchResult.values.getTuples().size();
 			if (size == 0) {
 				if (header[i] != searchResult.page) {
-					searchResult.page.remove();
+					searchResult.page.remove(false);
 					if (searchResult.page.next != null) {
 						searchResult.page.next.prev = searchResult.page.prev;
 					}
@@ -366,7 +412,7 @@
 					searchResult.page.prev = null;
 					continue;
 				}
-				header[i].remove();
+				header[i].remove(false);
 				if (header[i].next != null) {
 					header[i] = header[i].next;
 					header[i].prev = null;
@@ -401,7 +447,7 @@
 	}
 	
 	public void remove() {
-		truncate();
+		truncate(false);
 		this.keyManager.remove();
 		this.leafManager.remove();
 	}
@@ -410,12 +456,12 @@
 		return this.rowCount.get();
 	}
 	
-	public int truncate() {
+	public int truncate(boolean force) {
 		int oldSize = rowCount.getAndSet(0);
 		for (int i = 0; i < header.length; i++) {
 			SPage page = header[i];
 			while (page != null) {
-				page.remove();
+				page.remove(force);
 				page = page.next;
 			}
 		}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-09-03 01:59:26 UTC (rev 3448)
@@ -92,6 +92,28 @@
 	private static final int IO_BUFFER_SIZE = 1 << 14;
 	private static final int COMPACTION_THRESHOLD = 1 << 25; //start checking at 32 megs
 	
+	private final class CleanupHook implements org.teiid.common.buffer.BatchManager.CleanupHook {
+		
+		private long id;
+		private int beginRow;
+		private WeakReference<BatchManagerImpl> ref;
+		
+		CleanupHook(long id, int beginRow, BatchManagerImpl batchManager) {
+			this.id = id;
+			this.beginRow = beginRow;
+			this.ref = new WeakReference<BatchManagerImpl>(batchManager);
+		}
+		
+		public void cleanup() {
+			BatchManagerImpl batchManager = ref.get();
+			if (batchManager == null) {
+				return;
+			}
+			cleanupManagedBatch(batchManager, beginRow, id);
+		}
+		
+	}
+	
 	private final class BatchManagerImpl implements BatchManager {
 		private final String id;
 		private volatile FileStore store;
@@ -163,7 +185,9 @@
 				store = newStore;
 				long oldOffset = offset;
 				offset = store.getLength();
-				LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Compacted store", id, "pre-size", oldOffset, "post-size", offset); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+				if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+					LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Compacted store", id, "pre-size", oldOffset, "post-size", offset); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+				}
 				return offset;
 			} finally {
 				this.compactionLock.writeLock().unlock();
@@ -213,7 +237,9 @@
 				this.lobManager = new LobManager();
 			}
 			sizeEstimate = (int) Math.max(1, manager.sizeUtility.getBatchSize(batch) / 1024);
-            LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to BufferManager", id, "with size estimate", sizeEstimate); //$NON-NLS-1$ //$NON-NLS-2$
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+				LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to BufferManager", id, "with size estimate", sizeEstimate); //$NON-NLS-1$ //$NON-NLS-2$
+			}
 		}
 		
 		@Override
@@ -249,7 +275,9 @@
 		@Override
 		public TupleBatch getBatch(boolean cache, String[] types) throws TeiidComponentException {
 			long reads = readAttempts.incrementAndGet();
-			LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, batchManager.id, "getting batch", reads, "reference hits", referenceHit.get()); //$NON-NLS-1$ //$NON-NLS-2$
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+				LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, batchManager.id, "getting batch", reads, "reference hits", referenceHit.get()); //$NON-NLS-1$ //$NON-NLS-2$
+			}
 			synchronized (activeBatches) {
 				TupleBufferInfo tbi = activeBatches.remove(batchManager.id);
 				if (tbi != null) { 
@@ -286,7 +314,9 @@
 					}
 				}
 				long count = readCount.incrementAndGet();
-				LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, batchManager.id, id, "reading batch from disk, total reads:", count); //$NON-NLS-1$
+				if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+					LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, batchManager.id, id, "reading batch from disk, total reads:", count); //$NON-NLS-1$
+				}
 				try {
 					this.batchManager.compactionLock.readLock().lock();
 					long[] info = batchManager.physicalMapping.get(this.id);
@@ -324,7 +354,9 @@
 				if (batch != null) {
 					if (!persistent) {
 						long count = writeCount.incrementAndGet();
-						LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, batchManager.id, id, "writing batch to disk, total writes: ", count); //$NON-NLS-1$
+						if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+							LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, batchManager.id, id, "writing batch to disk, total writes: ", count); //$NON-NLS-1$
+						}
 						long offset = 0;
 						if (lobManager != null) {
 							for (List<?> tuple : batch.getTuples()) {
@@ -341,7 +373,9 @@
 				            long[] info = new long[] {offset, size};
 				            batchManager.physicalMapping.put(this.id, info);
 						}
-						LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, batchManager.id, id, "batch written starting at:", offset); //$NON-NLS-1$
+						if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+							LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, batchManager.id, id, "batch written starting at:", offset); //$NON-NLS-1$
+						}
 					}
 					if (softCache) {
 						this.batchReference = new SoftReference<TupleBatch>(batch);
@@ -363,25 +397,17 @@
 		}
 
 		public void remove() {
-			synchronized (activeBatches) {
-				TupleBufferInfo tbi = activeBatches.get(batchManager.id);
-				if (tbi != null && tbi.removeBatch(this.beginRow) != null) {
-					if (tbi.batches.isEmpty()) {
-						activeBatches.remove(batchManager.id);
-					}
-				}
-			}
-			long[] info = batchManager.physicalMapping.remove(id);
-			if (info != null) {
-				batchManager.unusedSpace.addAndGet(info[1]); 
-			}
-			activeBatch = null;
-			batchReference = null;
+			cleanupManagedBatch(batchManager, beginRow, id);
 		}
+				
+		@Override
+		public CleanupHook getCleanupHook() {
+			return new CleanupHook(id, beginRow, batchManager);
+		}
 		
 		@Override
 		public String toString() {
-			return "ManagedBatch " + batchManager.id + " " + activeBatch; //$NON-NLS-1$ //$NON-NLS-2$
+			return "ManagedBatch " + batchManager.id + " " + this.beginRow + " " + activeBatch; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
 		}
 	}
 	
@@ -487,6 +513,21 @@
         return tupleBuffer;
     }
     
+    private void cleanupManagedBatch(BatchManagerImpl batchManager, int beginRow, long id) {
+		synchronized (activeBatches) {
+			TupleBufferInfo tbi = activeBatches.get(batchManager.id);
+			if (tbi != null && tbi.removeBatch(beginRow) != null) {
+				if (tbi.batches.isEmpty()) {
+					activeBatches.remove(batchManager.id);
+				}
+			}
+		}
+		long[] info = batchManager.physicalMapping.remove(id);
+		if (info != null) {
+			batchManager.unusedSpace.addAndGet(info[1]); 
+		}
+    }
+    
     public STree createSTree(final List elements, String groupName, int keyLength) {
     	String newID = String.valueOf(this.tsId.getAndIncrement());
     	int[] lobIndexes = LobManager.getLobIndexes(elements);
@@ -496,13 +537,17 @@
     	for (int i = 1; i < compareIndexes.length; i++) {
 			compareIndexes[i] = i;
 		}
-        LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Creating STree:", newID); //$NON-NLS-1$ 
+    	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+    		LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Creating STree:", newID); //$NON-NLS-1$
+    	}
     	return new STree(keyManager, bm, new ListNestedSortComparator(compareIndexes), getProcessorBatchSize(), keyLength, TupleBuffer.getTypeNames(elements));
     }
 
     @Override
     public FileStore createFileStore(String name) {
-        LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Creating FileStore:", name); //$NON-NLS-1$ 
+    	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+    		LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Creating FileStore:", name); //$NON-NLS-1$
+    	}
     	return this.diskMgr.createFileStore(name);
     }
         

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2011-09-03 01:59:26 UTC (rev 3448)
@@ -76,6 +76,7 @@
 import org.teiid.query.QueryPlugin;
 import org.teiid.query.tempdata.TempTableDataManager;
 import org.teiid.query.tempdata.TempTableStore;
+import org.teiid.query.tempdata.TempTableStore.TransactionMode;
 
 
 /**
@@ -235,7 +236,7 @@
     	}
 		ClientState state = clientState.get(key);
 		if (state == null && create) {
-			state = new ClientState(new TempTableStore(key));
+			state = new ClientState(new TempTableStore(key, TransactionMode.ISOLATE_WRITES));
     		clientState.put(key, state);
 		}
 		return state;

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java	2011-09-03 01:59:26 UTC (rev 3448)
@@ -356,13 +356,14 @@
             
             if (startAutoWrapTxn) {
                 try {
-                    tc = transactionService.begin(tc);
+                    transactionService.begin(tc);
                 } catch (XATransactionException err) {
                     throw new TeiidComponentException(err);
                 }
             }
         } 
         
+        tc.setIsolationLevel(requestMsg.getTransactionIsolation());
         this.transactionContext = tc;
         this.processor = new QueryProcessor(processPlan, context, bufferManager, processorDataManager);
     }

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java	2011-09-03 01:59:26 UTC (rev 3448)
@@ -462,31 +462,28 @@
     /**
      * Request level transaction
      */
-    public TransactionContext begin(TransactionContext context) throws XATransactionException{
+    public void begin(TransactionContext context) throws XATransactionException{
         if (context.getTransactionType() != TransactionContext.Scope.NONE) {
             throw new XATransactionException(QueryPlugin.Util.getString("TransactionServer.existing_transaction")); //$NON-NLS-1$
         }
         beginDirect(context);
         context.setTransactionType(TransactionContext.Scope.REQUEST);
-        return context;
     }
 
     /**
      * Request level transaction
      */    
-    public TransactionContext commit(TransactionContext context) throws XATransactionException {
+    public void commit(TransactionContext context) throws XATransactionException {
         Assertion.assertTrue(context.getTransactionType() == TransactionContext.Scope.REQUEST);
         commitDirect(context);
-        return context;
     }
 
     /**
      * Request level transaction
      */    
-    public TransactionContext rollback(TransactionContext context) throws XATransactionException {
+    public void rollback(TransactionContext context) throws XATransactionException {
         Assertion.assertTrue(context.getTransactionType() == TransactionContext.Scope.REQUEST);
         rollbackDirect(context);
-        return context;      
     }
 
     public void cancelTransactions(String threadId, boolean requestOnly) throws XATransactionException {

Modified: trunk/engine/src/main/java/org/teiid/dqp/service/TransactionContext.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/service/TransactionContext.java	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/engine/src/main/java/org/teiid/dqp/service/TransactionContext.java	2011-09-03 01:59:26 UTC (rev 3448)
@@ -46,7 +46,16 @@
     private long creationTime;
     private Transaction transaction;
     private Set<String> suspendedBy = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+    private int isolationLevel;
+
+    public int getIsolationLevel() {
+		return isolationLevel;
+	}
     
+    public void setIsolationLevel(int isolationLevel) {
+		this.isolationLevel = isolationLevel;
+	}
+    
     public long getCreationTime() {
 		return creationTime;
 	}

Modified: trunk/engine/src/main/java/org/teiid/dqp/service/TransactionService.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/service/TransactionService.java	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/engine/src/main/java/org/teiid/dqp/service/TransactionService.java	2011-09-03 01:59:26 UTC (rev 3448)
@@ -39,11 +39,11 @@
 public interface TransactionService {
     
     // processor level methods
-    TransactionContext begin(TransactionContext context) throws XATransactionException;
+    void begin(TransactionContext context) throws XATransactionException;
 
-    TransactionContext commit(TransactionContext context) throws XATransactionException;
+    void commit(TransactionContext context) throws XATransactionException;
 
-    TransactionContext rollback(TransactionContext context) throws XATransactionException;
+    void rollback(TransactionContext context) throws XATransactionException;
 
     TransactionContext getOrCreateTransactionContext(String threadId);
     

Modified: trunk/engine/src/main/java/org/teiid/query/processor/proc/Program.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/proc/Program.java	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/engine/src/main/java/org/teiid/query/processor/proc/Program.java	2011-09-03 01:59:26 UTC (rev 3448)
@@ -29,6 +29,7 @@
 import org.teiid.query.processor.ProcessorPlan;
 import org.teiid.query.sql.proc.Statement.Labeled;
 import org.teiid.query.tempdata.TempTableStore;
+import org.teiid.query.tempdata.TempTableStore.TransactionMode;
 
 
 /**
@@ -113,7 +114,7 @@
      */
     public void reset(String sessionId){
         counter = 0;
-        this.tempTables = new TempTableStore(sessionId);
+        this.tempTables = new TempTableStore(sessionId, TransactionMode.ISOLATE_WRITES);
         this.startedTxn = false;
     }
 

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java	2011-09-03 01:59:26 UTC (rev 3448)
@@ -46,6 +46,7 @@
 import org.teiid.query.sql.lang.QueryCommand;
 import org.teiid.query.sql.lang.WithQueryCommand;
 import org.teiid.query.tempdata.TempTableStore;
+import org.teiid.query.tempdata.TempTableStore.TransactionMode;
 import org.teiid.query.util.CommandContext;
 
 /**
@@ -87,7 +88,7 @@
     public void initialize(CommandContext context, ProcessorDataManager dataMgr, BufferManager bufferMgr) {
     	if (this.with != null) {
     		context = context.clone();
-    		tempTableStore = new TempTableStore(context.getConnectionID());
+    		tempTableStore = new TempTableStore(context.getConnectionID(), TransactionMode.NONE);
             tempTableStore.setParentTempTableStore(context.getTempTableStore());
             context.setTempTableStore(tempTableStore);
     		for (WithQueryCommand withCommand : this.with) {

Modified: trunk/engine/src/main/java/org/teiid/query/processor/xml/XMLPlan.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/xml/XMLPlan.java	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/engine/src/main/java/org/teiid/query/processor/xml/XMLPlan.java	2011-09-03 01:59:26 UTC (rev 3448)
@@ -60,6 +60,7 @@
 import org.teiid.query.sql.symbol.ElementSymbol;
 import org.teiid.query.sql.symbol.GroupSymbol;
 import org.teiid.query.tempdata.TempTableStore;
+import org.teiid.query.tempdata.TempTableStore.TransactionMode;
 import org.teiid.query.util.CommandContext;
 import org.xml.sax.Attributes;
 import org.xml.sax.EntityResolver;
@@ -110,7 +111,7 @@
     public void initialize(CommandContext context, ProcessorDataManager dataMgr, BufferManager bufferMgr) {
     	context = context.clone();
     	setContext(context);
-        TempTableStore tempTableStore = new TempTableStore(context.getConnectionID());
+        TempTableStore tempTableStore = new TempTableStore(context.getConnectionID(), TransactionMode.NONE);
         tempTableStore.setParentTempTableStore(context.getTempTableStore());
         context.setTempTableStore(tempTableStore);
         this.dataMgr = dataMgr;

Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java	2011-09-03 01:59:26 UTC (rev 3448)
@@ -30,6 +30,7 @@
 import org.teiid.api.exception.query.QueryResolverException;
 import org.teiid.api.exception.query.QueryValidatorException;
 import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidProcessingException;
 import org.teiid.query.metadata.TempMetadataID;
 import org.teiid.query.sql.symbol.GroupSymbol;
 import org.teiid.query.tempdata.GlobalTableStoreImpl.MatTableInfo;
@@ -52,7 +53,7 @@
 	List<?> updateMatViewRow(String matTableName, List<?> tuple, boolean delete) throws TeiidComponentException;
 
 	TempTable createMatTable(String tableName, GroupSymbol group)
-	throws TeiidComponentException, QueryMetadataException, QueryResolverException, QueryValidatorException;
+	throws TeiidComponentException, QueryMetadataException, TeiidProcessingException;
 	
 	@Replicated
 	void failedLoad(String matTableName);

Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java	2011-09-03 01:59:26 UTC (rev 3448)
@@ -40,6 +40,7 @@
 import org.teiid.api.exception.query.QueryValidatorException;
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.TeiidRuntimeException;
 import org.teiid.core.types.DataTypeManager;
 import org.teiid.language.SQLConstants;
@@ -60,6 +61,7 @@
 import org.teiid.query.sql.lang.Create;
 import org.teiid.query.sql.symbol.ElementSymbol;
 import org.teiid.query.sql.symbol.GroupSymbol;
+import org.teiid.query.tempdata.TempTableStore.TransactionMode;
 
 public class GlobalTableStoreImpl implements GlobalTableStore, ReplicatedObject {
 	
@@ -154,7 +156,7 @@
 	}
 	
 	private ConcurrentHashMap<String, MatTableInfo> matTables = new ConcurrentHashMap<String, MatTableInfo>();
-	private TempTableStore tableStore = new TempTableStore("SYSTEM"); //$NON-NLS-1$
+	private TempTableStore tableStore = new TempTableStore("SYSTEM", TransactionMode.ISOLATE_READS); //$NON-NLS-1$
 	private BufferManager bufferManager;
 	private QueryMetadataInterface metadata;
 	private Serializable localAddress;
@@ -196,13 +198,13 @@
 		String matTableName = RelationalPlanner.MAT_PREFIX+matViewName.toUpperCase();
 		GroupSymbol group = new GroupSymbol(matViewName);
 		group.setMetadataID(viewId);
-		TempMetadataID id = tableStore.tempMetadataStore.getTempGroupID(matTableName);
+		TempMetadataID id = tableStore.getMetadataStore().getTempGroupID(matTableName);
 		//define the table preserving the key/index information and ensure that only a single instance exists
 		if (id == null) {
 			synchronized (viewId) {
-				id = tableStore.tempMetadataStore.getTempGroupID(matTableName);
+				id = tableStore.getMetadataStore().getTempGroupID(matTableName);
 				if (id == null) {
-					id = tableStore.tempMetadataStore.addTempGroup(matTableName, ResolverUtil.resolveElementsInGroup(group, metadata), false, true);
+					id = tableStore.getMetadataStore().addTempGroup(matTableName, ResolverUtil.resolveElementsInGroup(group, metadata), false, true);
 					id.setQueryNode(metadata.getVirtualPlan(viewId));
 					id.setCardinality(metadata.getCardinality(viewId));
 					id.setOriginalMetadataID(viewId);
@@ -273,16 +275,31 @@
 
 	@Override
 	public void loaded(String matTableName, TempTable table) {
-		this.tableStore.swapTempTable(matTableName, table);
+		swapTempTable(matTableName, table);
 		this.getMatTableInfo(matTableName).setState(MatState.LOADED, true);
 	}
+	
+	private void swapTempTable(String tempTableName, TempTable tempTable) {
+    	this.tableStore.getTempTables().put(tempTableName, tempTable);
+    }
 
 	@Override
 	public List<?> updateMatViewRow(String matTableName, List<?> tuple,
 			boolean delete) throws TeiidComponentException {
 		TempTable tempTable = tableStore.getTempTable(matTableName);
 		if (tempTable != null) {
-			return tempTable.updateTuple(tuple, delete);
+			TempMetadataID id = tableStore.getMetadataStore().getTempGroupID(matTableName);
+			synchronized (id) {
+				boolean clone = tempTable.getActiveReaders().get() != 0;
+				if (clone) {
+					tempTable = tempTable.clone();
+				}
+				List<?> result = tempTable.updateTuple(tuple, delete);
+				if (clone) {
+					swapTempTable(matTableName, tempTable);
+				}
+				return result;
+			}
 		}
 		return null;
 	}
@@ -294,7 +311,7 @@
 	
 	@Override
 	public TempTable createMatTable(final String tableName, GroupSymbol group) throws TeiidComponentException,
-	QueryMetadataException, QueryResolverException, QueryValidatorException {
+	QueryMetadataException, TeiidProcessingException {
 		Create create = new Create();
 		create.setTable(group);
 		List<ElementSymbol> allColumns = ResolverUtil.resolveElementsInGroup(group, metadata);
@@ -304,7 +321,7 @@
 			List<ElementSymbol> pkColumns = resolveIndex(metadata, allColumns, pk);
 			create.getPrimaryKey().addAll(pkColumns);
 		}
-		TempTable table = getTempTableStore().addTempTable(tableName, create, bufferManager, false);
+		TempTable table = getTempTableStore().addTempTable(tableName, create, bufferManager, false, null);
 		table.setUpdatable(false);
 		CacheHint hint = table.getCacheHint();
 		if (hint != null) {
@@ -424,8 +441,8 @@
 
 	private void loadTable(String stateId, ObjectInputStream ois)
 			throws TeiidComponentException, QueryMetadataException,
-			QueryResolverException, QueryValidatorException, IOException,
-			ClassNotFoundException {
+			IOException,
+			ClassNotFoundException, TeiidProcessingException {
 		LogManager.logDetail(LogConstants.CTX_DQP, "loading table from remote stream", stateId); //$NON-NLS-1$
 		long updateTime = ois.readLong();
 		Serializable loadingAddress = (Serializable) ois.readObject();
@@ -449,7 +466,7 @@
 		tempTable.readFrom(ois);
 		MatTableInfo info = this.getMatTableInfo(stateId);
 		synchronized (info) {
-			this.tableStore.swapTempTable(stateId, tempTable);
+			swapTempTable(stateId, tempTable);
 			info.setState(state, true);
 			info.updateTime = updateTime;
 			info.loadingAddress = loadingAddress;

Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java	2011-09-03 01:59:26 UTC (rev 3448)
@@ -51,6 +51,7 @@
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidException;
 import org.teiid.core.TeiidProcessingException;
+import org.teiid.core.TeiidRuntimeException;
 import org.teiid.core.types.DataTypeManager;
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
@@ -75,7 +76,7 @@
  * TODO: in this implementation blocked exceptions will not happen
  *       allowing for subquery evaluation though would cause pauses
  */
-public class TempTable {
+public class TempTable implements Cloneable {
 	
 	private final class InsertUpdateProcessor extends UpdateProcessor {
 		
@@ -274,7 +275,9 @@
 		}
 		
 	}
+	private static AtomicInteger ID_GENERATOR = new AtomicInteger();
 	
+	private int id = ID_GENERATOR.getAndIncrement();
 	private STree tree;
 	private AtomicInteger rowId;
 	private List<ElementSymbol> columns;
@@ -292,6 +295,8 @@
 	private List<Integer> notNull = new LinkedList<Integer>();
 	private Map<Integer, AtomicInteger> sequences;
 	private int uniqueColIndex;
+	
+	private AtomicInteger activeReaders = new AtomicInteger();
 
 	TempTable(TempMetadataID tid, BufferManager bm, List<ElementSymbol> columns, int primaryKeyLength, String sessionID) {
 		this.tid = tid;
@@ -333,6 +338,33 @@
 		this.leafBatchSize = bm.getSchemaSize(columns.subList(0, primaryKeyLength));
 	}
 	
+	public TempTable clone() {
+		lock.readLock().lock();
+		try {
+			TempTable clone = (TempTable) super.clone();
+			clone.lock = new ReentrantReadWriteLock();
+			if (clone.indexTables != null) {
+				clone.indexTables = new LinkedHashMap<List<ElementSymbol>, TempTable>(clone.indexTables);
+				for (Map.Entry<List<ElementSymbol>, TempTable> entry : clone.indexTables.entrySet()) {
+					TempTable indexClone = entry.getValue().clone();
+					indexClone.lock = clone.lock;
+					entry.setValue(indexClone);
+				}
+			}
+			clone.tree = tree.clone();
+			clone.activeReaders = new AtomicInteger();
+			return clone;
+		} catch (CloneNotSupportedException e) {
+			throw new TeiidRuntimeException();
+		} finally {
+			lock.readLock().unlock();
+		}
+	}
+	
+	public AtomicInteger getActiveReaders() {
+		return activeReaders;
+	}
+	
 	void addIndex(List<ElementSymbol> indexColumns, boolean unique) throws TeiidComponentException, TeiidProcessingException {
 		List<ElementSymbol> keyColumns = columns.subList(0, tree.getKeyLength());
 		if (keyColumns.equals(indexColumns) || (indexTables != null && indexTables.containsKey(indexColumns))) {
@@ -500,9 +532,9 @@
 		return tree.getRowCount();
 	}
 	
-	public int truncate() {
+	public int truncate(boolean force) {
 		this.tid.getTableData().dataModified(tree.getRowCount());
-		return tree.truncate();
+		return tree.truncate(force);
 	}
 	
 	public void remove() {
@@ -770,5 +802,26 @@
 	public TempMetadataID getMetadataId() {
 		return tid;
 	}
+	
+	public int getId() {
+		return id;
+	}
+	
+	@Override
+	public int hashCode() {
+		return id;
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		}
+		if (!(obj instanceof TempTable)) {
+			return false;
+		}
+		TempTable other = (TempTable)obj;
+		return id == other.id;
+	}
 
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2011-09-03 01:59:26 UTC (rev 3448)
@@ -43,6 +43,7 @@
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.util.Assertion;
 import org.teiid.core.util.StringUtil;
 import org.teiid.dqp.internal.process.CachedResults;
 import org.teiid.dqp.internal.process.SessionAwareCache;
@@ -164,7 +165,7 @@
         		return null;
         	}
         	final String groupKey = group.getNonCorrelationName().toUpperCase();
-            final TempTable table = contextStore.getOrCreateTempTable(groupKey, command, bufferManager, true);
+            final TempTable table = contextStore.getOrCreateTempTable(groupKey, command, bufferManager, true, true, context);
         	if (command instanceof Insert) {
         		Insert insert = (Insert)command;
         		TupleSource ts = insert.getTupleSource();
@@ -186,8 +187,8 @@
         		final Delete delete = (Delete)command;
         		final Criteria crit = delete.getCriteria();
         		if (crit == null) {
-        			//because we are non-transactional, just use a truncate
-        			int rows = table.truncate();
+        			//TODO: we'll add a real truncate later
+        			int rows = table.truncate(false);
                     return CollectionTupleSource.createUpdateCountTupleSource(rows);
         		}
         		return table.delete(crit);
@@ -199,17 +200,18 @@
     		if (contextStore.hasTempTable(tempTableName)) {
                 throw new QueryProcessingException(QueryPlugin.Util.getString("TempTableStore.table_exist_error", tempTableName));//$NON-NLS-1$
             }
-    		contextStore.addTempTable(tempTableName, create, bufferManager, true);
+    		contextStore.addTempTable(tempTableName, create, bufferManager, true, context);
             return CollectionTupleSource.createUpdateCountTupleSource(0);	
     	}
     	if (command instanceof Drop) {
     		String tempTableName = ((Drop)command).getTable().getCanonicalName();
-    		contextStore.removeTempTableByName(tempTableName);
+    		contextStore.removeTempTableByName(tempTableName, context);
             return CollectionTupleSource.createUpdateCountTupleSource(0);
     	}
     	if (command instanceof AlterTempTable) {
     		AlterTempTable att = (AlterTempTable)command;
-    		TempTable tt = contextStore.getOrCreateTempTable(att.getTempTable().toUpperCase(), command, bufferManager, true);
+    		TempTable tt = contextStore.getTempTable(att.getTempTable().toUpperCase());
+    		Assertion.isNotNull(tt, "Table doesn't exist"); //$NON-NLS-1$
     		tt.setUpdatable(false);
     		if (att.getIndexColumns() != null) {
     			tt.addIndex(att.getIndexColumns(), false);
@@ -388,10 +390,10 @@
 					loadAsynch(context, group, tableName, globalStore);
 				}
 			} 
-			table = globalStore.getTempTableStore().getOrCreateTempTable(tableName, query, bufferManager, false);
+			table = globalStore.getTempTableStore().getOrCreateTempTable(tableName, query, bufferManager, false, false, context);
 			context.accessedDataObject(group.getMetadataID());
 		} else {
-			table = contextStore.getOrCreateTempTable(tableName, query, bufferManager, true);
+			table = contextStore.getOrCreateTempTable(tableName, query, bufferManager, true, false, context);
 			if (context.getDataObjects() != null) {
 				Object id = RelationalPlanner.getTrackableGroup(group, context.getMetadata());
 				if (id != null) {

Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java	2011-09-03 01:59:26 UTC (rev 3448)
@@ -22,15 +22,26 @@
 
 package org.teiid.query.tempdata;
 
+import java.sql.Connection;
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+
 import org.teiid.api.exception.query.QueryProcessingException;
 import org.teiid.common.buffer.BufferManager;
+import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidProcessingException;
+import org.teiid.dqp.service.TransactionContext;
+import org.teiid.dqp.service.TransactionContext.Scope;
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
 import org.teiid.query.QueryPlugin;
@@ -42,16 +53,121 @@
 import org.teiid.query.sql.lang.Insert;
 import org.teiid.query.sql.symbol.ElementSymbol;
 import org.teiid.query.sql.symbol.GroupSymbol;
+import org.teiid.query.util.CommandContext;
 
+/**
+ * TempTableStores are transactional, but do not act as full resource manager.
+ * This means we are effectively 1PC and don't allow any heuristic exceptions
+ * on commit.
+ *  
+ * Table state snapshoting and a {@link Synchronization} are used to
+ * perform the appropriate commit/rollback actions.
+ * 
+ * Full row level MVCC would be a good next step as it would remove the
+ * cost of state cloning and would allow for concurrent read/write transactions. 
+ */
 public class TempTableStore {
 	
-    TempMetadataStore tempMetadataStore = new TempMetadataStore(new ConcurrentHashMap<String, TempMetadataID>());
+    public interface TransactionCallback {
+    	void commit();
+    	void rollback();
+    }
+    
+    public enum TransactionMode {
+    	ISOLATE_READS, //for matviews that have atomic updates
+    	ISOLATE_WRITES, //for session/procedure stores that need rollback support - this is effectively READ_UNCOMMITTED
+    	NONE
+    }
+    
+    public class TempTableSynchronization implements Synchronization {
+    	
+    	private String id;
+    	Set<Integer> existingTables = new HashSet<Integer>();
+    	ConcurrentHashMap<String, TempTable> tables = new ConcurrentHashMap<String, TempTable>();
+        private List<TransactionCallback> callbacks = new LinkedList<TransactionCallback>();
+    	        
+        private boolean completed;
+        
+        public TempTableSynchronization(final String id) {
+        	this.id = id;
+        	for (TempTable tempTable : tempTables.values()) {
+        		existingTables.add(tempTable.getId());
+        	}
+        	if (transactionMode == TransactionMode.ISOLATE_WRITES) {
+        		addCallback(new TransactionCallback() {
+        	        private Map<String, TempMetadataID> clonedMetadata = new ConcurrentHashMap<String, TempMetadataID>(tempMetadataStore.getData());
+        	        private Map<String, TempTable> clonedTables = new ConcurrentHashMap<String, TempTable>(tempTables);
+					
+					@Override
+					public void rollback() {
+						LogManager.logDetail(LogConstants.CTX_DQP, "Rolling back txn", id, "restoring", clonedTables.keySet(), "using rollback tables", tables); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+						//remove any tables created in the scope of this txn
+						tempTables.values().removeAll(clonedTables.values());
+						for (TempTable table : tempTables.values()) {
+							table.remove();
+						}
+						
+						//restore the state
+						tempMetadataStore.getData().clear();
+						tempMetadataStore.getData().putAll(clonedMetadata);
+						tempTables.clear();
+						tempTables.putAll(clonedTables);
+						
+						//overlay the rollback tables
+						tempTables.putAll(tables);
+					}
+					
+					@Override
+					public void commit() {
+						//remove any original tables that were removed in this txn
+						clonedTables.values().removeAll(tempTables.values());
+						for (TempTable table : clonedTables.values()) {
+							table.remove();
+						}
+					}
+				});
+        	}
+		}
+    	
+    	@Override
+    	public synchronized void afterCompletion(int status) {
+    		//TODO: cleanup tables
+    		completed = true;
+    		synchronizations.remove(id);
+    		for (TransactionCallback callback : callbacks) {
+        		if (status == Status.STATUS_COMMITTED) {
+        			callback.commit();
+        		} else {
+        			callback.rollback();
+        		}
+			}
+    		callbacks.clear();
+    	}
+    	
+    	@Override
+    	public void beforeCompletion() {
+    		
+    	}
+    	
+    	public synchronized boolean addCallback(TransactionCallback callback) {
+    		if (!completed) {
+    			callbacks.add(0, callback);
+    		}
+    		return !completed;
+    	}
+    }
+    
+    private Map<String, TempTableSynchronization> synchronizations = new ConcurrentHashMap<String, TempTableSynchronization>();
+    private TransactionMode transactionMode = TransactionMode.NONE;
+	
+    private TempMetadataStore tempMetadataStore = new TempMetadataStore(new ConcurrentHashMap<String, TempMetadataID>());
     private Map<String, TempTable> tempTables = new ConcurrentHashMap<String, TempTable>();
     private String sessionID;
     private TempTableStore parentTempTableStore;
     
-    public TempTableStore(String sessionID) {
+    public TempTableStore(String sessionID, TransactionMode transactionMode) {
         this.sessionID = sessionID;
+        this.transactionMode = transactionMode;
     }
     
     public void setParentTempTableStore(TempTableStore parentTempTableStore) {
@@ -62,9 +178,10 @@
     	return tempTables.containsKey(tempTableName);
     }
 
-    TempTable addTempTable(String tempTableName, Create create, BufferManager buffer, boolean add) {
+    TempTable addTempTable(final String tempTableName, Create create, BufferManager buffer, boolean add, CommandContext context) throws TeiidProcessingException {
     	List<ElementSymbol> columns = create.getColumnSymbols();
     	TempMetadataID id = tempMetadataStore.getTempGroupID(tempTableName);
+    	getSynchronization(context);
     	if (id == null) {
 	        //add metadata
 	    	id = tempMetadataStore.addTempGroup(tempTableName, columns, false, true);
@@ -77,32 +194,67 @@
     		columns.removeAll(primaryKey);
     		columns.addAll(0, primaryKey);
     	}
-        TempTable tempTable = new TempTable(id, buffer, columns, create.getPrimaryKey().size(), sessionID);
+        final TempTable tempTable = new TempTable(id, buffer, columns, create.getPrimaryKey().size(), sessionID);
         if (add) {
         	tempTables.put(tempTableName, tempTable);
         }
         return tempTable;
     }
     
-    void swapTempTable(String tempTableName, TempTable tempTable) {
-    	tempTables.put(tempTableName, tempTable);
+    public void removeTempTableByName(final String tempTableName, CommandContext context) throws TeiidProcessingException {
+    	TempTableSynchronization synch = getSynchronization(context);
+    	tempMetadataStore.removeTempGroup(tempTableName);
+        final TempTable table = this.tempTables.remove(tempTableName);
+        if (table == null) {
+        	return;
+        }
+		if (transactionMode != TransactionMode.ISOLATE_WRITES || synch == null || !synch.existingTables.contains(table.getId())) {
+			table.remove();
+    	}
     }
 
-    public void removeTempTableByName(String tempTableName) {
-        tempMetadataStore.removeTempGroup(tempTableName);
-        TempTable table = this.tempTables.remove(tempTableName);
-        if(table != null) {
-            table.remove();
-        }      
-    }
-    
+	private TempTableSynchronization getSynchronization(CommandContext context) throws TeiidProcessingException {
+		TempTableSynchronization synch = null;
+		if (context == null || transactionMode == TransactionMode.NONE) {
+			return null;
+		}
+		TransactionContext tc = context.getTransactionContext();
+		if (tc == null || tc.getTransactionType() == Scope.NONE) {
+			return null;
+		}
+		String transactionId = tc.getTransactionId();
+		synch = synchronizations.get(transactionId);
+		if (synch == null) {
+			boolean success = false;
+			try {
+				synch = new TempTableSynchronization(transactionId);
+				synchronizations.put(transactionId, synch);
+				tc.getTransaction().registerSynchronization(synch);
+				success = true;
+			} catch (RollbackException e) {
+				throw new TeiidProcessingException(e);
+			} catch (SystemException e) {
+				throw new TeiidProcessingException(e);
+			} finally {
+				if (!success) {
+					synchronizations.remove(transactionId);
+				}
+			}
+		}
+		return synch;
+	}
+
     public TempMetadataStore getMetadataStore() {
         return tempMetadataStore;
     }
             
-    public void removeTempTables() {
+    public void removeTempTables() throws TeiidComponentException {
         for (String name : tempTables.keySet()) {
-            removeTempTableByName(name);
+            try {
+				removeTempTableByName(name, null);
+			} catch (TeiidProcessingException e) {
+				throw new TeiidComponentException(e);
+			}
         }
     }
     
@@ -117,8 +269,8 @@
         return this.tempTables.get(tempTableID);
     }
     
-    TempTable getOrCreateTempTable(String tempTableID, Command command, BufferManager buffer, boolean delegate) throws QueryProcessingException{
-    	TempTable tempTable = getTempTable(tempTableID, command, buffer, delegate);
+    TempTable getOrCreateTempTable(String tempTableID, Command command, BufferManager buffer, boolean delegate, boolean forUpdate, CommandContext context) throws TeiidProcessingException{
+    	TempTable tempTable = getTempTable(tempTableID, command, buffer, delegate, forUpdate, context);
     	if (tempTable != null) {
     		return tempTable;
     	}
@@ -138,18 +290,63 @@
         Create create = new Create();
         create.setTable(new GroupSymbol(tempTableID));
         create.setElementSymbolsAsColumns(columns);
-        return addTempTable(tempTableID, create, buffer, true);       
+        return addTempTable(tempTableID, create, buffer, true, context);       
     }
 
 	private TempTable getTempTable(String tempTableID, Command command,
-			BufferManager buffer, boolean delegate)
-			throws QueryProcessingException {
-		TempTable tsID = tempTables.get(tempTableID);
-        if(tsID != null) {
-            return tsID;
+			BufferManager buffer, boolean delegate, boolean forUpdate, CommandContext context)
+			throws TeiidProcessingException {
+		final TempTable tempTable = tempTables.get(tempTableID);
+        if(tempTable != null) {
+        	//isolate if needed
+    		if (forUpdate) {
+    			if (transactionMode == TransactionMode.ISOLATE_WRITES) {
+    				TransactionContext tc = context.getTransactionContext();
+        			if (tc != null) {
+        				TempTableSynchronization synch = getSynchronization(context);
+        				if (synch != null && synch.existingTables.contains(tempTable.getId())) {
+        					TempTable result = synch.tables.get(tempTableID);
+        					if (result == null) {
+        						synch.tables.put(tempTableID, tempTable.clone());
+        					}
+        					return tempTable;
+        				}
+        			}	
+    			}
+    		} else if (transactionMode == TransactionMode.ISOLATE_READS) {
+    			TransactionContext tc = context.getTransactionContext();
+    			if (tc != null && tc.getIsolationLevel() > Connection.TRANSACTION_READ_COMMITTED) {
+    				TempTableSynchronization synch = getSynchronization(context);
+    				if (synch != null) {
+    					TempTable result = synch.tables.get(tempTableID);
+    					if (result == null) {
+    						synch.tables.put(tempTableID, tempTable);
+    						result = tempTable;
+    						result.getActiveReaders().getAndIncrement();
+        					TransactionCallback callback = new TransactionCallback() {
+    							
+    							@Override
+    							public void rollback() {
+    								tempTable.getActiveReaders().getAndDecrement();
+    							}
+    							
+    							@Override
+    							public void commit() {
+    								tempTable.getActiveReaders().getAndDecrement();
+    							}
+    						};
+    						if (!synch.addCallback(callback)) {
+        						callback.rollback();
+        					}
+    					}
+    					return result;
+    				}
+    			}
+    		}
+            return tempTable;
         }
         if(delegate && this.parentTempTableStore != null){
-    		return this.parentTempTableStore.getTempTable(tempTableID, command, buffer, delegate);
+    		return this.parentTempTableStore.getTempTable(tempTableID, command, buffer, delegate, forUpdate, context);
         }
         return null;
 	}

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java	2011-09-03 01:59:26 UTC (rev 3448)
@@ -65,6 +65,11 @@
 				public void setPrefersMemory(boolean prefers) {
 					
 				}
+				
+				@Override
+				public CleanupHook getCleanupHook() {
+					return null;
+				}
 			};
 		}
 

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java	2011-09-03 01:59:26 UTC (rev 3448)
@@ -42,6 +42,7 @@
 import org.teiid.query.resolver.QueryResolver;
 import org.teiid.query.sql.lang.Command;
 import org.teiid.query.tempdata.TempTableStore;
+import org.teiid.query.tempdata.TempTableStore.TransactionMode;
 import org.teiid.query.unittest.RealMetadataFactory;
 import org.teiid.query.util.ContextProperties;
 
@@ -52,7 +53,7 @@
  */
 public class TestRequest extends TestCase {
 
-    private static final TempTableStore TEMP_TABLE_STORE = new TempTableStore("1"); //$NON-NLS-1$
+    private static final TempTableStore TEMP_TABLE_STORE = new TempTableStore("1", TransactionMode.ISOLATE_WRITES); //$NON-NLS-1$
 	private final static String QUERY = "SELECT * FROM pm1.g1";  //$NON-NLS-1$
     
     /**

Modified: trunk/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java	2011-09-03 01:59:26 UTC (rev 3448)
@@ -48,6 +48,7 @@
 import org.teiid.query.tempdata.TempTableDataManager;
 import org.teiid.query.tempdata.TempTableStore;
 import org.teiid.query.tempdata.GlobalTableStoreImpl.MatTableInfo;
+import org.teiid.query.tempdata.TempTableStore.TransactionMode;
 import org.teiid.query.unittest.RealMetadataFactory;
 import org.teiid.query.util.CommandContext;
 
@@ -62,7 +63,7 @@
 	private HardcodedDataManager hdm;
 	
 	@Before public void setUp() {
-		tempStore = new TempTableStore("1"); //$NON-NLS-1$
+		tempStore = new TempTableStore("1", TransactionMode.ISOLATE_WRITES); //$NON-NLS-1$
 	    BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
 	    QueryMetadataInterface actualMetadata = RealMetadataFactory.exampleMaterializedView();
 	    globalStore = new GlobalTableStoreImpl(bm, actualMetadata);

Modified: trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java	2011-09-03 01:59:26 UTC (rev 3448)
@@ -96,6 +96,7 @@
 import org.teiid.query.tempdata.GlobalTableStoreImpl;
 import org.teiid.query.tempdata.TempTableDataManager;
 import org.teiid.query.tempdata.TempTableStore;
+import org.teiid.query.tempdata.TempTableStore.TransactionMode;
 import org.teiid.query.unittest.RealMetadataFactory;
 import org.teiid.query.unittest.TimestampUtil;
 import org.teiid.query.util.CommandContext;
@@ -244,7 +245,7 @@
     	}
     	context.getNextRand(0);
         if (context.getTempTableStore() == null) {
-        	context.setTempTableStore(new TempTableStore(context.getConnectionID()));
+        	context.setTempTableStore(new TempTableStore(context.getConnectionID(), TransactionMode.ISOLATE_WRITES));
         }
         if (context.getGlobalTableStore() == null) {
         	GlobalTableStoreImpl gts = new GlobalTableStoreImpl(bufferMgr, context.getMetadata());

Modified: trunk/engine/src/test/java/org/teiid/query/processor/TestTempTables.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/TestTempTables.java	2011-09-02 21:01:40 UTC (rev 3447)
+++ trunk/engine/src/test/java/org/teiid/query/processor/TestTempTables.java	2011-09-03 01:59:26 UTC (rev 3448)
@@ -24,24 +24,38 @@
 
 import static org.junit.Assert.*;
 
+import java.sql.Connection;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Executor;
 
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.BufferManagerFactory;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.dqp.internal.process.CachedResults;
 import org.teiid.dqp.internal.process.SessionAwareCache;
+import org.teiid.dqp.service.TransactionContext;
+import org.teiid.dqp.service.TransactionContext.Scope;
 import org.teiid.metadata.FunctionMethod.Determinism;
 import org.teiid.query.metadata.TempMetadataAdapter;
 import org.teiid.query.optimizer.TestOptimizer;
 import org.teiid.query.optimizer.TestOptimizer.ComparisonMode;
+import org.teiid.query.tempdata.GlobalTableStoreImpl;
 import org.teiid.query.tempdata.TempTableDataManager;
 import org.teiid.query.tempdata.TempTableStore;
+import org.teiid.query.tempdata.TempTableStore.TransactionMode;
 import org.teiid.query.unittest.RealMetadataFactory;
 import org.teiid.query.util.CommandContext;
 
@@ -52,6 +66,10 @@
 	private TempTableDataManager dataManager;
 	private TempTableStore tempStore;
 	
+	private TransactionContext tc;
+	private Transaction txn;
+	private Synchronization synch;
+	
 	private ProcessorPlan execute(String sql, List[] expectedResults) throws Exception {
 		ProcessorPlan plan = TestProcessor.helpGetPlan(sql, metadata);
 		execute(plan, expectedResults);
@@ -60,6 +78,7 @@
 	
 	private void execute(ProcessorPlan processorPlan, List[] expectedResults) throws Exception {
 		CommandContext cc = TestProcessor.createCommandContext();
+		cc.setTransactionContext(tc);
 		cc.setMetadata(metadata);
 		cc.setTempTableStore(tempStore);
 		TestProcessor.doProcess(processorPlan, dataManager, expectedResults, cc);
@@ -67,7 +86,7 @@
 	}
 
 	@Before public void setUp() {
-		tempStore = new TempTableStore("1"); //$NON-NLS-1$
+		tempStore = new TempTableStore("1", TransactionMode.ISOLATE_WRITES); //$NON-NLS-1$
 		metadata = new TempMetadataAdapter(RealMetadataFactory.example1Cached(), tempStore.getMetadataStore());
 		metadata.setSession(true);
 		FakeDataManager fdm = new FakeDataManager();
@@ -84,6 +103,96 @@
 		dataManager = new TempTableDataManager(fdm, bm, executor, cache);
 	}
 	
+	@Test public void testRollbackNoExisting() throws Exception {
+		setupTransaction(Connection.TRANSACTION_SERIALIZABLE);
+		execute("create local temporary table x (e1 string, e2 integer)", new List[] {Arrays.asList(0)}); //$NON-NLS-1$
+		execute("insert into x (e2, e1) select e2, e1 from pm1.g1", new List[] {Arrays.asList(6)}); //$NON-NLS-1$
+		execute("update x set e1 = e2 where e2 > 1", new List[] {Arrays.asList(2)}); //$NON-NLS-1$
+		
+		Mockito.verify(txn).registerSynchronization((Synchronization) Mockito.anyObject());
+		synch.afterCompletion(Status.STATUS_ROLLEDBACK);
+		
+		try {
+			execute("select * from x", new List[] {});
+			fail();
+		} catch (Exception e) {
+			
+		}
+		execute("create local temporary table x (e1 string, e2 integer)", new List[] {Arrays.asList(0)}); //$NON-NLS-1$
+	}
+	
+	@Test public void testRollbackExisting() throws Exception {
+		execute("create local temporary table x (e1 string, e2 integer)", new List[] {Arrays.asList(0)}); //$NON-NLS-1$
+		setupTransaction(Connection.TRANSACTION_SERIALIZABLE);
+		//execute("create local temporary table x (e1 string, e2 integer)", new List[] {Arrays.asList(0)}); //$NON-NLS-1$
+		for (int i = 0; i < 86; i++) {
+			execute("insert into x (e2, e1) select e2, e1 from pm1.g1", new List[] {Arrays.asList(6)}); //$NON-NLS-1$
+		}
+		execute("update x set e1 = e2 where e2 > 1", new List[] {Arrays.asList(172)}); //$NON-NLS-1$
+		
+		synch.afterCompletion(Status.STATUS_ROLLEDBACK);
+
+		execute("select * from x", new List[] {});
+	}
+	
+	@Test public void testRollbackExisting1() throws Exception {
+		execute("create local temporary table x (e1 string, e2 integer)", new List[] {Arrays.asList(0)}); //$NON-NLS-1$
+		for (int i = 0; i < 86; i++) {
+			execute("insert into x (e2, e1) select e2, e1 from pm1.g1", new List[] {Arrays.asList(6)}); //$NON-NLS-1$
+		}
+		setupTransaction(Connection.TRANSACTION_SERIALIZABLE);
+		//execute("create local temporary table x (e1 string, e2 integer)", new List[] {Arrays.asList(0)}); //$NON-NLS-1$
+		for (int i = 0; i < 86; i++) {
+			execute("insert into x (e2, e1) select e2, e1 from pm1.g1", new List[] {Arrays.asList(6)}); //$NON-NLS-1$
+		}
+		execute("update x set e1 = e2 where e2 > 1", new List[] {Arrays.asList(344)}); //$NON-NLS-1$
+		
+		synch.afterCompletion(Status.STATUS_ROLLEDBACK);
+		this.tc = null;
+		
+		execute("select count(*) from x", new List[] {Arrays.asList(516)});
+		
+		execute("delete from x", new List[] {Arrays.asList(516)});
+	}
+	
+	@Test public void testIsolateReads() throws Exception {
+		GlobalTableStoreImpl gtsi = new GlobalTableStoreImpl(BufferManagerFactory.getStandaloneBufferManager(), RealMetadataFactory.example1Cached());
+		tempStore = gtsi.getTempTableStore();
+		metadata = new TempMetadataAdapter(RealMetadataFactory.example1Cached(), tempStore.getMetadataStore());
+		execute("create local temporary table x (e1 string, e2 integer)", new List[] {Arrays.asList(0)}); //$NON-NLS-1$
+		for (int i = 0; i < 86; i++) {
+			execute("insert into x (e2, e1) select e2, e1 from pm1.g1", new List[] {Arrays.asList(6)}); //$NON-NLS-1$
+		}
+		setupTransaction(Connection.TRANSACTION_SERIALIZABLE);
+		execute("select count(*) from x", new List[] {Arrays.asList(516)});
+		gtsi.updateMatViewRow("X", Arrays.asList(1), true);
+		tc=null;
+		//outside of the transaction we can see the row removed
+		execute("select count(*) from x", new List[] {Arrays.asList(515)});
+		
+		//back in the transaction we see the original state
+		setupTransaction(Connection.TRANSACTION_SERIALIZABLE);
+		execute("select count(*) from x", new List[] {Arrays.asList(516)});
+		
+		synch.afterCompletion(Status.STATUS_COMMITTED);
+	}
+
+	private void setupTransaction(int isolation) throws RollbackException, SystemException {
+		txn = Mockito.mock(Transaction.class);
+		Mockito.doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) throws Throwable {
+				synch = (Synchronization)invocation.getArguments()[0];
+				return null;
+			}
+		}).when(txn).registerSynchronization((Synchronization)Mockito.anyObject());
+		Mockito.stub(txn.toString()).toReturn("txn");
+		tc = new TransactionContext();
+		tc.setTransaction(txn);
+		tc.setIsolationLevel(isolation);
+		tc.setTransactionType(Scope.REQUEST);
+	}
+	
 	@Test public void testInsertWithQueryExpression() throws Exception {
 		execute("create local temporary table x (e1 string, e2 integer)", new List[] {Arrays.asList(0)}); //$NON-NLS-1$
 		execute("insert into x (e2, e1) select e2, e1 from pm1.g1", new List[] {Arrays.asList(6)}); //$NON-NLS-1$



More information about the teiid-commits mailing list