[teiid-commits] teiid SVN: r1363 - in trunk: engine/src/main/java/org/teiid/dqp/internal/datamgr/impl and 6 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Sep 16 13:35:24 EDT 2009


Author: shawkins
Date: 2009-09-16 13:35:24 -0400 (Wed, 16 Sep 2009)
New Revision: 1363

Removed:
   trunk/test-integration/common/src/test/java/com/metamatrix/connector/jdbc/oracle/
Modified:
   trunk/engine/src/main/java/com/metamatrix/dqp/service/TransactionService.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWrapper.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/SynchConnectorWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/PooledConnector.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionProvider.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestPooledConnector.java
   trunk/txn-jbossts/src/main/java/com/arjuna/ats/internal/jta/transaction/arjunacore/jca/XATerminatorImple.java
   trunk/txn-jbossts/src/main/java/com/arjuna/ats/internal/jta/transaction/arjunacore/subordinate/SubordinateAtomicAction.java
Log:
TEIID-833 initial changes to transaction handling to move transaction interactions primarily to the pooled connector

Modified: trunk/engine/src/main/java/com/metamatrix/dqp/service/TransactionService.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/dqp/service/TransactionService.java	2009-09-16 17:09:12 UTC (rev 1362)
+++ trunk/engine/src/main/java/com/metamatrix/dqp/service/TransactionService.java	2009-09-16 17:35:24 UTC (rev 1363)
@@ -28,6 +28,7 @@
 
 import javax.transaction.InvalidTransactionException;
 import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
@@ -50,12 +51,13 @@
     public static final String TXN_STATUS_PORT = "xa.txnstatus_port"; //$NON-NLS-1$
     public static final String TXN_ENABLE_RECOVERY = "xa.enable_recovery"; //$NON-NLS-1$
     
-    
     public static final String PROCESSNAME = DQPEmbeddedProperties.PROCESSNAME;
     
     public static final String DEFAULT_TXN_MGR_LOG_DIR = "txnlog"; //$NON-NLS-1$
     public static final String DEFAULT_TXN_TIMEOUT = "120"; //$NON-NLS-1$ //2 mins
     public static final String DEFAULT_TXN_STATUS_PORT = "0"; //$NON-NLS-1$
+    
+    TransactionManager getTransactionManager();
 
     // processor level methods
     TransactionContext start(TransactionContext context) throws XATransactionException, SystemException;
@@ -66,29 +68,16 @@
 
     TransactionContext getOrCreateTransactionContext(String threadId);
 
-    // local transaction
+    // local transaction methods
     TransactionContext begin(String threadId) throws XATransactionException, SystemException;
 
     void commit(String threadId) throws XATransactionException, SystemException;
 
     void rollback(String threadId) throws XATransactionException, SystemException;
 
-    // connector worker
-    TransactionContext delist(TransactionContext context,
-                              XAResource resource,
-                              int flags) throws XATransactionException;
-
-    TransactionContext enlist(TransactionContext context,
-                              XAResource resource) throws XATransactionException;
-    
     void cancelTransactions(String threadId, boolean requestOnly) throws InvalidTransactionException, SystemException;
 
-    
-    // recovery
-    void registerRecoverySource(String name, XAConnectionSource resource);
-    
-    void removeRecoverySource(String name);  
-    
+    // global transaction methods
     int prepare(final String threadId,
             MMXid xid) throws XATransactionException;
 
@@ -113,9 +102,19 @@
 	         MMXid xid,
 	         int flags) throws XATransactionException;
         
+	// management methods
     Collection<Transaction> getTransactions();
     
     void terminateTransaction(Xid transactionId) throws AdminException;
     
     void terminateTransaction(String transactionId, String sessionId) throws AdminException;
+    
+    // Teiid managed XA
+    TransactionContext enlist(TransactionContext context,
+                              XAResource resource) throws XATransactionException;
+    
+    void registerRecoverySource(String name, XAConnectionSource resource);
+    
+    void removeRecoverySource(String name);  
+
 }

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java	2009-09-16 17:09:12 UTC (rev 1362)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java	2009-09-16 17:35:24 UTC (rev 1363)
@@ -22,15 +22,9 @@
 
 package org.teiid.dqp.internal.datamgr.impl;
 
-import org.teiid.connector.api.Connector;
-import org.teiid.connector.api.ConnectorException;
-
-import com.metamatrix.api.exception.MetaMatrixComponentException;
 import com.metamatrix.common.comm.api.ResultsReceiver;
-import com.metamatrix.core.util.Assertion;
 import com.metamatrix.dqp.message.AtomicRequestMessage;
 import com.metamatrix.dqp.message.AtomicResultsMessage;
-import com.metamatrix.query.metadata.QueryMetadataInterface;
 
 public class AsynchConnectorWorkItem extends ConnectorWorkItem {
                 
@@ -39,14 +33,6 @@
     }
     
     @Override
-    protected void createConnection(Connector connector,
-    		QueryMetadataInterface queryMetadata) throws ConnectorException,
-    		MetaMatrixComponentException {
-    	super.createConnection(connector, queryMetadata);
-    	Assertion.assertTrue(!this.isTransactional, "Asynch work items are not suitable for transactions"); //$NON-NLS-1$
-    }
-    
-    @Override
     protected boolean dataNotAvailable(long delay) {
     	this.manager.scheduleTask(this, delay);
     	return false;

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java	2009-09-16 17:09:12 UTC (rev 1362)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java	2009-09-16 17:35:24 UTC (rev 1363)
@@ -30,7 +30,6 @@
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
-import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -38,8 +37,6 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
-import javax.transaction.xa.XAResource;
-
 import org.teiid.connector.api.Connection;
 import org.teiid.connector.api.Connector;
 import org.teiid.connector.api.ConnectorCapabilities;
@@ -51,14 +48,12 @@
 import org.teiid.connector.api.ConnectorAnnotations.SynchronousWorkers;
 import org.teiid.connector.metadata.runtime.ConnectorMetadata;
 import org.teiid.connector.metadata.runtime.MetadataFactory;
-import org.teiid.connector.xa.api.XAConnection;
 import org.teiid.connector.xa.api.XAConnector;
 import org.teiid.dqp.internal.cache.DQPContextCache;
 import org.teiid.dqp.internal.cache.ResultSetCache;
 import org.teiid.dqp.internal.datamgr.CapabilitiesConverter;
 import org.teiid.dqp.internal.pooling.connector.PooledConnector;
 import org.teiid.dqp.internal.process.DQPWorkContext;
-import org.teiid.dqp.internal.transaction.TransactionProvider;
 
 import com.metamatrix.admin.objects.MMConnectionPool;
 import com.metamatrix.api.exception.MetaMatrixComponentException;
@@ -189,7 +184,7 @@
             	
             	context.setContextCache(getContextCache());
 
-            	conn = connector.getConnection(context);
+            	conn = connector.getConnection(context, null);
             	caps = conn.getCapabilities();
             	global = false;
             }
@@ -436,61 +431,18 @@
 			} catch (MetaMatrixCoreException e) {
 	            throw new ApplicationLifecycleException(e, DQPPlugin.Util.getString("failed_find_Connector_class", connectorClassName)); //$NON-NLS-1$
 			}
-			if (this.isXa) {
-	            if(!(c instanceof XAConnector)){
-	            	throw new ApplicationLifecycleException(DQPPlugin.Util.getString("non_xa_connector", connectorName)); //$NON-NLS-1$
-	            }
-                if (this.getTransactionService() == null) {                    
-                    throw new ApplicationLifecycleException(DQPPlugin.Util.getString("no_txn_manager", connectorName)); //$NON-NLS-1$
-                }
-			}
             if (this.synchWorkers) {
                 SynchronousWorkers synchWorkerAnnotation = c.getClass().getAnnotation(SynchronousWorkers.class);
             	if (synchWorkerAnnotation != null) {
             		this.synchWorkers = synchWorkerAnnotation.enabled();
             	}
+            	if (!this.synchWorkers) {
+            		LogManager.logDetail(LogConstants.CTX_CONNECTOR, "Changing asynch connector", getName(), "to non-XA.  Consider changing you're connector binding to be non-XA."); //$NON-NLS-1$ //$NON-NLS-2$
+            		this.isXa = false;
+            	}
             }
-        	c = wrapPooledConnector(c, env);
-            if (c instanceof ConnectorWrapper) {
-            	this.connector = (ConnectorWrapper)c;
-            } else {
-            	this.connector = new ConnectorWrapper(c);
-            }
+        	this.connector = wrapConnector(c, env);
             this.connector.start(env);
-            if (this.isXa) {
-                if (this.connector.supportsSingleIdentity()) {
-                	// add this connector as the recovery source
-	                TransactionService ts = this.getTransactionService(); 
-	                ts.registerRecoverySource(connectorName, new TransactionProvider.XAConnectionSource() {
-	                	XAConnection conn = null;
-	                	
-	                	@Override
-	                	public XAResource getXAResource() throws SQLException {
-	                		if (conn == null) {
-	                			try {
-									conn = ((XAConnector)connector).getXAConnection(null, null);
-								} catch (ConnectorException e) {
-									throw new SQLException(e);
-								}
-	                		}
-	                		try {
-								return conn.getXAResource();
-							} catch (ConnectorException e) {
-								throw new SQLException(e);
-							}
-	                	}
-	                	
-	                	@Override
-	                	public void close() {
-	                		if (conn != null) {
-	                			conn.close();
-	                		}
-	                	}
-	                });
-                } else {
-                	LogManager.logWarning(LogConstants.CTX_CONNECTOR, DQPPlugin.Util.getString("ConnectorManager.cannot_add_to_recovery", this.getName())); //$NON-NLS-1$	                
-                }
-            }
         } catch (ConnectorException e) {
             throw new ApplicationLifecycleException(e, DQPPlugin.Util.getString("failed_start_Connector", new Object[] {this.getConnectorID(), e.getMessage()})); //$NON-NLS-1$
         } finally {
@@ -498,7 +450,7 @@
         }
     }
     
-    private Connector wrapPooledConnector(Connector c, ConnectorEnvironment connectorEnv) {
+    private ConnectorWrapper wrapConnector(Connector c, ConnectorEnvironment connectorEnv) throws ApplicationLifecycleException {
     	//the pooling annotation overrides the connector binding
         ConnectionPooling connectionPooling = c.getClass().getAnnotation(ConnectionPooling.class);
     	boolean connectionPoolPropertyEnabled = PropertiesUtils.getBooleanProperty(connectorEnv.getProperties(), ConnectorPropertyNames.CONNECTION_POOL_ENABLED, true);
@@ -509,14 +461,22 @@
         } else {
         	poolingEnabled = connectionPooling != null && connectionPooling.enabled();
         }
+		if (this.isXa) {
+            if(poolingEnabled && !(c instanceof XAConnector)){
+            	throw new ApplicationLifecycleException(DQPPlugin.Util.getString("non_xa_connector", connectorName)); //$NON-NLS-1$
+            }
+            if (this.getTransactionService() == null) {                    
+                throw new ApplicationLifecycleException(DQPPlugin.Util.getString("no_txn_manager", connectorName)); //$NON-NLS-1$
+            }
+		}
         if (poolingEnabled) {
            	LogManager.logDetail(LogConstants.CTX_CONNECTOR, "Automatic connection pooling was enabled for connector " + getName()); //$NON-NLS-1$
         	if (!this.synchWorkers) {
             	LogManager.logWarning(LogConstants.CTX_CONNECTOR, DQPPlugin.Util.getString("ConnectorManager.asynch_worker_warning", ConnectorPropertyNames.SYNCH_WORKERS)); //$NON-NLS-1$	                
         	}
-        	return new PooledConnector(c);
+        	return new PooledConnector(this.connectorName, c, isXa?this.getTransactionService():null);
         }         
-        return c;
+        return new ConnectorWrapper(c);
     }
     
     protected ResultSetCache createResultSetCache(Properties rsCacheProps) {

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java	2009-09-16 17:09:12 UTC (rev 1362)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java	2009-09-16 17:35:24 UTC (rev 1363)
@@ -26,8 +26,10 @@
 import java.util.Arrays;
 import java.util.List;
 
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.SystemException;
+
 import org.teiid.connector.api.Connection;
-import org.teiid.connector.api.Connector;
 import org.teiid.connector.api.ConnectorException;
 import org.teiid.connector.api.DataNotAvailableException;
 import org.teiid.connector.api.Execution;
@@ -38,7 +40,6 @@
 import org.teiid.connector.language.IProcedure;
 import org.teiid.connector.language.IQueryCommand;
 import org.teiid.connector.metadata.runtime.RuntimeMetadata;
-import org.teiid.connector.xa.api.XAConnector;
 import org.teiid.dqp.internal.datamgr.language.LanguageBridgeFactory;
 import org.teiid.dqp.internal.datamgr.metadata.RuntimeMetadataImpl;
 import org.teiid.dqp.internal.process.AbstractWorkItem;
@@ -123,23 +124,28 @@
         this.securityContext.setContextCache(manager.getContextCache());
     }
 
-    protected void createConnection(Connector connector, QueryMetadataInterface queryMetadata) throws ConnectorException, MetaMatrixComponentException {
+    protected void createConnection(ConnectorWrapper connector, QueryMetadataInterface queryMetadata) throws ConnectorException, MetaMatrixComponentException {
         LogManager.logTrace(LogConstants.CTX_CONNECTOR, new Object[] {id, "creating connection for atomic-request"});  //$NON-NLS-1$
          
         if (requestMsg.isTransactional()){
         	if (manager.isXa()) {
-	    		connection = ((XAConnector)connector).getXAConnection(this.securityContext, requestMsg.getTransactionContext());
 	    		this.securityContext.setTransactional(true);
 	    		this.isTransactional = true;
-	    		return;
-        	} 
-    	    if (!manager.isImmutable() && requestMsg.getCommand().updatingModelCount(queryMetadata) > 0) {
+	    		try {
+					manager.getTransactionService().getTransactionManager().resume(requestMsg.getTransactionContext().getTransaction());
+				} catch (InvalidTransactionException e) {
+					throw new ConnectorException(e);
+				} catch (SystemException e) {
+					throw new ConnectorException(e);
+				}
+        	} else if (!manager.isImmutable() && requestMsg.getCommand().updatingModelCount(queryMetadata) > 0) {
     	        throw new ConnectorException(DQPPlugin.Util.getString("ConnectorWorker.transactionNotSupported")); //$NON-NLS-1$
     	    }
     	}
-    	connection = connector.getConnection(this.securityContext);
+        
+    	connection = connector.getConnection(this.securityContext, this.isTransactional?requestMsg.getTransactionContext():null);
     }
-            
+    
     protected void process() {
     	DQPWorkContext.setWorkContext(this.requestMsg.getWorkContext());
         ClassLoader contextloader = Thread.currentThread().getContextClassLoader();
@@ -269,7 +275,14 @@
         }        
     }
 
-	protected void sendClose() {
+	private void sendClose() {
+		if (this.isTransactional) {
+			try {
+				manager.getTransactionService().getTransactionManager().suspend();
+			} catch (SystemException e) {
+				LogManager.logWarning(LogConstants.CTX_CONNECTOR, e, e.getMessage());
+			}
+		}
 		AtomicResultsMessage response = new AtomicResultsMessage(this.requestMsg);
 		response.setRequestClosed(true);
 		this.resultsReceiver.receiveResults(response);
@@ -284,7 +297,7 @@
     }
 
 	protected void createExecution() throws MetaMatrixComponentException,
-			ConnectorException, MetaMatrixProcessingException {
+			ConnectorException {
     	LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.requestMsg.getAtomicRequestID(), "Processing NEW request:", this.requestMsg.getCommand()}); //$NON-NLS-1$                                     
 
 		QueryMetadataInterface queryMetadata = new TempMetadataAdapter(manager.getMetadataService().lookupMetadata(this.requestMsg.getWorkContext().getVdbName(), this.requestMsg.getWorkContext().getVdbVersion()), new TempMetadataStore());

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWrapper.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWrapper.java	2009-09-16 17:09:12 UTC (rev 1362)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWrapper.java	2009-09-16 17:35:24 UTC (rev 1363)
@@ -44,10 +44,9 @@
 /**
  * ConnectorWrapper adds default behavior to the wrapped connector.
  */
-public class ConnectorWrapper implements XAConnector, MetadataProvider {
+public class ConnectorWrapper implements MetadataProvider {
 	
 	private Connector actualConnector;
-	private String name;
 	private volatile ConnectorStatus status = ConnectorStatus.UNABLE_TO_CHECK;
 	
 	public ConnectorWrapper(Connector actualConnector){
@@ -55,7 +54,6 @@
 	}
 
 	public void start(ConnectorEnvironment environment) throws ConnectorException {
-		name = environment.getConnectorName();
 		actualConnector.start(environment);
 		int interval = PropertiesUtils.getIntProperty(environment.getProperties(), ConnectionPool.SOURCE_CONNECTION_TEST_INTERVAL, ConnectionPool.DEFAULT_SOURCE_CONNECTION_TEST_INTERVAL);
 		if (interval > 0 && isConnectionTestable()) {
@@ -76,24 +74,22 @@
 		actualConnector.stop();
 	}
 	
-	@Override
-	public final Connection getConnection(ExecutionContext context)
-			throws ConnectorException {
-    	setIdentity(context);
-		return getConnectionDirect(context);
+	public final Connection getConnection(ExecutionContext context, TransactionContext transactionContext)
+	throws ConnectorException {
+		if (context instanceof ExecutionContextImpl && context.getConnectorIdentity() == null) {
+    		((ExecutionContextImpl)context).setConnectorIdentity(createIdentity(context));
+    	}
+		if (transactionContext == null) {
+			return getConnectionDirect(context);
+		}
+		return getXAConnectionDirect(context, transactionContext);
 	}
-
+	
 	protected Connection getConnectionDirect(ExecutionContext context)
 			throws ConnectorException {
 		return actualConnector.getConnection(context);
 	}
 	
-	@Override
-    public final XAConnection getXAConnection( ExecutionContext executionContext, TransactionContext transactionContext) throws ConnectorException {
-    	setIdentity(executionContext);
-		return getXAConnectionDirect(executionContext, transactionContext);
-    }
-
 	protected XAConnection getXAConnectionDirect(ExecutionContext executionContext,
 			TransactionContext transactionContext) throws ConnectorException {
 		if (actualConnector instanceof XAConnector) {
@@ -102,13 +98,6 @@
     	return null;
 	}
 
-	private void setIdentity(ExecutionContext executionContext)
-			throws ConnectorException {
-		if (executionContext instanceof ExecutionContextImpl && executionContext.getConnectorIdentity() == null) {
-    		((ExecutionContextImpl)executionContext).setConnectorIdentity(createIdentity(executionContext));
-    	}
-	}
-	
 	public ConnectorCapabilities getCapabilities() {
 	    return actualConnector.getCapabilities();
 	}
@@ -125,7 +114,7 @@
 		if (supportsSingleIdentity()) {
 			Connection conn = null;
 			try {
-				conn = this.getConnection(null);
+				conn = this.getConnectionDirect(null);
 				return conn.isAlive()?ConnectorStatus.OPEN:ConnectorStatus.DATA_SOURCE_UNAVAILABLE;
 			} catch (ConnectorException e) {
 				return ConnectorStatus.DATA_SOURCE_UNAVAILABLE;
@@ -142,7 +131,6 @@
 		return actualConnector;
 	}
 	
-	@Override
 	public ConnectorIdentity createIdentity(ExecutionContext context)
 			throws ConnectorException {
 		return actualConnector.createIdentity(context);
@@ -156,10 +144,6 @@
 		}
 	}
 	
-	public String getConnectorBindingName() {
-		return this.name;
-	}
-	
 	@Override
 	public void getConnectorMetadata(MetadataFactory metadataFactory)
 			throws ConnectorException {

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/SynchConnectorWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/SynchConnectorWorkItem.java	2009-09-16 17:09:12 UTC (rev 1362)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/SynchConnectorWorkItem.java	2009-09-16 17:35:24 UTC (rev 1363)
@@ -26,19 +26,10 @@
 import java.util.Map;
 import java.util.concurrent.Semaphore;
 
-import javax.transaction.xa.XAResource;
-
-import org.teiid.connector.api.ConnectorException;
-import org.teiid.connector.xa.api.XAConnection;
-
-import com.metamatrix.api.exception.MetaMatrixComponentException;
-import com.metamatrix.api.exception.MetaMatrixProcessingException;
 import com.metamatrix.common.comm.api.ResultsReceiver;
 import com.metamatrix.common.log.LogManager;
-import com.metamatrix.common.xa.XATransactionException;
 import com.metamatrix.dqp.message.AtomicRequestMessage;
 import com.metamatrix.dqp.message.AtomicResultsMessage;
-import com.metamatrix.dqp.service.TransactionService;
 import com.metamatrix.dqp.util.LogConstants;
 
 public class SynchConnectorWorkItem extends ConnectorWorkItem {
@@ -94,45 +85,6 @@
 		this.notify();
 	}
 	
-	@Override
-	protected void createExecution() throws MetaMatrixComponentException,
-			ConnectorException, MetaMatrixProcessingException {
-		super.createExecution();
-		enlistResource();
-	}
-	
-	@Override
-	protected void sendClose() {
-		delistResource();
-		super.sendClose();
-	}
-
-	private void enlistResource() throws ConnectorException,
-			XATransactionException {
-		if (!this.isTransactional || this.connection == null) {
-			return;
-		}
-		LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {
-								"AtomicRequest", id, "enlist(" + requestMsg.getTransactionContext() + ")" }); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ 
-		XAResource xaRes = ((XAConnection) connection).getXAResource();
-		getTransactionServer().enlist(requestMsg.getTransactionContext(), xaRes);
-	}
-
-	private void delistResource() {
-		if (!this.isTransactional || this.connection == null) {
-			return;
-		}
-		try {
-			LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {
-									"AtomicRequest", id, "delist(" + requestMsg.getTransactionContext() + ")" }); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ 
-			XAResource xaRes = ((XAConnection) connection).getXAResource();
-			getTransactionServer().delist(requestMsg.getTransactionContext(),
-					xaRes, XAResource.TMSUCCESS);
-		} catch (Throwable e) {
-			LogManager.logWarning(LogConstants.CTX_CONNECTOR, e.getMessage());
-		}
-	}
-
 	private void acquireTransactionLock() throws InterruptedException {
 		if (!this.isTransactional) {
 			return;
@@ -167,10 +119,6 @@
 		this.lock = null;
 	}
 	
-    private TransactionService getTransactionServer() {
-        return manager.getTransactionService();
-    }
-    
     @Override
     protected boolean dataNotAvailable(long delay) {
 		LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/PooledConnector.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/PooledConnector.java	2009-09-16 17:09:12 UTC (rev 1362)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/PooledConnector.java	2009-09-16 17:35:24 UTC (rev 1363)
@@ -22,6 +22,7 @@
 
 package org.teiid.dqp.internal.pooling.connector;
 
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -31,6 +32,7 @@
 import javax.transaction.RollbackException;
 import javax.transaction.Synchronization;
 import javax.transaction.SystemException;
+import javax.transaction.xa.XAResource;
 
 import org.teiid.connector.api.Connection;
 import org.teiid.connector.api.Connector;
@@ -41,9 +43,15 @@
 import org.teiid.connector.xa.api.XAConnection;
 import org.teiid.connector.xa.api.XAConnector;
 import org.teiid.dqp.internal.datamgr.impl.ConnectorWrapper;
+import org.teiid.dqp.internal.transaction.TransactionProvider;
 
 import com.metamatrix.admin.objects.MMConnectionPool;
+import com.metamatrix.common.log.LogManager;
+import com.metamatrix.common.xa.XATransactionException;
+import com.metamatrix.dqp.DQPPlugin;
 import com.metamatrix.dqp.service.ConnectorStatus;
+import com.metamatrix.dqp.service.TransactionService;
+import com.metamatrix.dqp.util.LogConstants;
 
 
 /**
@@ -93,14 +101,17 @@
 	
 	private Map<String, ConnectionWrapper> idToConnections = Collections.synchronizedMap(new HashMap<String, ConnectionWrapper>());
 	private ConnectorEnvironment environment;
+	private TransactionService transactionService;
+	private String name;
 	
-	public PooledConnector(Connector actualConnector) {
+	public PooledConnector(String name, Connector actualConnector, TransactionService transactionService) {
 		super(actualConnector);
 		pool = new ConnectionPool(this);
-		
+		this.transactionService = transactionService;
 		if (actualConnector instanceof XAConnector) {
 			xaPool = new ConnectionPool(this);
 		}
+		this.name = name;
 	}
 
 	@Override
@@ -112,6 +123,39 @@
 			xaPool.initialize(environment);
 		}
 		super.start(environment);
+		if (this.transactionService != null) {
+            if (this.supportsSingleIdentity()) {
+            	// add this connector as the recovery source
+                transactionService.registerRecoverySource(this.name, new TransactionProvider.XAConnectionSource() {
+                	XAConnection conn = null;
+                	
+                	@Override
+                	public XAResource getXAResource() throws SQLException {
+                		if (conn == null) {
+                			try {
+								conn = getXAConnectionDirect(null, null);
+							} catch (ConnectorException e) {
+								throw new SQLException(e);
+							}
+                		}
+                		try {
+							return conn.getXAResource();
+						} catch (ConnectorException e) {
+							throw new SQLException(e);
+						}
+                	}
+                	
+                	@Override
+                	public void close() {
+                		if (conn != null) {
+                			conn.close();
+                		}
+                	}
+                });
+            } else {
+            	LogManager.logWarning(LogConstants.CTX_CONNECTOR, DQPPlugin.Util.getString("ConnectorManager.cannot_add_to_recovery", this.name)); //$NON-NLS-1$	                
+            }
+        }
 	}
 	
 	@Override
@@ -153,6 +197,13 @@
         	if (environment.getLogger().isTraceEnabled()) {
         		environment.getLogger().logTrace("Obtained new connection for transaction " + transactionContext.getTxnID()); //$NON-NLS-1$
         	}
+    		XAResource xaRes = conn.getXAResource();
+    		try {
+				transactionService.enlist(transactionContext, xaRes);
+			} catch (XATransactionException e) {
+				conn.close();
+                throw new ConnectorException(e);
+			}
             
             try { //add a synchronization to remove the map entry
                 transactionContext.getTransaction().registerSynchronization(new RemovalCallback(transactionContext, conn));

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionProvider.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionProvider.java	2009-09-16 17:09:12 UTC (rev 1362)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionProvider.java	2009-09-16 17:35:24 UTC (rev 1363)
@@ -26,6 +26,7 @@
 import java.util.Properties;
 
 import javax.resource.spi.XATerminator;
+import javax.transaction.SystemException;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
 import javax.transaction.xa.XAException;
@@ -51,7 +52,7 @@
 
     TransactionManager getTransactionManager();
 
-    Transaction importTransaction(MMXid xid, int timeout) throws XAException;
+    Transaction importTransaction(MMXid xid, int timeout) throws XAException, SystemException;
     
     String getTransactionID(Transaction tx);
     

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java	2009-09-16 17:09:12 UTC (rev 1362)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java	2009-09-16 17:35:24 UTC (rev 1363)
@@ -249,7 +249,9 @@
                     tx = this.provider.importTransaction(xid, timeout);
                 } catch (XAException err) {
                     throw new XATransactionException(err);
-                } 
+                } catch (SystemException err) {
+                	throw new XATransactionException(err);
+                }
                 
                 try {
                     tx.registerSynchronization(new Synchronization() {
@@ -444,7 +446,7 @@
         }
     }
 
-    private TransactionManager getTransactionManager() {
+    public TransactionManager getTransactionManager() {
         return provider.getTransactionManager();
     }
 
@@ -520,53 +522,20 @@
         return context;
     }
 
-    public TransactionContext delist(TransactionContext context,
-                                     XAResource resource,
-                                     int flags) throws XATransactionException {
-        TransactionManager tm = getTransactionManager();
-        TransactionContextImpl tc = (TransactionContextImpl)context;
-        
-        try {
-            Transaction tx = tm.getTransaction();
-            if (!tx.equals(context.getTransaction())) {
-                throw new XATransactionException(context.getTransaction() + " != " + tx); //$NON-NLS-1$
-            }
-    
-            // intermediate suspend/success is not necessary because we hold the connector connection
-            // for the duration of the transaction. However, we want to suspend because 
-            // ConnectorWorker thread needs to be disassociated.
-        } catch (SystemException err) {
-            throw new XATransactionException(err);
-        } catch (IllegalStateException err) {
-            throw new XATransactionException(err);
-        } finally {
-            try {
-                tm.suspend();
-            } catch (SystemException err) {
-                throw new XATransactionException(err);
-            }
-        }
-        return tc;
-    }
-
     public TransactionContext enlist(TransactionContext context,
                                      XAResource resource) throws XATransactionException {
         TransactionManager tm = getTransactionManager();
         TransactionContextImpl tc = (TransactionContextImpl)context;
         
         try {
-            if (tc.getTransactionTimeout() > 0) {
-                if (tc.getTransactionTimeout() != resource.getTransactionTimeout()) {
-                    resource.setTransactionTimeout(tc.getTransactionTimeout());
-                }
+            if (tc.getTransactionTimeout() > 0 && tc.getTransactionTimeout() != resource.getTransactionTimeout()) {
+                resource.setTransactionTimeout(tc.getTransactionTimeout());
             }
             Transaction tx = tm.getTransaction();
             if (tx == null) {
                 tm.resume(context.getTransaction());
-            } else {
-                if (!tx.equals(context.getTransaction())) {
-                    throw new XATransactionException(context.getTransaction() + " != " + tx); //$NON-NLS-1$
-                }
+            } else if (!tx.equals(context.getTransaction())) {
+                throw new XATransactionException(context.getTransaction() + " != " + tx); //$NON-NLS-1$
             }
 
             if (!context.getTransaction().enlistResource(resource)) {

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestPooledConnector.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestPooledConnector.java	2009-09-16 17:09:12 UTC (rev 1362)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestPooledConnector.java	2009-09-16 17:35:24 UTC (rev 1363)
@@ -31,28 +31,29 @@
 
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.teiid.connector.api.Connection;
 import org.teiid.connector.api.ConnectorLogger;
 import org.teiid.connector.api.ExecutionContext;
 import org.teiid.connector.xa.api.TransactionContext;
-import org.teiid.connector.xa.api.XAConnection;
 import org.teiid.connector.xa.api.XAConnector;
 import org.teiid.dqp.internal.datamgr.impl.ConnectorEnvironmentImpl;
 
 import com.metamatrix.admin.objects.MMConnectionPool;
 import com.metamatrix.common.application.ApplicationEnvironment;
+import com.metamatrix.dqp.service.TransactionService;
 
 public class TestPooledConnector {
 	
 	@Test public void testGetXAConnection() throws Exception {
 		XAConnector connector = Mockito.mock(XAConnector.class);
-		PooledConnector pc = new PooledConnector(connector);
+		PooledConnector pc = new PooledConnector("foo", connector, Mockito.mock(TransactionService.class)); //$NON-NLS-1$
 		pc.start(new ConnectorEnvironmentImpl(new Properties(), Mockito.mock(ConnectorLogger.class), new ApplicationEnvironment()));
 		TransactionContext tc = Mockito.mock(TransactionContext.class);
 		Mockito.stub(tc.getTransaction()).toReturn(Mockito.mock(Transaction.class));
 		Mockito.stub(tc.getTxnID()).toReturn("1"); //$NON-NLS-1$
-		XAConnection conn = pc.getXAConnection(Mockito.mock(ExecutionContext.class), tc);
+		Connection conn = pc.getConnection(Mockito.mock(ExecutionContext.class), tc);
 		conn.close();
-		XAConnection conn1 = pc.getXAConnection(Mockito.mock(ExecutionContext.class), tc);
+		Connection conn1 = pc.getConnection(Mockito.mock(ExecutionContext.class), tc);
 		assertSame(conn, conn1);
 		
 		List<MMConnectionPool> stats = pc.getConnectionPoolStats();

Modified: trunk/txn-jbossts/src/main/java/com/arjuna/ats/internal/jta/transaction/arjunacore/jca/XATerminatorImple.java
===================================================================
--- trunk/txn-jbossts/src/main/java/com/arjuna/ats/internal/jta/transaction/arjunacore/jca/XATerminatorImple.java	2009-09-16 17:09:12 UTC (rev 1362)
+++ trunk/txn-jbossts/src/main/java/com/arjuna/ats/internal/jta/transaction/arjunacore/jca/XATerminatorImple.java	2009-09-16 17:35:24 UTC (rev 1363)
@@ -58,7 +58,7 @@
  */
 
 /*
- * See JBTM-456
+ * To be removed after 4.5 See JBTM-456
  */
 public class XATerminatorImple implements javax.resource.spi.XATerminator
 {

Modified: trunk/txn-jbossts/src/main/java/com/arjuna/ats/internal/jta/transaction/arjunacore/subordinate/SubordinateAtomicAction.java
===================================================================
--- trunk/txn-jbossts/src/main/java/com/arjuna/ats/internal/jta/transaction/arjunacore/subordinate/SubordinateAtomicAction.java	2009-09-16 17:09:12 UTC (rev 1362)
+++ trunk/txn-jbossts/src/main/java/com/arjuna/ats/internal/jta/transaction/arjunacore/subordinate/SubordinateAtomicAction.java	2009-09-16 17:35:24 UTC (rev 1363)
@@ -42,7 +42,7 @@
  */
 
 /*
- * see JBTM-457
+ * To be removed after 4.5 see JBTM-457
  */
 public class SubordinateAtomicAction extends
         com.arjuna.ats.internal.jta.transaction.arjunacore.AtomicAction



More information about the teiid-commits mailing list