Author: rareddy
Date: 2009-11-06 17:01:50 -0500 (Fri, 06 Nov 2009)
New Revision: 1545
Modified:
branches/JCA/connector-api/src/main/java/org/teiid/connector/basic/BasicManagedConnection.java
branches/JCA/engine/src/main/java/com/metamatrix/dqp/message/AtomicRequestMessage.java
branches/JCA/engine/src/main/java/com/metamatrix/dqp/service/TransactionContext.java
branches/JCA/engine/src/main/java/com/metamatrix/dqp/service/TransactionService.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItemFactory.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/SynchConnectorWorkItem.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AsyncRequestWorkItem.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java
branches/JCA/engine/src/main/resources/com/metamatrix/dqp/i18n.properties
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/transaction/TestTransactionServer.java
Log:
TEIID-833: Fixed the code for using container based transaction manager.
Modified:
branches/JCA/connector-api/src/main/java/org/teiid/connector/basic/BasicManagedConnection.java
===================================================================
---
branches/JCA/connector-api/src/main/java/org/teiid/connector/basic/BasicManagedConnection.java 2009-11-02
19:41:28 UTC (rev 1544)
+++
branches/JCA/connector-api/src/main/java/org/teiid/connector/basic/BasicManagedConnection.java 2009-11-06
22:01:50 UTC (rev 1545)
@@ -58,8 +58,10 @@
@Override
public void cleanup() throws ResourceException {
- this.conn.close();
- this.conn = null;
+ if (this.conn != null) {
+ this.conn.close();
+ this.conn = null;
+ }
}
@Override
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/dqp/message/AtomicRequestMessage.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/dqp/message/AtomicRequestMessage.java 2009-11-02
19:41:28 UTC (rev 1544)
+++
branches/JCA/engine/src/main/java/com/metamatrix/dqp/message/AtomicRequestMessage.java 2009-11-06
22:01:50 UTC (rev 1545)
@@ -125,7 +125,7 @@
}
public boolean isTransactional(){
- return this.txnContext != null && this.txnContext.isInTransaction();
+ return this.txnContext != null && this.txnContext.getXid() != null;
}
public Command getCommand() {
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/dqp/service/TransactionContext.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/dqp/service/TransactionContext.java 2009-11-02
19:41:28 UTC (rev 1544)
+++
branches/JCA/engine/src/main/java/com/metamatrix/dqp/service/TransactionContext.java 2009-11-06
22:01:50 UTC (rev 1545)
@@ -24,8 +24,11 @@
import java.io.Serializable;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.resource.spi.work.ExecutionContext;
@@ -46,10 +49,10 @@
private long creationTime;
private boolean rollback = false;
private Set<String> suspendedBy = Collections.newSetFromMap(new
ConcurrentHashMap<String, Boolean>());
- boolean inTransaction = false;
+ private Map<String, AtomicInteger> txnSources = Collections.synchronizedMap(new
HashMap<String, AtomicInteger>());
public boolean isInTransaction() {
- return (getXid() != null && inTransaction);
+ return (getXid() != null && this.txnSources.size() > 0);
}
public long getCreationTime() {
@@ -98,7 +101,17 @@
return this.suspendedBy;
}
- public void inTransaction(boolean flag) {
- this.inTransaction = flag;
+ public void incrementPartcipatingSourceCount(String source) {
+ AtomicInteger count = txnSources.get(source);
+ if (count == null) {
+ txnSources.put(source, new AtomicInteger(1));
+ }
+ else {
+ count.incrementAndGet();
+ }
+ }
+
+ public boolean isOnePhase() {
+ return this.txnSources.size() == 1;
}
}
\ No newline at end of file
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/dqp/service/TransactionService.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/dqp/service/TransactionService.java 2009-11-02
19:41:28 UTC (rev 1544)
+++
branches/JCA/engine/src/main/java/com/metamatrix/dqp/service/TransactionService.java 2009-11-06
22:01:50 UTC (rev 1545)
@@ -58,19 +58,19 @@
void cancelTransactions(String threadId, boolean requestOnly) throws
XATransactionException;
// global transaction methods
- int prepare(final String threadId, MMXid xid) throws XATransactionException;
+ int prepare(final String threadId, MMXid xid, boolean singleTM) throws
XATransactionException;
- void commit(final String threadId, MMXid xid, boolean onePhase) throws
XATransactionException;
+ void commit(final String threadId, MMXid xid, boolean onePhase, boolean singleTM) throws
XATransactionException;
- void rollback(final String threadId, MMXid xid) throws XATransactionException;
+ void rollback(final String threadId, MMXid xid, boolean singleTM) throws
XATransactionException;
- Xid[] recover(int flag) throws XATransactionException;
+ Xid[] recover(int flag, boolean singleTM) throws XATransactionException;
- void forget(final String threadId, MMXid xid) throws XATransactionException;
+ void forget(final String threadId, MMXid xid, boolean singleTM) throws
XATransactionException;
- void start(final String threadId, MMXid xid, int flags, int timeout) throws
XATransactionException;
+ void start(final String threadId, MMXid xid, int flags, int timeout, boolean singleTM)
throws XATransactionException;
- void end(final String threadId, MMXid xid, int flags) throws XATransactionException;
+ void end(final String threadId, MMXid xid, int flags, boolean singleTM) throws
XATransactionException;
// management methods
Collection<Transaction> getTransactions();
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java 2009-11-02
19:41:28 UTC (rev 1544)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java 2009-11-06
22:01:50 UTC (rev 1545)
@@ -31,7 +31,7 @@
public class AsynchConnectorWorkItem extends ConnectorWorkItem {
- AsynchConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager,
ResultsReceiver<AtomicResultsMessage> resultsReceiver) {
+ AsynchConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager,
ResultsReceiver<AtomicResultsMessage> resultsReceiver) throws ConnectorException {
super(message, manager, resultsReceiver);
}
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java 2009-11-02
19:41:28 UTC (rev 1544)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java 2009-11-06
22:01:50 UTC (rev 1545)
@@ -174,8 +174,13 @@
private void enqueueRequest(ConnectorWorkItem work) throws ConnectorException {
try {
- // TODO: install work listener; what can we use this for?
- this.workManager.scheduleWork(work, 0, work.requestMsg.getTransactionContext(),
null);
+ // the connector is immutable, then we do not want pass-on the transaction
context.
+ if (work.securityContext.isTransactional()) {
+ this.workManager.scheduleWork(work, 0, work.requestMsg.getTransactionContext(),
work);
+ }
+ else {
+ this.workManager.scheduleWork(work);
+ }
} catch (WorkException e) {
throw new ConnectorException(e);
}
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2009-11-02
19:41:28 UTC (rev 1544)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2009-11-06
22:01:50 UTC (rev 1545)
@@ -26,6 +26,8 @@
import java.util.Arrays;
import java.util.List;
+import javax.resource.spi.work.WorkEvent;
+
import org.teiid.connector.api.Connection;
import org.teiid.connector.api.Connector;
import org.teiid.connector.api.ConnectorEnvironment;
@@ -73,7 +75,8 @@
protected AtomicRequestID id;
protected ConnectorManager manager;
protected AtomicRequestMessage requestMsg;
- protected boolean isTransactional;
+ protected Connector connector;
+ QueryMetadataInterface queryMetadata;
/* Created on new request */
protected Connection connection;
@@ -103,7 +106,7 @@
protected ResultsReceiver<AtomicResultsMessage> resultsReceiver;
- ConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager,
ResultsReceiver<AtomicResultsMessage> resultsReceiver) {
+ ConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager,
ResultsReceiver<AtomicResultsMessage> resultsReceiver) throws ConnectorException {
this.id = message.getAtomicRequestID();
this.requestMsg = message;
this.manager = manager;
@@ -122,24 +125,27 @@
this.securityContext.setBatchSize(this.requestMsg.getFetchSize());
this.securityContext.setContextCache(manager.getContextCache());
this.securityContext.setMetadataService(manager.getMetadataService());
+
+ this.connector = manager.getConnector();
+ this.connectorEnv = connector.getConnectorEnvironment();
+ try {
+ this.queryMetadata = new
TempMetadataAdapter(manager.getMetadataService().lookupMetadata(this.requestMsg.getWorkContext().getVdbName(),
this.requestMsg.getWorkContext().getVdbVersion()), new TempMetadataStore());
+
+ if (requestMsg.isTransactional()){
+ if (this.connectorEnv.isXaCapable()) {
+ this.securityContext.setTransactional(true);
+ } else if (!this.connectorEnv.isImmutable() &&
requestMsg.getCommand().updatingModelCount(queryMetadata) > 0) {
+ throw new
ConnectorException(DQPPlugin.Util.getString("ConnectorWorker.transactionNotSupported"));
//$NON-NLS-1$
+ }
+ }
+ } catch(MetaMatrixComponentException e) {
+ throw new ConnectorException(e);
+ }
}
- protected void createConnection(Connector connector, QueryMetadataInterface
queryMetadata) throws ConnectorException, MetaMatrixComponentException {
+ protected void createConnection() throws ConnectorException {
LogManager.logTrace(LogConstants.CTX_CONNECTOR, new Object[] {id, "creating
connection for atomic-request"}); //$NON-NLS-1$
-
- ConnectorEnvironment env = connector.getConnectorEnvironment();
-
- if (requestMsg.isTransactional()){
- if (env.isXaCapable()) {
- this.securityContext.setTransactional(true);
- this.isTransactional = true;
- } else if (!env.isImmutable() &&
requestMsg.getCommand().updatingModelCount(queryMetadata) > 0) {
- throw new
ConnectorException(DQPPlugin.Util.getString("ConnectorWorker.transactionNotSupported"));
//$NON-NLS-1$
- }
- }
-
- this.connection = connector.getConnection();
- this.connectorEnv = env;
+ this.connection = this.connector.getConnection();
}
protected void process() {
@@ -253,14 +259,11 @@
} catch (Throwable e) {
LogManager.logError(LogConstants.CTX_CONNECTOR, e, e.getMessage());
} finally {
- try {
- if (connection != null) {
- connection.close();
- LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[]
{this.id, "Closed connection"}); //$NON-NLS-1$
- }
- } finally {
- manager.removeState(this.id);
- sendClose();
+ // Close the underlying connection, but send the close response only upon the
notification from
+ // container in workCompleted call.
+ if (connection != null) {
+ connection.close();
+ LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id,
"Closed connection"}); //$NON-NLS-1$
}
}
}
@@ -282,9 +285,8 @@
protected void createExecution() throws MetaMatrixComponentException,
ConnectorException {
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[]
{this.requestMsg.getAtomicRequestID(), "Processing NEW request:",
this.requestMsg.getCommand()}); //$NON-NLS-1$
-
- QueryMetadataInterface queryMetadata = new
TempMetadataAdapter(manager.getMetadataService().lookupMetadata(this.requestMsg.getWorkContext().getVdbName(),
this.requestMsg.getWorkContext().getVdbVersion()), new TempMetadataStore());
- createConnection(manager.getConnector(), queryMetadata);
+
+ createConnection();
LogManager.logTrace(LogConstants.CTX_CONNECTOR, new Object[] {id, "creating
execution for atomic-request"}); //$NON-NLS-1$
@@ -521,4 +523,18 @@
return this.id.toString();
}
+ @Override
+ public void workCompleted(WorkEvent arg0) {
+ manager.removeState(this.id);
+ sendClose();
+ }
+
+ @Override
+ public void workRejected(WorkEvent event) {
+ try {
+ asynchCancel();
+ } catch (ConnectorException e) {
+ LogManager.logError(LogConstants.CTX_CONNECTOR, event.getException(),
this.id.toString()); //$NON-NLS-1$
+ }
+ }
}
\ No newline at end of file
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItemFactory.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItemFactory.java 2009-11-02
19:41:28 UTC (rev 1544)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItemFactory.java 2009-11-06
22:01:50 UTC (rev 1545)
@@ -56,7 +56,7 @@
private CachedResultsConnectorWorkItem(AtomicRequestMessage message,
ConnectorManager manager,
ResultsReceiver<AtomicResultsMessage> resultsReceiver,
- CacheID cacheID) {
+ CacheID cacheID) throws ConnectorException {
super(message, manager, resultsReceiver);
this.cacheID = cacheID;
}
@@ -136,7 +136,7 @@
this.synchWorkers = synchWorkers;
}
- public ConnectorWorkItem createWorkItem(AtomicRequestMessage message,
ResultsReceiver<AtomicResultsMessage> receiver) {
+ public ConnectorWorkItem createWorkItem(AtomicRequestMessage message,
ResultsReceiver<AtomicResultsMessage> receiver) throws ConnectorException {
if (this.rsCache != null && message.useResultSetCache()) {
final CacheID cacheID = createCacheID(message);
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/SynchConnectorWorkItem.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/SynchConnectorWorkItem.java 2009-11-02
19:41:28 UTC (rev 1544)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/SynchConnectorWorkItem.java 2009-11-06
22:01:50 UTC (rev 1545)
@@ -26,12 +26,16 @@
import java.util.Map;
import java.util.concurrent.Semaphore;
+import javax.resource.spi.work.WorkEvent;
import javax.transaction.xa.Xid;
+import org.teiid.connector.api.ConnectorException;
+
import com.metamatrix.common.comm.api.ResultsReceiver;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.dqp.message.AtomicRequestMessage;
import com.metamatrix.dqp.message.AtomicResultsMessage;
+import com.metamatrix.dqp.service.TransactionContext;
import com.metamatrix.dqp.util.LogConstants;
public class SynchConnectorWorkItem extends ConnectorWorkItem {
@@ -45,33 +49,29 @@
private TransactionLock lock;
- SynchConnectorWorkItem(AtomicRequestMessage message,
- ConnectorManager manager, ResultsReceiver<AtomicResultsMessage> resultsReceiver)
{
+ SynchConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager,
ResultsReceiver<AtomicResultsMessage> resultsReceiver) throws ConnectorException {
super(message, manager, resultsReceiver);
+
+ // since container makes sure that there is no current work registered under current
transaction it is
+ // required that lock must be acquired before we schedule the work.
+ try {
+ acquireTransactionLock();
+ } catch (InterruptedException e) {
+ interrupted(e);
+ }
}
@Override
public void run() {
while (!this.isDoneProcessing()) { //process until closed
- try {
- acquireTransactionLock();
- } catch (InterruptedException e) {
- interrupted(e);
- }
- try {
- super.run();
- } finally {
- releaseTxnLock();
- }
+ super.run();
}
}
@Override
protected void pauseProcessing() {
- releaseTxnLock();
try {
this.wait();
- acquireTransactionLock();
} catch (InterruptedException e) {
interrupted(e);
}
@@ -88,10 +88,11 @@
}
private void acquireTransactionLock() throws InterruptedException {
- if (!this.isTransactional) {
+ TransactionContext tc = this.requestMsg.getTransactionContext();
+ if ( tc == null) {
return;
}
- Xid key = requestMsg.getTransactionContext().getXid();
+ Xid key = tc.getXid();
TransactionLock existing = null;
synchronized (TRANSACTION_LOCKS) {
@@ -101,20 +102,24 @@
TRANSACTION_LOCKS.put(key, existing);
}
existing.pendingCount++;
+ tc.incrementPartcipatingSourceCount(requestMsg.getConnectorName());
}
existing.lock.acquire();
this.lock = existing;
+ System.out.println("got the connector lock on ="+key);
}
private void releaseTxnLock() {
- if (!this.isTransactional || this.lock == null) {
+ TransactionContext tc = this.requestMsg.getTransactionContext();
+ if ( tc == null || this.lock == null) {
return;
- }
+ }
synchronized (TRANSACTION_LOCKS) {
lock.pendingCount--;
if (lock.pendingCount == 0) {
- Xid key = requestMsg.getTransactionContext().getXid();
+ Xid key = tc.getXid();
TRANSACTION_LOCKS.remove(key);
+ System.out.println("released the connector lock on ="+key);
}
}
lock.lock.release();
@@ -128,4 +133,22 @@
return true;
}
+
+ @Override
+ public void workCompleted(WorkEvent event) {
+ try {
+ super.workCompleted(event);
+ } finally {
+ releaseTxnLock();
+ }
+ }
+
+ @Override
+ public void workRejected(WorkEvent event) {
+ try {
+ super.workRejected(event);
+ } finally {
+ releaseTxnLock();
+ }
+ }
}
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java 2009-11-02
19:41:28 UTC (rev 1544)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java 2009-11-06
22:01:50 UTC (rev 1545)
@@ -23,6 +23,8 @@
package org.teiid.dqp.internal.process;
import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkEvent;
+import javax.resource.spi.work.WorkListener;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.dqp.util.LogConstants;
@@ -32,7 +34,7 @@
* Represents a task that performs work that may take more than one processing pass to
complete.
* During processing the WorkItem may receive events asynchronously through the moreWork
method.
*/
-public abstract class AbstractWorkItem implements Work {
+public abstract class AbstractWorkItem implements Work, WorkListener{
enum ThreadState {
MORE_WORK, WORKING, IDLE, DONE
@@ -134,4 +136,20 @@
public boolean shouldAbortProcessing() {
return this.release;
}
+
+ @Override
+ public void workAccepted(WorkEvent arg0) {
+ }
+
+ @Override
+ public void workCompleted(WorkEvent arg0) {
+ }
+
+ @Override
+ public void workRejected(WorkEvent event) {
+ }
+
+ @Override
+ public void workStarted(WorkEvent arg0) {
+ }
}
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AsyncRequestWorkItem.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AsyncRequestWorkItem.java 2009-11-02
19:41:28 UTC (rev 1544)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AsyncRequestWorkItem.java 2009-11-06
22:01:50 UTC (rev 1545)
@@ -16,7 +16,7 @@
@Override
protected void resumeProcessing() {
- dqpCore.addWork(this, getTransactionContext());
+ dqpCore.addWork(this);
}
@Override
Modified: branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2009-11-02
19:41:28 UTC (rev 1544)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2009-11-06
22:01:50 UTC (rev 1545)
@@ -206,7 +206,7 @@
if(holder != null && !holder.isCanceled()) {
RequestInfo req = new RequestInfo(holder.requestID,
holder.requestMsg.getCommandString(), holder.requestMsg.getSubmittedTimestamp(),
holder.requestMsg.getProcessingTimestamp());
req.setSessionToken(holder.dqpWorkContext.getSessionToken());
- if (holder.getTransactionContext() != null &&
holder.getTransactionContext().isInTransaction()) {
+ if (holder.getTransactionContext() != null &&
holder.getTransactionContext().getXid() != null) {
req.setXid(holder.getTransactionContext().getXid());
}
@@ -272,7 +272,7 @@
logMMCommand(workItem, true, false, 0);
addRequest(requestID, workItem);
- this.addWork(workItem, workItem.getTransactionContext());
+ this.addWork(workItem);
return resultsFuture;
}
@@ -322,9 +322,9 @@
return rsCache.hasResults(cID);
}
- void addWork(Work work, TransactionContext context) {
+ void addWork(Work work) {
try {
- this.processWorkerPool.scheduleWork(work, 20000, context, null);
+ this.processWorkerPool.scheduleWork(work);
} catch (WorkException e) {
//TODO: how can be turn this into result?
e.printStackTrace();
@@ -708,39 +708,35 @@
// global txn
public void commit(MMXid xid, boolean onePhase) throws XATransactionException {
String threadId = DQPWorkContext.getWorkContext().getConnectionID();
- this.getTransactionService().commit(threadId, xid, onePhase);
+ this.getTransactionService().commit(threadId, xid, onePhase, false);
}
// global txn
public void end(MMXid xid, int flags) throws XATransactionException {
String threadId = DQPWorkContext.getWorkContext().getConnectionID();
- this.getTransactionService().end(threadId, xid, flags);
+ this.getTransactionService().end(threadId, xid, flags, false);
}
// global txn
public void forget(MMXid xid) throws XATransactionException {
String threadId = DQPWorkContext.getWorkContext().getConnectionID();
- this.getTransactionService().forget(threadId, xid);
+ this.getTransactionService().forget(threadId, xid, false);
}
// global txn
public int prepare(MMXid xid) throws XATransactionException {
- return this.getTransactionService().prepare(
- DQPWorkContext.getWorkContext().getConnectionID(),
- xid);
+ return
this.getTransactionService().prepare(DQPWorkContext.getWorkContext().getConnectionID(),xid,
false);
}
// global txn
public Xid[] recover(int flag) throws XATransactionException {
- return this.getTransactionService().recover(flag);
+ return this.getTransactionService().recover(flag, false);
}
// global txn
public void rollback(MMXid xid) throws XATransactionException {
- this.getTransactionService().rollback(
- DQPWorkContext.getWorkContext().getConnectionID(),
- xid);
+ this.getTransactionService().rollback(DQPWorkContext.getWorkContext().getConnectionID(),xid,
false);
}
// global txn
public void start(MMXid xid, int flags, int timeout)
throws XATransactionException {
String threadId = DQPWorkContext.getWorkContext().getConnectionID();
- this.getTransactionService().start(threadId, xid, flags, timeout);
+ this.getTransactionService().start(threadId, xid, flags, timeout, false);
}
public MetadataResult getMetadata(long requestID)
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2009-11-02
19:41:28 UTC (rev 1544)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2009-11-06
22:01:50 UTC (rev 1545)
@@ -105,7 +105,7 @@
try {
this.dataMgr.executeRequest(aqr, this.connectorName, this);
} catch (MetaMatrixComponentException e) {
- exceptionOccurred(e);
+ exceptionOccurred(e, true);
}
}
}
@@ -166,6 +166,11 @@
}
public void exceptionOccurred(Throwable e) {
+ exceptionOccurred(e, false);
+ }
+
+ private void exceptionOccurred(Throwable e, boolean removeState) {
+
synchronized (this) {
if(workItem.requestMsg.supportsPartialResults()) {
nextBatch = new List[0];
@@ -178,7 +183,9 @@
}
waitingForData = false;
}
- workItem.closeAtomicRequest(aqr.getAtomicRequestID());
+ if (removeState) {
+ workItem.closeAtomicRequest(aqr.getAtomicRequestID());
+ }
this.workItem.moreWork();
}
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java 2009-11-02
19:41:28 UTC (rev 1544)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java 2009-11-06
22:01:50 UTC (rev 1545)
@@ -78,7 +78,7 @@
shouldClose = chunk.isLast();
} catch (BlockedOnMemoryException e) {
LogManager.logDetail(LogConstants.CTX_DQP, new Object[] {"Reenqueueing LOB chunk
request due to lack of available memory ###########", requestID}); //$NON-NLS-1$
//$NON-NLS-2$
- this.dqpCore.addWork(this, null);
+ this.dqpCore.addWork(this);
return;
} catch (TupleSourceNotFoundException e) {
LogManager.logWarning(LogConstants.CTX_DQP, e,
DQPPlugin.Util.getString("BufferManagerLobChunkStream.no_tuple_source",
streamId)); //$NON-NLS-1$
Modified: branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/Request.java 2009-11-02
19:41:28 UTC (rev 1544)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/Request.java 2009-11-06
22:01:50 UTC (rev 1545)
@@ -370,9 +370,9 @@
if (tc != null){
Assertion.assertTrue(tc.getTransactionType() !=
TransactionContext.Scope.REQUEST, "Transaction already associated with
request."); //$NON-NLS-1$
}
-
- if (tc == null || !tc.isInTransaction()) {
- //if not under a transaction
+
+ // If local or global transaction is not started.
+ if (tc == null || tc.getXid() == null) {
boolean startAutoWrapTxn = false;
@@ -405,7 +405,6 @@
}
this.transactionContext = tc;
- this.transactionContext.inTransaction(true);
this.processor = new QueryProcessor(processPlan, context, bufferManager, new
TempTableDataManager(processorDataManager, tempTableStore));
}
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2009-11-02
19:41:28 UTC (rev 1544)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2009-11-06
22:01:50 UTC (rev 1545)
@@ -179,41 +179,11 @@
protected boolean isDoneProcessing() {
return isClosed;
}
-
-// @Override
-// protected void resumeProcessing() {
-// dqpCore.addWork(this, getTransactionContext());
-// }
-
+
@Override
- public void run() {
- while (!this.isDoneProcessing()) { //process until closed
- super.run();
- }
- }
-
- @Override
protected void resumeProcessing() {
- this.notify();
+ dqpCore.addWork(this);
}
-
- @Override
- protected void pauseProcessing() {
- try {
- this.wait();
- } catch (InterruptedException e) {
- interrupted(e);
- }
- }
-
- private void interrupted(InterruptedException e) {
- try {
- LogManager.logDetail(LogConstants.CTX_DQP, e, this.requestID+" Interrupted,
proceeding to close"); //$NON-NLS-1$
- this.dqpCore.cancelRequest(this.requestID);
- } catch (MetaMatrixComponentException e1) {
- LogManager.logWarning(LogConstants.CTX_DQP, e,
DQPPlugin.Util.getString("Cancel_failed", this.requestID)); //$NON-NLS-1$
- }
- }
@Override
protected void process() {
@@ -372,7 +342,7 @@
this.transactionService.rollback(transactionContext);
} catch (XATransactionException e1) {
LogManager.logWarning(LogConstants.CTX_DQP, e1,
DQPPlugin.Util.getString("ProcessWorker.failed_rollback")); //$NON-NLS-1$
- }
+ }
}
isClosed = true;
@@ -412,7 +382,7 @@
analysisRecord = request.analysisRecord;
schemas = request.schemas;
transactionContext = request.transactionContext;
- if (this.transactionContext != null &&
this.transactionContext.isInTransaction()) {
+ if (this.transactionContext != null && this.transactionContext.getXid() !=
null) {
this.transactionState = TransactionState.ACTIVE;
}
Option option = originalCommand.getOption();
@@ -587,7 +557,7 @@
}
}
workItem.setResultsReceiver(chunckReceiver);
- dqpCore.addWork(workItem, getTransactionContext());
+ dqpCore.addWork(workItem);
}
public void removeLobStream(int streamRequestId) {
@@ -688,7 +658,7 @@
*/
private void logCommandError() {
String transactionID = null;
- if (this.transactionContext != null &&
this.transactionContext.isInTransaction()) {
+ if (this.transactionContext != null && this.transactionContext.getXid()
!= null) {
transactionID = this.transactionContext.getXid().toString();
}
CommandLogMessage message = new CommandLogMessage(System.currentTimeMillis(),
requestID.toString(), transactionID == null ? null : transactionID,
requestID.getConnectionID(), dqpWorkContext.getUserName(), dqpWorkContext.getVdbName(),
dqpWorkContext.getVdbVersion(), -1, false, true);
@@ -716,7 +686,8 @@
TransactionContext getTransactionContext() {
return transactionContext;
}
-
+
+
Collection<DataTierTupleSource> getConnectorRequests() {
return new LinkedList<DataTierTupleSource>(this.connectorInfo.values());
}
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java 2009-11-02
19:41:28 UTC (rev 1544)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java 2009-11-06
22:01:50 UTC (rev 1545)
@@ -94,10 +94,6 @@
}
}
- public synchronized void removeTransactionContext(MMXid xid) {
- this.xidToTransactionContext.remove(xid);
- }
-
public synchronized void addTransactionContext(TransactionContext tc) {
if (tc.getXid() != null) {
this.xidToTransactionContext.put(tc.getXid(), tc);
@@ -128,56 +124,106 @@
/**
* Global Transaction
*/
- public int prepare(final String threadId, MMXid xid) throws XATransactionException {
- TransactionContext impl = checkXAState(threadId, xid, true, false);
- if (!impl.getSuspendedBy().isEmpty()) {
+ public int prepare(final String threadId, MMXid xid, boolean singleTM) throws
XATransactionException {
+ TransactionContext tc = checkXAState(threadId, xid, true, false);
+ if (!tc.getSuspendedBy().isEmpty()) {
throw new XATransactionException(XAException.XAER_PROTO,
DQPPlugin.Util.getString("TransactionServer.suspended_exist", xid));
//$NON-NLS-1$
}
- // In the container this pass though
- return XAResource.XA_RDONLY;
+ if (tc.shouldRollback()) {
+ throw new XATransactionException(XAException.XAER_RMERR,
DQPPlugin.Util.getString("TransactionServer.rollback_set", xid));
+ }
+
+ // In the container this pass though
+ if (singleTM) {
+ return XAResource.XA_RDONLY;
+ }
+
+ try {
+ return this.provider.getXATerminator().prepare(tc.getXid());
+ } catch (XAException e) {
+ throw new XATransactionException(e);
+ }
}
/**
* Global Transaction
*/
- public void commit(final String threadId, MMXid xid, boolean onePhase) throws
XATransactionException {
- // no-op; Connector sources will be directly managed by container's TM
- TransactionContext tc = checkXAState(threadId, xid, true, false);
- this.transactions.removeTransactionContext(tc);
+ public void commit(final String threadId, MMXid xid, boolean onePhase, boolean
singleTM) throws XATransactionException {
+ TransactionContext tc = checkXAState(threadId, xid, true, false);
+ try {
+ // In the case of single TM, the container directly commits the sources.
+ if (!singleTM) {
+ // In the case of onePhase containers call commit directly. If Teiid is also
one phase let it pass through,
+ // otherwise force the prepare.
+ boolean singlePhase = tc.isOnePhase();
+ if (onePhase && !singlePhase) {
+ prepare(threadId, xid, singleTM);
+ }
+ this.provider.getXATerminator().commit(tc.getXid(), singlePhase);
+ }
+ } catch (XAException e) {
+ throw new XATransactionException(e);
+ } finally {
+ this.transactions.removeTransactionContext(tc);
+ }
}
/**
* Global Transaction
*/
- public void rollback(final String threadId, MMXid xid) throws XATransactionException
{
- // no-op; Connector sources will be directly managed by container's TM
- TransactionContext tc = checkXAState(threadId, xid, true, false);
- this.transactions.removeTransactionContext(tc);
+ public void rollback(final String threadId, MMXid xid, boolean singleTM) throws
XATransactionException {
+ TransactionContext tc = checkXAState(threadId, xid, true, false);
+ try {
+ // In the case of single TM, the container directly roll backs the sources.
+ if (!singleTM) {
+ this.provider.getXATerminator().rollback(tc.getXid());
+ }
+ } catch (XAException e) {
+ throw new XATransactionException(e);
+ } finally {
+ this.transactions.removeTransactionContext(tc);
+ }
}
/**
* Global Transaction
*/
- public Xid[] recover(int flag) throws XATransactionException {
- // no-op; Connector sources will be directly managed by container's TM
+ public Xid[] recover(int flag, boolean singleTM) throws XATransactionException {
+ // In case of single TM, container knows this list.
+ if (singleTM) {
+ return new Xid[0];
+ }
+
return new Xid[0];
+// try {
+// return this.provider.getXATerminator().recover(flag);
+// } catch (XAException e) {
+// throw new XATransactionException(e);
+// }
}
/**
* Global Transaction
*/
- public void forget(final String threadId, MMXid xid) throws XATransactionException {
- // no-op; Connector sources will be directly managed by container's TM
+ public void forget(final String threadId, MMXid xid, boolean singleTM) throws
XATransactionException {
+ TransactionContext tc = checkXAState(threadId, xid, true, false);
+ try {
+ if (singleTM) {
+ return;
+ }
+ this.provider.getXATerminator().forget(xid);
+ } catch (XAException err) {
+ throw new XATransactionException(err);
+ } finally {
+ this.transactions.removeTransactionContext(tc);
+ }
}
/**
* Global Transaction
*/
- public void start(final String threadId,
- final MMXid xid,
- int flags,
- int timeout) throws XATransactionException {
+ public void start(final String threadId, final MMXid xid, int flags, int timeout,
boolean singleTM) throws XATransactionException {
TransactionContext tc = null;
@@ -221,7 +267,7 @@
/**
* Global Transaction
*/
- public void end(final String threadId, MMXid xid, int flags) throws
XATransactionException {
+ public void end(final String threadId, MMXid xid, int flags, boolean singleTM) throws
XATransactionException {
TransactionContext tc = checkXAState(threadId, xid, true, true);
try {
switch (flags) {
@@ -234,6 +280,7 @@
break;
}
case XAResource.TMFAIL: {
+ tc.setRollbackOnly();
break;
}
default:
@@ -324,8 +371,7 @@
}
try {
if (tc.isInTransaction()) {
- // TODO: implement the one phase logic
- this.provider.getXATerminator().commit(tc.getXid(), false);
+ directCommit(tc);
}
} catch (XAException e) {
throw new XATransactionException(e);
@@ -382,13 +428,17 @@
//commit may be called multiple times by the processworker, if this is a
subsequent call, then the current
//context will not be active
- TransactionContext currentContext =
transactions.getTransactionContext(context.getThreadId());
- if (currentContext == null || currentContext.getTransactionType() ==
TransactionContext.Scope.NONE) {
- return currentContext;
+ TransactionContext tc =
transactions.getTransactionContext(context.getThreadId());
+ if (tc == null || tc.getTransactionType() == TransactionContext.Scope.NONE) {
+ return tc;
}
+
+ Assertion.assertTrue(!tc.shouldRollback());
+
try {
- // TODO: implement the one phase logic
- this.provider.getXATerminator().commit(context.getXid(), false);
+ if (tc.isInTransaction()) {
+ directCommit(tc);
+ }
} catch (XAException e) {
throw new XATransactionException(e);
} finally {
@@ -397,6 +447,19 @@
return context;
}
+ private void directCommit(TransactionContext tc) throws XAException {
+ boolean commit = true;
+ boolean onePhase = tc.isOnePhase();
+ if (!onePhase) {
+ int prepare = this.provider.getXATerminator().prepare(tc.getXid());
+ commit = (prepare == XAResource.XA_OK);
+ }
+
+ if (commit) {
+ this.provider.getXATerminator().commit(tc.getXid(), onePhase);
+ }
+ }
+
/**
* Request level transaction
*/
@@ -426,14 +489,16 @@
return;
}
- cancelTransaction(tc);
+ tc.setRollbackOnly();
+
+ if (requestOnly) {
+ rollback(tc);
+ }
+ else {
+ rollback(threadId);
+ }
}
- private void cancelTransaction(TransactionContext context) throws XATransactionException
{
- context.setRollbackOnly();
- rollback(context);
- }
-
@Override
public Collection<org.teiid.adminapi.Transaction> getTransactions() {
Set<TransactionContext> txnSet = Collections.newSetFromMap(new
IdentityHashMap<TransactionContext, Boolean>());
Modified: branches/JCA/engine/src/main/resources/com/metamatrix/dqp/i18n.properties
===================================================================
--- branches/JCA/engine/src/main/resources/com/metamatrix/dqp/i18n.properties 2009-11-02
19:41:28 UTC (rev 1544)
+++ branches/JCA/engine/src/main/resources/com/metamatrix/dqp/i18n.properties 2009-11-06
22:01:50 UTC (rev 1545)
@@ -463,7 +463,7 @@
TransactionServer.suspended_exist=Suspended work still exists on transaction {0}.
TransactionServer.failed_to_enlist=Failed to enlist the XAResource in Transaction.
TransactionServer.failed_to_delist=Failed to delist the XAResource from Transaction.
-
+TransactionServer.rollback_set=Rollback Only has been set on transaction {0}. This may be
result of a cancel request.
TransactionContextImpl.remote_not_supported=Remote connector calls under a transaction
are not supported
CodeTableCache.duplicate_key=Duplicate code table ''{0}'' key
''{1}'' value ''{2}''
Modified:
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java
===================================================================
---
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java 2009-11-02
19:41:28 UTC (rev 1544)
+++
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java 2009-11-06
22:01:50 UTC (rev 1545)
@@ -427,7 +427,7 @@
int resumeCount;
FakeQueuingAsynchConnectorWorkItem(AtomicRequestMessage message,
- ConnectorManager manager, ResultsReceiver<AtomicResultsMessage>
resultsReceiver) {
+ ConnectorManager manager, ResultsReceiver<AtomicResultsMessage>
resultsReceiver) throws ConnectorException {
super(message, manager, resultsReceiver);
}
Modified:
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/transaction/TestTransactionServer.java
===================================================================
---
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/transaction/TestTransactionServer.java 2009-11-02
19:41:28 UTC (rev 1544)
+++
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/transaction/TestTransactionServer.java 2009-11-06
22:01:50 UTC (rev 1545)
@@ -22,21 +22,23 @@
package org.teiid.dqp.internal.transaction;
+import javax.resource.spi.XATerminator;
import javax.transaction.xa.XAResource;
+import junit.framework.TestCase;
+
import org.mockito.Mockito;
import org.teiid.adminapi.Transaction;
-import junit.framework.TestCase;
-
import com.metamatrix.common.xa.MMXid;
import com.metamatrix.common.xa.XATransactionException;
-import com.metamatrix.core.util.SimpleMock;
+import com.metamatrix.dqp.service.TransactionContext;
public class TestTransactionServer extends TestCase {
private TransactionServerImpl server;
-
+ private XATerminator xaTerminator;
+
private static final String THREAD1 = "1"; //$NON-NLS-1$
private static final String THREAD2 = "2"; //$NON-NLS-1$
@@ -52,7 +54,10 @@
*/
protected void setUp() throws Exception {
server = new TransactionServerImpl();
-
server.setTransactionProvider(SimpleMock.createSimpleMock(TransactionProvider.class));
+ TransactionProvider provider = Mockito.mock(TransactionProvider.class);
+ xaTerminator = Mockito.mock(XATerminator.class);
+ Mockito.stub(provider.getXATerminator()).toReturn(xaTerminator);
+ server.setTransactionProvider(provider);
server.setXidFactory(new XidFactory());
}
@@ -63,7 +68,7 @@
server.begin(THREAD1);
try {
- server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100);
+ server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100, false);
fail("exception expected"); //$NON-NLS-1$
} catch (XATransactionException ex) {
assertEquals("Client thread already involved in a transaction.
Transaction nesting is not supported. The current transaction must be completed
first.", //$NON-NLS-1$
@@ -75,7 +80,7 @@
* once in a global, cannot start a local
*/
public void testTransactionExclusion1() throws Exception {
- server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100);
+ server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100, false);
try {
server.begin(THREAD1);
@@ -90,10 +95,10 @@
* global can only be started once
*/
public void testTransactionExclusion2() throws Exception {
- server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100);
+ server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
try {
- server.start(THREAD2, XID1, XAResource.TMNOFLAGS, 100);
+ server.start(THREAD2, XID1, XAResource.TMNOFLAGS, 100,false);
fail("exception expected"); //$NON-NLS-1$
} catch (XATransactionException ex) {
assertEquals("Global transaction MMXid global:1 branch:null format:0
already exists.", ex.getMessage()); //$NON-NLS-1$
@@ -104,10 +109,10 @@
* global cannot be nested
*/
public void testTransactionExclusion3() throws Exception {
- server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100);
+ server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
try {
- server.start(THREAD1, XID2, XAResource.TMNOFLAGS, 100);
+ server.start(THREAD1, XID2, XAResource.TMNOFLAGS, 100,false);
fail("exception expected"); //$NON-NLS-1$
} catch (XATransactionException ex) {
assertEquals("Client thread already involved in a transaction.
Transaction nesting is not supported. The current transaction must be completed
first.", //$NON-NLS-1$
@@ -134,12 +139,12 @@
* global cannot be nested
*/
public void testTransactionExclusion5() throws Exception {
- server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100);
- server.start(THREAD2, XID2, XAResource.TMNOFLAGS, 100);
- server.end(THREAD2, XID2, XAResource.TMSUCCESS);
+ server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
+ server.start(THREAD2, XID2, XAResource.TMNOFLAGS, 100,false);
+ server.end(THREAD2, XID2, XAResource.TMSUCCESS,false);
try {
- server.start(THREAD1, XID2, XAResource.TMJOIN, 100);
+ server.start(THREAD1, XID2, XAResource.TMJOIN, 100,false);
fail("exception expected"); //$NON-NLS-1$
} catch (XATransactionException ex) {
assertEquals("Client thread already involved in a transaction.
Transaction nesting is not supported. The current transaction must be completed
first.", //$NON-NLS-1$
@@ -158,10 +163,49 @@
}
}
+ public void testLocalSetRollback() throws Exception {
+ TransactionContext tc = server.begin(THREAD1);
+ tc.incrementPartcipatingSourceCount("s1");
+ tc.setRollbackOnly();
+
+ server.commit(THREAD1);
+
+ Mockito.verify(xaTerminator).rollback(tc.getXid());
+ }
+
+ public void testSinglePhaseCommit() throws Exception {
+ TransactionContext tc = server.begin(THREAD1);
+ tc.incrementPartcipatingSourceCount("S1");
+
+ server.commit(THREAD1);
+
+ Mockito.verify(xaTerminator).commit(tc.getXid(), true);
+
+ tc = server.begin(THREAD1);
+ tc.incrementPartcipatingSourceCount("S1");
+ tc.incrementPartcipatingSourceCount("S1");
+
+ server.commit(THREAD1);
+
+ Mockito.verify(xaTerminator).commit(tc.getXid(), true);
+ }
+
+ public void testTwoPhaseCommit() throws Exception {
+ TransactionContext tc = server.begin(THREAD1);
+ tc.incrementPartcipatingSourceCount("S1");
+ tc.incrementPartcipatingSourceCount("S2");
+
+ server.commit(THREAD1);
+
+ Mockito.verify(xaTerminator).commit(tc.getXid(), false);
+ }
+
public void testLocalRollback() throws Exception {
- server.begin(THREAD1);
+ TransactionContext tc = server.begin(THREAD1);
+ tc.incrementPartcipatingSourceCount("s1");
server.rollback(THREAD1);
-
+ Mockito.verify(xaTerminator).rollback(tc.getXid());
+
try {
server.rollback(THREAD1);
} catch (XATransactionException e) {
@@ -170,10 +214,10 @@
}
public void testConcurrentEnlistment() throws Exception {
- server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100);
+ server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
try {
- server.start(THREAD1, XID1, XAResource.TMJOIN, 100);
+ server.start(THREAD1, XID1, XAResource.TMJOIN, 100,false);
fail("exception expected"); //$NON-NLS-1$
} catch (XATransactionException ex) {
assertEquals("Concurrent enlistment in global transaction MMXid global:1
branch:null format:0 is not supported.", //$NON-NLS-1$
@@ -182,11 +226,11 @@
}
public void testSuspend() throws Exception {
- server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100);
- server.end(THREAD1, XID1, XAResource.TMSUSPEND);
+ server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
+ server.end(THREAD1, XID1, XAResource.TMSUSPEND,false);
try {
- server.end(THREAD1, XID1, XAResource.TMSUSPEND);
+ server.end(THREAD1, XID1, XAResource.TMSUSPEND,false);
fail("exception expected"); //$NON-NLS-1$
} catch (XATransactionException ex) {
assertEquals("Client is not currently enlisted in transaction MMXid
global:1 branch:null format:0.", ex.getMessage()); //$NON-NLS-1$
@@ -194,13 +238,13 @@
}
public void testSuspendResume() throws Exception {
- server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100);
- server.end(THREAD1, XID1, XAResource.TMSUSPEND);
- server.start(THREAD1, XID1, XAResource.TMRESUME, 100);
- server.end(THREAD1, XID1, XAResource.TMSUSPEND);
+ server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
+ server.end(THREAD1, XID1, XAResource.TMSUSPEND,false);
+ server.start(THREAD1, XID1, XAResource.TMRESUME, 100,false);
+ server.end(THREAD1, XID1, XAResource.TMSUSPEND,false);
try {
- server.start(THREAD2, XID1, XAResource.TMRESUME, 100);
+ server.start(THREAD2, XID1, XAResource.TMRESUME, 100,false);
fail("exception expected"); //$NON-NLS-1$
} catch (XATransactionException ex) {
assertEquals("Cannot resume, transaction MMXid global:1 branch:null
format:0 was not suspended by client 2.", ex.getMessage()); //$NON-NLS-1$
@@ -209,7 +253,7 @@
public void testUnknownFlags() throws Exception {
try {
- server.start(THREAD1, XID1, Integer.MAX_VALUE, 100);
+ server.start(THREAD1, XID1, Integer.MAX_VALUE, 100,false);
fail("exception expected"); //$NON-NLS-1$
} catch (XATransactionException ex) {
assertEquals("Unknown flags", ex.getMessage()); //$NON-NLS-1$
@@ -218,7 +262,7 @@
public void testUnknownGlobalTransaction() throws Exception {
try {
- server.end(THREAD1, XID1, XAResource.TMSUCCESS);
+ server.end(THREAD1, XID1, XAResource.TMSUCCESS,false);
fail("exception expected"); //$NON-NLS-1$
} catch (XATransactionException ex) {
assertEquals("No global transaction found for MMXid global:1 branch:null
format:0.", ex.getMessage()); //$NON-NLS-1$
@@ -226,11 +270,11 @@
}
public void testPrepareWithSuspended() throws Exception {
- server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100);
- server.end(THREAD1, XID1, XAResource.TMSUSPEND);
+ server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
+ server.end(THREAD1, XID1, XAResource.TMSUSPEND,false);
try {
- server.prepare(THREAD1, XID1);
+ server.prepare(THREAD1, XID1,false);
fail("exception expected"); //$NON-NLS-1$
} catch (XATransactionException ex) {
assertEquals("Suspended work still exists on transaction MMXid global:1
branch:null format:0.", ex.getMessage()); //$NON-NLS-1$
@@ -242,7 +286,7 @@
}
public void testGetTransactions() throws Exception {
- server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100);
+ server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
server.begin(THREAD2);
assertEquals(2, server.getTransactions().size());
@@ -254,4 +298,181 @@
assertEquals(THREAD1, t.getAssociatedSession());
assertNotNull(t.getXid());
}
+
+ public void testGlobalPrepare() throws Exception {
+ server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
+ TransactionContext tc = server.getOrCreateTransactionContext(THREAD1);
+ server.end(THREAD1, XID1, XAResource.TMSUCCESS, false);
+
+ server.prepare(THREAD1, XID1, false);
+
+ Mockito.verify(xaTerminator).prepare(tc.getXid());
+
+ server.commit(THREAD1, XID1, true, false);
+ }
+
+ public void testGlobalPrepareFail() throws Exception {
+ server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
+ server.end(THREAD1, XID1, XAResource.TMFAIL, false);
+
+ try {
+ server.prepare(THREAD1, XID1, false);
+ fail("should have failed to prepare as end resulted in TMFAIL");
+ } catch (Exception e) {
+ }
+
+ server.forget(THREAD1, XID1, false);
+ }
+
+ public void testGlobalOnePhaseCommit() throws Exception {
+ server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
+ TransactionContext tc = server.getOrCreateTransactionContext(THREAD1);
+
+ tc.incrementPartcipatingSourceCount("S1");
+
+ server.end(THREAD1, XID1, XAResource.TMSUCCESS, false);
+
+ server.prepare(THREAD1, XID1, false);
+
+
+ server.commit(THREAD1, XID1, true, false);
+
+ // since there are two sources the commit is not single phase
+ Mockito.verify(xaTerminator).commit(tc.getXid(), true);
+ }
+
+ public void testGlobalOnePhaseCommit_force_prepare_through() throws Exception {
+ server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
+ TransactionContext tc = server.getOrCreateTransactionContext(THREAD1);
+
+ tc.incrementPartcipatingSourceCount("S1");
+
+ server.end(THREAD1, XID1, XAResource.TMSUCCESS, false);
+
+
+ server.commit(THREAD1, XID1, true, false);
+
+ // since there are two sources the commit is not single phase
+ Mockito.verify(xaTerminator, Mockito.times(0)).prepare(tc.getXid());
+ Mockito.verify(xaTerminator).commit(tc.getXid(), true);
+ }
+
+ public void testGlobalOnePhaseCommit_force_prepare() throws Exception {
+ server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
+ TransactionContext tc = server.getOrCreateTransactionContext(THREAD1);
+
+ tc.incrementPartcipatingSourceCount("S1");
+ tc.incrementPartcipatingSourceCount("S2");
+
+ server.end(THREAD1, XID1, XAResource.TMSUCCESS, false);
+
+
+ server.commit(THREAD1, XID1, true, false);
+
+ // since there are two sources the commit is not single phase
+ Mockito.verify(xaTerminator).prepare(tc.getXid());
+ Mockito.verify(xaTerminator).commit(tc.getXid(), false);
+ }
+
+
+ public void testGlobalOnePhase_teiid_multiple() throws Exception {
+ server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
+ TransactionContext tc = server.getOrCreateTransactionContext(THREAD1);
+
+ tc.incrementPartcipatingSourceCount("S1");
+ tc.incrementPartcipatingSourceCount("S2");
+
+ server.end(THREAD1, XID1, XAResource.TMSUCCESS, false);
+
+ server.prepare(THREAD1, XID1, false);
+
+
+ server.commit(THREAD1, XID1, true, false);
+
+ // since there are two sources the commit is not single phase
+ Mockito.verify(xaTerminator).commit(tc.getXid(), false);
+ }
+
+ public void testGlobalOnePhaseRoolback() throws Exception {
+ server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
+ TransactionContext tc = server.getOrCreateTransactionContext(THREAD1);
+
+ tc.incrementPartcipatingSourceCount("S1");
+
+ server.end(THREAD1, XID1, XAResource.TMSUCCESS, false);
+
+ server.prepare(THREAD1, XID1, false);
+
+
+ server.rollback(THREAD1, XID1, false);
+
+ // since there are two sources the commit is not single phase
+ Mockito.verify(xaTerminator).rollback(tc.getXid());
+ }
+
+ public void testLocalCommit_rollback() throws Exception {
+ TransactionContext tc = server.begin(THREAD1);
+ tc.incrementPartcipatingSourceCount("s1");
+ tc.setRollbackOnly();
+ server.commit(THREAD1);
+
+ Mockito.verify(xaTerminator).rollback(tc.getXid());
+ }
+
+ public void testLocalCommit_not_in_Tx() throws Exception {
+ TransactionContext tc = server.begin(THREAD1);
+ server.commit(THREAD1);
+
+ Mockito.verify(xaTerminator,Mockito.times(0)).commit(tc.getXid(), true);
+ }
+
+ public void testRequestCommit() throws Exception{
+ TransactionContext tc = server.getOrCreateTransactionContext(THREAD1);
+ server.start(tc);
+ tc.incrementPartcipatingSourceCount("s1");
+ server.commit(tc);
+ Mockito.verify(xaTerminator,Mockito.times(0)).prepare(tc.getXid());
+ Mockito.verify(xaTerminator).commit(tc.getXid(), true);
+ }
+
+ public void testRequestCommit2() throws Exception{
+ TransactionContext tc = server.getOrCreateTransactionContext(THREAD1);
+ server.start(tc);
+ tc.incrementPartcipatingSourceCount("s1");
+ tc.incrementPartcipatingSourceCount("s2");
+ server.commit(tc);
+
+ Mockito.verify(xaTerminator).prepare(tc.getXid());
+ Mockito.verify(xaTerminator).commit(tc.getXid(), false);
+ }
+
+ public void testRequestRollback() throws Exception{
+ TransactionContext tc = server.getOrCreateTransactionContext(THREAD1);
+ server.start(tc);
+ tc.incrementPartcipatingSourceCount("s1");
+ tc.incrementPartcipatingSourceCount("s2");
+
+ server.rollback(tc);
+ Mockito.verify(xaTerminator).rollback(tc.getXid());
+ }
+
+ public void testLocalCancel() throws Exception {
+ TransactionContext tc = server.begin(THREAD1);
+ tc.incrementPartcipatingSourceCount("S1");
+ tc.incrementPartcipatingSourceCount("S2");
+
+ server.cancelTransactions(THREAD1, false);
+
+ Mockito.verify(xaTerminator).rollback(tc.getXid());
+ }
+
+ public void testRequestCancel() throws Exception{
+ TransactionContext tc = server.getOrCreateTransactionContext(THREAD1);
+ server.start(tc);
+ tc.incrementPartcipatingSourceCount("s1");
+ tc.incrementPartcipatingSourceCount("s2");
+
+ server.cancelTransactions(THREAD1, true);
+ Mockito.verify(xaTerminator).rollback(tc.getXid());
+ }
}