[teiid-commits] teiid SVN: r1545 - in branches/JCA: engine/src/main/java/com/metamatrix/dqp/message and 7 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Fri Nov 6 17:01:50 EST 2009


Author: rareddy
Date: 2009-11-06 17:01:50 -0500 (Fri, 06 Nov 2009)
New Revision: 1545

Modified:
   branches/JCA/connector-api/src/main/java/org/teiid/connector/basic/BasicManagedConnection.java
   branches/JCA/engine/src/main/java/com/metamatrix/dqp/message/AtomicRequestMessage.java
   branches/JCA/engine/src/main/java/com/metamatrix/dqp/service/TransactionContext.java
   branches/JCA/engine/src/main/java/com/metamatrix/dqp/service/TransactionService.java
   branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java
   branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
   branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
   branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItemFactory.java
   branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/SynchConnectorWorkItem.java
   branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
   branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AsyncRequestWorkItem.java
   branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
   branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
   branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java
   branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
   branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   branches/JCA/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java
   branches/JCA/engine/src/main/resources/com/metamatrix/dqp/i18n.properties
   branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java
   branches/JCA/engine/src/test/java/org/teiid/dqp/internal/transaction/TestTransactionServer.java
Log:
TEIID-833: Fixed the code for using container based transaction manager.

Modified: branches/JCA/connector-api/src/main/java/org/teiid/connector/basic/BasicManagedConnection.java
===================================================================
--- branches/JCA/connector-api/src/main/java/org/teiid/connector/basic/BasicManagedConnection.java	2009-11-02 19:41:28 UTC (rev 1544)
+++ branches/JCA/connector-api/src/main/java/org/teiid/connector/basic/BasicManagedConnection.java	2009-11-06 22:01:50 UTC (rev 1545)
@@ -58,8 +58,10 @@
 
 	@Override
 	public void cleanup() throws ResourceException {
-		this.conn.close();
-		this.conn = null;
+		if (this.conn != null) {
+			this.conn.close();
+			this.conn = null;
+		}
 	}
 
 	@Override

Modified: branches/JCA/engine/src/main/java/com/metamatrix/dqp/message/AtomicRequestMessage.java
===================================================================
--- branches/JCA/engine/src/main/java/com/metamatrix/dqp/message/AtomicRequestMessage.java	2009-11-02 19:41:28 UTC (rev 1544)
+++ branches/JCA/engine/src/main/java/com/metamatrix/dqp/message/AtomicRequestMessage.java	2009-11-06 22:01:50 UTC (rev 1545)
@@ -125,7 +125,7 @@
     }
 
     public boolean isTransactional(){
-        return this.txnContext != null && this.txnContext.isInTransaction();
+        return this.txnContext != null && this.txnContext.getXid() != null;
     }    
 	
 	public Command getCommand() {

Modified: branches/JCA/engine/src/main/java/com/metamatrix/dqp/service/TransactionContext.java
===================================================================
--- branches/JCA/engine/src/main/java/com/metamatrix/dqp/service/TransactionContext.java	2009-11-02 19:41:28 UTC (rev 1544)
+++ branches/JCA/engine/src/main/java/com/metamatrix/dqp/service/TransactionContext.java	2009-11-06 22:01:50 UTC (rev 1545)
@@ -24,8 +24,11 @@
 
 import java.io.Serializable;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.resource.spi.work.ExecutionContext;
 
@@ -46,10 +49,10 @@
     private long creationTime;
     private boolean rollback = false;
     private Set<String> suspendedBy = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
-    boolean inTransaction = false;
+    private Map<String, AtomicInteger> txnSources = Collections.synchronizedMap(new HashMap<String, AtomicInteger>());
     
     public boolean isInTransaction() {
-        return (getXid() != null && inTransaction);
+        return (getXid() != null && this.txnSources.size() > 0);
     }
 
     public long getCreationTime() {
@@ -98,7 +101,17 @@
         return this.suspendedBy;
     }
 
-	public void inTransaction(boolean flag) {
-		this.inTransaction = flag;
+	public void incrementPartcipatingSourceCount(String source) {
+		AtomicInteger count = txnSources.get(source);
+		if (count == null) {
+			txnSources.put(source, new AtomicInteger(1));
+		}
+		else {
+			count.incrementAndGet();
+		}
+	}
+	
+	public boolean isOnePhase() {
+		return this.txnSources.size() == 1;
 	}    
 }
\ No newline at end of file

Modified: branches/JCA/engine/src/main/java/com/metamatrix/dqp/service/TransactionService.java
===================================================================
--- branches/JCA/engine/src/main/java/com/metamatrix/dqp/service/TransactionService.java	2009-11-02 19:41:28 UTC (rev 1544)
+++ branches/JCA/engine/src/main/java/com/metamatrix/dqp/service/TransactionService.java	2009-11-06 22:01:50 UTC (rev 1545)
@@ -58,19 +58,19 @@
     void cancelTransactions(String threadId, boolean requestOnly) throws XATransactionException;
 
     // global transaction methods
-    int prepare(final String threadId, MMXid xid) throws XATransactionException;
+    int prepare(final String threadId, MMXid xid, boolean singleTM) throws XATransactionException;
 
-	void commit(final String threadId, MMXid xid, boolean onePhase) throws XATransactionException;
+	void commit(final String threadId, MMXid xid, boolean onePhase, boolean singleTM) throws XATransactionException;
 	
-	void rollback(final String threadId, MMXid xid) throws XATransactionException;
+	void rollback(final String threadId, MMXid xid, boolean singleTM) throws XATransactionException;
 	
-	Xid[] recover(int flag) throws XATransactionException;
+	Xid[] recover(int flag, boolean singleTM) throws XATransactionException;
 	
-	void forget(final String threadId, MMXid xid) throws XATransactionException;
+	void forget(final String threadId, MMXid xid, boolean singleTM) throws XATransactionException;
 	
-	void start(final String threadId, MMXid xid, int flags, int timeout) throws XATransactionException;
+	void start(final String threadId, MMXid xid, int flags, int timeout, boolean singleTM) throws XATransactionException;
 	
-	void end(final String threadId, MMXid xid, int flags) throws XATransactionException;
+	void end(final String threadId, MMXid xid, int flags, boolean singleTM) throws XATransactionException;
         
 	// management methods
     Collection<Transaction> getTransactions();

Modified: branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java
===================================================================
--- branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java	2009-11-02 19:41:28 UTC (rev 1544)
+++ branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java	2009-11-06 22:01:50 UTC (rev 1545)
@@ -31,7 +31,7 @@
 
 public class AsynchConnectorWorkItem extends ConnectorWorkItem {
                 
-    AsynchConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager, ResultsReceiver<AtomicResultsMessage> resultsReceiver) {
+    AsynchConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager, ResultsReceiver<AtomicResultsMessage> resultsReceiver) throws ConnectorException {
     	super(message, manager, resultsReceiver);
     }
     

Modified: branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
===================================================================
--- branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java	2009-11-02 19:41:28 UTC (rev 1544)
+++ branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java	2009-11-06 22:01:50 UTC (rev 1545)
@@ -174,8 +174,13 @@
     
     private void enqueueRequest(ConnectorWorkItem work) throws ConnectorException {
         try {
-        	// TODO: install work listener; what can we use this for?
-			this.workManager.scheduleWork(work, 0, work.requestMsg.getTransactionContext(), null);
+        	// the connector is immutable, then we do not want pass-on the transaction context.
+        	if (work.securityContext.isTransactional()) {
+        		this.workManager.scheduleWork(work, 0, work.requestMsg.getTransactionContext(), work);
+        	}
+        	else {
+        		this.workManager.scheduleWork(work);
+        	}
 		} catch (WorkException e) {
 			throw new ConnectorException(e);
 		}

Modified: branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
===================================================================
--- branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java	2009-11-02 19:41:28 UTC (rev 1544)
+++ branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java	2009-11-06 22:01:50 UTC (rev 1545)
@@ -26,6 +26,8 @@
 import java.util.Arrays;
 import java.util.List;
 
+import javax.resource.spi.work.WorkEvent;
+
 import org.teiid.connector.api.Connection;
 import org.teiid.connector.api.Connector;
 import org.teiid.connector.api.ConnectorEnvironment;
@@ -73,7 +75,8 @@
     protected AtomicRequestID id;
     protected ConnectorManager manager;
     protected AtomicRequestMessage requestMsg;
-    protected boolean isTransactional;
+    protected Connector connector;
+    QueryMetadataInterface queryMetadata;
     
     /* Created on new request */
     protected Connection connection;
@@ -103,7 +106,7 @@
 
     protected ResultsReceiver<AtomicResultsMessage> resultsReceiver;
     
-    ConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager, ResultsReceiver<AtomicResultsMessage> resultsReceiver) {
+    ConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager, ResultsReceiver<AtomicResultsMessage> resultsReceiver) throws ConnectorException {
         this.id = message.getAtomicRequestID();
         this.requestMsg = message;
         this.manager = manager;
@@ -122,24 +125,27 @@
         this.securityContext.setBatchSize(this.requestMsg.getFetchSize());
         this.securityContext.setContextCache(manager.getContextCache());
         this.securityContext.setMetadataService(manager.getMetadataService());
+        
+        this.connector = manager.getConnector();
+        this.connectorEnv = connector.getConnectorEnvironment();
+        try {
+	        this.queryMetadata = new TempMetadataAdapter(manager.getMetadataService().lookupMetadata(this.requestMsg.getWorkContext().getVdbName(), this.requestMsg.getWorkContext().getVdbVersion()), new TempMetadataStore());
+	        
+	        if (requestMsg.isTransactional()){
+	        	if (this.connectorEnv.isXaCapable()) {
+		    		this.securityContext.setTransactional(true);
+	        	} else if (!this.connectorEnv.isImmutable() && requestMsg.getCommand().updatingModelCount(queryMetadata) > 0) {
+	    	        throw new ConnectorException(DQPPlugin.Util.getString("ConnectorWorker.transactionNotSupported")); //$NON-NLS-1$
+	    	    }
+	        }
+        } catch(MetaMatrixComponentException e) {
+        	throw new ConnectorException(e);
+        }
     }
 
-    protected void createConnection(Connector connector, QueryMetadataInterface queryMetadata) throws ConnectorException, MetaMatrixComponentException {
+    protected void createConnection() throws ConnectorException {
         LogManager.logTrace(LogConstants.CTX_CONNECTOR, new Object[] {id, "creating connection for atomic-request"});  //$NON-NLS-1$
-        
-        ConnectorEnvironment env = connector.getConnectorEnvironment();
-        
-        if (requestMsg.isTransactional()){
-        	if (env.isXaCapable()) {
-	    		this.securityContext.setTransactional(true);
-	    		this.isTransactional = true;
-        	} else if (!env.isImmutable() && requestMsg.getCommand().updatingModelCount(queryMetadata) > 0) {
-    	        throw new ConnectorException(DQPPlugin.Util.getString("ConnectorWorker.transactionNotSupported")); //$NON-NLS-1$
-    	    }
-    	}
-        
-    	this.connection = connector.getConnection();
-    	this.connectorEnv = env;
+    	this.connection = this.connector.getConnection();
     }
     
     protected void process() {
@@ -253,14 +259,11 @@
         } catch (Throwable e) {
             LogManager.logError(LogConstants.CTX_CONNECTOR, e, e.getMessage());
         } finally {
-        	try {
-                if (connection != null) {
-                    connection.close();
-                    LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id, "Closed connection"}); //$NON-NLS-1$
-                }
-            } finally {
-                manager.removeState(this.id);
-                sendClose();
+        	// Close the underlying connection, but send the close response only upon the notification from
+        	// container in workCompleted call.
+            if (connection != null) {
+                connection.close();
+                LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id, "Closed connection"}); //$NON-NLS-1$
             }
         }        
     }
@@ -282,9 +285,8 @@
 	protected void createExecution() throws MetaMatrixComponentException,
 			ConnectorException {
     	LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.requestMsg.getAtomicRequestID(), "Processing NEW request:", this.requestMsg.getCommand()}); //$NON-NLS-1$                                     
-
-		QueryMetadataInterface queryMetadata = new TempMetadataAdapter(manager.getMetadataService().lookupMetadata(this.requestMsg.getWorkContext().getVdbName(), this.requestMsg.getWorkContext().getVdbVersion()), new TempMetadataStore());
-        createConnection(manager.getConnector(), queryMetadata);
+		
+        createConnection();
         
         LogManager.logTrace(LogConstants.CTX_CONNECTOR, new Object[] {id, "creating execution for atomic-request"});  //$NON-NLS-1$
 
@@ -521,4 +523,18 @@
 		return this.id.toString();
 	}
 
+	@Override
+	public void workCompleted(WorkEvent arg0) {
+        manager.removeState(this.id);
+        sendClose();
+	}
+
+	@Override
+	public void workRejected(WorkEvent event) {
+		try {
+			asynchCancel();
+		} catch (ConnectorException e) {
+			LogManager.logError(LogConstants.CTX_CONNECTOR, event.getException(), this.id.toString()); //$NON-NLS-1$
+		}
+	}
 }
\ No newline at end of file

Modified: branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItemFactory.java
===================================================================
--- branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItemFactory.java	2009-11-02 19:41:28 UTC (rev 1544)
+++ branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItemFactory.java	2009-11-06 22:01:50 UTC (rev 1545)
@@ -56,7 +56,7 @@
 		private CachedResultsConnectorWorkItem(AtomicRequestMessage message,
 				ConnectorManager manager,
 				ResultsReceiver<AtomicResultsMessage> resultsReceiver,
-				CacheID cacheID) {
+				CacheID cacheID) throws ConnectorException {
 			super(message, manager, resultsReceiver);
 			this.cacheID = cacheID;
 		}
@@ -136,7 +136,7 @@
 		this.synchWorkers = synchWorkers;
 	}
 	
-	public ConnectorWorkItem createWorkItem(AtomicRequestMessage message, ResultsReceiver<AtomicResultsMessage> receiver) {
+	public ConnectorWorkItem createWorkItem(AtomicRequestMessage message, ResultsReceiver<AtomicResultsMessage> receiver) throws ConnectorException {
     	if (this.rsCache != null && message.useResultSetCache()) {
         	final CacheID cacheID = createCacheID(message);
 

Modified: branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/SynchConnectorWorkItem.java
===================================================================
--- branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/SynchConnectorWorkItem.java	2009-11-02 19:41:28 UTC (rev 1544)
+++ branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/SynchConnectorWorkItem.java	2009-11-06 22:01:50 UTC (rev 1545)
@@ -26,12 +26,16 @@
 import java.util.Map;
 import java.util.concurrent.Semaphore;
 
+import javax.resource.spi.work.WorkEvent;
 import javax.transaction.xa.Xid;
 
+import org.teiid.connector.api.ConnectorException;
+
 import com.metamatrix.common.comm.api.ResultsReceiver;
 import com.metamatrix.common.log.LogManager;
 import com.metamatrix.dqp.message.AtomicRequestMessage;
 import com.metamatrix.dqp.message.AtomicResultsMessage;
+import com.metamatrix.dqp.service.TransactionContext;
 import com.metamatrix.dqp.util.LogConstants;
 
 public class SynchConnectorWorkItem extends ConnectorWorkItem {
@@ -45,33 +49,29 @@
 
 	private TransactionLock lock;
 
-	SynchConnectorWorkItem(AtomicRequestMessage message,
-			ConnectorManager manager, ResultsReceiver<AtomicResultsMessage> resultsReceiver) {
+	SynchConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager, ResultsReceiver<AtomicResultsMessage> resultsReceiver) throws ConnectorException  {
 		super(message, manager, resultsReceiver);
+		
+		// since container makes sure that there is no current work registered under current transaction it is
+		// required that lock must be acquired before we schedule the work.
+		try {
+			acquireTransactionLock();
+		} catch (InterruptedException e) {
+			interrupted(e);
+		} 			
 	}
 	
 	@Override
 	public void run() {
 		while (!this.isDoneProcessing()) { //process until closed
-			try {
-				acquireTransactionLock();
-			} catch (InterruptedException e) {
-				interrupted(e);
-			} 
-			try {
-				super.run();
-			} finally {
-				releaseTxnLock();
-			}
+			super.run();
 		}
 	}
 
 	@Override
 	protected void pauseProcessing() {
-		releaseTxnLock();
 		try {
 			this.wait();
-			acquireTransactionLock();
 		} catch (InterruptedException e) {
 			interrupted(e);
 		}
@@ -88,10 +88,11 @@
 	}
 	
 	private void acquireTransactionLock() throws InterruptedException {
-		if (!this.isTransactional) {
+		TransactionContext tc = this.requestMsg.getTransactionContext();
+		if ( tc == null) {
 			return;
 		}
-		Xid key = requestMsg.getTransactionContext().getXid();
+		Xid key = tc.getXid();
 
 		TransactionLock existing = null;
 		synchronized (TRANSACTION_LOCKS) {
@@ -101,20 +102,24 @@
 				TRANSACTION_LOCKS.put(key, existing);
 			}
 			existing.pendingCount++;
+			tc.incrementPartcipatingSourceCount(requestMsg.getConnectorName());
 		}
 		existing.lock.acquire();
 		this.lock = existing;
+		System.out.println("got the connector lock on ="+key);
 	}
 
 	private void releaseTxnLock() {
-		if (!this.isTransactional || this.lock == null) {
+		TransactionContext tc = this.requestMsg.getTransactionContext();
+		if ( tc == null || this.lock == null) {
 			return;
-		}
+		}		
 		synchronized (TRANSACTION_LOCKS) {
 			lock.pendingCount--;
 			if (lock.pendingCount == 0) {
-				Xid key = requestMsg.getTransactionContext().getXid();
+				Xid key = tc.getXid();
 				TRANSACTION_LOCKS.remove(key);
+				System.out.println("released the connector lock on ="+key);
 			}
 		}
 		lock.lock.release();
@@ -128,4 +133,22 @@
     	return true;
     }
 
+    
+	@Override
+	public void workCompleted(WorkEvent event) {
+		try {
+			super.workCompleted(event);
+		} finally {
+			releaseTxnLock();
+		}
+	}
+
+	@Override
+	public void workRejected(WorkEvent event) {
+		try {
+			super.workRejected(event);
+		} finally {
+			releaseTxnLock();
+		}
+	}
 }

Modified: branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
===================================================================
--- branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java	2009-11-02 19:41:28 UTC (rev 1544)
+++ branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java	2009-11-06 22:01:50 UTC (rev 1545)
@@ -23,6 +23,8 @@
 package org.teiid.dqp.internal.process;
 
 import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkEvent;
+import javax.resource.spi.work.WorkListener;
 
 import com.metamatrix.common.log.LogManager;
 import com.metamatrix.dqp.util.LogConstants;
@@ -32,7 +34,7 @@
  * Represents a task that performs work that may take more than one processing pass to complete.
  * During processing the WorkItem may receive events asynchronously through the moreWork method.
  */
-public abstract class AbstractWorkItem implements Work {
+public abstract class AbstractWorkItem implements Work, WorkListener{
 	
     enum ThreadState {
     	MORE_WORK, WORKING, IDLE, DONE
@@ -134,4 +136,20 @@
 	public boolean shouldAbortProcessing() {
 		return this.release;
 	}
+	
+	@Override
+	public void workAccepted(WorkEvent arg0) {
+	}
+
+	@Override
+	public void workCompleted(WorkEvent arg0) {
+	}
+
+	@Override
+	public void workRejected(WorkEvent event) {
+	}
+
+	@Override
+	public void workStarted(WorkEvent arg0) {
+	}		
 }

Modified: branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AsyncRequestWorkItem.java
===================================================================
--- branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AsyncRequestWorkItem.java	2009-11-02 19:41:28 UTC (rev 1544)
+++ branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AsyncRequestWorkItem.java	2009-11-06 22:01:50 UTC (rev 1545)
@@ -16,7 +16,7 @@
 	
 	@Override
 	protected void resumeProcessing() {
-		dqpCore.addWork(this, getTransactionContext());
+		dqpCore.addWork(this);
 	}
 	
 	@Override

Modified: branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2009-11-02 19:41:28 UTC (rev 1544)
+++ branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2009-11-06 22:01:50 UTC (rev 1545)
@@ -206,7 +206,7 @@
             if(holder != null && !holder.isCanceled()) {
             	RequestInfo req = new RequestInfo(holder.requestID, holder.requestMsg.getCommandString(), holder.requestMsg.getSubmittedTimestamp(), holder.requestMsg.getProcessingTimestamp());
             	req.setSessionToken(holder.dqpWorkContext.getSessionToken());
-            	if (holder.getTransactionContext() != null && holder.getTransactionContext().isInTransaction()) {
+            	if (holder.getTransactionContext() != null && holder.getTransactionContext().getXid() != null) {
             		req.setXid(holder.getTransactionContext().getXid());
             	}
 
@@ -272,7 +272,7 @@
     	logMMCommand(workItem, true, false, 0); 
         addRequest(requestID, workItem);
         
-        this.addWork(workItem, workItem.getTransactionContext());      
+        this.addWork(workItem);      
         return resultsFuture;
     }
 	
@@ -322,9 +322,9 @@
         return rsCache.hasResults(cID);        
     }
            
-    void addWork(Work work, TransactionContext context) {
+    void addWork(Work work) {
     	try {
-			this.processWorkerPool.scheduleWork(work, 20000, context, null);
+			this.processWorkerPool.scheduleWork(work);
 		} catch (WorkException e) {
 			//TODO: how can be turn this into result?
 			e.printStackTrace();
@@ -708,39 +708,35 @@
 	// global txn
 	public void commit(MMXid xid, boolean onePhase) throws XATransactionException {
 		String threadId = DQPWorkContext.getWorkContext().getConnectionID();
-		this.getTransactionService().commit(threadId, xid, onePhase);
+		this.getTransactionService().commit(threadId, xid, onePhase, false);
 	}
 	// global txn
 	public void end(MMXid xid, int flags) throws XATransactionException {
 		String threadId = DQPWorkContext.getWorkContext().getConnectionID();
-		this.getTransactionService().end(threadId, xid, flags);
+		this.getTransactionService().end(threadId, xid, flags, false);
 	}
 	// global txn
 	public void forget(MMXid xid) throws XATransactionException {
 		String threadId = DQPWorkContext.getWorkContext().getConnectionID();
-		this.getTransactionService().forget(threadId, xid);
+		this.getTransactionService().forget(threadId, xid, false);
 	}
 	// global txn
 	public int prepare(MMXid xid) throws XATransactionException {
-		return this.getTransactionService().prepare(
-				DQPWorkContext.getWorkContext().getConnectionID(),
-				xid);
+		return this.getTransactionService().prepare(DQPWorkContext.getWorkContext().getConnectionID(),xid, false);
 	}
 	// global txn
 	public Xid[] recover(int flag) throws XATransactionException {
-		return this.getTransactionService().recover(flag);
+		return this.getTransactionService().recover(flag, false);
 	}
 	// global txn
 	public void rollback(MMXid xid) throws XATransactionException {
-		this.getTransactionService().rollback(
-				DQPWorkContext.getWorkContext().getConnectionID(),
-				xid);
+		this.getTransactionService().rollback(DQPWorkContext.getWorkContext().getConnectionID(),xid, false);
 	}
 	// global txn
 	public void start(MMXid xid, int flags, int timeout)
 			throws XATransactionException {
 		String threadId = DQPWorkContext.getWorkContext().getConnectionID();
-		this.getTransactionService().start(threadId, xid, flags, timeout);
+		this.getTransactionService().start(threadId, xid, flags, timeout, false);
 	}
 
 	public MetadataResult getMetadata(long requestID)

Modified: branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
--- branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2009-11-02 19:41:28 UTC (rev 1544)
+++ branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2009-11-06 22:01:50 UTC (rev 1545)
@@ -105,7 +105,7 @@
 	        try {
 	        	this.dataMgr.executeRequest(aqr, this.connectorName, this);
 	        } catch (MetaMatrixComponentException e) {
-	        	exceptionOccurred(e);
+	        	exceptionOccurred(e, true);
 	        }
         }
     }
@@ -166,6 +166,11 @@
     }
 
 	public void exceptionOccurred(Throwable e) {
+		exceptionOccurred(e, false);
+	}
+    
+    private void exceptionOccurred(Throwable e, boolean removeState) {
+    
 		synchronized (this) {
 			if(workItem.requestMsg.supportsPartialResults()) {
 				nextBatch = new List[0];
@@ -178,7 +183,9 @@
 			}	
 			waitingForData = false;
 		}
-		workItem.closeAtomicRequest(aqr.getAtomicRequestID());
+		if (removeState) {
+			workItem.closeAtomicRequest(aqr.getAtomicRequestID());
+		}
 		this.workItem.moreWork();
 	}
 

Modified: branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java
===================================================================
--- branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java	2009-11-02 19:41:28 UTC (rev 1544)
+++ branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java	2009-11-06 22:01:50 UTC (rev 1545)
@@ -78,7 +78,7 @@
             shouldClose = chunk.isLast();
     	} catch (BlockedOnMemoryException e) {
 			LogManager.logDetail(LogConstants.CTX_DQP, new Object[] {"Reenqueueing LOB chunk request due to lack of available memory ###########", requestID}); //$NON-NLS-1$ //$NON-NLS-2$
-			this.dqpCore.addWork(this, null);
+			this.dqpCore.addWork(this);
 			return;
     	} catch (TupleSourceNotFoundException e) {
             LogManager.logWarning(LogConstants.CTX_DQP, e, DQPPlugin.Util.getString("BufferManagerLobChunkStream.no_tuple_source", streamId)); //$NON-NLS-1$

Modified: branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
===================================================================
--- branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/Request.java	2009-11-02 19:41:28 UTC (rev 1544)
+++ branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/Request.java	2009-11-06 22:01:50 UTC (rev 1545)
@@ -370,9 +370,9 @@
         if (tc != null){ 
             Assertion.assertTrue(tc.getTransactionType() != TransactionContext.Scope.REQUEST, "Transaction already associated with request."); //$NON-NLS-1$
         }
-        
-        if (tc == null || !tc.isInTransaction()) {
-            //if not under a transaction
+
+        // If local or global transaction is not started.
+        if (tc == null || tc.getXid() == null) {
             
             boolean startAutoWrapTxn = false;
             
@@ -405,7 +405,6 @@
         } 
         
         this.transactionContext = tc;
-        this.transactionContext.inTransaction(true);
         this.processor = new QueryProcessor(processPlan, context, bufferManager, new TempTableDataManager(processorDataManager, tempTableStore));
     }
 

Modified: branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2009-11-02 19:41:28 UTC (rev 1544)
+++ branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2009-11-06 22:01:50 UTC (rev 1545)
@@ -179,41 +179,11 @@
 	protected boolean isDoneProcessing() {
 		return isClosed;
 	}
-	
-//	@Override
-//	protected void resumeProcessing() {
-//		dqpCore.addWork(this, getTransactionContext());
-//	}	
-	
+
 	@Override
-	public void run() {
-		while (!this.isDoneProcessing()) { //process until closed
-			super.run();
-		}
-	}
-	
-	@Override
 	protected void resumeProcessing() {
-		this.notify();
+		dqpCore.addWork(this);
 	}
-	
-	@Override
-	protected void pauseProcessing() {
-		try {
-			this.wait();
-		} catch (InterruptedException e) {
-			interrupted(e);
-		}
-	}
-	
-	private void interrupted(InterruptedException e) {
-		try {
-			LogManager.logDetail(LogConstants.CTX_DQP, e, this.requestID+" Interrupted, proceeding to close"); //$NON-NLS-1$
-			this.dqpCore.cancelRequest(this.requestID);
-		} catch (MetaMatrixComponentException e1) {
-			LogManager.logWarning(LogConstants.CTX_DQP, e, DQPPlugin.Util.getString("Cancel_failed", this.requestID)); //$NON-NLS-1$			
-		}
-	}
              
 	@Override
 	protected void process() {
@@ -372,7 +342,7 @@
         		this.transactionService.rollback(transactionContext);
             } catch (XATransactionException e1) {
                 LogManager.logWarning(LogConstants.CTX_DQP, e1, DQPPlugin.Util.getString("ProcessWorker.failed_rollback")); //$NON-NLS-1$           
-            }
+            } 
 		}
 		
 		isClosed = true;
@@ -412,7 +382,7 @@
 		analysisRecord = request.analysisRecord;
 		schemas = request.schemas;
 		transactionContext = request.transactionContext;
-		if (this.transactionContext != null && this.transactionContext.isInTransaction()) {
+		if (this.transactionContext != null && this.transactionContext.getXid() != null) {
 			this.transactionState = TransactionState.ACTIVE;
 		}
 		Option option = originalCommand.getOption();
@@ -587,7 +557,7 @@
             }
 		}
     	workItem.setResultsReceiver(chunckReceiver);
-        dqpCore.addWork(workItem, getTransactionContext());
+        dqpCore.addWork(workItem);
     }
     
     public void removeLobStream(int streamRequestId) {
@@ -688,7 +658,7 @@
      */
     private void logCommandError() {
         String transactionID = null;
-        if (this.transactionContext != null && this.transactionContext.isInTransaction()) {
+        if (this.transactionContext != null && this.transactionContext.getXid() != null) {
             transactionID = this.transactionContext.getXid().toString();
         }
         CommandLogMessage message = new CommandLogMessage(System.currentTimeMillis(), requestID.toString(), transactionID == null ? null : transactionID, requestID.getConnectionID(), dqpWorkContext.getUserName(), dqpWorkContext.getVdbName(), dqpWorkContext.getVdbVersion(), -1, false, true);
@@ -716,7 +686,8 @@
 	TransactionContext getTransactionContext() {
 		return transactionContext;
 	}
-		
+	
+	
 	Collection<DataTierTupleSource> getConnectorRequests() {
 		return new LinkedList<DataTierTupleSource>(this.connectorInfo.values());
 	}

Modified: branches/JCA/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java
===================================================================
--- branches/JCA/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java	2009-11-02 19:41:28 UTC (rev 1544)
+++ branches/JCA/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java	2009-11-06 22:01:50 UTC (rev 1545)
@@ -94,10 +94,6 @@
             }
         }
         
-        public synchronized void removeTransactionContext(MMXid xid) {
-            this.xidToTransactionContext.remove(xid);
-        }
-
         public synchronized void addTransactionContext(TransactionContext tc) {
             if (tc.getXid() != null) {
                 this.xidToTransactionContext.put(tc.getXid(), tc);
@@ -128,56 +124,106 @@
     /**
      * Global Transaction 
      */
-	public int prepare(final String threadId, MMXid xid) throws XATransactionException {
-        TransactionContext impl = checkXAState(threadId, xid, true, false);
-        if (!impl.getSuspendedBy().isEmpty()) {
+	public int prepare(final String threadId, MMXid xid, boolean singleTM) throws XATransactionException {
+        TransactionContext tc = checkXAState(threadId, xid, true, false);
+        if (!tc.getSuspendedBy().isEmpty()) {
             throw new XATransactionException(XAException.XAER_PROTO, DQPPlugin.Util.getString("TransactionServer.suspended_exist", xid)); //$NON-NLS-1$
         }		
         
-    	// In the container this pass though
-    	return XAResource.XA_RDONLY;
+        if (tc.shouldRollback()) {
+        	throw new XATransactionException(XAException.XAER_RMERR, DQPPlugin.Util.getString("TransactionServer.rollback_set", xid));
+        }
+        
+        // In the container this pass though
+        if (singleTM) {        	    	
+	    	return XAResource.XA_RDONLY;
+        }
+        
+        try {
+        	return this.provider.getXATerminator().prepare(tc.getXid());
+        } catch (XAException e) {
+            throw new XATransactionException(e);
+        }
     }
     
     /**
      * Global Transaction 
      */    
-    public void commit(final String threadId, MMXid xid, boolean onePhase) throws XATransactionException {
-    	// no-op; Connector sources will be directly managed by container's TM
-    	TransactionContext tc = checkXAState(threadId, xid, true, false);     
-    	this.transactions.removeTransactionContext(tc);
+    public void commit(final String threadId, MMXid xid, boolean onePhase, boolean singleTM) throws XATransactionException {
+    	TransactionContext tc = checkXAState(threadId, xid, true, false);  
+    	try {
+    		// In the case of single TM, the container directly commits the sources.
+        	if (!singleTM) {
+        		// In the case of onePhase containers call commit directly. If Teiid is also one phase let it pass through,
+        		// otherwise force the prepare.
+        		boolean singlePhase = tc.isOnePhase();
+        		if (onePhase && !singlePhase) {
+        			prepare(threadId, xid, singleTM);
+        		}
+        		this.provider.getXATerminator().commit(tc.getXid(), singlePhase);
+        	}
+    	} catch (XAException e) {
+            throw new XATransactionException(e);
+        } finally {
+    		this.transactions.removeTransactionContext(tc);
+    	}
     }
     
     /**
      * Global Transaction 
      */
-    public void rollback(final String threadId, MMXid xid) throws XATransactionException {
-    	// no-op; Connector sources will be directly managed by container's TM
-    	TransactionContext tc = checkXAState(threadId, xid, true, false);     
-    	this.transactions.removeTransactionContext(tc);
+    public void rollback(final String threadId, MMXid xid, boolean singleTM) throws XATransactionException {
+    	TransactionContext tc = checkXAState(threadId, xid, true, false);  
+    	try {
+    		// In the case of single TM, the container directly roll backs the sources.
+        	if (!singleTM) {
+        		this.provider.getXATerminator().rollback(tc.getXid());
+        	}
+    	} catch (XAException e) {
+            throw new XATransactionException(e);
+        } finally {
+    		this.transactions.removeTransactionContext(tc);
+    	}
     }
 
     /**
      * Global Transaction 
      */    
-    public Xid[] recover(int flag) throws XATransactionException {
-    	// no-op; Connector sources will be directly managed by container's TM
+    public Xid[] recover(int flag, boolean singleTM) throws XATransactionException {
+    	// In case of single TM, container knows this list.
+    	if (singleTM) {
+    		return new Xid[0];
+    	}
+    	
     	return new Xid[0];
+//    	try {
+//			return this.provider.getXATerminator().recover(flag);
+//		} catch (XAException e) {
+//			throw new XATransactionException(e);
+//		}
     }
 
     /**
      * Global Transaction 
      */    
-    public void forget(final String threadId, MMXid xid) throws XATransactionException {
-    	// no-op; Connector sources will be directly managed by container's TM
+    public void forget(final String threadId, MMXid xid, boolean singleTM) throws XATransactionException {
+    	TransactionContext tc = checkXAState(threadId, xid, true, false); 
+        try {
+        	if (singleTM) {
+        		return;
+        	}
+            this.provider.getXATerminator().forget(xid);
+        } catch (XAException err) {
+            throw new XATransactionException(err);
+        } finally {
+        	this.transactions.removeTransactionContext(tc);
+        }
     }
 
     /**
      * Global Transaction 
      */
-    public void start(final String threadId,
-                      final MMXid xid,
-                      int flags,
-                      int timeout) throws XATransactionException {
+    public void start(final String threadId, final MMXid xid, int flags, int timeout, boolean singleTM) throws XATransactionException {
         
         TransactionContext tc = null;
 
@@ -221,7 +267,7 @@
     /**
      * Global Transaction 
      */    
-    public void end(final String threadId, MMXid xid, int flags) throws XATransactionException {
+    public void end(final String threadId, MMXid xid, int flags, boolean singleTM) throws XATransactionException {
         TransactionContext tc = checkXAState(threadId, xid, true, true);
         try {
             switch (flags) {
@@ -234,6 +280,7 @@
                     break;
                 }
                 case XAResource.TMFAIL: {
+                	tc.setRollbackOnly();
                     break;
                 }
                 default:
@@ -324,8 +371,7 @@
         }
         try {
         	if (tc.isInTransaction()) {
-		        // TODO: implement the one phase logic
-		        this.provider.getXATerminator().commit(tc.getXid(), false);
+            	directCommit(tc);
         	}
         } catch (XAException e) {
             throw new XATransactionException(e);
@@ -382,13 +428,17 @@
         
         //commit may be called multiple times by the processworker, if this is a subsequent call, then the current
         //context will not be active
-        TransactionContext currentContext = transactions.getTransactionContext(context.getThreadId());
-        if (currentContext == null || currentContext.getTransactionType() == TransactionContext.Scope.NONE) {
-            return currentContext;
+        TransactionContext tc = transactions.getTransactionContext(context.getThreadId());
+        if (tc == null || tc.getTransactionType() == TransactionContext.Scope.NONE) {
+            return tc;
         }
+        
+        Assertion.assertTrue(!tc.shouldRollback());
+        
         try {
-	        // TODO: implement the one phase logic
-	        this.provider.getXATerminator().commit(context.getXid(), false);
+        	if (tc.isInTransaction()) {
+            	directCommit(tc);
+        	}
         } catch (XAException e) {
             throw new XATransactionException(e);
         } finally {
@@ -397,6 +447,19 @@
         return context;
     }
 
+	private void directCommit(TransactionContext tc) throws XAException {
+		boolean commit = true;
+		boolean onePhase = tc.isOnePhase();
+		if (!onePhase) {
+			int prepare = this.provider.getXATerminator().prepare(tc.getXid());
+			commit = (prepare == XAResource.XA_OK);
+		}
+		
+		if (commit) {
+			this.provider.getXATerminator().commit(tc.getXid(), onePhase);
+		}
+	}
+
     /**
      * Request level transaction
      */    
@@ -426,14 +489,16 @@
             return;
         }
         
-        cancelTransaction(tc);
+        tc.setRollbackOnly();
+        
+        if (requestOnly) {
+        	rollback(tc);
+        }
+        else {
+        	rollback(threadId);
+        }
     }
 
-	private void cancelTransaction(TransactionContext context) throws XATransactionException {
-		context.setRollbackOnly();
-		rollback(context);
-	}    
-    
 	@Override
 	public Collection<org.teiid.adminapi.Transaction> getTransactions() {
 		Set<TransactionContext> txnSet = Collections.newSetFromMap(new IdentityHashMap<TransactionContext, Boolean>());

Modified: branches/JCA/engine/src/main/resources/com/metamatrix/dqp/i18n.properties
===================================================================
--- branches/JCA/engine/src/main/resources/com/metamatrix/dqp/i18n.properties	2009-11-02 19:41:28 UTC (rev 1544)
+++ branches/JCA/engine/src/main/resources/com/metamatrix/dqp/i18n.properties	2009-11-06 22:01:50 UTC (rev 1545)
@@ -463,7 +463,7 @@
 TransactionServer.suspended_exist=Suspended work still exists on transaction {0}.
 TransactionServer.failed_to_enlist=Failed to enlist the XAResource in Transaction.
 TransactionServer.failed_to_delist=Failed to delist the XAResource from Transaction.
-
+TransactionServer.rollback_set=Rollback Only has been set on transaction {0}. This may be result of a cancel request.
 TransactionContextImpl.remote_not_supported=Remote connector calls under a transaction are not supported
 
 CodeTableCache.duplicate_key=Duplicate code table ''{0}'' key ''{1}'' value ''{2}''

Modified: branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java
===================================================================
--- branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java	2009-11-02 19:41:28 UTC (rev 1544)
+++ branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java	2009-11-06 22:01:50 UTC (rev 1545)
@@ -427,7 +427,7 @@
 		int resumeCount;
 
 		FakeQueuingAsynchConnectorWorkItem(AtomicRequestMessage message,
-				ConnectorManager manager, ResultsReceiver<AtomicResultsMessage> resultsReceiver) {
+				ConnectorManager manager, ResultsReceiver<AtomicResultsMessage> resultsReceiver) throws ConnectorException {
 			super(message, manager, resultsReceiver);
 		}
 

Modified: branches/JCA/engine/src/test/java/org/teiid/dqp/internal/transaction/TestTransactionServer.java
===================================================================
--- branches/JCA/engine/src/test/java/org/teiid/dqp/internal/transaction/TestTransactionServer.java	2009-11-02 19:41:28 UTC (rev 1544)
+++ branches/JCA/engine/src/test/java/org/teiid/dqp/internal/transaction/TestTransactionServer.java	2009-11-06 22:01:50 UTC (rev 1545)
@@ -22,21 +22,23 @@
 
 package org.teiid.dqp.internal.transaction;
 
+import javax.resource.spi.XATerminator;
 import javax.transaction.xa.XAResource;
 
+import junit.framework.TestCase;
+
 import org.mockito.Mockito;
 import org.teiid.adminapi.Transaction;
 
-import junit.framework.TestCase;
-
 import com.metamatrix.common.xa.MMXid;
 import com.metamatrix.common.xa.XATransactionException;
-import com.metamatrix.core.util.SimpleMock;
+import com.metamatrix.dqp.service.TransactionContext;
 
 public class TestTransactionServer extends TestCase {
 
     private TransactionServerImpl server;
-
+    private XATerminator xaTerminator;
+    
     private static final String THREAD1 = "1"; //$NON-NLS-1$
     private static final String THREAD2 = "2"; //$NON-NLS-1$
 
@@ -52,7 +54,10 @@
      */
     protected void setUp() throws Exception {
         server = new TransactionServerImpl();
-        server.setTransactionProvider(SimpleMock.createSimpleMock(TransactionProvider.class)); 
+        TransactionProvider provider = Mockito.mock(TransactionProvider.class);
+        xaTerminator = Mockito.mock(XATerminator.class);
+        Mockito.stub(provider.getXATerminator()).toReturn(xaTerminator);
+        server.setTransactionProvider(provider); 
         server.setXidFactory(new XidFactory());
     }
 
@@ -63,7 +68,7 @@
         server.begin(THREAD1);
 
         try {
-            server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100);
+            server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100, false);
             fail("exception expected"); //$NON-NLS-1$
         } catch (XATransactionException ex) {
             assertEquals("Client thread already involved in a transaction. Transaction nesting is not supported. The current transaction must be completed first.", //$NON-NLS-1$
@@ -75,7 +80,7 @@
      * once in a global, cannot start a local
      */
     public void testTransactionExclusion1() throws Exception {
-        server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100);
+        server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100, false);
 
         try {
             server.begin(THREAD1);
@@ -90,10 +95,10 @@
      * global can only be started once
      */
     public void testTransactionExclusion2() throws Exception {
-        server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100);
+        server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
 
         try {
-            server.start(THREAD2, XID1, XAResource.TMNOFLAGS, 100);
+            server.start(THREAD2, XID1, XAResource.TMNOFLAGS, 100,false);
             fail("exception expected"); //$NON-NLS-1$
         } catch (XATransactionException ex) {
             assertEquals("Global transaction MMXid global:1 branch:null format:0 already exists.", ex.getMessage()); //$NON-NLS-1$
@@ -104,10 +109,10 @@
      * global cannot be nested
      */
     public void testTransactionExclusion3() throws Exception {
-        server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100);
+        server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
 
         try {
-            server.start(THREAD1, XID2, XAResource.TMNOFLAGS, 100);
+            server.start(THREAD1, XID2, XAResource.TMNOFLAGS, 100,false);
             fail("exception expected"); //$NON-NLS-1$
         } catch (XATransactionException ex) {
             assertEquals("Client thread already involved in a transaction. Transaction nesting is not supported. The current transaction must be completed first.", //$NON-NLS-1$
@@ -134,12 +139,12 @@
      * global cannot be nested
      */
     public void testTransactionExclusion5() throws Exception {
-        server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100);
-        server.start(THREAD2, XID2, XAResource.TMNOFLAGS, 100);
-        server.end(THREAD2, XID2, XAResource.TMSUCCESS);
+        server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
+        server.start(THREAD2, XID2, XAResource.TMNOFLAGS, 100,false);
+        server.end(THREAD2, XID2, XAResource.TMSUCCESS,false);
 
         try {
-            server.start(THREAD1, XID2, XAResource.TMJOIN, 100);
+            server.start(THREAD1, XID2, XAResource.TMJOIN, 100,false);
             fail("exception expected"); //$NON-NLS-1$
         } catch (XATransactionException ex) {
             assertEquals("Client thread already involved in a transaction. Transaction nesting is not supported. The current transaction must be completed first.", //$NON-NLS-1$
@@ -158,10 +163,49 @@
         }
     }
 
+    public void testLocalSetRollback() throws Exception {
+        TransactionContext tc = server.begin(THREAD1);
+        tc.incrementPartcipatingSourceCount("s1");
+        tc.setRollbackOnly();
+        
+        server.commit(THREAD1);
+        
+        Mockito.verify(xaTerminator).rollback(tc.getXid());
+    }    
+    
+    public void testSinglePhaseCommit() throws Exception {
+        TransactionContext tc = server.begin(THREAD1);
+        tc.incrementPartcipatingSourceCount("S1");
+        
+        server.commit(THREAD1);
+        
+        Mockito.verify(xaTerminator).commit(tc.getXid(), true);
+        
+        tc = server.begin(THREAD1);
+        tc.incrementPartcipatingSourceCount("S1");
+        tc.incrementPartcipatingSourceCount("S1");
+        
+        server.commit(THREAD1);
+        
+        Mockito.verify(xaTerminator).commit(tc.getXid(), true);        
+    }      
+    
+    public void testTwoPhaseCommit() throws Exception {
+        TransactionContext tc = server.begin(THREAD1);
+        tc.incrementPartcipatingSourceCount("S1");
+        tc.incrementPartcipatingSourceCount("S2");
+        
+        server.commit(THREAD1);
+        
+        Mockito.verify(xaTerminator).commit(tc.getXid(), false);
+    }     
+    
     public void testLocalRollback() throws Exception {
-        server.begin(THREAD1);
+        TransactionContext tc = server.begin(THREAD1);
+        tc.incrementPartcipatingSourceCount("s1");
         server.rollback(THREAD1);
-
+        Mockito.verify(xaTerminator).rollback(tc.getXid());
+        
         try {
             server.rollback(THREAD1);
         } catch (XATransactionException e) {
@@ -170,10 +214,10 @@
     }
 
     public void testConcurrentEnlistment() throws Exception {
-        server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100);
+        server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
 
         try {
-            server.start(THREAD1, XID1, XAResource.TMJOIN, 100);
+            server.start(THREAD1, XID1, XAResource.TMJOIN, 100,false);
             fail("exception expected"); //$NON-NLS-1$
         } catch (XATransactionException ex) {
             assertEquals("Concurrent enlistment in global transaction MMXid global:1 branch:null format:0 is not supported.", //$NON-NLS-1$
@@ -182,11 +226,11 @@
     }
 
     public void testSuspend() throws Exception {
-        server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100);
-        server.end(THREAD1, XID1, XAResource.TMSUSPEND);
+        server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
+        server.end(THREAD1, XID1, XAResource.TMSUSPEND,false);
 
         try {
-            server.end(THREAD1, XID1, XAResource.TMSUSPEND);
+            server.end(THREAD1, XID1, XAResource.TMSUSPEND,false);
             fail("exception expected"); //$NON-NLS-1$
         } catch (XATransactionException ex) {
             assertEquals("Client is not currently enlisted in transaction MMXid global:1 branch:null format:0.", ex.getMessage()); //$NON-NLS-1$
@@ -194,13 +238,13 @@
     }
     
     public void testSuspendResume() throws Exception {
-        server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100);
-        server.end(THREAD1, XID1, XAResource.TMSUSPEND);
-        server.start(THREAD1, XID1, XAResource.TMRESUME, 100);
-        server.end(THREAD1, XID1, XAResource.TMSUSPEND);
+        server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
+        server.end(THREAD1, XID1, XAResource.TMSUSPEND,false);
+        server.start(THREAD1, XID1, XAResource.TMRESUME, 100,false);
+        server.end(THREAD1, XID1, XAResource.TMSUSPEND,false);
 
         try {
-            server.start(THREAD2, XID1, XAResource.TMRESUME, 100);
+            server.start(THREAD2, XID1, XAResource.TMRESUME, 100,false);
             fail("exception expected"); //$NON-NLS-1$
         } catch (XATransactionException ex) {
             assertEquals("Cannot resume, transaction MMXid global:1 branch:null format:0 was not suspended by client 2.", ex.getMessage()); //$NON-NLS-1$
@@ -209,7 +253,7 @@
 
     public void testUnknownFlags() throws Exception {
         try {
-            server.start(THREAD1, XID1, Integer.MAX_VALUE, 100);
+            server.start(THREAD1, XID1, Integer.MAX_VALUE, 100,false);
             fail("exception expected"); //$NON-NLS-1$
         } catch (XATransactionException ex) {
             assertEquals("Unknown flags", ex.getMessage()); //$NON-NLS-1$
@@ -218,7 +262,7 @@
 
     public void testUnknownGlobalTransaction() throws Exception {
         try {
-            server.end(THREAD1, XID1, XAResource.TMSUCCESS);
+            server.end(THREAD1, XID1, XAResource.TMSUCCESS,false);
             fail("exception expected"); //$NON-NLS-1$
         } catch (XATransactionException ex) {
             assertEquals("No global transaction found for MMXid global:1 branch:null format:0.", ex.getMessage()); //$NON-NLS-1$
@@ -226,11 +270,11 @@
     }
     
     public void testPrepareWithSuspended() throws Exception {
-        server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100);
-        server.end(THREAD1, XID1, XAResource.TMSUSPEND);
+        server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
+        server.end(THREAD1, XID1, XAResource.TMSUSPEND,false);
 
         try {
-            server.prepare(THREAD1, XID1);
+            server.prepare(THREAD1, XID1,false);
             fail("exception expected"); //$NON-NLS-1$
         } catch (XATransactionException ex) {
             assertEquals("Suspended work still exists on transaction MMXid global:1 branch:null format:0.", ex.getMessage()); //$NON-NLS-1$
@@ -242,7 +286,7 @@
     }
     
     public void testGetTransactions() throws Exception {
-    	server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100);
+    	server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
         server.begin(THREAD2);
         
         assertEquals(2, server.getTransactions().size());
@@ -254,4 +298,181 @@
         assertEquals(THREAD1, t.getAssociatedSession());
         assertNotNull(t.getXid());
     }
+    
+    public void testGlobalPrepare() throws Exception {
+    	server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
+        TransactionContext tc = server.getOrCreateTransactionContext(THREAD1);
+        server.end(THREAD1, XID1, XAResource.TMSUCCESS, false);
+        
+    	server.prepare(THREAD1, XID1, false);
+    	
+    	Mockito.verify(xaTerminator).prepare(tc.getXid());
+    	
+    	server.commit(THREAD1, XID1, true, false);
+    }
+    
+    public void testGlobalPrepareFail() throws Exception {
+    	server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
+        server.end(THREAD1, XID1, XAResource.TMFAIL, false);
+        
+    	try {
+			server.prepare(THREAD1, XID1, false);
+			fail("should have failed to prepare as end resulted in TMFAIL");
+		} catch (Exception e) {
+		}
+		
+		server.forget(THREAD1, XID1, false);
+    }    
+    
+    public void testGlobalOnePhaseCommit() throws Exception {
+    	server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
+    	TransactionContext tc = server.getOrCreateTransactionContext(THREAD1);
+    	
+    	tc.incrementPartcipatingSourceCount("S1");
+    	
+        server.end(THREAD1, XID1, XAResource.TMSUCCESS, false);
+        
+        server.prepare(THREAD1, XID1, false);
+
+		
+		server.commit(THREAD1, XID1, true, false);
+		
+		// since there are two sources the commit is not single phase
+		Mockito.verify(xaTerminator).commit(tc.getXid(), true);
+    }  
+    
+    public void testGlobalOnePhaseCommit_force_prepare_through() throws Exception {
+    	server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
+    	TransactionContext tc = server.getOrCreateTransactionContext(THREAD1);
+    	
+    	tc.incrementPartcipatingSourceCount("S1");
+    	
+        server.end(THREAD1, XID1, XAResource.TMSUCCESS, false);
+        
+		
+		server.commit(THREAD1, XID1, true, false);
+		
+		// since there are two sources the commit is not single phase
+		Mockito.verify(xaTerminator, Mockito.times(0)).prepare(tc.getXid());
+		Mockito.verify(xaTerminator).commit(tc.getXid(), true);
+    }  
+    
+    public void testGlobalOnePhaseCommit_force_prepare() throws Exception {
+    	server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
+    	TransactionContext tc = server.getOrCreateTransactionContext(THREAD1);
+    	
+    	tc.incrementPartcipatingSourceCount("S1");
+    	tc.incrementPartcipatingSourceCount("S2");
+    	
+        server.end(THREAD1, XID1, XAResource.TMSUCCESS, false);
+        
+		
+		server.commit(THREAD1, XID1, true, false);
+		
+		// since there are two sources the commit is not single phase
+		Mockito.verify(xaTerminator).prepare(tc.getXid());
+		Mockito.verify(xaTerminator).commit(tc.getXid(), false);
+    }  
+    
+    
+    public void testGlobalOnePhase_teiid_multiple() throws Exception {
+    	server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
+    	TransactionContext tc = server.getOrCreateTransactionContext(THREAD1);
+    	
+    	tc.incrementPartcipatingSourceCount("S1");
+    	tc.incrementPartcipatingSourceCount("S2");
+    	
+        server.end(THREAD1, XID1, XAResource.TMSUCCESS, false);
+        
+        server.prepare(THREAD1, XID1, false);
+
+		
+		server.commit(THREAD1, XID1, true, false);
+		
+		// since there are two sources the commit is not single phase
+		Mockito.verify(xaTerminator).commit(tc.getXid(), false);
+    }    
+    
+    public void testGlobalOnePhaseRoolback() throws Exception {
+    	server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
+    	TransactionContext tc = server.getOrCreateTransactionContext(THREAD1);
+    	
+    	tc.incrementPartcipatingSourceCount("S1");
+    	
+        server.end(THREAD1, XID1, XAResource.TMSUCCESS, false);
+        
+        server.prepare(THREAD1, XID1, false);
+
+		
+		server.rollback(THREAD1, XID1, false);
+		
+		// since there are two sources the commit is not single phase
+		Mockito.verify(xaTerminator).rollback(tc.getXid());
+    }     
+    
+    public void testLocalCommit_rollback() throws Exception {
+        TransactionContext tc = server.begin(THREAD1);
+        tc.incrementPartcipatingSourceCount("s1");
+        tc.setRollbackOnly();
+        server.commit(THREAD1);
+
+        Mockito.verify(xaTerminator).rollback(tc.getXid());
+    }    
+    
+    public void testLocalCommit_not_in_Tx() throws Exception {
+        TransactionContext tc = server.begin(THREAD1);
+        server.commit(THREAD1);
+
+        Mockito.verify(xaTerminator,Mockito.times(0)).commit(tc.getXid(), true);
+    }       
+    
+    public void testRequestCommit() throws Exception{
+    	TransactionContext tc = server.getOrCreateTransactionContext(THREAD1);
+    	server.start(tc);
+    	tc.incrementPartcipatingSourceCount("s1");
+    	server.commit(tc);
+    	Mockito.verify(xaTerminator,Mockito.times(0)).prepare(tc.getXid());
+    	Mockito.verify(xaTerminator).commit(tc.getXid(), true);
+    }
+    
+    public void testRequestCommit2() throws Exception{
+    	TransactionContext tc = server.getOrCreateTransactionContext(THREAD1);
+    	server.start(tc);
+    	tc.incrementPartcipatingSourceCount("s1");
+    	tc.incrementPartcipatingSourceCount("s2");
+    	server.commit(tc);
+    	
+    	Mockito.verify(xaTerminator).prepare(tc.getXid());
+    	Mockito.verify(xaTerminator).commit(tc.getXid(), false);
+    }    
+    
+    public void testRequestRollback() throws Exception{
+    	TransactionContext tc = server.getOrCreateTransactionContext(THREAD1);
+    	server.start(tc);
+    	tc.incrementPartcipatingSourceCount("s1");
+    	tc.incrementPartcipatingSourceCount("s2");
+    	
+    	server.rollback(tc);
+    	Mockito.verify(xaTerminator).rollback(tc.getXid());
+    }     
+    
+    public void testLocalCancel() throws Exception {
+        TransactionContext tc = server.begin(THREAD1);
+        tc.incrementPartcipatingSourceCount("S1");
+        tc.incrementPartcipatingSourceCount("S2");
+        
+        server.cancelTransactions(THREAD1, false);
+        
+        Mockito.verify(xaTerminator).rollback(tc.getXid());
+    }  
+    
+    public void testRequestCancel() throws Exception{
+    	TransactionContext tc = server.getOrCreateTransactionContext(THREAD1);
+    	server.start(tc);
+    	tc.incrementPartcipatingSourceCount("s1");
+    	tc.incrementPartcipatingSourceCount("s2");
+    	
+    	 server.cancelTransactions(THREAD1, true);
+    	Mockito.verify(xaTerminator).rollback(tc.getXid());
+    }      
 }



More information about the teiid-commits mailing list