[teiid-commits] teiid SVN: r1987 - in trunk/engine/src: main/java/org/teiid/dqp/internal/transaction and 1 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Mon Mar 22 17:05:24 EDT 2010
Author: rareddy
Date: 2010-03-22 17:05:23 -0400 (Mon, 22 Mar 2010)
New Revision: 1987
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java
trunk/engine/src/test/java/org/teiid/dqp/internal/transaction/TestTransactionServer.java
Log:
TEIID-833, TEIID-862: Fixes related transactions.
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2010-03-22 21:01:44 UTC (rev 1986)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2010-03-22 21:05:23 UTC (rev 1987)
@@ -147,7 +147,7 @@
if (exception.getCause() instanceof MetaMatrixProcessingException) {
throw (MetaMatrixProcessingException)exception.getCause();
}
- throw new MetaMatrixComponentException(exception);
+ throw new MetaMatrixProcessingException(exception);
}
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-03-22 21:01:44 UTC (rev 1986)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-03-22 21:05:23 UTC (rev 1987)
@@ -76,7 +76,7 @@
private enum ProcessingState {NEW, PROCESSING, CLOSE}
private ProcessingState state = ProcessingState.NEW;
- private enum TransactionState {NONE, ACTIVE, END, DONE}
+ private enum TransactionState {NONE, ACTIVE, DONE}
private TransactionState transactionState = TransactionState.NONE;
/*
@@ -166,10 +166,6 @@
protected void process() {
LogManager.logDetail(LogConstants.CTX_DQP, "Request Thread", requestID, "with state", state); //$NON-NLS-1$ //$NON-NLS-2$
try {
- if (this.transactionState == TransactionState.ACTIVE && this.transactionContext.getTransaction() != null) {
- //there's no need to do this for xa transactions, as that is done by the workmanager
- this.transactionService.resume(this.transactionContext);
- }
if (this.state == ProcessingState.NEW) {
state = ProcessingState.PROCESSING;
processNew();
@@ -178,6 +174,9 @@
state = ProcessingState.CLOSE;
}
}
+
+ resume();
+
if (this.state == ProcessingState.PROCESSING) {
processMore();
if (this.closeRequested) {
@@ -226,16 +225,24 @@
}
sendError();
}
- if (this.transactionState == TransactionState.ACTIVE && this.transactionContext.getTransaction() != null) {
- try {
- this.transactionService.suspend(this.transactionContext);
- } catch (XATransactionException e) {
- LogManager.logDetail(LogConstants.CTX_DQP, e, "Error suspending active transaction"); //$NON-NLS-1$
- }
- }
}
}
+ private void resume() throws XATransactionException {
+ if (this.transactionState == TransactionState.ACTIVE && this.transactionContext.getTransaction() != null) {
+ //there's no need to do this for xa transactions, as that is done by the workmanager
+ this.transactionService.resume(this.transactionContext);
+ }
+ }
+
+ private void suspend() {
+ try {
+ this.transactionService.suspend(this.transactionContext);
+ } catch (XATransactionException e) {
+ LogManager.logDetail(LogConstants.CTX_DQP, e, "Error suspending active transaction"); //$NON-NLS-1$
+ }
+ }
+
protected void processMore() throws BlockedException, MetaMatrixCoreException {
if (this.processor != null) {
this.processor.getContext().setTimeSliceEnd(System.currentTimeMillis() + this.processorTimeslice);
@@ -247,7 +254,6 @@
}
if (doneProducingBatches) {
if (this.transactionState == TransactionState.ACTIVE) {
- boolean endState = true;
/*
* TEIID-14 if we are done producing batches, then proactively close transactional
* executions even ones that were intentionally kept alive. this may
@@ -257,17 +263,15 @@
for (DataTierTupleSource connectorRequest : this.connectorInfo.values()) {
if (connectorRequest.isTransactional()) {
connectorRequest.fullyCloseSource();
- endState = false;
}
}
- if (endState) {
- this.transactionState = TransactionState.END;
+ this.transactionState = TransactionState.DONE;
+ if (transactionContext.getTransactionType() == TransactionContext.Scope.REQUEST) {
+ this.transactionService.commit(transactionContext);
+ } else {
+ suspend();
}
}
- if (this.transactionState == TransactionState.END && transactionContext.getTransactionType() == TransactionContext.Scope.REQUEST) {
- this.transactionService.commit(transactionContext);
- this.transactionState = TransactionState.DONE;
- }
sendResultsIfNeeded(null);
} else {
moreWork(false); // If the timeslice expired, then the processor can probably produce more batches.
@@ -317,20 +321,17 @@
}
}
- if (this.transactionState == TransactionState.ACTIVE) {
- if (!this.connectorInfo.isEmpty()) {
- return; //wait for pending connector work
+ if (this.transactionState == TransactionState.ACTIVE) {
+ this.transactionState = TransactionState.DONE;
+ if (transactionContext.getTransactionType() == TransactionContext.Scope.REQUEST) {
+ try {
+ this.transactionService.rollback(transactionContext);
+ } catch (XATransactionException e1) {
+ LogManager.logWarning(LogConstants.CTX_DQP, e1, DQPPlugin.Util.getString("ProcessWorker.failed_rollback")); //$NON-NLS-1$
+ }
+ } else {
+ suspend();
}
- this.transactionState = TransactionState.END;
- }
-
- if (this.transactionState == TransactionState.END && transactionContext.getTransactionType() == TransactionContext.Scope.REQUEST) {
- this.transactionState = TransactionState.DONE;
- try {
- this.transactionService.rollback(transactionContext);
- } catch (XATransactionException e1) {
- LogManager.logWarning(LogConstants.CTX_DQP, e1, DQPPlugin.Util.getString("ProcessWorker.failed_rollback")); //$NON-NLS-1$
- }
}
isClosed = true;
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java 2010-03-22 21:01:44 UTC (rev 1986)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java 2010-03-22 21:05:23 UTC (rev 1987)
@@ -354,19 +354,22 @@
if (!transactionExpected) {
throw new InvalidTransactionException(DQPPlugin.Util.getString("TransactionServer.existing_transaction")); //$NON-NLS-1$
}
+ transactionManager.resume(tc.getTransaction());
} else if (transactionExpected) {
throw new InvalidTransactionException(DQPPlugin.Util.getString("TransactionServer.no_transaction", threadId)); //$NON-NLS-1$
}
} catch (InvalidTransactionException e) {
throw new XATransactionException(e);
- }
+ } catch (SystemException e) {
+ throw new XATransactionException(e);
+ }
return tc;
}
private void beginDirect(TransactionContext tc) throws XATransactionException {
try {
transactionManager.begin();
- Transaction tx = transactionManager.getTransaction();
+ Transaction tx = transactionManager.suspend();
tc.setTransaction(tx);
tc.setCreationTime(System.currentTimeMillis());
} catch (javax.transaction.NotSupportedException err) {
@@ -403,9 +406,9 @@
throw new XATransactionException(e);
} catch (SystemException e) {
throw new XATransactionException(e);
- } finally {
- transactions.removeTransactionContext(tc);
- }
+ } finally {
+ transactions.removeTransactionContext(tc);
+ }
}
public void suspend(TransactionContext context) throws XATransactionException {
@@ -473,12 +476,6 @@
*/
public TransactionContext commit(TransactionContext context) throws XATransactionException {
Assertion.assertTrue(context.getTransactionType() == TransactionContext.Scope.REQUEST);
-
- TransactionContext tc = transactions.getTransactionContext(context.getThreadId());
- if (tc == null || tc.getTransactionType() == TransactionContext.Scope.NONE) {
- return tc;
- }
-
commitDirect(context);
return context;
}
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/transaction/TestTransactionServer.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/transaction/TestTransactionServer.java 2010-03-22 21:01:44 UTC (rev 1986)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/transaction/TestTransactionServer.java 2010-03-22 21:05:23 UTC (rev 1987)
@@ -60,6 +60,7 @@
tm = Mockito.mock(TransactionManager.class);
txn = Mockito.mock(javax.transaction.Transaction.class);
Mockito.stub(tm.getTransaction()).toReturn(txn);
+ Mockito.stub(tm.suspend()).toReturn(txn);
server.setXaTerminator(xaTerminator);
server.setTransactionManager(tm);
server.setWorkManager(new FakeWorkManager());
More information about the teiid-commits
mailing list