Author: shawkins
Date: 2010-03-22 14:41:48 -0400 (Mon, 22 Mar 2010)
New Revision: 1984
Removed:
trunk/client/src/main/java/com/metamatrix/admin/
trunk/client/src/main/java/com/metamatrix/common/
trunk/client/src/main/java/com/metamatrix/dqp/
trunk/client/src/main/java/com/metamatrix/platform/
trunk/client/src/test/java/com/
Modified:
trunk/engine/src/main/java/com/metamatrix/dqp/service/TransactionContext.java
trunk/engine/src/main/java/com/metamatrix/query/optimizer/proc/ProcedurePlanner.java
trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/RelationalPlan.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java
trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestOptimizer.java
trunk/engine/src/test/java/com/metamatrix/query/processor/proc/TestProcedureProcessor.java
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeTransactionService.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java
trunk/engine/src/test/java/org/teiid/dqp/internal/transaction/TestTransactionServer.java
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
Log:
TEIID-1015 refining transaction logic. switching to importing xa transactions on start.
Modified: trunk/engine/src/main/java/com/metamatrix/dqp/service/TransactionContext.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/dqp/service/TransactionContext.java 2010-03-22
14:47:51 UTC (rev 1983)
+++
trunk/engine/src/main/java/com/metamatrix/dqp/service/TransactionContext.java 2010-03-22
18:41:48 UTC (rev 1984)
@@ -44,7 +44,6 @@
private String threadId;
private Scope transactionType = Scope.NONE;
private long creationTime;
- private boolean rollback;
private Transaction transaction;
private Set<String> suspendedBy = Collections.newSetFromMap(new
ConcurrentHashMap<String, Boolean>());
@@ -93,14 +92,6 @@
return "NONE"; //$NON-NLS-1$
}
- public void setRollbackOnly() {
- this.rollback = true;
- }
-
- public boolean shouldRollback() {
- return this.rollback;
- }
-
public Set<String> getSuspendedBy() {
return this.suspendedBy;
}
Modified:
trunk/engine/src/main/java/com/metamatrix/query/optimizer/proc/ProcedurePlanner.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/optimizer/proc/ProcedurePlanner.java 2010-03-22
14:47:51 UTC (rev 1983)
+++
trunk/engine/src/main/java/com/metamatrix/query/optimizer/proc/ProcedurePlanner.java 2010-03-22
18:41:48 UTC (rev 1984)
@@ -53,6 +53,7 @@
import com.metamatrix.query.sql.lang.Command;
import com.metamatrix.query.sql.lang.DynamicCommand;
import com.metamatrix.query.sql.lang.ProcedureContainer;
+import com.metamatrix.query.sql.lang.StoredProcedure;
import com.metamatrix.query.sql.lang.TranslatableProcedureContainer;
import com.metamatrix.query.sql.proc.AssignmentStatement;
import com.metamatrix.query.sql.proc.Block;
@@ -120,6 +121,9 @@
ProcedureContainer container =
(ProcedureContainer)((CreateUpdateProcedureCommand) procCommand).getUserCommand();
if (container != null) {
+ if (container instanceof StoredProcedure) {
+ plan.setRequiresTransaction(container.getUpdateCount() > 0);
+ }
Map params = container.getProcedureParameters();
plan.setParams(params);
plan.setMetadata(metadata);
@@ -156,16 +160,13 @@
private Program planBlock(CreateUpdateProcedureCommand parentProcCommand, Block
block, QueryMetadataInterface metadata, boolean debug, IDGenerator idGenerator,
CapabilitiesFinder capFinder, AnalysisRecord analysisRecord, CommandContext context)
throws QueryPlannerException, QueryMetadataException,
MetaMatrixComponentException {
- Iterator stmtIter = block.getStatements().iterator();
-
// Generate program and add instructions
// this program represents the block on the procedure
// instruction in the program would correspond to statements in the block
Program programBlock = new Program();
// plan each statement in the block
- while(stmtIter.hasNext()) {
- Statement statement = (Statement) stmtIter.next();
+ for (Statement statement : block.getStatements()) {
Object instruction = planStatement(parentProcCommand, statement, metadata, debug,
idGenerator, capFinder, analysisRecord, context);
//childIndex = ((Integer) array[0]).intValue();
if(instruction instanceof ProgramInstruction){
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java 2010-03-22
14:47:51 UTC (rev 1983)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java 2010-03-22
18:41:48 UTC (rev 1984)
@@ -118,6 +118,8 @@
private Stack<Program> programs = new Stack<Program>();
private boolean evaluatedParams;
+
+ private boolean requiresTransaction = true;
/**
* Constructor for ProcedurePlan.
@@ -328,6 +330,7 @@
plan.setParams(params);
plan.setImplicitParams(implicitParams);
plan.setMetadata(metadata);
+ plan.requiresTransaction = requiresTransaction;
return plan;
}
@@ -625,9 +628,14 @@
return programs.peek();
}
+ public void setRequiresTransaction(boolean requiresTransaction) {
+ this.requiresTransaction = requiresTransaction;
+ }
+
@Override
public boolean requiresTransaction(boolean transactionalReads) {
- return true;
+ //TODO: detect simple select case
+ return requiresTransaction || transactionalReads;
}
}
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/RelationalPlan.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/RelationalPlan.java 2010-03-22
14:47:51 UTC (rev 1983)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/RelationalPlan.java 2010-03-22
18:41:48 UTC (rev 1984)
@@ -155,16 +155,35 @@
@Override
public boolean requiresTransaction(boolean transactionalReads) {
- if (root instanceof DependentAccessNode) {
- if (transactionalReads || !(((DependentAccessNode)root).getCommand() instanceof
QueryCommand)) {
+ return requiresTransaction(transactionalReads, root);
+ }
+
+ /**
+ * Currently does not detect procedures in non-inline view subqueries
+ */
+ boolean requiresTransaction(boolean transactionalReads, RelationalNode node) {
+ if (node instanceof DependentAccessNode) {
+ if (transactionalReads || !(((DependentAccessNode)node).getCommand() instanceof
QueryCommand)) {
return true;
}
return false;
}
- if (root instanceof AccessNode) {
- return false; //full pushdown
+ if (node instanceof AccessNode) {
+ return false;
}
- return transactionalReads; //embedded procedures are not detected
+ if (transactionalReads) {
+ return true;
+ }
+ if (node instanceof PlanExecutionNode) {
+ ProcessorPlan plan = ((PlanExecutionNode)node).getProcessorPlan();
+ return plan.requiresTransaction(transactionalReads);
+ }
+ for (RelationalNode child : node.getChildren()) {
+ if (child != null && requiresTransaction(transactionalReads, child)) {
+ return true;
+ }
+ }
+ return false;
}
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java 2010-03-22
14:47:51 UTC (rev 1983)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java 2010-03-22
18:41:48 UTC (rev 1984)
@@ -44,7 +44,11 @@
public void run() {
startProcessing();
- process();
+ try {
+ process();
+ } finally {
+ endProcessing();
+ }
}
synchronized ThreadState getThreadState() {
@@ -142,7 +146,6 @@
@Override
public void workCompleted(WorkEvent arg0) {
- endProcessing();
}
@Override
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2010-03-22
14:47:51 UTC (rev 1983)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2010-03-22
18:41:48 UTC (rev 1984)
@@ -82,18 +82,21 @@
*/
public class DQPCore implements DQP {
- private final static class FutureWork<T> implements Work, WorkListener {
- private final ResultsReceiver<T> receiver;
+ public final static class FutureWork<T> implements Work, WorkListener {
private final Callable<T> toCall;
private DQPWorkContext workContext;
+ private ResultsFuture<T> result = new ResultsFuture<T>();
+ private ResultsReceiver<T> receiver = result.getResultsReceiver();
- private FutureWork(ResultsReceiver<T> receiver,
- Callable<T> processor) {
+ public FutureWork(Callable<T> processor) {
this.workContext = DQPWorkContext.getWorkContext();
- this.receiver = receiver;
this.toCall = processor;
}
-
+
+ public ResultsFuture<T> getResult() {
+ return result;
+ }
+
@Override
public void run() {
try {
@@ -326,12 +329,8 @@
}
void addWork(Work work) {
- TransactionContext tc = null;
- if (work instanceof RequestWorkItem && this.transactionService != null) {
- tc =
transactionService.getOrCreateTransactionContext(DQPWorkContext.getWorkContext().getConnectionID());
- }
try {
- this.processWorkerPool.scheduleWork(work, tc, 0);
+ this.processWorkerPool.scheduleWork(work);
} catch (WorkException e) {
//TODO: cancel? close?
throw new MetaMatrixRuntimeException(e);
@@ -439,13 +438,11 @@
}
}
- if (transactionService != null) {
- try {
- transactionService.cancelTransactions(sessionId, false);
- } catch (XATransactionException err) {
- LogManager.logWarning(LogConstants.CTX_DQP, "rollback failed for
requestID=" + sessionId); //$NON-NLS-1$
- }
- }
+ try {
+ transactionService.cancelTransactions(sessionId, false);
+ } catch (XATransactionException err) {
+ LogManager.logWarning(LogConstants.CTX_DQP, "rollback failed for
requestID=" + sessionId); //$NON-NLS-1$
+ }
contextCache.removeSessionScopedCache(sessionId);
}
@@ -542,16 +539,10 @@
}
public Collection<org.teiid.adminapi.Transaction> getTransactions() {
- if (this.transactionService == null) {
- return Collections.emptyList();
- }
return this.transactionService.getTransactions();
}
public void terminateTransaction(String xid) throws AdminException {
- if (this.transactionService == null) {
- return;
- }
this.transactionService.terminateTransaction(xid);
}
@@ -592,9 +583,6 @@
}
public TransactionService getTransactionService() {
- if (transactionService == null) {
- throw new MetaMatrixRuntimeException("Transactions are not enabled");
//$NON-NLS-1$
- }
return transactionService;
}
@@ -741,14 +729,13 @@
}
private <T> ResultsFuture<T> addWork(Callable<T> processor) {
- ResultsFuture<T> result = new ResultsFuture<T>();
- ResultsReceiver<T> receiver = result.getResultsReceiver();
+ FutureWork<T> work = new FutureWork<T>(processor);
try {
- this.workManager.scheduleWork(new FutureWork<T>(receiver, processor));
+ this.workManager.scheduleWork(work);
} catch (WorkException e) {
throw new MetaMatrixRuntimeException(e);
}
- return result;
+ return work.getResult();
}
// global txn
@@ -771,11 +758,17 @@
return addWork(processor);
}
// global txn
- public ResultsFuture<?> start(XidImpl xid, int flags, int timeout)
+ public ResultsFuture<?> start(final XidImpl xid, final int flags, final int
timeout)
throws XATransactionException {
- DQPWorkContext workContext = DQPWorkContext.getWorkContext();
- this.getTransactionService().start(workContext.getConnectionID(), xid, flags, timeout,
workContext.getSession().isEmbedded());
- return ResultsFuture.NULL_FUTURE;
+ Callable<Void> processor = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ DQPWorkContext workContext = DQPWorkContext.getWorkContext();
+ getTransactionService().start(workContext.getConnectionID(), xid, flags, timeout,
workContext.getSession().isEmbedded());
+ return null;
+ }
+ };
+ return addWork(processor);
}
public MetadataResult getMetadata(long requestID)
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java 2010-03-22
14:47:51 UTC (rev 1983)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java 2010-03-22
18:41:48 UTC (rev 1984)
@@ -335,18 +335,12 @@
private void createProcessor() throws MetaMatrixComponentException {
- TransactionContext tc = null;
+ TransactionContext tc =
transactionService.getOrCreateTransactionContext(workContext.getConnectionID());
- if (transactionService != null) {
- tc =
transactionService.getOrCreateTransactionContext(workContext.getConnectionID());
- }
-
- if (tc != null){
- Assertion.assertTrue(tc.getTransactionType() !=
TransactionContext.Scope.REQUEST, "Transaction already associated with
request."); //$NON-NLS-1$
- }
+ Assertion.assertTrue(tc.getTransactionType() != TransactionContext.Scope.REQUEST,
"Transaction already associated with request."); //$NON-NLS-1$
// If local or global transaction is not started.
- if (tc == null || tc.getTransactionType() != Scope.NONE) {
+ if (tc.getTransactionType() == Scope.NONE) {
boolean startAutoWrapTxn = false;
@@ -355,17 +349,10 @@
} else if
(RequestMessage.TXN_WRAP_DETECT.equals(requestMsg.getTxnAutoWrapMode())){
boolean transactionalRead = requestMsg.getTransactionIsolation() ==
Connection.TRANSACTION_REPEATABLE_READ
|| requestMsg.getTransactionIsolation() == Connection.TRANSACTION_SERIALIZABLE;
- if (!transactionalRead && userCommand instanceof StoredProcedure
&& ((StoredProcedure)userCommand).getUpdateCount() == 0) {
- startAutoWrapTxn = false;
- } else {
- startAutoWrapTxn = processPlan.requiresTransaction(transactionalRead);
- }
+ startAutoWrapTxn = processPlan.requiresTransaction(transactionalRead);
}
if (startAutoWrapTxn) {
- if (transactionService == null) {
- throw new
MetaMatrixComponentException(DQPPlugin.Util.getString("Request.transaction_not_supported"));
//$NON-NLS-1$
- }
try {
tc = transactionService.begin(tc);
} catch (XATransactionException err) {
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java 2010-03-22
14:47:51 UTC (rev 1983)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java 2010-03-22
18:41:48 UTC (rev 1984)
@@ -29,9 +29,13 @@
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import javax.resource.NotSupportedException;
import javax.resource.spi.XATerminator;
+import javax.resource.spi.work.WorkException;
+import javax.resource.spi.work.WorkManager;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.InvalidTransactionException;
@@ -48,6 +52,7 @@
import org.teiid.adminapi.impl.TransactionMetadata;
import org.teiid.client.xa.XATransactionException;
import org.teiid.client.xa.XidImpl;
+import org.teiid.dqp.internal.process.DQPCore.FutureWork;
import com.metamatrix.core.util.Assertion;
import com.metamatrix.dqp.DQPPlugin;
@@ -111,6 +116,7 @@
private XATerminator xaTerminator;
private TransactionManager transactionManager;
+ private WorkManager workManager;
public void setXaTerminator(XATerminator xaTerminator) {
this.xaTerminator = xaTerminator;
@@ -119,6 +125,10 @@
public void setTransactionManager(TransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
+
+ public void setWorkManager(WorkManager workManager) {
+ this.workManager = workManager;
+ }
/**
* Global Transaction
@@ -129,10 +139,6 @@
throw new XATransactionException(XAException.XAER_PROTO,
DQPPlugin.Util.getString("TransactionServer.suspended_exist", xid));
//$NON-NLS-1$
}
- if (tc.shouldRollback()) {
- throw new XATransactionException(XAException.XAER_RMERR,
DQPPlugin.Util.getString("TransactionServer.rollback_set", xid)); //$NON-NLS-1$
- }
-
// In the container this pass though
if (singleTM) {
return XAResource.XA_RDONLY;
@@ -231,8 +237,29 @@
tc.setTransactionTimeout(timeout);
tc.setXid(xid);
tc.setTransactionType(TransactionContext.Scope.GLOBAL);
+ if (singleTM) {
+ tc.setTransaction(transactionManager.getTransaction());
+ assert tc.getTransaction() != null;
+ } else {
+ FutureWork<Transaction> work = new FutureWork<Transaction>(new
Callable<Transaction>() {
+ @Override
+ public Transaction call() throws Exception {
+ return transactionManager.getTransaction();
+ }
+ });
+ workManager.doWork(work, WorkManager.INDEFINITE, tc, null);
+ tc.setTransaction(work.getResult().get());
+ }
} catch (NotSupportedException e) {
- throw new XATransactionException(XAException.XAER_INVAL, e.getMessage());
+ throw new XATransactionException(e, XAException.XAER_INVAL);
+ } catch (WorkException e) {
+ throw new XATransactionException(e, XAException.XAER_INVAL);
+ } catch (InterruptedException e) {
+ throw new XATransactionException(e, XAException.XAER_INVAL);
+ } catch (ExecutionException e) {
+ throw new XATransactionException(e, XAException.XAER_INVAL);
+ } catch (SystemException e) {
+ throw new XATransactionException(e, XAException.XAER_INVAL);
}
break;
}
@@ -273,7 +300,7 @@
break;
}
case XAResource.TMFAIL: {
- tc.setRollbackOnly();
+ cancelTransactions(threadId, false);
break;
}
default:
@@ -417,12 +444,7 @@
*/
public void commit(String threadId) throws XATransactionException {
TransactionContext tc = checkLocalTransactionState(threadId, true);
-
- if (tc.shouldRollback()) {
- rollback(threadId);
- } else {
- commitDirect(tc);
- }
+ commitDirect(tc);
}
/**
@@ -460,8 +482,6 @@
return tc;
}
- Assertion.assertTrue(!tc.shouldRollback());
-
commitDirect(context);
return context;
}
@@ -483,27 +503,11 @@
return;
}
- if (tc.getTransactionType() == TransactionContext.Scope.GLOBAL) {
- tc.setRollbackOnly();
- return;
- }
-
try {
- try {
- transactionManager.resume(tc.getTransaction());
- transactionManager.setRollbackOnly();
- } catch (InvalidTransactionException e) {
- throw new XATransactionException(e);
- } catch (SystemException e) {
- throw new XATransactionException(e);
- }
- } finally {
- try {
- transactionManager.suspend();
- } catch (SystemException e) {
- throw new XATransactionException(e);
- }
- }
+ tc.getTransaction().setRollbackOnly();
+ } catch (SystemException e) {
+ throw new XATransactionException(e);
+ }
}
@Override
Modified: trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestOptimizer.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestOptimizer.java 2010-03-22
14:47:51 UTC (rev 1983)
+++
trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestOptimizer.java 2010-03-22
18:41:48 UTC (rev 1984)
@@ -758,7 +758,7 @@
@Test public void testQueryPhysical() {
ProcessorPlan plan = helpPlan("SELECT pm1.g1.e1, e2, pm1.g1.e3 as a, e4 as b FROM
pm1.g1", FakeMetadataFactory.example1Cached(), //$NON-NLS-1$
new String[] {"SELECT pm1.g1.e1, e2, pm1.g1.e3, e4 FROM pm1.g1"} );
//$NON-NLS-1$
-
+ assertTrue(!plan.requiresTransaction(true));
checkNodeTypes(plan, FULL_PUSHDOWN);
}
Modified:
trunk/engine/src/test/java/com/metamatrix/query/processor/proc/TestProcedureProcessor.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/query/processor/proc/TestProcedureProcessor.java 2010-03-22
14:47:51 UTC (rev 1983)
+++
trunk/engine/src/test/java/com/metamatrix/query/processor/proc/TestProcedureProcessor.java 2010-03-22
18:41:48 UTC (rev 1984)
@@ -2135,6 +2135,8 @@
ProcessorPlan plan = getProcedurePlan(userUpdateStr, metadata);
helpTestProcess(plan, 1, dataMgr);
+
+ assertTrue(plan.requiresTransaction(false));
}
@Test public void testUpdateAssignmentNotExecutedVirtual() throws Exception {
@@ -2600,6 +2602,8 @@
Arrays.asList( new Object[] { "Third", null, new Integer(51),
null} ) //$NON-NLS-1$
};
helpTestProcess(plan, expected, dataMgr);
+
+ assertTrue(!plan.requiresTransaction(false));
}
/**
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeTransactionService.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeTransactionService.java 2010-03-22
14:47:51 UTC (rev 1983)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeTransactionService.java 2010-03-22
18:41:48 UTC (rev 1984)
@@ -27,6 +27,7 @@
import org.teiid.dqp.internal.transaction.TransactionServerImpl;
+import com.metamatrix.common.queue.FakeWorkManager;
import com.metamatrix.core.util.SimpleMock;
public class FakeTransactionService extends TransactionServerImpl {
@@ -34,6 +35,7 @@
public FakeTransactionService() {
this.setTransactionManager(SimpleMock.createSimpleMock(TransactionManager.class));
this.setXaTerminator(SimpleMock.createSimpleMock(XATerminator.class));
+ this.setWorkManager(new FakeWorkManager());
}
}
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java 2010-03-22
14:47:51 UTC (rev 1983)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java 2010-03-22
18:41:48 UTC (rev 1984)
@@ -31,6 +31,7 @@
import org.teiid.client.RequestMessage;
import org.teiid.client.RequestMessage.StatementType;
import org.teiid.dqp.internal.datamgr.impl.ConnectorManagerRepository;
+import org.teiid.dqp.internal.datamgr.impl.FakeTransactionService;
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.api.exception.query.QueryParserException;
@@ -86,7 +87,7 @@
RequestMessage message = new RequestMessage();
DQPWorkContext workContext = FakeMetadataFactory.buildWorkContext(metadata,
FakeMetadataFactory.example1VDB());
- request.initialize(message, null, null,null,false, null, workContext, 101024,
repo, false);
+ request.initialize(message, null, null,new FakeTransactionService(),false, null,
workContext, 101024, repo, false);
request.initMetadata();
request.validateAccess(command);
}
@@ -143,7 +144,7 @@
Mockito.stub(repo.getConnectorManager(Mockito.anyString())).toReturn(new
AutoGenDataService());
request.initialize(message, Mockito.mock(BufferManager.class),
- new FakeDataManager(), null, false, null, workContext,
+ new FakeDataManager(), new FakeTransactionService(), false, null, workContext,
101024, repo, false);
request.processRequest();
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java 2010-03-22
14:47:51 UTC (rev 1983)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java 2010-03-22
18:41:48 UTC (rev 1984)
@@ -44,12 +44,6 @@
}
@Override
- public void run() {
- super.run();
- workCompleted(null);
- }
-
- @Override
protected boolean isDoneProcessing() {
return isDone;
}
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/transaction/TestTransactionServer.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/transaction/TestTransactionServer.java 2010-03-22
14:47:51 UTC (rev 1983)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/transaction/TestTransactionServer.java 2010-03-22
18:41:48 UTC (rev 1984)
@@ -35,12 +35,14 @@
import org.teiid.client.xa.XATransactionException;
import org.teiid.client.xa.XidImpl;
+import com.metamatrix.common.queue.FakeWorkManager;
import com.metamatrix.dqp.service.TransactionContext;
public class TestTransactionServer {
private TransactionServerImpl server;
private XATerminator xaTerminator;
private TransactionManager tm;
+ private javax.transaction.Transaction txn;
private static final String THREAD1 = "1"; //$NON-NLS-1$
private static final String THREAD2 = "2"; //$NON-NLS-1$
@@ -56,10 +58,11 @@
server = new TransactionServerImpl();
xaTerminator = Mockito.mock(XATerminator.class);
tm = Mockito.mock(TransactionManager.class);
- javax.transaction.Transaction txn =
Mockito.mock(javax.transaction.Transaction.class);
+ txn = Mockito.mock(javax.transaction.Transaction.class);
Mockito.stub(tm.getTransaction()).toReturn(txn);
server.setXaTerminator(xaTerminator);
server.setTransactionManager(tm);
+ server.setWorkManager(new FakeWorkManager());
}
/**
@@ -166,15 +169,6 @@
}
}
- @Test public void testLocalSetRollback() throws Exception {
- TransactionContext tc = server.begin(THREAD1);
- tc.setRollbackOnly();
-
- server.commit(THREAD1);
-
- Mockito.verify(tm).rollback();
- }
-
@Test public void testTwoPhaseCommit() throws Exception {
server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
server.end(THREAD1, XID1, XAResource.TMSUCCESS, false);
@@ -296,12 +290,7 @@
@Test public void testGlobalPrepareFail() throws Exception {
server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false);
server.end(THREAD1, XID1, XAResource.TMFAIL, false);
-
- try {
- server.prepare(THREAD1, XID1, false);
- fail("should have failed to prepare as end resulted in TMFAIL");
//$NON-NLS-1$
- } catch (XATransactionException e) {
- }
+ Mockito.verify(txn).setRollbackOnly();
}
@Test public void testGlobalOnePhaseCommit() throws Exception {
@@ -395,7 +384,7 @@
server.cancelTransactions(THREAD1, false);
- Mockito.verify(tm).setRollbackOnly();
+ Mockito.verify(txn).setRollbackOnly();
}
@Test public void testRequestCancel() throws Exception{
@@ -403,6 +392,6 @@
server.begin(tc);
server.cancelTransactions(THREAD1, true);
- Mockito.verify(tm).setRollbackOnly();
+ Mockito.verify(txn).setRollbackOnly();
}
}
Modified:
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
===================================================================
---
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2010-03-22
14:47:51 UTC (rev 1983)
+++
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2010-03-22
18:41:48 UTC (rev 1984)
@@ -213,6 +213,7 @@
public void setWorkManager(WorkManager mgr) {
this.dqpCore.setWorkManager(mgr);
+ this.transactionServerImpl.setWorkManager(mgr);
}
public void setSessionService(SessionService service) {