Author: clebert.suconic(a)jboss.com
Date: 2010-06-15 00:33:35 -0400 (Tue, 15 Jun 2010)
New Revision: 9314
Modified:
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java
trunk/src/main/org/hornetq/ra/HornetQRAMessageConsumer.java
trunk/src/main/org/hornetq/ra/HornetQRAMessageProducer.java
trunk/src/main/org/hornetq/ra/HornetQRASession.java
trunk/src/main/org/hornetq/ra/Util.java
trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java
Log:
Removing cached sessions on our Resource adapter
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-06-14
04:52:14 UTC (rev 9313)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-06-15
04:33:35 UTC (rev 9314)
@@ -519,7 +519,7 @@
closed);
}
- if (autoCommitAcks)
+ if (autoCommitAcks || tx == null)
{
ref.getQueue().acknowledge(ref);
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-06-14 04:52:14
UTC (rev 9313)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-06-15 04:33:35
UTC (rev 9314)
@@ -510,11 +510,6 @@
{
ServerConsumer consumer = consumers.get(consumerID);
- if (this.xa && tx == null)
- {
- throw new HornetQXAException(XAException.XAER_PROTO, "Invalid transaction
state");
- }
-
consumer.acknowledge(autoCommitAcks, tx, messageID);
}
@@ -1169,11 +1164,6 @@
throw e;
}
- if (this.xa && tx == null)
- {
- throw new HornetQXAException(XAException.XAER_PROTO, "Invalid transaction
state");
- }
-
if (tx == null || autoCommitSends)
{
}
Modified: trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java 2010-06-14 04:52:14 UTC
(rev 9313)
+++ trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java 2010-06-15 04:33:35 UTC
(rev 9314)
@@ -26,10 +26,8 @@
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
-import javax.jms.QueueConnection;
import javax.jms.ResourceAllocationException;
import javax.jms.Session;
-import javax.jms.TopicConnection;
import javax.jms.XAConnection;
import javax.jms.XAQueueConnection;
import javax.jms.XASession;
@@ -44,7 +42,9 @@
import javax.resource.spi.ManagedConnectionMetaData;
import javax.resource.spi.SecurityException;
import javax.security.auth.Subject;
+import javax.transaction.Status;
import javax.transaction.SystemException;
+import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
@@ -95,12 +95,10 @@
// auto-commit session, used outside XA or Local transaction
private Session session;
- private Session transactedSession;
-
private XASession xaSession;
private XAResource xaResource;
-
+
private final TransactionManager tm;
private boolean inManagedTx;
@@ -133,7 +131,6 @@
connection = null;
session = null;
- transactedSession = null;
xaSession = null;
xaResource = null;
@@ -234,7 +231,7 @@
HornetQRAManagedConnection.log.trace("destroy()");
}
- if (isDestroyed.get() || connection == null)
+ if (isDestroyed.get() || connection == null)
{
return;
}
@@ -261,11 +258,6 @@
session.close();
}
- if (transactedSession != null)
- {
- transactedSession.close();
- }
-
if (xaSession != null)
{
xaSession.close();
@@ -306,7 +298,7 @@
destroyHandles();
inManagedTx = false;
-
+
// I'm recreating the lock object when we return to the pool
// because it looks too nasty to expect the connection handle
// to unlock properly in certain race conditions
@@ -339,6 +331,35 @@
}
}
+ public void checkTransactionActive() throws JMSException
+ {
+ // don't bother looking at the transaction if there's an active XID
+ if (!inManagedTx && tm != null)
+ {
+ try
+ {
+ Transaction tx = tm.getTransaction();
+ if (tx != null)
+ {
+ int status = tx.getStatus();
+ // Only allow states that will actually succeed
+ if (status != Status.STATUS_ACTIVE && status !=
Status.STATUS_PREPARING &&
+ status != Status.STATUS_PREPARED &&
+ status != Status.STATUS_COMMITTING)
+ {
+ throw new javax.jms.IllegalStateException("Transaction " + tx
+ " not active");
+ }
+ }
+ }
+ catch (SystemException e)
+ {
+ JMSException jmsE = new javax.jms.IllegalStateException("Unexpected
exception on the Transaction ManagerTransaction");
+ jmsE.initCause(e);
+ throw jmsE;
+ }
+ }
+ }
+
/**
* Aqquire a lock on the managed connection
*/
@@ -441,7 +462,7 @@
//
if (xaResource == null)
{
- xaResource = xaSession.getXAResource();
+ xaResource = xaSession.getXAResource();
}
if (HornetQRAManagedConnection.trace)
@@ -566,36 +587,7 @@
*/
protected Session getSession() throws JMSException
{
- if (xaResource != null && isManagedTx())
- {
- if (HornetQRAManagedConnection.trace)
- {
- HornetQRAManagedConnection.log.trace("getSession() -> XA session
" + xaSession.getSession());
- }
-
- return xaSession.getSession();
- }
- else
- {
- if (isManagedTx())
- {
- if (HornetQRAManagedConnection.trace)
- {
- HornetQRAManagedConnection.log.trace("getSession() ->
transactedSession " + transactedSession);
- }
-
- return transactedSession;
- }
- else
- {
- if (HornetQRAManagedConnection.trace)
- {
- HornetQRAManagedConnection.log.trace("getSession() -> session
" + session);
- }
-
- return session;
- }
- }
+ return session;
}
/**
@@ -748,8 +740,8 @@
try
{
boolean transacted = cri.isTransacted();
- int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-
+ int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+
if (cri.getType() == HornetQRAConnectionFactory.TOPIC_CONNECTION)
{
if (userName != null && password != null)
@@ -764,8 +756,7 @@
connection.setExceptionListener(this);
xaSession = ((XATopicConnection)connection).createXATopicSession();
- transactedSession =
((TopicConnection)connection).createTopicSession(transacted, acknowledgeMode);
- session = ((TopicConnection)connection).createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
+ session = xaSession.getSession();
}
else if (cri.getType() == HornetQRAConnectionFactory.QUEUE_CONNECTION)
{
@@ -781,8 +772,7 @@
connection.setExceptionListener(this);
xaSession = ((XAQueueConnection)connection).createXAQueueSession();
- transactedSession =
((QueueConnection)connection).createQueueSession(transacted, acknowledgeMode);
- session = ((QueueConnection)connection).createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
+ session = xaSession.getSession();
}
else
{
@@ -798,8 +788,7 @@
connection.setExceptionListener(this);
xaSession = ((XAConnection)connection).createXASession();
- transactedSession = connection.createSession(transacted, acknowledgeMode);
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session = xaSession.getSession();
}
}
catch (JMSException je)
@@ -807,7 +796,7 @@
throw new ResourceException(je.getMessage(), je);
}
}
-
+
private boolean isManagedTx()
{
return inManagedTx || isXA();
Modified: trunk/src/main/org/hornetq/ra/HornetQRAMessageConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAMessageConsumer.java 2010-06-14 04:52:14 UTC
(rev 9313)
+++ trunk/src/main/org/hornetq/ra/HornetQRAMessageConsumer.java 2010-06-15 04:33:35 UTC
(rev 9314)
@@ -96,6 +96,7 @@
{
HornetQRAMessageConsumer.log.trace("checkState()");
}
+ session.checkState();
}
/**
Modified: trunk/src/main/org/hornetq/ra/HornetQRAMessageProducer.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAMessageProducer.java 2010-06-14 04:52:14 UTC
(rev 9313)
+++ trunk/src/main/org/hornetq/ra/HornetQRAMessageProducer.java 2010-06-15 04:33:35 UTC
(rev 9314)
@@ -402,6 +402,7 @@
*/
void checkState() throws JMSException
{
+ session.checkState();
}
/**
Modified: trunk/src/main/org/hornetq/ra/HornetQRASession.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRASession.java 2010-06-14 04:52:14 UTC (rev
9313)
+++ trunk/src/main/org/hornetq/ra/HornetQRASession.java 2010-06-15 04:33:35 UTC (rev
9314)
@@ -49,6 +49,8 @@
import javax.jms.XATopicSession;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionEvent;
+import javax.transaction.RollbackException;
+import javax.transaction.SystemException;
import javax.transaction.xa.XAResource;
import org.hornetq.core.logging.Logger;
@@ -1612,4 +1614,14 @@
}
return (TopicSession)s;
}
+
+ /**
+ * @throws SystemException
+ * @throws RollbackException
+ *
+ */
+ public void checkState() throws JMSException
+ {
+ mc.checkTransactionActive();
+ }
}
Modified: trunk/src/main/org/hornetq/ra/Util.java
===================================================================
--- trunk/src/main/org/hornetq/ra/Util.java 2010-06-14 04:52:14 UTC (rev 9313)
+++ trunk/src/main/org/hornetq/ra/Util.java 2010-06-15 04:33:35 UTC (rev 9314)
@@ -197,7 +197,13 @@
/** The Resource adapter can't depend on any provider's specific library.
Because of that we use reflection to locate the
- * transaction manager during startup. */
+ * transaction manager during startup.
+ *
+ *
+ * TODO:
https://jira.jboss.org/browse/HORNETQ-417
+ * We should use a proper SPI instead of reflection
+ * We would need to define a proper SPI package for this.
+ * */
public static TransactionManager locateTM(final String locatorClass, final String
locatorMethod)
{
try
Modified: trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java 2010-06-14 04:52:14
UTC (rev 9313)
+++ trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java 2010-06-15 04:33:35
UTC (rev 9314)
@@ -138,7 +138,7 @@
ClientConsumer cons = session.createConsumer("Test");
- assertNull("Send went through an invalid XA Session",
cons.receiveImmediate());
+ assertNotNull("Send went through an invalid XA Session",
cons.receiveImmediate());
}
finally
{
@@ -194,9 +194,7 @@
msg = cons.receiveImmediate();
- assertNotNull("Acknowledge went through invalid XA Session", msg);
-
- assertNull(cons.receiveImmediate());
+ assertNull("Acknowledge went through invalid XA Session", msg);