[teiid-commits] teiid SVN: r1987 - in trunk/engine/src: main/java/org/teiid/dqp/internal/transaction and 1 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Mon Mar 22 17:05:24 EDT 2010


Author: rareddy
Date: 2010-03-22 17:05:23 -0400 (Mon, 22 Mar 2010)
New Revision: 1987

Modified:
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/transaction/TestTransactionServer.java
Log:
TEIID-833, TEIID-862: Fixes related transactions.

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2010-03-22 21:01:44 UTC (rev 1986)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2010-03-22 21:05:23 UTC (rev 1987)
@@ -147,7 +147,7 @@
     		if (exception.getCause() instanceof MetaMatrixProcessingException) {
     			throw (MetaMatrixProcessingException)exception.getCause();
     		}
-    		throw new MetaMatrixComponentException(exception);
+    		throw new MetaMatrixProcessingException(exception);
 		}	
 	}
 

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2010-03-22 21:01:44 UTC (rev 1986)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2010-03-22 21:05:23 UTC (rev 1987)
@@ -76,7 +76,7 @@
 	private enum ProcessingState {NEW, PROCESSING, CLOSE}
 	private ProcessingState state = ProcessingState.NEW;
     
-	private enum TransactionState {NONE, ACTIVE, END, DONE}
+	private enum TransactionState {NONE, ACTIVE, DONE}
 	private TransactionState transactionState = TransactionState.NONE;
 	
 	/*
@@ -166,10 +166,6 @@
 	protected void process() {
         LogManager.logDetail(LogConstants.CTX_DQP, "Request Thread", requestID, "with state", state); //$NON-NLS-1$ //$NON-NLS-2$
         try {
-        	if (this.transactionState == TransactionState.ACTIVE && this.transactionContext.getTransaction() != null) {
-        		//there's no need to do this for xa transactions, as that is done by the workmanager
-        		this.transactionService.resume(this.transactionContext);
-            }
             if (this.state == ProcessingState.NEW) {
                 state = ProcessingState.PROCESSING;
         		processNew();
@@ -178,6 +174,9 @@
                     state = ProcessingState.CLOSE;
                 } 
         	}
+        	
+            resume();
+        	
             if (this.state == ProcessingState.PROCESSING) {
             	processMore();
             	if (this.closeRequested) {
@@ -226,16 +225,24 @@
         		}
         		sendError();
         	} 
-        	if (this.transactionState == TransactionState.ACTIVE && this.transactionContext.getTransaction() != null) {
-        		try {
-					this.transactionService.suspend(this.transactionContext);
-				} catch (XATransactionException e) {
-					LogManager.logDetail(LogConstants.CTX_DQP, e, "Error suspending active transaction"); //$NON-NLS-1$
-				}
-            }
         }
     }
 
+	private void resume() throws XATransactionException {
+		if (this.transactionState == TransactionState.ACTIVE && this.transactionContext.getTransaction() != null) {
+			//there's no need to do this for xa transactions, as that is done by the workmanager
+			this.transactionService.resume(this.transactionContext);
+		}
+	}
+
+	private void suspend() {
+		try {
+			this.transactionService.suspend(this.transactionContext);
+		} catch (XATransactionException e) {
+			LogManager.logDetail(LogConstants.CTX_DQP, e, "Error suspending active transaction"); //$NON-NLS-1$
+		}
+	}
+
 	protected void processMore() throws BlockedException, MetaMatrixCoreException {
 		if (this.processor != null) {
 			this.processor.getContext().setTimeSliceEnd(System.currentTimeMillis() + this.processorTimeslice);
@@ -247,7 +254,6 @@
 		}
 		if (doneProducingBatches) {
 			if (this.transactionState == TransactionState.ACTIVE) {
-				boolean endState = true;
 				/*
 				 * TEIID-14 if we are done producing batches, then proactively close transactional 
 				 * executions even ones that were intentionally kept alive. this may 
@@ -257,17 +263,15 @@
 	        	for (DataTierTupleSource connectorRequest : this.connectorInfo.values()) {
 	        		if (connectorRequest.isTransactional()) {
 	        			connectorRequest.fullyCloseSource();
-	        			endState = false;
 	        		}
 	            }
-				if (endState) {
-					this.transactionState = TransactionState.END;
+				this.transactionState = TransactionState.DONE;
+				if (transactionContext.getTransactionType() == TransactionContext.Scope.REQUEST) {
+					this.transactionService.commit(transactionContext);
+				} else {
+					suspend();
 				}
 			}
-			if (this.transactionState == TransactionState.END && transactionContext.getTransactionType() == TransactionContext.Scope.REQUEST) {
-				this.transactionService.commit(transactionContext);
-				this.transactionState = TransactionState.DONE;
-			}
 			sendResultsIfNeeded(null);
 		} else {
 			moreWork(false); // If the timeslice expired, then the processor can probably produce more batches.
@@ -317,20 +321,17 @@
 			}
 		}
 
-		if (this.transactionState == TransactionState.ACTIVE) {
-			if (!this.connectorInfo.isEmpty()) {
-				return; //wait for pending connector work
+		if (this.transactionState == TransactionState.ACTIVE) { 
+			this.transactionState = TransactionState.DONE;
+            if (transactionContext.getTransactionType() == TransactionContext.Scope.REQUEST) {
+				try {
+	        		this.transactionService.rollback(transactionContext);
+	            } catch (XATransactionException e1) {
+	                LogManager.logWarning(LogConstants.CTX_DQP, e1, DQPPlugin.Util.getString("ProcessWorker.failed_rollback")); //$NON-NLS-1$           
+	            } 
+			} else {
+	        	suspend();
 			}
-			this.transactionState = TransactionState.END;
-		} 
-		
-		if (this.transactionState == TransactionState.END && transactionContext.getTransactionType() == TransactionContext.Scope.REQUEST) {
-			this.transactionState = TransactionState.DONE;
-            try {
-        		this.transactionService.rollback(transactionContext);
-            } catch (XATransactionException e1) {
-                LogManager.logWarning(LogConstants.CTX_DQP, e1, DQPPlugin.Util.getString("ProcessWorker.failed_rollback")); //$NON-NLS-1$           
-            } 
 		}
 		
 		isClosed = true;

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java	2010-03-22 21:01:44 UTC (rev 1986)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java	2010-03-22 21:05:23 UTC (rev 1987)
@@ -354,19 +354,22 @@
 	            if (!transactionExpected) {
 	            	throw new InvalidTransactionException(DQPPlugin.Util.getString("TransactionServer.existing_transaction")); //$NON-NLS-1$
 	            }
+	            transactionManager.resume(tc.getTransaction());
 	        } else if (transactionExpected) {
 	        	throw new InvalidTransactionException(DQPPlugin.Util.getString("TransactionServer.no_transaction", threadId)); //$NON-NLS-1$
 	        }
         } catch (InvalidTransactionException e) {
         	throw new XATransactionException(e);
-        }
+		} catch (SystemException e) {
+        	throw new XATransactionException(e);
+		}
         return tc;
     }
     
     private void beginDirect(TransactionContext tc) throws XATransactionException {
 		try {
 			transactionManager.begin();
-			Transaction tx = transactionManager.getTransaction();
+			Transaction tx = transactionManager.suspend();
 			tc.setTransaction(tx);
 			tc.setCreationTime(System.currentTimeMillis());
         } catch (javax.transaction.NotSupportedException err) {
@@ -403,9 +406,9 @@
 			throw new XATransactionException(e);
 		} catch (SystemException e) {
 			throw new XATransactionException(e);
-        } finally {
-			transactions.removeTransactionContext(tc);
-		}
+		} finally {
+            transactions.removeTransactionContext(tc);
+        }
 	}
 	
 	public void suspend(TransactionContext context) throws XATransactionException {
@@ -473,12 +476,6 @@
      */    
     public TransactionContext commit(TransactionContext context) throws XATransactionException {
         Assertion.assertTrue(context.getTransactionType() == TransactionContext.Scope.REQUEST);
-        
-        TransactionContext tc = transactions.getTransactionContext(context.getThreadId());
-        if (tc == null || tc.getTransactionType() == TransactionContext.Scope.NONE) {
-            return tc;
-        }
-        
         commitDirect(context);
         return context;
     }

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/transaction/TestTransactionServer.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/transaction/TestTransactionServer.java	2010-03-22 21:01:44 UTC (rev 1986)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/transaction/TestTransactionServer.java	2010-03-22 21:05:23 UTC (rev 1987)
@@ -60,6 +60,7 @@
         tm = Mockito.mock(TransactionManager.class);
         txn = Mockito.mock(javax.transaction.Transaction.class);
         Mockito.stub(tm.getTransaction()).toReturn(txn);
+        Mockito.stub(tm.suspend()).toReturn(txn);
         server.setXaTerminator(xaTerminator);
         server.setTransactionManager(tm);
         server.setWorkManager(new FakeWorkManager());



More information about the teiid-commits mailing list