Author: shawkins
Date: 2012-09-13 14:12:44 -0400 (Thu, 13 Sep 2012)
New Revision: 4435
Added:
trunk/test-integration/common/src/test/java/org/teiid/arquillian/IntegrationTestTransactions.java
trunk/test-integration/common/src/test/resources/txn-vdb.xml
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java
trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java
trunk/engine/src/test/java/org/teiid/query/processor/proc/TestProcedureProcessor.java
trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java
trunk/runtime/src/test/java/org/teiid/runtime/TestEmbeddedServer.java
trunk/test-integration/common/src/test/java/org/teiid/arquillian/IntegrationTestDynamicViewDefinition.java
Log:
TEIID-2205 fix for procedure/block/local connection txn issues
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java 2012-09-13
11:16:46 UTC (rev 4434)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java 2012-09-13
18:12:44 UTC (rev 4435)
@@ -254,7 +254,6 @@
this.context.setRequestId(this.requestId);
this.context.setDQPWorkContext(this.workContext);
this.context.setTransactionService(this.transactionService);
- this.context.setTransactionContext(this.transactionContext);
this.context.setVDBClassLoader(workContext.getVDB().getAttachment(ClassLoader.class));
}
@@ -366,6 +365,7 @@
tc.setIsolationLevel(requestMsg.getTransactionIsolation());
this.transactionContext = tc;
+ this.context.setTransactionContext(tc);
this.processor = new QueryProcessor(processPlan, context, bufferManager,
processorDataManager);
this.processor.setContinuous(this.requestMsg.getRequestOptions().isContinuous());
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2012-09-13
11:16:46 UTC (rev 4434)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2012-09-13
18:12:44 UTC (rev 4435)
@@ -356,13 +356,13 @@
}
private void resume() throws XATransactionException {
- if (this.transactionState == TransactionState.ACTIVE && isSuspendable()) {
+ if (this.transactionState == TransactionState.ACTIVE) {
this.transactionService.resume(this.transactionContext);
}
}
private boolean isSuspendable() {
- return !this.useCallingThread && this.transactionContext.getTransaction() !=
null;
+ return this.transactionContext.getTransaction() != null &&
!(this.useCallingThread & this.transactionContext.getTransactionType() ==
Scope.GLOBAL);
}
private void suspend() {
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java 2012-09-13
11:16:46 UTC (rev 4434)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java 2012-09-13
18:12:44 UTC (rev 4435)
@@ -59,7 +59,10 @@
import org.teiid.dqp.service.TransactionContext.Scope;
import org.teiid.query.QueryPlugin;
-
+/**
+ * Note that the begin methods do not leave the transaction associated with the
+ * calling thread. This is by design and requires explicit resumes for association.
+ */
public class TransactionServerImpl implements TransactionService {
protected static class TransactionMapping {
@@ -423,11 +426,20 @@
public void resume(TransactionContext context) throws XATransactionException {
try {
+ //if we're already associated, just return
+ if (this.transactionManager.getTransaction() == context.getTransaction()) {
+ return;
+ }
+ } catch (SystemException e) {
+ }
+ try {
this.transactionManager.resume(context.getTransaction());
+ } catch (IllegalStateException e) {
+ throw new XATransactionException(QueryPlugin.Event.TEIID30538, e);
} catch (InvalidTransactionException e) {
- throw new XATransactionException(QueryPlugin.Event.TEIID30538, e);
+ throw new XATransactionException(QueryPlugin.Event.TEIID30538, e);
} catch (SystemException e) {
- throw new XATransactionException(QueryPlugin.Event.TEIID30538, e);
+ throw new XATransactionException(QueryPlugin.Event.TEIID30538, e);
}
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java 2012-09-13
11:16:46 UTC (rev 4434)
+++
trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java 2012-09-13
18:12:44 UTC (rev 4435)
@@ -134,7 +134,6 @@
private boolean requiresTransaction = true;
private TransactionContext blockContext;
- private boolean inTxn;
/**
* Resources cannot be held open across the txn boundary. This list is a hack at
ensuring the resources are closed.
*/
@@ -208,7 +207,6 @@
beginBatch = 1;
batchRows = null;
lastBatch = false;
- inTxn = false;
//reset program stack
programs.clear();
LogManager.logTrace(org.teiid.logging.LogConstants.CTX_DQP, "ProcedurePlan
reset"); //$NON-NLS-1$
@@ -270,15 +268,13 @@
@Override
public TupleBatch nextBatch() throws BlockedException,
TeiidComponentException, TeiidProcessingException {
- if (blockContext != null && !this.inTxn) {
+ if (blockContext != null) {
this.getContext().getTransactionServer().resume(blockContext);
- this.inTxn = true;
}
try {
return nextBatchDirect();
} finally {
if (blockContext != null) {
- this.inTxn = false;
this.getContext().getTransactionServer().suspend(blockContext);
}
}
@@ -580,9 +576,7 @@
TransactionContext tc = this.blockContext;
this.blockContext = null;
try {
- if (!inTxn) {
- this.getContext().getTransactionServer().resume(tc);
- }
+ this.getContext().getTransactionServer().resume(tc);
for (WeakReference<DataTierTupleSource> ref : txnTupleSources) {
DataTierTupleSource dtts = ref.get();
if (dtts != null) {
@@ -616,9 +610,8 @@
if (tc != null && tc.getTransactionType() == Scope.NONE) {
//start a transaction
this.getContext().getTransactionServer().begin(tc);
- this.inTxn = true;
this.blockContext = tc;
- this.peek().setStartedTxn(true);
+ program.setStartedTxn(true);
}
}
}
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java 2012-09-13
11:16:46 UTC (rev 4434)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java 2012-09-13
18:12:44 UTC (rev 4435)
@@ -115,6 +115,7 @@
Request request = helpProcessMessage(message, null, workContext);
assertEquals("1", request.context.getConnectionId()); //$NON-NLS-1$
+ assertNotNull(request.context.getTransactionContext());
}
private Request helpProcessMessage(RequestMessage message,
SessionAwareCache<PreparedPlan> cache, DQPWorkContext workContext) throws
TeiidComponentException,
Modified:
trunk/engine/src/test/java/org/teiid/query/processor/proc/TestProcedureProcessor.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/query/processor/proc/TestProcedureProcessor.java 2012-09-13
11:16:46 UTC (rev 4434)
+++
trunk/engine/src/test/java/org/teiid/query/processor/proc/TestProcedureProcessor.java 2012-09-13
18:12:44 UTC (rev 4435)
@@ -2001,7 +2001,7 @@
}
Mockito.verify(ts).begin(tc);
- Mockito.verify(ts).resume(tc);
+ Mockito.verify(ts, Mockito.times(2)).resume(tc);
Mockito.verify(ts, Mockito.times(0)).commit(tc);
Mockito.verify(ts).rollback(tc);
}
Modified: trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java 2012-09-13 11:16:46
UTC (rev 4434)
+++ trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java 2012-09-13 18:12:44
UTC (rev 4435)
@@ -48,8 +48,8 @@
import org.teiid.client.security.ILogon;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.TupleBufferCache;
+import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.BundleUtil.Event;
-import org.teiid.core.TeiidRuntimeException;
import org.teiid.deployers.CompositeVDB;
import org.teiid.deployers.UDFMetaData;
import org.teiid.deployers.VDBLifeCycleListener;
@@ -166,7 +166,7 @@
}
});
tc.setTransaction(tx);
- tc.setTransactionType(Scope.LOCAL);
+ tc.setTransactionType(Scope.GLOBAL);
}
} catch (SystemException e) {
} catch (IllegalStateException e) {
Modified: trunk/runtime/src/test/java/org/teiid/runtime/TestEmbeddedServer.java
===================================================================
--- trunk/runtime/src/test/java/org/teiid/runtime/TestEmbeddedServer.java 2012-09-13
11:16:46 UTC (rev 4434)
+++ trunk/runtime/src/test/java/org/teiid/runtime/TestEmbeddedServer.java 2012-09-13
18:12:44 UTC (rev 4435)
@@ -27,12 +27,24 @@
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.NotSupportedException;
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import org.teiid.adminapi.Model.Type;
import org.teiid.adminapi.impl.ModelMetaData;
import org.teiid.deployers.VirtualDatabaseException;
@@ -54,6 +66,85 @@
@SuppressWarnings("nls")
public class TestEmbeddedServer {
+ private final class MockTransactionManager implements TransactionManager {
+ ThreadLocal<Transaction> txns = new ThreadLocal<Transaction>();
+ List<Transaction> txnHistory = new ArrayList<Transaction>();
+
+ @Override
+ public Transaction suspend() throws SystemException {
+ Transaction result = txns.get();
+ txns.remove();
+ return result;
+ }
+
+ @Override
+ public void setTransactionTimeout(int seconds) throws SystemException {
+ }
+
+ @Override
+ public void setRollbackOnly() throws IllegalStateException, SystemException {
+ Transaction result = txns.get();
+ if (result == null) {
+ throw new IllegalStateException();
+ }
+ result.setRollbackOnly();
+ }
+
+ @Override
+ public void rollback() throws IllegalStateException, SecurityException,
+ SystemException {
+ Transaction t = checkNull(false);
+ txns.remove();
+ t.rollback();
+ }
+
+ @Override
+ public void resume(Transaction tobj) throws InvalidTransactionException,
+ IllegalStateException, SystemException {
+ checkNull(true);
+ txns.set(tobj);
+ }
+
+ private Transaction checkNull(boolean isNull) {
+ Transaction t = txns.get();
+ if ((!isNull && t == null) || (isNull && t != null)) {
+ throw new IllegalStateException();
+ }
+ return t;
+ }
+
+ @Override
+ public Transaction getTransaction() throws SystemException {
+ return txns.get();
+ }
+
+ @Override
+ public int getStatus() throws SystemException {
+ Transaction t = txns.get();
+ if (t == null) {
+ return Status.STATUS_NO_TRANSACTION;
+ }
+ return t.getStatus();
+ }
+
+ @Override
+ public void commit() throws RollbackException, HeuristicMixedException,
+ HeuristicRollbackException, SecurityException,
+ IllegalStateException, SystemException {
+ Transaction t = checkNull(false);
+ txns.remove();
+ t.commit();
+ }
+
+ @Override
+ public void begin() throws NotSupportedException, SystemException {
+ checkNull(true);
+ Transaction t = Mockito.mock(Transaction.class);
+ txnHistory.add(t);
+ txns.set(t);
+ }
+ }
+
EmbeddedServer es;
@Before public void setup() {
@@ -227,5 +318,48 @@
}
}
+
+ @Test public void testTransactions() throws Exception {
+ EmbeddedConfiguration ec = new EmbeddedConfiguration();
+ MockTransactionManager tm = new MockTransactionManager();
+ ec.setTransactionManager(tm);
+ ec.setUseDisk(false);
+ es.start(ec);
+
+ ModelMetaData mmd1 = new ModelMetaData();
+ mmd1.setName("b");
+ mmd1.setModelType(Type.VIRTUAL);
+ mmd1.setSchemaSourceType("ddl");
+ mmd1.setSchemaText("create view v as select 1; " +
+ "create virtual procedure proc () options (updatecount 2) as begin select * from
v; end; " +
+ "create virtual procedure proc1 () as begin atomic select * from v; end;
");
+ es.deployVDB("test", mmd1);
+
+ TeiidDriver td = es.getDriver();
+ Connection c = td.connect("jdbc:teiid:test", null);
+ //local txn
+ c.setAutoCommit(false);
+ Statement s = c.createStatement();
+ s.execute("select 1");
+ c.setAutoCommit(true);
+ assertEquals(1, tm.txnHistory.size());
+ Transaction txn = tm.txnHistory.remove(0);
+ Mockito.verify(txn).commit();
+
+ //should be an auto-commit txn (could also force with autoCommitTxn=true)
+ s.execute("call proc ()");
+
+ assertEquals(1, tm.txnHistory.size());
+ txn = tm.txnHistory.remove(0);
+ Mockito.verify(txn).commit();
+
+ //block txn
+ s.execute("call proc1()");
+
+ assertEquals(1, tm.txnHistory.size());
+ txn = tm.txnHistory.remove(0);
+ Mockito.verify(txn).commit();
+ }
+
}
Modified:
trunk/test-integration/common/src/test/java/org/teiid/arquillian/IntegrationTestDynamicViewDefinition.java
===================================================================
---
trunk/test-integration/common/src/test/java/org/teiid/arquillian/IntegrationTestDynamicViewDefinition.java 2012-09-13
11:16:46 UTC (rev 4434)
+++
trunk/test-integration/common/src/test/java/org/teiid/arquillian/IntegrationTestDynamicViewDefinition.java 2012-09-13
18:12:44 UTC (rev 4435)
@@ -73,6 +73,7 @@
this.internalConnection =
TeiidDriver.getInstance().connect("jdbc:teiid:dynamic@mm://localhost:31000;user=user;password=user",
null);
execute("SELECT * FROM Sys.Columns WHERE tablename='stock'");
//$NON-NLS-1$
+ assertRowCount(2);
}
}
Added:
trunk/test-integration/common/src/test/java/org/teiid/arquillian/IntegrationTestTransactions.java
===================================================================
---
trunk/test-integration/common/src/test/java/org/teiid/arquillian/IntegrationTestTransactions.java
(rev 0)
+++
trunk/test-integration/common/src/test/java/org/teiid/arquillian/IntegrationTestTransactions.java 2012-09-13
18:12:44 UTC (rev 4435)
@@ -0,0 +1,79 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.arquillian;
+
+import static org.junit.Assert.*;
+
+import java.io.FileInputStream;
+
+import org.jboss.arquillian.junit.Arquillian;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.teiid.adminapi.Admin;
+import org.teiid.adminapi.AdminException;
+import org.teiid.adminapi.AdminFactory;
+import org.teiid.core.util.UnitTestUtil;
+import org.teiid.jdbc.AbstractMMQueryTestCase;
+import org.teiid.jdbc.TeiidDriver;
+
+(a)RunWith(Arquillian.class)
+@SuppressWarnings("nls")
+public class IntegrationTestTransactions extends AbstractMMQueryTestCase {
+
+ private Admin admin;
+
+ @Before
+ public void setup() throws Exception {
+ admin = AdminFactory.getInstance().createAdmin("localhost",
9999, "admin", "admin".toCharArray());
+ }
+
+ @After
+ public void teardown() throws AdminException {
+ AdminUtil.cleanUp(admin);
+ admin.close();
+ }
+
+ @Test
+ public void testViewDefinition() throws Exception {
+
+ admin.deploy("txn-vdb.xml",new
FileInputStream(UnitTestUtil.getTestDataFile("txn-vdb.xml")));
+
+ assertTrue(AdminUtil.waitForVDBLoad(admin, "txn", 1, 30));
+
+ this.internalConnection =
TeiidDriver.getInstance().connect("jdbc:teiid:txn@mm://localhost:31000;user=user;password=user",
null);
+
+ execute("create local temporary table temp (x integer)"); //$NON-NLS-1$
+ execute("call proc()");
+ execute("start transaction"); //$NON-NLS-1$
+ execute("call proc()");
+ execute("insert into temp (x) values (1)"); //$NON-NLS-1$
+ execute("select * from temp");
+ assertRowCount(1);
+ execute("rollback");
+ execute("select * from temp");
+ assertRowCount(0);
+ }
+
+}
Property changes on:
trunk/test-integration/common/src/test/java/org/teiid/arquillian/IntegrationTestTransactions.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: trunk/test-integration/common/src/test/resources/txn-vdb.xml
===================================================================
--- trunk/test-integration/common/src/test/resources/txn-vdb.xml
(rev 0)
+++ trunk/test-integration/common/src/test/resources/txn-vdb.xml 2012-09-13 18:12:44 UTC
(rev 4435)
@@ -0,0 +1,8 @@
+<vdb name="txn" version= "1">
+ <model visible = "true" type = "VIRTUAL" name =
"x">
+ <metadata type = "DDL"><![CDATA[
+ CREATE VIRTUAL PROCEDURE PROC() AS BEGIN ATOMIC declare integer x = 1;
END;
+ ]]>
+ </metadata>
+ </model>
+</vdb>
\ No newline at end of file
Property changes on: trunk/test-integration/common/src/test/resources/txn-vdb.xml
___________________________________________________________________
Added: svn:mime-type
+ text/plain