Author: ghelblin
Date: 2009-03-02 11:25:59 -0500 (Mon, 02 Mar 2009)
New Revision: 522
Modified:
trunk/engine/src/main/java/com/metamatrix/dqp/internal/datamgr/impl/ConnectorManager.java
trunk/engine/src/main/java/com/metamatrix/dqp/internal/datamgr/impl/ConnectorWorkItem.java
trunk/engine/src/test/java/com/metamatrix/dqp/internal/datamgr/impl/TestConnectorWorkItem.java
Log:
TEIID-154 - Add connector property to allow bypass of logic that puts PROCs in txns
[reviewed by Steve H]
Modified:
trunk/engine/src/main/java/com/metamatrix/dqp/internal/datamgr/impl/ConnectorManager.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/dqp/internal/datamgr/impl/ConnectorManager.java 2009-03-02
16:25:56 UTC (rev 521)
+++
trunk/engine/src/main/java/com/metamatrix/dqp/internal/datamgr/impl/ConnectorManager.java 2009-03-02
16:25:59 UTC (rev 522)
@@ -129,6 +129,13 @@
this.props = props;
}
+ public boolean isImmutable() {
+ if ( this.props == null ) {
+ this.props = new Properties();
+ }
+ return PropertiesUtils.getBooleanProperty(props,
ConnectorPropertyNames.IS_IMMUTABLE, false);
+ }
+
public ClassLoader getClassloader() {
return classloader;
}
Modified:
trunk/engine/src/main/java/com/metamatrix/dqp/internal/datamgr/impl/ConnectorWorkItem.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2009-03-02
16:25:56 UTC (rev 521)
+++
trunk/engine/src/main/java/com/metamatrix/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2009-03-02
16:25:59 UTC (rev 522)
@@ -101,12 +101,11 @@
ConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager,
ResultsReceiver<AtomicResultsMessage> resultsReceiver) {
this.id = message.getAtomicRequestID();
this.requestMsg = message;
- this.isTransactional = manager.isXa() && message.isTransactional();
this.manager = manager;
this.resultsReceiver = resultsReceiver;
}
- private void createConnection(Connector connector, QueryMetadataInterface
queryMetadata) throws ConnectorException, MetaMatrixComponentException {
+ protected void createConnection(Connector connector, QueryMetadataInterface
queryMetadata) throws ConnectorException, MetaMatrixComponentException {
LogManager.logTrace(LogConstants.CTX_CONNECTOR, new Object[] {id, "creating
connection for atomic-request"}); //$NON-NLS-1$
AtomicRequestID requestID = this.requestMsg.getAtomicRequestID();
this.securityContext = new
ExecutionContextImpl(requestMsg.getWorkContext().getVdbName(),
@@ -123,15 +122,19 @@
&&
(requestMsg.getCommand()).areResultsCachable()
);
this.securityContext.setBatchSize(this.requestMsg.getFetchSize());
- if (isTransactional){
- connection = ((XAConnector)connector).getXAConnection(this.securityContext,
requestMsg.getTransactionContext());
- this.securityContext.setTransactional(true);
- } else {
- if (requestMsg.isTransactional() &&
requestMsg.getCommand().updatingModelCount(queryMetadata) > 0) {
+
+ if (requestMsg.isTransactional()){
+ if (manager.isXa()) {
+ connection = ((XAConnector)connector).getXAConnection(this.securityContext,
requestMsg.getTransactionContext());
+ this.securityContext.setTransactional(true);
+ this.isTransactional = true;
+ return;
+ }
+ if (!manager.isImmutable() &&
requestMsg.getCommand().updatingModelCount(queryMetadata) > 0) {
throw new
ConnectorException(DQPPlugin.Util.getString("ConnectorWorker.transactionNotSupported"));
//$NON-NLS-1$
}
- connection = connector.getConnection(this.securityContext);
}
+ connection = connector.getConnection(this.securityContext);
}
protected void process() {
Modified:
trunk/engine/src/test/java/com/metamatrix/dqp/internal/datamgr/impl/TestConnectorWorkItem.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/dqp/internal/datamgr/impl/TestConnectorWorkItem.java 2009-03-02
16:25:56 UTC (rev 521)
+++
trunk/engine/src/test/java/com/metamatrix/dqp/internal/datamgr/impl/TestConnectorWorkItem.java 2009-03-02
16:25:59 UTC (rev 522)
@@ -24,16 +24,22 @@
import java.util.Arrays;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import javax.transaction.Transaction;
+
+import junit.framework.TestCase;
+
import org.teiid.connector.api.ConnectorException;
import org.teiid.connector.api.ProcedureExecution;
+import org.teiid.connector.internal.ConnectorPropertyNames;
import org.teiid.connector.language.IProcedure;
+import org.teiid.connector.xa.api.TransactionContext;
-import junit.framework.TestCase;
-
import com.metamatrix.common.comm.api.ResultsReceiver;
+import com.metamatrix.common.log.LogManager;
import com.metamatrix.dqp.client.ResultsFuture;
import com.metamatrix.dqp.internal.datamgr.ConnectorID;
import com.metamatrix.dqp.internal.datamgr.language.LanguageBridgeFactory;
@@ -43,6 +49,7 @@
import com.metamatrix.dqp.message.RequestID;
import com.metamatrix.dqp.message.RequestMessage;
import com.metamatrix.dqp.service.FakeMetadataService;
+import com.metamatrix.dqp.util.LogConstants;
import com.metamatrix.query.metadata.QueryMetadataInterface;
import com.metamatrix.query.parser.QueryParser;
import com.metamatrix.query.resolver.QueryResolver;
@@ -330,6 +337,132 @@
assertEquals(1, results.getWarnings().size());
}
+
+ public void testIsImmutablePropertySucceeds() throws Exception {
+ /*
+ * Setup:
+ * 1. requestMsg.isTransactional() must be TRUE
+ * 2. manager.isXa() must be FALSE ()
+ * 3. command must NOT be a SELECT
+ * 4. Then, set isImmutable to TRUE, we should SUCCEED
+ */
+ ConnectorManager cm = getConnectorManager();
+ Properties props = new Properties();
+
+ // to create an XA ConnectorManager
+ props.setProperty(ConnectorPropertyNames.CONNECTOR_CLASS,
FakeConnector.class.getName());
+
+ // to set IS_IMMUTABLE to true
+ props.setProperty(ConnectorPropertyNames.IS_IMMUTABLE, "true");
//$NON-NLS-1$ //$NON-NLS-2$
+ cm.initialize(props);
+
+ // command must not be a SELECT
+ Command command = helpGetCommand("update bqt1.smalla set stringkey = 1 where
stringkey = 2", EXAMPLE_BQT); //$NON-NLS-1$
+ AtomicRequestMessage requestMsg = createNewAtomicRequestMessage(1, 1);
+ requestMsg.setCommand(command);
+
+ // To make the AtomicRequestMessage transactional, construct your own
+ requestMsg.setTransactionContext( new TransactionContext(){
+
+ @Override
+ public Transaction getTransaction() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Scope getTransactionType() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public String getTxnID() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean isInTransaction() {
+ // TODO Auto-generated method stub
+ return true;
+ }} );
+
+ QueueResultsReceiver receiver = new QueueResultsReceiver();
+
+ SynchConnectorWorkItem synchConnectorWorkItem = new SynchConnectorWorkItem(requestMsg,
cm, receiver);
+
+ // This is the test
+ try {
+ synchConnectorWorkItem.run();
+ assertNotNull("Connection should not be null when IsImmutable is true",
synchConnectorWorkItem.connection); //$NON-NLS-1$
+ } catch ( Exception e ) {
+ LogManager.logWarning(LogConstants.CTX_CONNECTOR, e.getMessage());
+ }
+ }
+
+ public void testIsImmutablePropertyFails() throws Exception {
+ /*
+ * Setup:
+ * 1. requestMsg.isTransactional() must be TRUE
+ * 2. manager.isXa() must be FALSE ()
+ * 3. command must NOT be a SELECT
+ * 4. Then, set isImmutable to FALSE, and we should FAIL
+ */
+ ConnectorManager cm = getConnectorManager();
+ Properties props = new Properties();
+
+ // to create an XA ConnectorManager
+ props.setProperty(ConnectorPropertyNames.CONNECTOR_CLASS,
FakeConnector.class.getName());
+
+ // to set IS_IMMUTABLE to false
+ props.setProperty(ConnectorPropertyNames.IS_IMMUTABLE, "false");
//$NON-NLS-1$ //$NON-NLS-2$
+ cm.initialize(props);
+
+ // command must not be a SELECT
+ Command command = helpGetCommand("update bqt1.smalla set stringkey = 1 where
stringkey = 2", EXAMPLE_BQT); //$NON-NLS-1$
+ AtomicRequestMessage requestMsg = createNewAtomicRequestMessage(1, 1);
+ requestMsg.setCommand(command);
+
+ // To make the AtomicRequestMessage transactional, construct your own
+ requestMsg.setTransactionContext( new TransactionContext(){
+
+ @Override
+ public Transaction getTransaction() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Scope getTransactionType() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public String getTxnID() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean isInTransaction() {
+ // TODO Auto-generated method stub
+ return true;
+ }} );
+
+ QueueResultsReceiver receiver = new QueueResultsReceiver();
+ SynchConnectorWorkItem synchConnectorWorkItem = new SynchConnectorWorkItem(requestMsg,
cm, receiver);
+
+ // This is the test
+ try {
+ synchConnectorWorkItem.run();
+ this.assertNull("Connection should be null when IsImmutable is false",
synchConnectorWorkItem.connection); //$NON-NLS-1$
+ } catch ( Exception e ) {
+ LogManager.logWarning(LogConstants.CTX_CONNECTOR, e.getMessage());
+ }
+ }
+
private static class FakeQueuingAsynchConnectorWorkItem extends
AsynchConnectorWorkItem {
int resumeCount;