Author: shawkins
Date: 2009-09-16 13:35:24 -0400 (Wed, 16 Sep 2009)
New Revision: 1363
Removed:
trunk/test-integration/common/src/test/java/com/metamatrix/connector/jdbc/oracle/
Modified:
trunk/engine/src/main/java/com/metamatrix/dqp/service/TransactionService.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWrapper.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/SynchConnectorWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/PooledConnector.java
trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionProvider.java
trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java
trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestPooledConnector.java
trunk/txn-jbossts/src/main/java/com/arjuna/ats/internal/jta/transaction/arjunacore/jca/XATerminatorImple.java
trunk/txn-jbossts/src/main/java/com/arjuna/ats/internal/jta/transaction/arjunacore/subordinate/SubordinateAtomicAction.java
Log:
TEIID-833 initial changes to transaction handling to move transaction interactions
primarily to the pooled connector
Modified: trunk/engine/src/main/java/com/metamatrix/dqp/service/TransactionService.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/dqp/service/TransactionService.java 2009-09-16
17:09:12 UTC (rev 1362)
+++
trunk/engine/src/main/java/com/metamatrix/dqp/service/TransactionService.java 2009-09-16
17:35:24 UTC (rev 1363)
@@ -28,6 +28,7 @@
import javax.transaction.InvalidTransactionException;
import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -50,12 +51,13 @@
public static final String TXN_STATUS_PORT = "xa.txnstatus_port";
//$NON-NLS-1$
public static final String TXN_ENABLE_RECOVERY = "xa.enable_recovery";
//$NON-NLS-1$
-
public static final String PROCESSNAME = DQPEmbeddedProperties.PROCESSNAME;
public static final String DEFAULT_TXN_MGR_LOG_DIR = "txnlog";
//$NON-NLS-1$
public static final String DEFAULT_TXN_TIMEOUT = "120"; //$NON-NLS-1$ //2
mins
public static final String DEFAULT_TXN_STATUS_PORT = "0"; //$NON-NLS-1$
+
+ TransactionManager getTransactionManager();
// processor level methods
TransactionContext start(TransactionContext context) throws XATransactionException,
SystemException;
@@ -66,29 +68,16 @@
TransactionContext getOrCreateTransactionContext(String threadId);
- // local transaction
+ // local transaction methods
TransactionContext begin(String threadId) throws XATransactionException,
SystemException;
void commit(String threadId) throws XATransactionException, SystemException;
void rollback(String threadId) throws XATransactionException, SystemException;
- // connector worker
- TransactionContext delist(TransactionContext context,
- XAResource resource,
- int flags) throws XATransactionException;
-
- TransactionContext enlist(TransactionContext context,
- XAResource resource) throws XATransactionException;
-
void cancelTransactions(String threadId, boolean requestOnly) throws
InvalidTransactionException, SystemException;
-
- // recovery
- void registerRecoverySource(String name, XAConnectionSource resource);
-
- void removeRecoverySource(String name);
-
+ // global transaction methods
int prepare(final String threadId,
MMXid xid) throws XATransactionException;
@@ -113,9 +102,19 @@
MMXid xid,
int flags) throws XATransactionException;
+ // management methods
Collection<Transaction> getTransactions();
void terminateTransaction(Xid transactionId) throws AdminException;
void terminateTransaction(String transactionId, String sessionId) throws
AdminException;
+
+ // Teiid managed XA
+ TransactionContext enlist(TransactionContext context,
+ XAResource resource) throws XATransactionException;
+
+ void registerRecoverySource(String name, XAConnectionSource resource);
+
+ void removeRecoverySource(String name);
+
}
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java 2009-09-16
17:09:12 UTC (rev 1362)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java 2009-09-16
17:35:24 UTC (rev 1363)
@@ -22,15 +22,9 @@
package org.teiid.dqp.internal.datamgr.impl;
-import org.teiid.connector.api.Connector;
-import org.teiid.connector.api.ConnectorException;
-
-import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.common.comm.api.ResultsReceiver;
-import com.metamatrix.core.util.Assertion;
import com.metamatrix.dqp.message.AtomicRequestMessage;
import com.metamatrix.dqp.message.AtomicResultsMessage;
-import com.metamatrix.query.metadata.QueryMetadataInterface;
public class AsynchConnectorWorkItem extends ConnectorWorkItem {
@@ -39,14 +33,6 @@
}
@Override
- protected void createConnection(Connector connector,
- QueryMetadataInterface queryMetadata) throws ConnectorException,
- MetaMatrixComponentException {
- super.createConnection(connector, queryMetadata);
- Assertion.assertTrue(!this.isTransactional, "Asynch work items are not suitable
for transactions"); //$NON-NLS-1$
- }
-
- @Override
protected boolean dataNotAvailable(long delay) {
this.manager.scheduleTask(this, delay);
return false;
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java 2009-09-16
17:09:12 UTC (rev 1362)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java 2009-09-16
17:35:24 UTC (rev 1363)
@@ -30,7 +30,6 @@
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
-import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -38,8 +37,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import javax.transaction.xa.XAResource;
-
import org.teiid.connector.api.Connection;
import org.teiid.connector.api.Connector;
import org.teiid.connector.api.ConnectorCapabilities;
@@ -51,14 +48,12 @@
import org.teiid.connector.api.ConnectorAnnotations.SynchronousWorkers;
import org.teiid.connector.metadata.runtime.ConnectorMetadata;
import org.teiid.connector.metadata.runtime.MetadataFactory;
-import org.teiid.connector.xa.api.XAConnection;
import org.teiid.connector.xa.api.XAConnector;
import org.teiid.dqp.internal.cache.DQPContextCache;
import org.teiid.dqp.internal.cache.ResultSetCache;
import org.teiid.dqp.internal.datamgr.CapabilitiesConverter;
import org.teiid.dqp.internal.pooling.connector.PooledConnector;
import org.teiid.dqp.internal.process.DQPWorkContext;
-import org.teiid.dqp.internal.transaction.TransactionProvider;
import com.metamatrix.admin.objects.MMConnectionPool;
import com.metamatrix.api.exception.MetaMatrixComponentException;
@@ -189,7 +184,7 @@
context.setContextCache(getContextCache());
- conn = connector.getConnection(context);
+ conn = connector.getConnection(context, null);
caps = conn.getCapabilities();
global = false;
}
@@ -436,61 +431,18 @@
} catch (MetaMatrixCoreException e) {
throw new ApplicationLifecycleException(e,
DQPPlugin.Util.getString("failed_find_Connector_class", connectorClassName));
//$NON-NLS-1$
}
- if (this.isXa) {
- if(!(c instanceof XAConnector)){
- throw new
ApplicationLifecycleException(DQPPlugin.Util.getString("non_xa_connector",
connectorName)); //$NON-NLS-1$
- }
- if (this.getTransactionService() == null) {
- throw new
ApplicationLifecycleException(DQPPlugin.Util.getString("no_txn_manager",
connectorName)); //$NON-NLS-1$
- }
- }
if (this.synchWorkers) {
SynchronousWorkers synchWorkerAnnotation =
c.getClass().getAnnotation(SynchronousWorkers.class);
if (synchWorkerAnnotation != null) {
this.synchWorkers = synchWorkerAnnotation.enabled();
}
+ if (!this.synchWorkers) {
+ LogManager.logDetail(LogConstants.CTX_CONNECTOR, "Changing asynch
connector", getName(), "to non-XA. Consider changing you're connector
binding to be non-XA."); //$NON-NLS-1$ //$NON-NLS-2$
+ this.isXa = false;
+ }
}
- c = wrapPooledConnector(c, env);
- if (c instanceof ConnectorWrapper) {
- this.connector = (ConnectorWrapper)c;
- } else {
- this.connector = new ConnectorWrapper(c);
- }
+ this.connector = wrapConnector(c, env);
this.connector.start(env);
- if (this.isXa) {
- if (this.connector.supportsSingleIdentity()) {
- // add this connector as the recovery source
- TransactionService ts = this.getTransactionService();
- ts.registerRecoverySource(connectorName, new
TransactionProvider.XAConnectionSource() {
- XAConnection conn = null;
-
- @Override
- public XAResource getXAResource() throws SQLException {
- if (conn == null) {
- try {
- conn = ((XAConnector)connector).getXAConnection(null, null);
- } catch (ConnectorException e) {
- throw new SQLException(e);
- }
- }
- try {
- return conn.getXAResource();
- } catch (ConnectorException e) {
- throw new SQLException(e);
- }
- }
-
- @Override
- public void close() {
- if (conn != null) {
- conn.close();
- }
- }
- });
- } else {
- LogManager.logWarning(LogConstants.CTX_CONNECTOR,
DQPPlugin.Util.getString("ConnectorManager.cannot_add_to_recovery",
this.getName())); //$NON-NLS-1$
- }
- }
} catch (ConnectorException e) {
throw new ApplicationLifecycleException(e,
DQPPlugin.Util.getString("failed_start_Connector", new Object[]
{this.getConnectorID(), e.getMessage()})); //$NON-NLS-1$
} finally {
@@ -498,7 +450,7 @@
}
}
- private Connector wrapPooledConnector(Connector c, ConnectorEnvironment connectorEnv)
{
+ private ConnectorWrapper wrapConnector(Connector c, ConnectorEnvironment
connectorEnv) throws ApplicationLifecycleException {
//the pooling annotation overrides the connector binding
ConnectionPooling connectionPooling =
c.getClass().getAnnotation(ConnectionPooling.class);
boolean connectionPoolPropertyEnabled =
PropertiesUtils.getBooleanProperty(connectorEnv.getProperties(),
ConnectorPropertyNames.CONNECTION_POOL_ENABLED, true);
@@ -509,14 +461,22 @@
} else {
poolingEnabled = connectionPooling != null &&
connectionPooling.enabled();
}
+ if (this.isXa) {
+ if(poolingEnabled && !(c instanceof XAConnector)){
+ throw new
ApplicationLifecycleException(DQPPlugin.Util.getString("non_xa_connector",
connectorName)); //$NON-NLS-1$
+ }
+ if (this.getTransactionService() == null) {
+ throw new
ApplicationLifecycleException(DQPPlugin.Util.getString("no_txn_manager",
connectorName)); //$NON-NLS-1$
+ }
+ }
if (poolingEnabled) {
LogManager.logDetail(LogConstants.CTX_CONNECTOR, "Automatic connection
pooling was enabled for connector " + getName()); //$NON-NLS-1$
if (!this.synchWorkers) {
LogManager.logWarning(LogConstants.CTX_CONNECTOR,
DQPPlugin.Util.getString("ConnectorManager.asynch_worker_warning",
ConnectorPropertyNames.SYNCH_WORKERS)); //$NON-NLS-1$
}
- return new PooledConnector(c);
+ return new PooledConnector(this.connectorName, c,
isXa?this.getTransactionService():null);
}
- return c;
+ return new ConnectorWrapper(c);
}
protected ResultSetCache createResultSetCache(Properties rsCacheProps) {
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2009-09-16
17:09:12 UTC (rev 1362)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2009-09-16
17:35:24 UTC (rev 1363)
@@ -26,8 +26,10 @@
import java.util.Arrays;
import java.util.List;
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.SystemException;
+
import org.teiid.connector.api.Connection;
-import org.teiid.connector.api.Connector;
import org.teiid.connector.api.ConnectorException;
import org.teiid.connector.api.DataNotAvailableException;
import org.teiid.connector.api.Execution;
@@ -38,7 +40,6 @@
import org.teiid.connector.language.IProcedure;
import org.teiid.connector.language.IQueryCommand;
import org.teiid.connector.metadata.runtime.RuntimeMetadata;
-import org.teiid.connector.xa.api.XAConnector;
import org.teiid.dqp.internal.datamgr.language.LanguageBridgeFactory;
import org.teiid.dqp.internal.datamgr.metadata.RuntimeMetadataImpl;
import org.teiid.dqp.internal.process.AbstractWorkItem;
@@ -123,23 +124,28 @@
this.securityContext.setContextCache(manager.getContextCache());
}
- protected void createConnection(Connector connector, QueryMetadataInterface
queryMetadata) throws ConnectorException, MetaMatrixComponentException {
+ protected void createConnection(ConnectorWrapper connector, QueryMetadataInterface
queryMetadata) throws ConnectorException, MetaMatrixComponentException {
LogManager.logTrace(LogConstants.CTX_CONNECTOR, new Object[] {id, "creating
connection for atomic-request"}); //$NON-NLS-1$
if (requestMsg.isTransactional()){
if (manager.isXa()) {
- connection = ((XAConnector)connector).getXAConnection(this.securityContext,
requestMsg.getTransactionContext());
this.securityContext.setTransactional(true);
this.isTransactional = true;
- return;
- }
- if (!manager.isImmutable() &&
requestMsg.getCommand().updatingModelCount(queryMetadata) > 0) {
+ try {
+ manager.getTransactionService().getTransactionManager().resume(requestMsg.getTransactionContext().getTransaction());
+ } catch (InvalidTransactionException e) {
+ throw new ConnectorException(e);
+ } catch (SystemException e) {
+ throw new ConnectorException(e);
+ }
+ } else if (!manager.isImmutable() &&
requestMsg.getCommand().updatingModelCount(queryMetadata) > 0) {
throw new
ConnectorException(DQPPlugin.Util.getString("ConnectorWorker.transactionNotSupported"));
//$NON-NLS-1$
}
}
- connection = connector.getConnection(this.securityContext);
+
+ connection = connector.getConnection(this.securityContext,
this.isTransactional?requestMsg.getTransactionContext():null);
}
-
+
protected void process() {
DQPWorkContext.setWorkContext(this.requestMsg.getWorkContext());
ClassLoader contextloader = Thread.currentThread().getContextClassLoader();
@@ -269,7 +275,14 @@
}
}
- protected void sendClose() {
+ private void sendClose() {
+ if (this.isTransactional) {
+ try {
+ manager.getTransactionService().getTransactionManager().suspend();
+ } catch (SystemException e) {
+ LogManager.logWarning(LogConstants.CTX_CONNECTOR, e, e.getMessage());
+ }
+ }
AtomicResultsMessage response = new AtomicResultsMessage(this.requestMsg);
response.setRequestClosed(true);
this.resultsReceiver.receiveResults(response);
@@ -284,7 +297,7 @@
}
protected void createExecution() throws MetaMatrixComponentException,
- ConnectorException, MetaMatrixProcessingException {
+ ConnectorException {
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[]
{this.requestMsg.getAtomicRequestID(), "Processing NEW request:",
this.requestMsg.getCommand()}); //$NON-NLS-1$
QueryMetadataInterface queryMetadata = new
TempMetadataAdapter(manager.getMetadataService().lookupMetadata(this.requestMsg.getWorkContext().getVdbName(),
this.requestMsg.getWorkContext().getVdbVersion()), new TempMetadataStore());
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWrapper.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWrapper.java 2009-09-16
17:09:12 UTC (rev 1362)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWrapper.java 2009-09-16
17:35:24 UTC (rev 1363)
@@ -44,10 +44,9 @@
/**
* ConnectorWrapper adds default behavior to the wrapped connector.
*/
-public class ConnectorWrapper implements XAConnector, MetadataProvider {
+public class ConnectorWrapper implements MetadataProvider {
private Connector actualConnector;
- private String name;
private volatile ConnectorStatus status = ConnectorStatus.UNABLE_TO_CHECK;
public ConnectorWrapper(Connector actualConnector){
@@ -55,7 +54,6 @@
}
public void start(ConnectorEnvironment environment) throws ConnectorException {
- name = environment.getConnectorName();
actualConnector.start(environment);
int interval = PropertiesUtils.getIntProperty(environment.getProperties(),
ConnectionPool.SOURCE_CONNECTION_TEST_INTERVAL,
ConnectionPool.DEFAULT_SOURCE_CONNECTION_TEST_INTERVAL);
if (interval > 0 && isConnectionTestable()) {
@@ -76,24 +74,22 @@
actualConnector.stop();
}
- @Override
- public final Connection getConnection(ExecutionContext context)
- throws ConnectorException {
- setIdentity(context);
- return getConnectionDirect(context);
+ public final Connection getConnection(ExecutionContext context, TransactionContext
transactionContext)
+ throws ConnectorException {
+ if (context instanceof ExecutionContextImpl && context.getConnectorIdentity()
== null) {
+ ((ExecutionContextImpl)context).setConnectorIdentity(createIdentity(context));
+ }
+ if (transactionContext == null) {
+ return getConnectionDirect(context);
+ }
+ return getXAConnectionDirect(context, transactionContext);
}
-
+
protected Connection getConnectionDirect(ExecutionContext context)
throws ConnectorException {
return actualConnector.getConnection(context);
}
- @Override
- public final XAConnection getXAConnection( ExecutionContext executionContext,
TransactionContext transactionContext) throws ConnectorException {
- setIdentity(executionContext);
- return getXAConnectionDirect(executionContext, transactionContext);
- }
-
protected XAConnection getXAConnectionDirect(ExecutionContext executionContext,
TransactionContext transactionContext) throws ConnectorException {
if (actualConnector instanceof XAConnector) {
@@ -102,13 +98,6 @@
return null;
}
- private void setIdentity(ExecutionContext executionContext)
- throws ConnectorException {
- if (executionContext instanceof ExecutionContextImpl &&
executionContext.getConnectorIdentity() == null) {
-
((ExecutionContextImpl)executionContext).setConnectorIdentity(createIdentity(executionContext));
- }
- }
-
public ConnectorCapabilities getCapabilities() {
return actualConnector.getCapabilities();
}
@@ -125,7 +114,7 @@
if (supportsSingleIdentity()) {
Connection conn = null;
try {
- conn = this.getConnection(null);
+ conn = this.getConnectionDirect(null);
return conn.isAlive()?ConnectorStatus.OPEN:ConnectorStatus.DATA_SOURCE_UNAVAILABLE;
} catch (ConnectorException e) {
return ConnectorStatus.DATA_SOURCE_UNAVAILABLE;
@@ -142,7 +131,6 @@
return actualConnector;
}
- @Override
public ConnectorIdentity createIdentity(ExecutionContext context)
throws ConnectorException {
return actualConnector.createIdentity(context);
@@ -156,10 +144,6 @@
}
}
- public String getConnectorBindingName() {
- return this.name;
- }
-
@Override
public void getConnectorMetadata(MetadataFactory metadataFactory)
throws ConnectorException {
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/SynchConnectorWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/SynchConnectorWorkItem.java 2009-09-16
17:09:12 UTC (rev 1362)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/SynchConnectorWorkItem.java 2009-09-16
17:35:24 UTC (rev 1363)
@@ -26,19 +26,10 @@
import java.util.Map;
import java.util.concurrent.Semaphore;
-import javax.transaction.xa.XAResource;
-
-import org.teiid.connector.api.ConnectorException;
-import org.teiid.connector.xa.api.XAConnection;
-
-import com.metamatrix.api.exception.MetaMatrixComponentException;
-import com.metamatrix.api.exception.MetaMatrixProcessingException;
import com.metamatrix.common.comm.api.ResultsReceiver;
import com.metamatrix.common.log.LogManager;
-import com.metamatrix.common.xa.XATransactionException;
import com.metamatrix.dqp.message.AtomicRequestMessage;
import com.metamatrix.dqp.message.AtomicResultsMessage;
-import com.metamatrix.dqp.service.TransactionService;
import com.metamatrix.dqp.util.LogConstants;
public class SynchConnectorWorkItem extends ConnectorWorkItem {
@@ -94,45 +85,6 @@
this.notify();
}
- @Override
- protected void createExecution() throws MetaMatrixComponentException,
- ConnectorException, MetaMatrixProcessingException {
- super.createExecution();
- enlistResource();
- }
-
- @Override
- protected void sendClose() {
- delistResource();
- super.sendClose();
- }
-
- private void enlistResource() throws ConnectorException,
- XATransactionException {
- if (!this.isTransactional || this.connection == null) {
- return;
- }
- LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {
- "AtomicRequest", id, "enlist(" +
requestMsg.getTransactionContext() + ")" }); //$NON-NLS-1$ //$NON-NLS-2$
//$NON-NLS-3$
- XAResource xaRes = ((XAConnection) connection).getXAResource();
- getTransactionServer().enlist(requestMsg.getTransactionContext(), xaRes);
- }
-
- private void delistResource() {
- if (!this.isTransactional || this.connection == null) {
- return;
- }
- try {
- LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {
- "AtomicRequest", id, "delist(" +
requestMsg.getTransactionContext() + ")" }); //$NON-NLS-1$ //$NON-NLS-2$
//$NON-NLS-3$
- XAResource xaRes = ((XAConnection) connection).getXAResource();
- getTransactionServer().delist(requestMsg.getTransactionContext(),
- xaRes, XAResource.TMSUCCESS);
- } catch (Throwable e) {
- LogManager.logWarning(LogConstants.CTX_CONNECTOR, e.getMessage());
- }
- }
-
private void acquireTransactionLock() throws InterruptedException {
if (!this.isTransactional) {
return;
@@ -167,10 +119,6 @@
this.lock = null;
}
- private TransactionService getTransactionServer() {
- return manager.getTransactionService();
- }
-
@Override
protected boolean dataNotAvailable(long delay) {
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/PooledConnector.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/PooledConnector.java 2009-09-16
17:09:12 UTC (rev 1362)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/PooledConnector.java 2009-09-16
17:35:24 UTC (rev 1363)
@@ -22,6 +22,7 @@
package org.teiid.dqp.internal.pooling.connector;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -31,6 +32,7 @@
import javax.transaction.RollbackException;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
+import javax.transaction.xa.XAResource;
import org.teiid.connector.api.Connection;
import org.teiid.connector.api.Connector;
@@ -41,9 +43,15 @@
import org.teiid.connector.xa.api.XAConnection;
import org.teiid.connector.xa.api.XAConnector;
import org.teiid.dqp.internal.datamgr.impl.ConnectorWrapper;
+import org.teiid.dqp.internal.transaction.TransactionProvider;
import com.metamatrix.admin.objects.MMConnectionPool;
+import com.metamatrix.common.log.LogManager;
+import com.metamatrix.common.xa.XATransactionException;
+import com.metamatrix.dqp.DQPPlugin;
import com.metamatrix.dqp.service.ConnectorStatus;
+import com.metamatrix.dqp.service.TransactionService;
+import com.metamatrix.dqp.util.LogConstants;
/**
@@ -93,14 +101,17 @@
private Map<String, ConnectionWrapper> idToConnections =
Collections.synchronizedMap(new HashMap<String, ConnectionWrapper>());
private ConnectorEnvironment environment;
+ private TransactionService transactionService;
+ private String name;
- public PooledConnector(Connector actualConnector) {
+ public PooledConnector(String name, Connector actualConnector, TransactionService
transactionService) {
super(actualConnector);
pool = new ConnectionPool(this);
-
+ this.transactionService = transactionService;
if (actualConnector instanceof XAConnector) {
xaPool = new ConnectionPool(this);
}
+ this.name = name;
}
@Override
@@ -112,6 +123,39 @@
xaPool.initialize(environment);
}
super.start(environment);
+ if (this.transactionService != null) {
+ if (this.supportsSingleIdentity()) {
+ // add this connector as the recovery source
+ transactionService.registerRecoverySource(this.name, new
TransactionProvider.XAConnectionSource() {
+ XAConnection conn = null;
+
+ @Override
+ public XAResource getXAResource() throws SQLException {
+ if (conn == null) {
+ try {
+ conn = getXAConnectionDirect(null, null);
+ } catch (ConnectorException e) {
+ throw new SQLException(e);
+ }
+ }
+ try {
+ return conn.getXAResource();
+ } catch (ConnectorException e) {
+ throw new SQLException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (conn != null) {
+ conn.close();
+ }
+ }
+ });
+ } else {
+ LogManager.logWarning(LogConstants.CTX_CONNECTOR,
DQPPlugin.Util.getString("ConnectorManager.cannot_add_to_recovery", this.name));
//$NON-NLS-1$
+ }
+ }
}
@Override
@@ -153,6 +197,13 @@
if (environment.getLogger().isTraceEnabled()) {
environment.getLogger().logTrace("Obtained new connection for transaction
" + transactionContext.getTxnID()); //$NON-NLS-1$
}
+ XAResource xaRes = conn.getXAResource();
+ try {
+ transactionService.enlist(transactionContext, xaRes);
+ } catch (XATransactionException e) {
+ conn.close();
+ throw new ConnectorException(e);
+ }
try { //add a synchronization to remove the map entry
transactionContext.getTransaction().registerSynchronization(new
RemovalCallback(transactionContext, conn));
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionProvider.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionProvider.java 2009-09-16
17:09:12 UTC (rev 1362)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionProvider.java 2009-09-16
17:35:24 UTC (rev 1363)
@@ -26,6 +26,7 @@
import java.util.Properties;
import javax.resource.spi.XATerminator;
+import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAException;
@@ -51,7 +52,7 @@
TransactionManager getTransactionManager();
- Transaction importTransaction(MMXid xid, int timeout) throws XAException;
+ Transaction importTransaction(MMXid xid, int timeout) throws XAException,
SystemException;
String getTransactionID(Transaction tx);
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java 2009-09-16
17:09:12 UTC (rev 1362)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java 2009-09-16
17:35:24 UTC (rev 1363)
@@ -249,7 +249,9 @@
tx = this.provider.importTransaction(xid, timeout);
} catch (XAException err) {
throw new XATransactionException(err);
- }
+ } catch (SystemException err) {
+ throw new XATransactionException(err);
+ }
try {
tx.registerSynchronization(new Synchronization() {
@@ -444,7 +446,7 @@
}
}
- private TransactionManager getTransactionManager() {
+ public TransactionManager getTransactionManager() {
return provider.getTransactionManager();
}
@@ -520,53 +522,20 @@
return context;
}
- public TransactionContext delist(TransactionContext context,
- XAResource resource,
- int flags) throws XATransactionException {
- TransactionManager tm = getTransactionManager();
- TransactionContextImpl tc = (TransactionContextImpl)context;
-
- try {
- Transaction tx = tm.getTransaction();
- if (!tx.equals(context.getTransaction())) {
- throw new XATransactionException(context.getTransaction() + " !=
" + tx); //$NON-NLS-1$
- }
-
- // intermediate suspend/success is not necessary because we hold the
connector connection
- // for the duration of the transaction. However, we want to suspend because
- // ConnectorWorker thread needs to be disassociated.
- } catch (SystemException err) {
- throw new XATransactionException(err);
- } catch (IllegalStateException err) {
- throw new XATransactionException(err);
- } finally {
- try {
- tm.suspend();
- } catch (SystemException err) {
- throw new XATransactionException(err);
- }
- }
- return tc;
- }
-
public TransactionContext enlist(TransactionContext context,
XAResource resource) throws XATransactionException
{
TransactionManager tm = getTransactionManager();
TransactionContextImpl tc = (TransactionContextImpl)context;
try {
- if (tc.getTransactionTimeout() > 0) {
- if (tc.getTransactionTimeout() != resource.getTransactionTimeout()) {
- resource.setTransactionTimeout(tc.getTransactionTimeout());
- }
+ if (tc.getTransactionTimeout() > 0 && tc.getTransactionTimeout()
!= resource.getTransactionTimeout()) {
+ resource.setTransactionTimeout(tc.getTransactionTimeout());
}
Transaction tx = tm.getTransaction();
if (tx == null) {
tm.resume(context.getTransaction());
- } else {
- if (!tx.equals(context.getTransaction())) {
- throw new XATransactionException(context.getTransaction() + " !=
" + tx); //$NON-NLS-1$
- }
+ } else if (!tx.equals(context.getTransaction())) {
+ throw new XATransactionException(context.getTransaction() + " !=
" + tx); //$NON-NLS-1$
}
if (!context.getTransaction().enlistResource(resource)) {
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestPooledConnector.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestPooledConnector.java 2009-09-16
17:09:12 UTC (rev 1362)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestPooledConnector.java 2009-09-16
17:35:24 UTC (rev 1363)
@@ -31,28 +31,29 @@
import org.junit.Test;
import org.mockito.Mockito;
+import org.teiid.connector.api.Connection;
import org.teiid.connector.api.ConnectorLogger;
import org.teiid.connector.api.ExecutionContext;
import org.teiid.connector.xa.api.TransactionContext;
-import org.teiid.connector.xa.api.XAConnection;
import org.teiid.connector.xa.api.XAConnector;
import org.teiid.dqp.internal.datamgr.impl.ConnectorEnvironmentImpl;
import com.metamatrix.admin.objects.MMConnectionPool;
import com.metamatrix.common.application.ApplicationEnvironment;
+import com.metamatrix.dqp.service.TransactionService;
public class TestPooledConnector {
@Test public void testGetXAConnection() throws Exception {
XAConnector connector = Mockito.mock(XAConnector.class);
- PooledConnector pc = new PooledConnector(connector);
+ PooledConnector pc = new PooledConnector("foo", connector,
Mockito.mock(TransactionService.class)); //$NON-NLS-1$
pc.start(new ConnectorEnvironmentImpl(new Properties(),
Mockito.mock(ConnectorLogger.class), new ApplicationEnvironment()));
TransactionContext tc = Mockito.mock(TransactionContext.class);
Mockito.stub(tc.getTransaction()).toReturn(Mockito.mock(Transaction.class));
Mockito.stub(tc.getTxnID()).toReturn("1"); //$NON-NLS-1$
- XAConnection conn = pc.getXAConnection(Mockito.mock(ExecutionContext.class), tc);
+ Connection conn = pc.getConnection(Mockito.mock(ExecutionContext.class), tc);
conn.close();
- XAConnection conn1 = pc.getXAConnection(Mockito.mock(ExecutionContext.class), tc);
+ Connection conn1 = pc.getConnection(Mockito.mock(ExecutionContext.class), tc);
assertSame(conn, conn1);
List<MMConnectionPool> stats = pc.getConnectionPoolStats();
Modified:
trunk/txn-jbossts/src/main/java/com/arjuna/ats/internal/jta/transaction/arjunacore/jca/XATerminatorImple.java
===================================================================
---
trunk/txn-jbossts/src/main/java/com/arjuna/ats/internal/jta/transaction/arjunacore/jca/XATerminatorImple.java 2009-09-16
17:09:12 UTC (rev 1362)
+++
trunk/txn-jbossts/src/main/java/com/arjuna/ats/internal/jta/transaction/arjunacore/jca/XATerminatorImple.java 2009-09-16
17:35:24 UTC (rev 1363)
@@ -58,7 +58,7 @@
*/
/*
- * See JBTM-456
+ * To be removed after 4.5 See JBTM-456
*/
public class XATerminatorImple implements javax.resource.spi.XATerminator
{
Modified:
trunk/txn-jbossts/src/main/java/com/arjuna/ats/internal/jta/transaction/arjunacore/subordinate/SubordinateAtomicAction.java
===================================================================
---
trunk/txn-jbossts/src/main/java/com/arjuna/ats/internal/jta/transaction/arjunacore/subordinate/SubordinateAtomicAction.java 2009-09-16
17:09:12 UTC (rev 1362)
+++
trunk/txn-jbossts/src/main/java/com/arjuna/ats/internal/jta/transaction/arjunacore/subordinate/SubordinateAtomicAction.java 2009-09-16
17:35:24 UTC (rev 1363)
@@ -42,7 +42,7 @@
*/
/*
- * see JBTM-457
+ * To be removed after 4.5 see JBTM-457
*/
public class SubordinateAtomicAction extends
com.arjuna.ats.internal.jta.transaction.arjunacore.AtomicAction