[teiid-commits] teiid SVN: r4435 - in trunk: engine/src/main/java/org/teiid/query/processor/proc and 6 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Thu Sep 13 14:12:44 EDT 2012


Author: shawkins
Date: 2012-09-13 14:12:44 -0400 (Thu, 13 Sep 2012)
New Revision: 4435

Added:
   trunk/test-integration/common/src/test/java/org/teiid/arquillian/IntegrationTestTransactions.java
   trunk/test-integration/common/src/test/resources/txn-vdb.xml
Modified:
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java
   trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java
   trunk/engine/src/test/java/org/teiid/query/processor/proc/TestProcedureProcessor.java
   trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java
   trunk/runtime/src/test/java/org/teiid/runtime/TestEmbeddedServer.java
   trunk/test-integration/common/src/test/java/org/teiid/arquillian/IntegrationTestDynamicViewDefinition.java
Log:
TEIID-2205 fix for procedure/block/local connection txn issues

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java	2012-09-13 11:16:46 UTC (rev 4434)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java	2012-09-13 18:12:44 UTC (rev 4435)
@@ -254,7 +254,6 @@
         this.context.setRequestId(this.requestId);
         this.context.setDQPWorkContext(this.workContext);
         this.context.setTransactionService(this.transactionService);
-        this.context.setTransactionContext(this.transactionContext);
         this.context.setVDBClassLoader(workContext.getVDB().getAttachment(ClassLoader.class));
     }
     
@@ -366,6 +365,7 @@
         
         tc.setIsolationLevel(requestMsg.getTransactionIsolation());
         this.transactionContext = tc;
+        this.context.setTransactionContext(tc);
         this.processor = new QueryProcessor(processPlan, context, bufferManager, processorDataManager);
     	this.processor.setContinuous(this.requestMsg.getRequestOptions().isContinuous());
     }

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2012-09-13 11:16:46 UTC (rev 4434)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2012-09-13 18:12:44 UTC (rev 4435)
@@ -356,13 +356,13 @@
 	}
 
 	private void resume() throws XATransactionException {
-		if (this.transactionState == TransactionState.ACTIVE && isSuspendable()) {
+		if (this.transactionState == TransactionState.ACTIVE) {
 			this.transactionService.resume(this.transactionContext);
 		}
 	}
 
 	private boolean isSuspendable() {
-		return !this.useCallingThread && this.transactionContext.getTransaction() != null;
+		return this.transactionContext.getTransaction() != null && !(this.useCallingThread & this.transactionContext.getTransactionType() == Scope.GLOBAL);
 	}
 
 	private void suspend() {

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java	2012-09-13 11:16:46 UTC (rev 4434)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java	2012-09-13 18:12:44 UTC (rev 4435)
@@ -59,7 +59,10 @@
 import org.teiid.dqp.service.TransactionContext.Scope;
 import org.teiid.query.QueryPlugin;
 
-
+/**
+ * Note that the begin methods do not leave the transaction associated with the
+ * calling thread.  This is by design and requires explicit resumes for association.
+ */
 public class TransactionServerImpl implements TransactionService {
 
     protected static class TransactionMapping {
@@ -423,11 +426,20 @@
 	
 	public void resume(TransactionContext context) throws XATransactionException {
 		try {
+			//if we're already associated, just return
+			if (this.transactionManager.getTransaction() == context.getTransaction()) {
+				return;
+			}
+		} catch (SystemException e) {
+		}
+		try {
 			this.transactionManager.resume(context.getTransaction());
+		} catch (IllegalStateException e) {
+			throw new XATransactionException(QueryPlugin.Event.TEIID30538, e);
 		} catch (InvalidTransactionException e) {
-			 throw new XATransactionException(QueryPlugin.Event.TEIID30538, e);
+			throw new XATransactionException(QueryPlugin.Event.TEIID30538, e);
 		} catch (SystemException e) {
-			 throw new XATransactionException(QueryPlugin.Event.TEIID30538, e);
+			throw new XATransactionException(QueryPlugin.Event.TEIID30538, e);
 		}
 	}
 

Modified: trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java	2012-09-13 11:16:46 UTC (rev 4434)
+++ trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java	2012-09-13 18:12:44 UTC (rev 4435)
@@ -134,7 +134,6 @@
     private boolean requiresTransaction = true;
     
     private TransactionContext blockContext;
-    private boolean inTxn;
     /**
      * Resources cannot be held open across the txn boundary.  This list is a hack at ensuring the resources are closed.
      */
@@ -208,7 +207,6 @@
         beginBatch = 1;
         batchRows = null;
         lastBatch = false;
-        inTxn = false;
         //reset program stack
         programs.clear();
 		LogManager.logTrace(org.teiid.logging.LogConstants.CTX_DQP, "ProcedurePlan reset"); //$NON-NLS-1$
@@ -270,15 +268,13 @@
 	@Override
 	public TupleBatch nextBatch() throws BlockedException,
 			TeiidComponentException, TeiidProcessingException {
-		if (blockContext != null && !this.inTxn) {
+		if (blockContext != null) {
 			this.getContext().getTransactionServer().resume(blockContext);
-			this.inTxn = true;
 		} 
 		try {
 			return nextBatchDirect();
 		} finally {
 			if (blockContext != null) {
-				this.inTxn = false;
 				this.getContext().getTransactionServer().suspend(blockContext);
 			}
 		}
@@ -580,9 +576,7 @@
     		TransactionContext tc = this.blockContext;
     		this.blockContext = null;
     		try {
-	    		if (!inTxn) {
-	    			this.getContext().getTransactionServer().resume(tc);
-	    		}
+    			this.getContext().getTransactionServer().resume(tc);
 	    		for (WeakReference<DataTierTupleSource> ref : txnTupleSources) {
 	    			DataTierTupleSource dtts = ref.get();
 	    			if (dtts != null) {
@@ -616,9 +610,8 @@
         	if (tc != null && tc.getTransactionType() == Scope.NONE) {
         		//start a transaction
         		this.getContext().getTransactionServer().begin(tc);
-        		this.inTxn = true;
         		this.blockContext = tc;
-        		this.peek().setStartedTxn(true);
+        		program.setStartedTxn(true);
         	}
         }
     }

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java	2012-09-13 11:16:46 UTC (rev 4434)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java	2012-09-13 18:12:44 UTC (rev 4435)
@@ -115,6 +115,7 @@
         
         Request request = helpProcessMessage(message, null, workContext);
         assertEquals("1", request.context.getConnectionId()); //$NON-NLS-1$
+        assertNotNull(request.context.getTransactionContext());
     }
 
     private Request helpProcessMessage(RequestMessage message, SessionAwareCache<PreparedPlan> cache, DQPWorkContext workContext) throws TeiidComponentException,

Modified: trunk/engine/src/test/java/org/teiid/query/processor/proc/TestProcedureProcessor.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/proc/TestProcedureProcessor.java	2012-09-13 11:16:46 UTC (rev 4434)
+++ trunk/engine/src/test/java/org/teiid/query/processor/proc/TestProcedureProcessor.java	2012-09-13 18:12:44 UTC (rev 4435)
@@ -2001,7 +2001,7 @@
         	
         }
     	Mockito.verify(ts).begin(tc);
-    	Mockito.verify(ts).resume(tc);
+    	Mockito.verify(ts, Mockito.times(2)).resume(tc);
     	Mockito.verify(ts, Mockito.times(0)).commit(tc);
     	Mockito.verify(ts).rollback(tc);
     }

Modified: trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java	2012-09-13 11:16:46 UTC (rev 4434)
+++ trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java	2012-09-13 18:12:44 UTC (rev 4435)
@@ -48,8 +48,8 @@
 import org.teiid.client.security.ILogon;
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.TupleBufferCache;
+import org.teiid.core.TeiidRuntimeException;
 import org.teiid.core.BundleUtil.Event;
-import org.teiid.core.TeiidRuntimeException;
 import org.teiid.deployers.CompositeVDB;
 import org.teiid.deployers.UDFMetaData;
 import org.teiid.deployers.VDBLifeCycleListener;
@@ -166,7 +166,7 @@
 							}
 						});
 						tc.setTransaction(tx);
-						tc.setTransactionType(Scope.LOCAL);
+						tc.setTransactionType(Scope.GLOBAL);
 					}
 				} catch (SystemException e) {
 				} catch (IllegalStateException e) {

Modified: trunk/runtime/src/test/java/org/teiid/runtime/TestEmbeddedServer.java
===================================================================
--- trunk/runtime/src/test/java/org/teiid/runtime/TestEmbeddedServer.java	2012-09-13 11:16:46 UTC (rev 4434)
+++ trunk/runtime/src/test/java/org/teiid/runtime/TestEmbeddedServer.java	2012-09-13 18:12:44 UTC (rev 4435)
@@ -27,12 +27,24 @@
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.NotSupportedException;
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.teiid.adminapi.Model.Type;
 import org.teiid.adminapi.impl.ModelMetaData;
 import org.teiid.deployers.VirtualDatabaseException;
@@ -54,6 +66,85 @@
 
 @SuppressWarnings("nls")
 public class TestEmbeddedServer {
+	private final class MockTransactionManager implements TransactionManager {
+		ThreadLocal<Transaction> txns = new ThreadLocal<Transaction>();
+		List<Transaction> txnHistory = new ArrayList<Transaction>();
+
+		@Override
+		public Transaction suspend() throws SystemException {
+			Transaction result = txns.get();
+			txns.remove();
+			return result;
+		}
+
+		@Override
+		public void setTransactionTimeout(int seconds) throws SystemException {
+		}
+
+		@Override
+		public void setRollbackOnly() throws IllegalStateException, SystemException {
+			Transaction result = txns.get();
+			if (result == null) {
+				throw new IllegalStateException();
+			}
+			result.setRollbackOnly();
+		}
+
+		@Override
+		public void rollback() throws IllegalStateException, SecurityException,
+				SystemException {
+			Transaction t = checkNull(false);
+			txns.remove();
+			t.rollback();
+		}
+
+		@Override
+		public void resume(Transaction tobj) throws InvalidTransactionException,
+				IllegalStateException, SystemException {
+			checkNull(true);
+			txns.set(tobj);
+		}
+
+		private Transaction checkNull(boolean isNull) {
+			Transaction t = txns.get();
+			if ((!isNull && t == null) || (isNull && t != null)) {
+				throw new IllegalStateException();
+			}
+			return t;
+		}
+
+		@Override
+		public Transaction getTransaction() throws SystemException {
+			return txns.get();
+		}
+
+		@Override
+		public int getStatus() throws SystemException {
+			Transaction t = txns.get();
+			if (t == null) {
+				return Status.STATUS_NO_TRANSACTION;
+			}
+			return t.getStatus();
+		}
+
+		@Override
+		public void commit() throws RollbackException, HeuristicMixedException,
+				HeuristicRollbackException, SecurityException,
+				IllegalStateException, SystemException {
+			Transaction t = checkNull(false);
+			txns.remove();
+			t.commit();
+		}
+
+		@Override
+		public void begin() throws NotSupportedException, SystemException {
+			checkNull(true);
+			Transaction t = Mockito.mock(Transaction.class);
+			txnHistory.add(t);
+			txns.set(t);
+		}
+	}
+
 	EmbeddedServer es;
 	
 	@Before public void setup() {
@@ -227,5 +318,48 @@
 			
 		}
 	}
+	
+	@Test public void testTransactions() throws Exception {
+		EmbeddedConfiguration ec = new EmbeddedConfiguration();
+		MockTransactionManager tm = new MockTransactionManager();
+		ec.setTransactionManager(tm);
+		ec.setUseDisk(false);
+		es.start(ec);
+		
+		ModelMetaData mmd1 = new ModelMetaData();
+		mmd1.setName("b");
+		mmd1.setModelType(Type.VIRTUAL);
+		mmd1.setSchemaSourceType("ddl");
+		mmd1.setSchemaText("create view v as select 1; " +
+				"create virtual procedure proc () options (updatecount 2) as begin select * from v; end; " +
+				"create virtual procedure proc1 () as begin atomic select * from v; end; ");
 
+		es.deployVDB("test", mmd1);
+		
+		TeiidDriver td = es.getDriver();
+		Connection c = td.connect("jdbc:teiid:test", null);
+		//local txn
+		c.setAutoCommit(false);
+		Statement s = c.createStatement();
+		s.execute("select 1");
+		c.setAutoCommit(true);
+		assertEquals(1, tm.txnHistory.size());
+		Transaction txn = tm.txnHistory.remove(0);
+		Mockito.verify(txn).commit();
+		
+		//should be an auto-commit txn (could also force with autoCommitTxn=true)
+		s.execute("call proc ()");
+		
+		assertEquals(1, tm.txnHistory.size());
+		txn = tm.txnHistory.remove(0);
+		Mockito.verify(txn).commit();
+		
+		//block txn
+		s.execute("call proc1()");
+		
+		assertEquals(1, tm.txnHistory.size());
+		txn = tm.txnHistory.remove(0);
+		Mockito.verify(txn).commit();
+	}
+
 }

Modified: trunk/test-integration/common/src/test/java/org/teiid/arquillian/IntegrationTestDynamicViewDefinition.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/arquillian/IntegrationTestDynamicViewDefinition.java	2012-09-13 11:16:46 UTC (rev 4434)
+++ trunk/test-integration/common/src/test/java/org/teiid/arquillian/IntegrationTestDynamicViewDefinition.java	2012-09-13 18:12:44 UTC (rev 4435)
@@ -73,6 +73,7 @@
 		this.internalConnection =  TeiidDriver.getInstance().connect("jdbc:teiid:dynamic at mm://localhost:31000;user=user;password=user", null);
 		
 		execute("SELECT * FROM Sys.Columns WHERE tablename='stock'"); //$NON-NLS-1$
+		assertRowCount(2);
     }
 
 }

Added: trunk/test-integration/common/src/test/java/org/teiid/arquillian/IntegrationTestTransactions.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/arquillian/IntegrationTestTransactions.java	                        (rev 0)
+++ trunk/test-integration/common/src/test/java/org/teiid/arquillian/IntegrationTestTransactions.java	2012-09-13 18:12:44 UTC (rev 4435)
@@ -0,0 +1,79 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.arquillian;
+
+import static org.junit.Assert.*;
+
+import java.io.FileInputStream;
+
+import org.jboss.arquillian.junit.Arquillian;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.teiid.adminapi.Admin;
+import org.teiid.adminapi.AdminException;
+import org.teiid.adminapi.AdminFactory;
+import org.teiid.core.util.UnitTestUtil;
+import org.teiid.jdbc.AbstractMMQueryTestCase;
+import org.teiid.jdbc.TeiidDriver;
+
+ at RunWith(Arquillian.class)
+ at SuppressWarnings("nls")
+public class IntegrationTestTransactions extends AbstractMMQueryTestCase {
+
+	private Admin admin;
+	
+	@Before
+	public void setup() throws Exception {
+		admin = AdminFactory.getInstance().createAdmin("localhost", 9999,	"admin", "admin".toCharArray());
+	}
+	
+	@After
+	public void teardown() throws AdminException {
+		AdminUtil.cleanUp(admin);
+		admin.close();
+	}
+
+	@Test
+    public void testViewDefinition() throws Exception {
+				
+		admin.deploy("txn-vdb.xml",new FileInputStream(UnitTestUtil.getTestDataFile("txn-vdb.xml")));
+		
+		assertTrue(AdminUtil.waitForVDBLoad(admin, "txn", 1, 30));
+		
+		this.internalConnection =  TeiidDriver.getInstance().connect("jdbc:teiid:txn at mm://localhost:31000;user=user;password=user", null);
+		
+		execute("create local temporary table temp (x integer)"); //$NON-NLS-1$
+		execute("call proc()");
+		execute("start transaction"); //$NON-NLS-1$
+		execute("call proc()");
+		execute("insert into temp (x) values (1)"); //$NON-NLS-1$
+		execute("select * from temp");
+		assertRowCount(1);
+		execute("rollback");
+		execute("select * from temp");
+		assertRowCount(0);
+    }
+
+}


Property changes on: trunk/test-integration/common/src/test/java/org/teiid/arquillian/IntegrationTestTransactions.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Added: trunk/test-integration/common/src/test/resources/txn-vdb.xml
===================================================================
--- trunk/test-integration/common/src/test/resources/txn-vdb.xml	                        (rev 0)
+++ trunk/test-integration/common/src/test/resources/txn-vdb.xml	2012-09-13 18:12:44 UTC (rev 4435)
@@ -0,0 +1,8 @@
+<vdb name="txn" version= "1">
+    <model visible = "true" type = "VIRTUAL" name = "x">
+         <metadata type = "DDL"><![CDATA[
+              CREATE VIRTUAL PROCEDURE PROC() AS BEGIN ATOMIC declare integer x = 1; END;
+         ]]>
+         </metadata>
+    </model>
+</vdb>
\ No newline at end of file


Property changes on: trunk/test-integration/common/src/test/resources/txn-vdb.xml
___________________________________________________________________
Added: svn:mime-type
   + text/plain



More information about the teiid-commits mailing list