Author: clebert.suconic(a)jboss.com
Date: 2010-06-15 15:39:48 -0400 (Tue, 15 Jun 2010)
New Revision: 9330
Modified:
trunk/src/main/org/hornetq/ra/HornetQRAConnectionRequestInfo.java
trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java
trunk/src/main/org/hornetq/ra/HornetQRASession.java
trunk/src/main/org/hornetq/ra/HornetQRAXAResource.java
Log:
fixing the RA
Modified: trunk/src/main/org/hornetq/ra/HornetQRAConnectionRequestInfo.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAConnectionRequestInfo.java 2010-06-15 17:45:45
UTC (rev 9329)
+++ trunk/src/main/org/hornetq/ra/HornetQRAConnectionRequestInfo.java 2010-06-15 19:39:48
UTC (rev 9330)
@@ -43,9 +43,6 @@
/** The client id */
private String clientID;
- /** Use XA */
- private boolean useXA;
-
/** The type */
private final int type;
@@ -70,7 +67,6 @@
userName = prop.getUserName();
password = prop.getPassword();
clientID = prop.getClientID();
- useXA = prop.isUseXA();
this.type = type;
transacted = true;
acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
@@ -138,7 +134,6 @@
{
clientID = prop.getClientID();
}
- useXA = prop.isUseXA();
}
/**
@@ -240,20 +235,6 @@
}
/**
- * Use XA communication
- * @return True if XA; otherwise false
- */
- public boolean isUseXA()
- {
- if (HornetQRAConnectionRequestInfo.trace)
- {
- HornetQRAConnectionRequestInfo.log.trace("isUseXA() " + useXA);
- }
-
- return useXA;
- }
-
- /**
* Use transactions
* @return True if transacted; otherwise false
*/
@@ -305,7 +286,6 @@
return Util.compare(userName, you.getUserName()) &&
Util.compare(password, you.getPassword()) &&
Util.compare(clientID, you.getClientID()) &&
type == you.getType() &&
- useXA == you.isUseXA() &&
transacted == you.isTransacted() &&
acknowledgeMode == you.getAcknowledgeMode();
}
@@ -332,7 +312,6 @@
hash += 31 * hash + (userName != null ? userName.hashCode() : 0);
hash += 31 * hash + (password != null ? password.hashCode() : 0);
hash += 31 * hash + Integer.valueOf(type).hashCode();
- hash += 31 * hash + (useXA ? 1 : 0);
hash += 31 * hash + (transacted ? 1 : 0);
hash += 31 * hash + Integer.valueOf(acknowledgeMode).hashCode();
@@ -343,7 +322,7 @@
public String toString()
{
return "HornetQRAConnectionRequestInfo[type=" + type +
- ", useXA=" + useXA + ", transacted=" + transacted + ",
acknowledgeMode=" + acknowledgeMode +
+ ", transacted=" + transacted + ", acknowledgeMode=" +
acknowledgeMode +
", clientID=" + clientID + ", userName=" + userName +
", password=" + password + "]";
}
}
Modified: trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java 2010-06-15 17:45:45 UTC
(rev 9329)
+++ trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java 2010-06-15 19:39:48 UTC
(rev 9330)
@@ -27,18 +27,13 @@
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
import javax.jms.ResourceAllocationException;
import javax.jms.Session;
import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
import javax.jms.XAConnection;
import javax.jms.XAQueueConnection;
-import javax.jms.XAQueueSession;
import javax.jms.XASession;
import javax.jms.XATopicConnection;
-import javax.jms.XATopicSession;
-import javax.resource.NotSupportedException;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionEvent;
import javax.resource.spi.ConnectionEventListener;
@@ -99,20 +94,10 @@
// Physical JMS connection stuff
private Connection connection;
- private XAConnection xaConnection;
-
private Session session;
- private TopicSession topicSession;
-
- private QueueSession queueSession;
-
private XASession xaSession;
- private XATopicSession xaTopicSession;
-
- private XAQueueSession xaQueueSession;
-
private XAResource xaResource;
private final TransactionManager tm;
@@ -146,13 +131,8 @@
handles = Collections.synchronizedSet(new HashSet<HornetQRASession>());
connection = null;
- xaConnection = null;
session = null;
- topicSession = null;
- queueSession = null;
xaSession = null;
- xaTopicSession = null;
- xaQueueSession = null;
xaResource = null;
try
@@ -223,11 +203,6 @@
try
{
- if (xaConnection != null)
- {
- xaConnection.stop();
- }
-
if (connection != null)
{
connection.stop();
@@ -257,7 +232,7 @@
HornetQRAManagedConnection.log.trace("destroy()");
}
- if (isDestroyed.get() || xaConnection == null && connection == null)
+ if (isDestroyed.get() || connection == null)
{
return;
}
@@ -266,14 +241,7 @@
try
{
- if (xaConnection != null)
- {
- xaConnection.setExceptionListener(null);
- }
- else
- {
- connection.setExceptionListener(null);
- }
+ connection.setExceptionListener(null);
}
catch (JMSException e)
{
@@ -286,26 +254,6 @@
{
try
{
- if (topicSession != null)
- {
- topicSession.close();
- }
-
- if (xaTopicSession != null)
- {
- xaTopicSession.close();
- }
-
- if (queueSession != null)
- {
- queueSession.close();
- }
-
- if (xaQueueSession != null)
- {
- xaQueueSession.close();
- }
-
if (session != null)
{
session.close();
@@ -325,11 +273,6 @@
{
connection.close();
}
-
- if (xaConnection != null)
- {
- xaConnection.close();
- }
}
catch (Throwable e)
{
@@ -357,6 +300,8 @@
inManagedTx = false;
+ inManagedTx = false;
+
// I'm recreating the lock object when we return to the pool
// because it looks too nasty to expect the connection handle
// to unlock properly in certain race conditions
@@ -515,29 +460,13 @@
HornetQRAManagedConnection.log.trace("getXAResource()");
}
- if (xaConnection == null)
- {
- throw new NotSupportedException("Non XA transaction not supported");
- }
-
//
// Spec says a mc must allways return the same XA resource,
// so we cache it.
//
if (xaResource == null)
{
- if (xaTopicSession != null)
- {
- xaResource = xaTopicSession.getXAResource();
- }
- else if (xaQueueSession != null)
- {
- xaResource = xaQueueSession.getXAResource();
- }
- else
- {
xaResource = xaSession.getXAResource();
- }
}
if (HornetQRAManagedConnection.trace)
@@ -644,14 +573,7 @@
try
{
- if (xaConnection != null)
- {
- xaConnection.setExceptionListener(null);
- }
- else
- {
- connection.setExceptionListener(null);
- }
+ connection.setExceptionListener(null);
}
catch (JMSException e)
{
@@ -663,72 +585,28 @@
}
/**
- * Is managed connection running in XA mode
- * @return True if XA; otherwise false
+ * Get the session for this connection.
+ * @return The session
+ * @throws JMSException
*/
- protected boolean isXA()
+ protected Session getSession() throws JMSException
{
- if (HornetQRAManagedConnection.trace)
+ if (xaResource != null && inManagedTx)
{
- HornetQRAManagedConnection.log.trace("isXA()");
- }
-
- return xaConnection != null;
- }
-
- /**
- * Get the XA session for this connection.
- * @return The XA session
- */
- protected XASession getXASession()
- {
- if (HornetQRAManagedConnection.trace)
- {
- HornetQRAManagedConnection.log.trace("getXASession()");
- }
-
- if (isXA())
- {
- if (xaTopicSession != null)
+ if (HornetQRAManagedConnection.trace)
{
- return xaTopicSession;
+ HornetQRAManagedConnection.log.trace("getSession() -> XA session
" + xaSession.getSession());
}
- else if (xaQueueSession != null)
- {
- return xaQueueSession;
- }
- else
- {
- return xaSession;
- }
- }
- else
- {
- return null;
- }
- }
- /**
- * Get the session for this connection.
- * @return The session
- */
- protected Session getSession()
- {
- if (HornetQRAManagedConnection.trace)
- {
- HornetQRAManagedConnection.log.trace("getSession()");
- }
-
- if (topicSession != null)
- {
- return topicSession;
- }
- else if (queueSession != null)
- {
- return queueSession;
- }
+ return xaSession.getSession();
+ }
else
{
+ if (HornetQRAManagedConnection.trace)
+ {
+ HornetQRAManagedConnection.log.trace("getSession() -> session "
+ xaSession.getSession());
+ }
+
return session;
}
}
@@ -836,11 +714,6 @@
{
connection.start();
}
-
- if (xaConnection != null)
- {
- xaConnection.start();
- }
}
/**
@@ -854,11 +727,6 @@
HornetQRAManagedConnection.log.trace("stop()");
}
- if (xaConnection != null)
- {
- xaConnection.stop();
- }
-
if (connection != null)
{
connection.stop();
@@ -893,109 +761,55 @@
try
{
boolean transacted = cri.isTransacted();
- int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-
+ int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+
if (cri.getType() == HornetQRAConnectionFactory.TOPIC_CONNECTION)
{
- if (cri.isUseXA())
+ if (userName != null && password != null)
{
- if (userName != null && password != null)
- {
- xaConnection =
mcf.getHornetQConnectionFactory().createXATopicConnection(userName, password);
- }
- else
- {
- xaConnection =
mcf.getHornetQConnectionFactory().createXATopicConnection();
- }
-
- xaConnection.setExceptionListener(this);
-
- xaTopicSession =
((XATopicConnection)xaConnection).createXATopicSession();
- topicSession = xaTopicSession.getTopicSession();
+ connection =
mcf.getHornetQConnectionFactory().createXATopicConnection(userName, password);
}
else
{
- if (userName != null && password != null)
- {
- connection =
mcf.getHornetQConnectionFactory().createTopicConnection(userName, password);
- }
- else
- {
- connection =
mcf.getHornetQConnectionFactory().createTopicConnection();
- }
+ connection = mcf.getHornetQConnectionFactory().createXATopicConnection();
+ }
- connection.setExceptionListener(this);
+ connection.setExceptionListener(this);
- topicSession =
((TopicConnection)connection).createTopicSession(transacted, acknowledgeMode);
- }
+ xaSession = ((XATopicConnection)connection).createXATopicSession();
+ session = ((TopicConnection)connection).createTopicSession(transacted,
acknowledgeMode);
}
else if (cri.getType() == HornetQRAConnectionFactory.QUEUE_CONNECTION)
{
- if (cri.isUseXA())
+ if (userName != null && password != null)
{
- if (userName != null && password != null)
- {
- xaConnection =
mcf.getHornetQConnectionFactory().createXAQueueConnection(userName, password);
- }
- else
- {
- xaConnection =
mcf.getHornetQConnectionFactory().createXAQueueConnection();
- }
-
- xaConnection.setExceptionListener(this);
-
- xaQueueSession =
((XAQueueConnection)xaConnection).createXAQueueSession();
- queueSession = xaQueueSession.getQueueSession();
+ connection =
mcf.getHornetQConnectionFactory().createXAQueueConnection(userName, password);
}
else
{
- if (userName != null && password != null)
- {
- connection =
mcf.getHornetQConnectionFactory().createQueueConnection(userName, password);
- }
- else
- {
- connection =
mcf.getHornetQConnectionFactory().createQueueConnection();
- }
+ connection = mcf.getHornetQConnectionFactory().createXAQueueConnection();
+ }
- connection.setExceptionListener(this);
+ connection.setExceptionListener(this);
- queueSession =
((QueueConnection)connection).createQueueSession(transacted, acknowledgeMode);
- }
+ xaSession = ((XAQueueConnection)connection).createXAQueueSession();
+ session = ((QueueConnection)connection).createQueueSession(transacted,
acknowledgeMode);
}
else
{
- if (cri.isUseXA())
+ if (userName != null && password != null)
{
- if (userName != null && password != null)
- {
- xaConnection =
mcf.getHornetQConnectionFactory().createXAConnection(userName, password);
- }
- else
- {
- xaConnection = mcf.getHornetQConnectionFactory().createXAConnection();
- }
-
- xaConnection.setExceptionListener(this);
-
- xaSession = xaConnection.createXASession();
- session = xaSession.getSession();
+ connection =
mcf.getHornetQConnectionFactory().createXAConnection(userName, password);
}
else
{
- if (userName != null && password != null)
- {
- connection =
mcf.getHornetQConnectionFactory().createConnection(userName, password);
- }
- else
- {
- connection = mcf.getHornetQConnectionFactory().createConnection();
- }
+ connection = mcf.getHornetQConnectionFactory().createXAConnection();
+ }
- connection.setExceptionListener(this);
+ connection.setExceptionListener(this);
- session = connection.createSession(transacted, acknowledgeMode);
- }
+ xaSession = ((XAConnection)connection).createXASession();
+ session = connection.createSession(transacted, acknowledgeMode);
}
}
catch (JMSException je)
Modified: trunk/src/main/org/hornetq/ra/HornetQRASession.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRASession.java 2010-06-15 17:45:45 UTC (rev
9329)
+++ trunk/src/main/org/hornetq/ra/HornetQRASession.java 2010-06-15 19:39:48 UTC (rev
9330)
@@ -47,6 +47,7 @@
import javax.jms.XAQueueSession;
import javax.jms.XASession;
import javax.jms.XATopicSession;
+import javax.resource.ResourceException;
import javax.resource.spi.ConnectionEvent;
import javax.transaction.xa.XAResource;
@@ -1214,9 +1215,7 @@
{
lock();
- XASession session = getXASessionInternal();
-
- return session.getXAResource();
+ return getXAResourceInternal();
}
catch (Throwable t)
{
@@ -1551,26 +1550,35 @@
}
/**
- * Get the XA session and ensure that it is open
- * @return The session
+ * Get the XA resource and ensure that it is open
+ * @return The XA Resource
* @exception JMSException Thrown if an error occurs
* @exception IllegalStateException The session is closed
*/
- XASession getXASessionInternal() throws JMSException
+ XAResource getXAResourceInternal() throws JMSException
{
if (mc == null)
{
throw new IllegalStateException("The session is closed");
}
- XASession session = mc.getXASession();
+ try
+ {
+ XAResource xares = mc.getXAResource();
- if (HornetQRASession.trace)
+ if (HornetQRASession.trace)
+ {
+ HornetQRASession.log.trace("getXAResourceInternal " + xares +
" for " + this);
+ }
+
+ return xares;
+ }
+ catch (ResourceException e)
{
- HornetQRASession.log.trace("getXASessionInternal " + session + "
for " + this);
+ JMSException jmse = new JMSException("Unable to get XA Resource");
+ jmse.initCause(e);
+ throw jmse;
}
-
- return session;
}
/**
Modified: trunk/src/main/org/hornetq/ra/HornetQRAXAResource.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAXAResource.java 2010-06-15 17:45:45 UTC (rev
9329)
+++ trunk/src/main/org/hornetq/ra/HornetQRAXAResource.java 2010-06-15 19:39:48 UTC (rev
9330)
@@ -172,6 +172,8 @@
}
finally
{
+ managedConnection.setInManagedTx(true);
+ managedConnection.setInManagedTx(false);
managedConnection.unlock();
}
}