Author: clebert.suconic(a)jboss.com
Date: 2010-06-15 13:43:54 -0400 (Tue, 15 Jun 2010)
New Revision: 9328
Modified:
trunk/src/main/org/hornetq/ra/HornetQRAConnectionRequestInfo.java
trunk/src/main/org/hornetq/ra/HornetQRALocalTransaction.java
trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java
trunk/src/main/org/hornetq/ra/HornetQRAProperties.java
trunk/src/main/org/hornetq/ra/HornetQRASession.java
trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java
Log:
Reverting RA to use a single Session
Modified: trunk/src/main/org/hornetq/ra/HornetQRAConnectionRequestInfo.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAConnectionRequestInfo.java 2010-06-15 16:37:03
UTC (rev 9327)
+++ trunk/src/main/org/hornetq/ra/HornetQRAConnectionRequestInfo.java 2010-06-15 17:43:54
UTC (rev 9328)
@@ -43,6 +43,9 @@
/** The client id */
private String clientID;
+ /** Use XA */
+ private boolean useXA;
+
/** The type */
private final int type;
@@ -67,6 +70,7 @@
userName = prop.getUserName();
password = prop.getPassword();
clientID = prop.getClientID();
+ useXA = prop.isUseXA();
this.type = type;
transacted = true;
acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
@@ -134,6 +138,7 @@
{
clientID = prop.getClientID();
}
+ useXA = prop.isUseXA();
}
/**
@@ -235,6 +240,20 @@
}
/**
+ * Use XA communication
+ * @return True if XA; otherwise false
+ */
+ public boolean isUseXA()
+ {
+ if (HornetQRAConnectionRequestInfo.trace)
+ {
+ HornetQRAConnectionRequestInfo.log.trace("isUseXA() " + useXA);
+ }
+
+ return useXA;
+ }
+
+ /**
* Use transactions
* @return True if transacted; otherwise false
*/
@@ -286,6 +305,7 @@
return Util.compare(userName, you.getUserName()) &&
Util.compare(password, you.getPassword()) &&
Util.compare(clientID, you.getClientID()) &&
type == you.getType() &&
+ useXA == you.isUseXA() &&
transacted == you.isTransacted() &&
acknowledgeMode == you.getAcknowledgeMode();
}
@@ -312,6 +332,7 @@
hash += 31 * hash + (userName != null ? userName.hashCode() : 0);
hash += 31 * hash + (password != null ? password.hashCode() : 0);
hash += 31 * hash + Integer.valueOf(type).hashCode();
+ hash += 31 * hash + (useXA ? 1 : 0);
hash += 31 * hash + (transacted ? 1 : 0);
hash += 31 * hash + Integer.valueOf(acknowledgeMode).hashCode();
@@ -322,7 +343,7 @@
public String toString()
{
return "HornetQRAConnectionRequestInfo[type=" + type +
- ", transacted=" + transacted + ", acknowledgeMode=" +
acknowledgeMode +
+ ", useXA=" + useXA + ", transacted=" + transacted + ",
acknowledgeMode=" + acknowledgeMode +
", clientID=" + clientID + ", userName=" + userName +
", password=" + password + "]";
}
}
Modified: trunk/src/main/org/hornetq/ra/HornetQRALocalTransaction.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRALocalTransaction.java 2010-06-15 16:37:03 UTC
(rev 9327)
+++ trunk/src/main/org/hornetq/ra/HornetQRALocalTransaction.java 2010-06-15 17:43:54 UTC
(rev 9328)
@@ -62,7 +62,7 @@
HornetQRALocalTransaction.log.trace("begin()");
}
- mc.setInManagedTx(true);
+ // mc.setInManagedTx(true);
}
/**
@@ -90,7 +90,7 @@
}
finally
{
- mc.setInManagedTx(false);
+ //mc.setInManagedTx(false);
mc.unlock();
}
}
@@ -120,7 +120,7 @@
}
finally
{
- mc.setInManagedTx(false);
+ //mc.setInManagedTx(false);
mc.unlock();
}
}
Modified: trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java 2010-06-15 16:37:03 UTC
(rev 9327)
+++ trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java 2010-06-15 17:43:54 UTC
(rev 9328)
@@ -26,12 +26,19 @@
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
import javax.jms.ResourceAllocationException;
import javax.jms.Session;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
import javax.jms.XAConnection;
import javax.jms.XAQueueConnection;
+import javax.jms.XAQueueSession;
import javax.jms.XASession;
import javax.jms.XATopicConnection;
+import javax.jms.XATopicSession;
+import javax.resource.NotSupportedException;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionEvent;
import javax.resource.spi.ConnectionEventListener;
@@ -92,11 +99,20 @@
// Physical JMS connection stuff
private Connection connection;
- // auto-commit session, used outside XA or Local transaction
+ private XAConnection xaConnection;
+
private Session session;
+ private TopicSession topicSession;
+
+ private QueueSession queueSession;
+
private XASession xaSession;
+ private XATopicSession xaTopicSession;
+
+ private XAQueueSession xaQueueSession;
+
private XAResource xaResource;
private final TransactionManager tm;
@@ -130,8 +146,13 @@
handles = Collections.synchronizedSet(new HashSet<HornetQRASession>());
connection = null;
+ xaConnection = null;
session = null;
+ topicSession = null;
+ queueSession = null;
xaSession = null;
+ xaTopicSession = null;
+ xaQueueSession = null;
xaResource = null;
try
@@ -202,6 +223,11 @@
try
{
+ if (xaConnection != null)
+ {
+ xaConnection.stop();
+ }
+
if (connection != null)
{
connection.stop();
@@ -231,7 +257,7 @@
HornetQRAManagedConnection.log.trace("destroy()");
}
- if (isDestroyed.get() || connection == null)
+ if (isDestroyed.get() || xaConnection == null && connection == null)
{
return;
}
@@ -240,7 +266,14 @@
try
{
- connection.setExceptionListener(null);
+ if (xaConnection != null)
+ {
+ xaConnection.setExceptionListener(null);
+ }
+ else
+ {
+ connection.setExceptionListener(null);
+ }
}
catch (JMSException e)
{
@@ -253,6 +286,26 @@
{
try
{
+ if (topicSession != null)
+ {
+ topicSession.close();
+ }
+
+ if (xaTopicSession != null)
+ {
+ xaTopicSession.close();
+ }
+
+ if (queueSession != null)
+ {
+ queueSession.close();
+ }
+
+ if (xaQueueSession != null)
+ {
+ xaQueueSession.close();
+ }
+
if (session != null)
{
session.close();
@@ -272,10 +325,15 @@
{
connection.close();
}
+
+ if (xaConnection != null)
+ {
+ xaConnection.close();
+ }
}
catch (Throwable e)
{
- throw new ResourceException("Could not properly close the transactedSession
and connection", e);
+ throw new ResourceException("Could not properly close the session and
connection", e);
}
}
@@ -330,7 +388,7 @@
throw new IllegalStateException("ManagedConnection in an illegal
state");
}
}
-
+
public void checkTransactionActive() throws JMSException
{
// don't bother looking at the transaction if there's an active XID
@@ -360,6 +418,7 @@
}
}
+
/**
* Aqquire a lock on the managed connection
*/
@@ -456,13 +515,29 @@
HornetQRAManagedConnection.log.trace("getXAResource()");
}
+ if (xaConnection == null)
+ {
+ throw new NotSupportedException("Non XA transaction not supported");
+ }
+
//
// Spec says a mc must allways return the same XA resource,
// so we cache it.
//
if (xaResource == null)
{
- xaResource = xaSession.getXAResource();
+ if (xaTopicSession != null)
+ {
+ xaResource = xaTopicSession.getXAResource();
+ }
+ else if (xaQueueSession != null)
+ {
+ xaResource = xaQueueSession.getXAResource();
+ }
+ else
+ {
+ xaResource = xaSession.getXAResource();
+ }
}
if (HornetQRAManagedConnection.trace)
@@ -569,7 +644,14 @@
try
{
- connection.setExceptionListener(null);
+ if (xaConnection != null)
+ {
+ xaConnection.setExceptionListener(null);
+ }
+ else
+ {
+ connection.setExceptionListener(null);
+ }
}
catch (JMSException e)
{
@@ -581,13 +663,74 @@
}
/**
+ * Is managed connection running in XA mode
+ * @return True if XA; otherwise false
+ */
+ protected boolean isXA()
+ {
+ if (HornetQRAManagedConnection.trace)
+ {
+ HornetQRAManagedConnection.log.trace("isXA()");
+ }
+
+ return xaConnection != null;
+ }
+
+ /**
+ * Get the XA session for this connection.
+ * @return The XA session
+ */
+ protected XASession getXASession()
+ {
+ if (HornetQRAManagedConnection.trace)
+ {
+ HornetQRAManagedConnection.log.trace("getXASession()");
+ }
+
+ if (isXA())
+ {
+ if (xaTopicSession != null)
+ {
+ return xaTopicSession;
+ }
+ else if (xaQueueSession != null)
+ {
+ return xaQueueSession;
+ }
+ else
+ {
+ return xaSession;
+ }
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ /**
* Get the session for this connection.
* @return The session
- * @throws JMSException
*/
- protected Session getSession() throws JMSException
+ protected Session getSession()
{
- return session;
+ if (HornetQRAManagedConnection.trace)
+ {
+ HornetQRAManagedConnection.log.trace("getSession()");
+ }
+
+ if (topicSession != null)
+ {
+ return topicSession;
+ }
+ else if (queueSession != null)
+ {
+ return queueSession;
+ }
+ else
+ {
+ return session;
+ }
}
/**
@@ -693,6 +836,11 @@
{
connection.start();
}
+
+ if (xaConnection != null)
+ {
+ xaConnection.start();
+ }
}
/**
@@ -706,6 +854,11 @@
HornetQRAManagedConnection.log.trace("stop()");
}
+ if (xaConnection != null)
+ {
+ xaConnection.stop();
+ }
+
if (connection != null)
{
connection.stop();
@@ -744,51 +897,105 @@
if (cri.getType() == HornetQRAConnectionFactory.TOPIC_CONNECTION)
{
- if (userName != null && password != null)
+ if (cri.isUseXA())
{
- connection =
mcf.getHornetQConnectionFactory().createXATopicConnection(userName, password);
+ if (userName != null && password != null)
+ {
+ xaConnection =
mcf.getHornetQConnectionFactory().createXATopicConnection(userName, password);
+ }
+ else
+ {
+ xaConnection =
mcf.getHornetQConnectionFactory().createXATopicConnection();
+ }
+
+ xaConnection.setExceptionListener(this);
+
+ xaTopicSession =
((XATopicConnection)xaConnection).createXATopicSession();
+ topicSession = xaTopicSession.getTopicSession();
}
else
{
- connection = mcf.getHornetQConnectionFactory().createXATopicConnection();
- }
+ if (userName != null && password != null)
+ {
+ connection =
mcf.getHornetQConnectionFactory().createTopicConnection(userName, password);
+ }
+ else
+ {
+ connection =
mcf.getHornetQConnectionFactory().createTopicConnection();
+ }
- connection.setExceptionListener(this);
+ connection.setExceptionListener(this);
- xaSession = ((XATopicConnection)connection).createXATopicSession();
- session = xaSession.getSession();
+ topicSession =
((TopicConnection)connection).createTopicSession(transacted, acknowledgeMode);
+ }
}
else if (cri.getType() == HornetQRAConnectionFactory.QUEUE_CONNECTION)
{
- if (userName != null && password != null)
+ if (cri.isUseXA())
{
- connection =
mcf.getHornetQConnectionFactory().createXAQueueConnection(userName, password);
+ if (userName != null && password != null)
+ {
+ xaConnection =
mcf.getHornetQConnectionFactory().createXAQueueConnection(userName, password);
+ }
+ else
+ {
+ xaConnection =
mcf.getHornetQConnectionFactory().createXAQueueConnection();
+ }
+
+ xaConnection.setExceptionListener(this);
+
+ xaQueueSession =
((XAQueueConnection)xaConnection).createXAQueueSession();
+ queueSession = xaQueueSession.getQueueSession();
}
else
{
- connection = mcf.getHornetQConnectionFactory().createXAQueueConnection();
- }
+ if (userName != null && password != null)
+ {
+ connection =
mcf.getHornetQConnectionFactory().createQueueConnection(userName, password);
+ }
+ else
+ {
+ connection =
mcf.getHornetQConnectionFactory().createQueueConnection();
+ }
- connection.setExceptionListener(this);
+ connection.setExceptionListener(this);
- xaSession = ((XAQueueConnection)connection).createXAQueueSession();
- session = xaSession.getSession();
+ queueSession =
((QueueConnection)connection).createQueueSession(transacted, acknowledgeMode);
+ }
}
else
{
- if (userName != null && password != null)
+ if (cri.isUseXA())
{
- connection =
mcf.getHornetQConnectionFactory().createXAConnection(userName, password);
+ if (userName != null && password != null)
+ {
+ xaConnection =
mcf.getHornetQConnectionFactory().createXAConnection(userName, password);
+ }
+ else
+ {
+ xaConnection = mcf.getHornetQConnectionFactory().createXAConnection();
+ }
+
+ xaConnection.setExceptionListener(this);
+
+ xaSession = xaConnection.createXASession();
+ session = xaSession.getSession();
}
else
{
- connection = mcf.getHornetQConnectionFactory().createXAConnection();
- }
+ if (userName != null && password != null)
+ {
+ connection =
mcf.getHornetQConnectionFactory().createConnection(userName, password);
+ }
+ else
+ {
+ connection = mcf.getHornetQConnectionFactory().createConnection();
+ }
- connection.setExceptionListener(this);
+ connection.setExceptionListener(this);
- xaSession = ((XAConnection)connection).createXASession();
- session = xaSession.getSession();
+ session = connection.createSession(transacted, acknowledgeMode);
+ }
}
}
catch (JMSException je)
@@ -796,31 +1003,10 @@
throw new ResourceException(je.getMessage(), je);
}
}
-
- private boolean isManagedTx()
- {
- return inManagedTx || isXA();
- }
-
- /**
- * @return
- * @throws SystemException
- */
- private boolean isXA()
- {
- try
- {
- return (tm != null && tm.getTransaction() != null);
- }
- catch (Exception e)
- {
- log.warn(e.getMessage(), e);
- return false;
- }
- }
-
+
protected void setInManagedTx(boolean inManagedTx)
{
this.inManagedTx = inManagedTx;
}
+
}
Modified: trunk/src/main/org/hornetq/ra/HornetQRAProperties.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAProperties.java 2010-06-15 16:37:03 UTC (rev
9327)
+++ trunk/src/main/org/hornetq/ra/HornetQRAProperties.java 2010-06-15 17:43:54 UTC (rev
9328)
@@ -41,6 +41,9 @@
/** The password */
private String password;
+ /** Use XA */
+ private Boolean useXA;
+
/** Use Local TX instead of XA */
private Boolean localTx = false;
@@ -169,11 +172,53 @@
}
+
+ /**
+ * Get the use XA flag
+ * @return The value
+ */
+ public Boolean getUseXA()
+ {
+ if (HornetQRAProperties.trace)
+ {
+ HornetQRAProperties.log.trace("getUseXA()");
+ }
+
+ return useXA;
+ }
+
+ /**
+ * Set the use XA flag
+ * @param xa The value
+ */
+ public void setUseXA(final Boolean xa)
+ {
+ if (HornetQRAProperties.trace)
+ {
+ HornetQRAProperties.log.trace("setUseXA(" + xa + ")");
+ }
+
+ useXA = xa;
+ }
+
+ /**
+ * Use XA for communication
+ * @return The value
+ */
+ public boolean isUseXA()
+ {
+ if (HornetQRAProperties.trace)
+ {
+ HornetQRAProperties.log.trace("isUseXA()");
+ }
+
+ return useXA != null && useXA;
+ }
@Override
public String toString()
{
- return "HornetQRAProperties[localTx=" + localTx +
+ return "HornetQRAProperties[useXA=" + useXA + ", localTx=" +
localTx +
", userName=" + userName + ", password=" + password +
"]";
}
}
Modified: trunk/src/main/org/hornetq/ra/HornetQRASession.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRASession.java 2010-06-15 16:37:03 UTC (rev
9327)
+++ trunk/src/main/org/hornetq/ra/HornetQRASession.java 2010-06-15 17:43:54 UTC (rev
9328)
@@ -47,10 +47,7 @@
import javax.jms.XAQueueSession;
import javax.jms.XASession;
import javax.jms.XATopicSession;
-import javax.resource.ResourceException;
import javax.resource.spi.ConnectionEvent;
-import javax.transaction.RollbackException;
-import javax.transaction.SystemException;
import javax.transaction.xa.XAResource;
import org.hornetq.core.logging.Logger;
@@ -1217,7 +1214,9 @@
{
lock();
- return getXAResourceInternal();
+ XASession session = getXASessionInternal();
+
+ return session.getXAResource();
}
catch (Throwable t)
{
@@ -1552,35 +1551,26 @@
}
/**
- * Get the XA resource and ensure that it is open
- * @return The XA Resource
+ * Get the XA session and ensure that it is open
+ * @return The session
* @exception JMSException Thrown if an error occurs
* @exception IllegalStateException The session is closed
*/
- XAResource getXAResourceInternal() throws JMSException
+ XASession getXASessionInternal() throws JMSException
{
if (mc == null)
{
throw new IllegalStateException("The session is closed");
}
- try
- {
- XAResource xares = mc.getXAResource();
+ XASession session = mc.getXASession();
- if (HornetQRASession.trace)
- {
- HornetQRASession.log.trace("getXAResourceInternal " + xares +
" for " + this);
- }
-
- return xares;
- }
- catch (ResourceException e)
+ if (HornetQRASession.trace)
{
- JMSException jmse = new JMSException("Unable to get XA Resource");
- jmse.initCause(e);
- throw jmse;
+ HornetQRASession.log.trace("getXASessionInternal " + session + "
for " + this);
}
+
+ return session;
}
/**
Modified: trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2010-06-15 16:37:03 UTC (rev
9327)
+++ trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2010-06-15 17:43:54 UTC (rev
9328)
@@ -1196,8 +1196,37 @@
raProperties.setUseLocalTx(localTx);
}
+ /**
+ * Get the use XA flag
+ *
+ * @return The value
+ */
+ public Boolean getUseXA()
+ {
+ if (HornetQResourceAdapter.trace)
+ {
+ HornetQResourceAdapter.log.trace("getUseXA()");
+ }
+ return raProperties.getUseXA();
+ }
+
/**
+ * Set the use XA flag
+ *
+ * @param xa The value
+ */
+ public void setUseXA(final Boolean xa)
+ {
+ if (HornetQResourceAdapter.trace)
+ {
+ HornetQResourceAdapter.log.trace("setUseXA(" + xa + ")");
+ }
+
+ raProperties.setUseXA(xa);
+ }
+
+ /**
* Indicates whether some other object is "equal to" this one.
*
* @param obj Object with which to compare
@@ -1339,13 +1368,10 @@
*/
protected void setup() throws HornetQException
{
-
-
defaultHornetQConnectionFactory = createHornetQConnectionFactory(raProperties);
sessionFactory = defaultHornetQConnectionFactory.getCoreFactory();
}
-
public HornetQConnectionFactory getDefaultHornetQConnectionFactory() throws
ResourceException
{
if (!configured.getAndSet(true))
@@ -1384,13 +1410,13 @@
: new
TransportConfiguration(backUpCOnnectorClassname,
backupConnectionParams);
- cf = HornetQJMSClient.createConnectionFactory(transportConf, backup);
+ cf = (HornetQConnectionFactory)
HornetQJMSClient.createConnectionFactory(transportConf, backup);
}
else if (discoveryAddress != null)
{
Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ?
overrideProperties.getDiscoveryPort()
:
getDiscoveryPort();
- cf = HornetQJMSClient.createConnectionFactory(discoveryAddress, discoveryPort);
+ cf = (HornetQConnectionFactory)
HornetQJMSClient.createConnectionFactory(discoveryAddress, discoveryPort);
}
else
{
@@ -1399,7 +1425,6 @@
setParams(cf, overrideProperties);
return cf;
}
-
public Map<String, Object> overrideConnectionParameters(final Map<String,
Object> connectionParams,
final Map<String,
Object> overrideConnectionParams)
{