[jboss-cvs] JBoss Messaging SVN: r2619 - in branches/Branch_1_0_1_SP: tests/src/org/jboss/test/messaging/jms and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed May 2 02:44:04 EDT 2007
Author: clebert.suconic at jboss.com
Date: 2007-05-02 02:44:04 -0400 (Wed, 02 May 2007)
New Revision: 2619
Modified:
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/TransactionAspect.java
branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java
Log:
reverting changes on http://jira.jboss.org/jira/browse/JBMESSAGING-946
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java 2007-05-02 04:36:53 UTC (rev 2618)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java 2007-05-02 06:44:04 UTC (rev 2619)
@@ -28,7 +28,6 @@
import javax.jms.IllegalStateException;
import javax.jms.Session;
-import javax.jms.ServerSession;
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
@@ -38,13 +37,12 @@
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.message.MessageProxy;
import org.jboss.jms.tx.AckInfo;
-import org.jboss.jms.tx.LocalTx;
import org.jboss.logging.Logger;
import org.jboss.messaging.util.Util;
/**
* This aspect handles JMS session related logic
- *
+ *
* This aspect is PER_VM
*
* @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
@@ -55,17 +53,17 @@
public class SessionAspect
{
// Constants -----------------------------------------------------
-
+
private static final Logger log = Logger.getLogger(SessionAspect.class);
-
+
// Attributes ----------------------------------------------------
-
+
private boolean trace = log.isTraceEnabled();
-
+
// Static --------------------------------------------------------
-
+
// Constructors --------------------------------------------------
-
+
// Public --------------------------------------------------------
public Object handleClosing(Invocation invocation) throws Throwable
@@ -73,7 +71,7 @@
MethodInvocation mi = (MethodInvocation)invocation;
SessionState state = getState(invocation);
SessionDelegate del = (SessionDelegate)mi.getTargetObject();
-
+
int ackMode = state.getAcknowledgeMode();
// select eligible acknowledgments
@@ -85,7 +83,7 @@
if (ackMode == Session.AUTO_ACKNOWLEDGE ||
ackMode == Session.DUPS_OK_ACKNOWLEDGE)
{
- acks.add(ack);
+ acks.add(ack);
}
else
{
@@ -93,7 +91,7 @@
}
i.remove();
}
-
+
// On closing we acknowlege any AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE, since the session
// might have closed before the onMessage had finished executing.
// We cancel any client ack or transactional, we do this explicitly so we can pass the updated
@@ -114,7 +112,7 @@
public Object handleClose(Invocation invocation) throws Throwable
- {
+ {
Object res = invocation.invokeNext();
// We must explicitly shutdown the executor
@@ -124,73 +122,69 @@
return res;
}
-
+
public Object handlePreDeliver(Invocation invocation) throws Throwable
- {
+ {
MethodInvocation mi = (MethodInvocation)invocation;
SessionState state = getState(invocation);
-
+
int ackMode = state.getAcknowledgeMode();
-
+
if (ackMode == Session.CLIENT_ACKNOWLEDGE ||
ackMode == Session.AUTO_ACKNOWLEDGE ||
- ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
- state.isXA() && (state.getCurrentTxId() instanceof LocalTx) &&
- state.getDistinguishedListener() == null)
+ ackMode == Session.DUPS_OK_ACKNOWLEDGE)
{
// We collect acknowledgments (and not transact them) for CLIENT, AUTO and DUPS_OK
SessionDelegate del = (SessionDelegate)mi.getTargetObject();
-
+
// We store the ack in a list for later acknowledgement or recovery
-
+
Object[] args = mi.getArguments();
MessageProxy mp = (MessageProxy)args[0];
int consumerID = ((Integer)args[1]).intValue();
AckInfo info = new AckInfo(mp, consumerID);
-
+
state.getToAck().add(info);
-
+
if (trace) { log.trace("ack mode is " + Util.acknowledgmentModeToString(ackMode)+ ", acknowledged on " + del); }
}
return invocation.invokeNext();
}
-
+
public Object handleAcknowledgeAll(Invocation invocation) throws Throwable
- {
+ {
MethodInvocation mi = (MethodInvocation)invocation;
SessionState state = getState(invocation);
SessionDelegate del = (SessionDelegate)mi.getTargetObject();
-
+
if (!state.getToAck().isEmpty())
- {
+ {
del.acknowledgeBatch(state.getToAck());
-
+
state.getToAck().clear();
}
-
+
return null;
}
-
+
public Object handlePostDeliver(Invocation invocation) throws Throwable
- {
+ {
MethodInvocation mi = (MethodInvocation)invocation;
SessionState state = getState(invocation);
-
+
int ackMode = state.getAcknowledgeMode();
-
+
boolean cancel = ((Boolean)mi.getArguments()[0]).booleanValue();
-
+
if (cancel && ackMode != Session.AUTO_ACKNOWLEDGE && ackMode != Session.DUPS_OK_ACKNOWLEDGE)
{
throw new IllegalStateException("Ack mode must be AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE");
}
-
+
if (ackMode == Session.AUTO_ACKNOWLEDGE ||
- ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
- state.isXA() && (state.getCurrentTxId() instanceof LocalTx) &&
- state.getDistinguishedListener() == null)
+ ackMode == Session.DUPS_OK_ACKNOWLEDGE)
{
// We acknowledge immediately on a non-transacted session that does not want to
// CLIENT_ACKNOWLEDGE
@@ -200,18 +194,18 @@
if (!state.isRecoverCalled())
{
if (trace) { log.trace("acknowledging NON-transactionally"); }
-
+
List acks = state.getToAck();
-
+
// Sanity check
if (acks.size() != 1)
{
throw new IllegalStateException("Should only be one entry in list. " +
"There are " + acks.size());
}
-
+
AckInfo ack = (AckInfo)acks.get(0);
-
+
try
{
if (cancel)
@@ -242,39 +236,39 @@
return null;
}
-
+
/*
* Called when session.recover is called
*/
public Object handleRecover(Invocation invocation) throws Throwable
{
if (trace) { log.trace("recover called"); }
-
+
MethodInvocation mi = (MethodInvocation)invocation;
-
+
SessionState state = getState(invocation);
-
+
int ackMode = state.getAcknowledgeMode();
-
+
if (ackMode == Session.SESSION_TRANSACTED)
{
throw new IllegalStateException("Cannot recover a transacted session");
}
-
+
if (trace) { log.trace("recovering the session"); }
-
+
//Call redeliver
SessionDelegate del = (SessionDelegate)mi.getTargetObject();
-
+
del.redeliver(state.getToAck());
-
+
state.getToAck().clear();
state.setRecoverCalled(true);
-
- return null;
+
+ return null;
}
-
+
/**
* Redelivery occurs in two situations:
*
@@ -298,35 +292,35 @@
*
* So on rollback we do session recovery (local redelivery) in the same as if session.recover()
* was called.
- *
+ *
* There is a conflict here though. It seems a CTS test requires messages to be available to
* OTHER sessions on rollback - see CTSMiscellaneousTest.testContestedQueueOnRollback(), which
* seems in direct contradiction to the spec.
- *
+ *
* In order to satisfy the test, on session recovery, if there are no local consumers available
* to consume the message, we cancel the message back to the channel.
*/
public Object handleRedeliver(Invocation invocation) throws Throwable
{
if (trace) { log.trace("redeliver called"); }
-
+
MethodInvocation mi = (MethodInvocation)invocation;
SessionState state = getState(invocation);
-
+
// We put the messages back in the front of their appropriate consumer buffers and set
// JMSRedelivered to true.
-
+
List toRedeliver = (List)mi.getArguments()[0];
LinkedList toCancel = new LinkedList();
-
+
// Need to be recovered in reverse order.
for (int i = toRedeliver.size() - 1; i >= 0; i--)
{
AckInfo info = (AckInfo)toRedeliver.get(i);
- MessageProxy proxy = info.getMessage();
-
+ MessageProxy proxy = info.getMessage();
+
MessageCallbackHandler handler = state.getCallbackHandler(info.getConsumerID());
-
+
if (handler == null)
{
// This is ok. The original consumer has closed, this message wil get cancelled back
@@ -336,36 +330,36 @@
else
{
handler.addToFrontOfBuffer(proxy);
- }
+ }
}
-
+
if (!toCancel.isEmpty())
{
// Cancel the messages that can't be redelivered locally
-
+
SessionDelegate del = (SessionDelegate)mi.getTargetObject();
del.cancelDeliveries(toCancel);
}
-
- return null;
+
+ return null;
}
-
+
public Object handleGetXAResource(Invocation invocation) throws Throwable
{
return getState(invocation).getXAResource();
}
-
+
public Object handleGetTransacted(Invocation invocation) throws Throwable
{
return getState(invocation).isTransacted() ? Boolean.TRUE : Boolean.FALSE;
}
-
+
public Object handleGetAcknowledgeMode(Invocation invocation) throws Throwable
{
return new Integer(getState(invocation).getAcknowledgeMode());
}
-
+
// Class YYY overrides -------------------------------------------
// Protected -----------------------------------------------------
@@ -373,13 +367,13 @@
// Package Private -----------------------------------------------
// Private -------------------------------------------------------
-
+
private SessionState getState(Invocation inv)
{
return (SessionState)((DelegateSupport)inv.getTargetObject()).getState();
}
// Inner Classes -------------------------------------------------
-
+
}
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/TransactionAspect.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/TransactionAspect.java 2007-05-02 04:36:53 UTC (rev 2618)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/TransactionAspect.java 2007-05-02 06:44:04 UTC (rev 2619)
@@ -152,7 +152,7 @@
SessionState state = (SessionState)getState(invocation);
Object txID = state.getCurrentTxId();
- if ((!state.isXA() && state.isTransacted()) || (state.isXA() && !(txID instanceof LocalTx)))
+ if (txID != null)
{
// the session is non-XA and transacted, or XA and enrolled in a global transaction, so
// we add the message to a transaction instead of sending it now. An XA session that has
Modified: branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java
===================================================================
--- branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java 2007-05-02 04:36:53 UTC (rev 2618)
+++ branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java 2007-05-02 06:44:04 UTC (rev 2619)
@@ -31,16 +31,12 @@
import javax.jms.TextMessage;
import javax.jms.XAConnection;
import javax.jms.XASession;
-import javax.jms.Queue;
-import javax.jms.ConnectionFactory;
import javax.naming.InitialContext;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
-import javax.transaction.UserTransaction;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
-import javax.management.ObjectName;
import org.jboss.jms.client.JBossConnection;
import org.jboss.jms.client.JBossConnectionFactory;
@@ -62,15 +58,15 @@
// Constants ------------------------------------------------------------------------------------
// Static ---------------------------------------------------------------------------------------
-
+
// Attributes -----------------------------------------------------------------------------------
protected InitialContext initialContext;
-
+
protected JBossConnectionFactory cf;
protected Destination queue;
protected TransactionManager tm;
-
+
protected Transaction suspendedTx;
// Constructors ---------------------------------------------------------------------------------
@@ -86,10 +82,11 @@
{
super.setUp();
ServerManagement.start("all");
-
+ initialContext = new InitialContext();
+
initialContext = new InitialContext(ServerManagement.getJNDIEnvironment());
cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");
-
+
if (!ServerManagement.isRemote())
{
tm = TransactionManagerLocator.getInstance().locate();
@@ -98,7 +95,7 @@
ServerManagement.undeployQueue("Queue");
ServerManagement.deployQueue("Queue");
queue = (Destination)initialContext.lookup("/queue/Queue");
-
+
this.drainDestination(cf, queue);
if (!ServerManagement.isRemote())
@@ -110,7 +107,7 @@
public void tearDown() throws Exception
{
ServerManagement.undeployQueue("Queue");
-
+
if (!ServerManagement.isRemote())
{
if (tm.getTransaction() != null)
@@ -119,7 +116,7 @@
tm.rollback();
}
}
-
+
if (suspendedTx != null)
{
tm.resume(suspendedTx);
@@ -127,242 +124,12 @@
super.tearDown();
}
-
-
- // Public ---------------------------------------------------------------------------------------
- /* If there is no global tx present the send must behave as non transacted
- * See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
- * http://jira.jboss.com/jira/browse/JBMESSAGING-410
- * http://jira.jboss.com/jira/browse/JBMESSAGING-721
- * http://jira.jboss.org/jira/browse/JBMESSAGING-946
- */
- public void testSendNoGlobalTransaction() throws Exception
- {
- Transaction suspended = null;
- try
- {
- ServerManagement.deployQueue("MyQueue");
+ // Public ---------------------------------------------------------------------------------------
- // make sure there's no active JTA transaction
-
- suspended = TransactionManagerLocator.getInstance().locate().suspend();
-
- // send a message to the queue, using a JCA wrapper
-
- Queue queue = (Queue)initialContext.lookup("queue/MyQueue");
-
- ConnectionFactory mcf =
- (ConnectionFactory)initialContext.lookup("java:/JCAConnectionFactory");
-
- Connection conn = mcf.createConnection();
-
- Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer p = s.createProducer(queue);
- p.setDeliveryMode(DeliveryMode.PERSISTENT);
- Message m = s.createTextMessage("one");
-
- p.send(m);
-
- log.debug("message sent");
-
- conn.close();
-
- // receive the message
- ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
- conn = cf.createConnection();
- conn.start();
- s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer c = s.createConsumer(queue);
- TextMessage rm = (TextMessage)c.receive(1000);
-
- assertEquals("one", rm.getText());
-
- conn.close();
- }
- finally
- {
- ServerManagement.undeployQueue("MyQueue");
-
- if (suspended != null)
- {
- TransactionManagerLocator.getInstance().locate().resume(suspended);
- }
- }
- }
-
- /* If there is no global tx present the send must behave as non transacted
- * See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
- * http://jira.jboss.com/jira/browse/JBMESSAGING-410
- * http://jira.jboss.com/jira/browse/JBMESSAGING-721
- * http://jira.jboss.org/jira/browse/JBMESSAGING-946
- */
- public void testSendNoGlobalTransaction2() throws Exception
- {
-
- Transaction suspended = TransactionManagerLocator.getInstance().locate().suspend();
-
- try
- {
-
- ConnectionFactory mcf =
- (ConnectionFactory)initialContext.lookup("java:/JCAConnectionFactory");
- Connection conn = mcf.createConnection();
- conn.start();
-
- UserTransaction ut = ServerManagement.getUserTransaction();
-
- ut.begin();
-
- Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer p = s.createProducer(queue);
- Message m = s.createTextMessage("one");
-
- p.send(m);
-
- ut.commit();
-
- conn.close();
-
- ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("ConnectionFactory");
- conn = cf.createConnection();
- s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- conn.start();
-
- TextMessage rm = (TextMessage)s.createConsumer(queue).receive(500);
-
- assertEquals("one", rm.getText());
-
- conn.close();
-
- // make sure there's no active JTA transaction
-
- assertNull(TransactionManagerLocator.getInstance().locate().getTransaction());
-
- // send a message to the queue, using a JCA wrapper
-
- mcf = (ConnectionFactory)initialContext.lookup("java:/JCAConnectionFactory");
-
- conn = mcf.createConnection();
-
- s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- p = s.createProducer(queue);
- p.setDeliveryMode(DeliveryMode.PERSISTENT);
- m = s.createTextMessage("one");
-
- p.send(m);
-
- conn.close();
-
- // receive the message
- cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
- conn = cf.createConnection();
- conn.start();
- s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer c = s.createConsumer(queue);
- rm = (TextMessage)c.receive(1000);
-
- assertEquals("one", rm.getText());
-
- conn.close();
- }
- finally
- {
- if (suspended != null)
- {
- TransactionManagerLocator.getInstance().locate().resume(suspended);
- }
- }
- }
-
-
- /*
- * If there is no global tx present messages consumed must consumed as if they were in a
- * local tx. Note this behaviour differs from messages sent
- * This is so we can support transacted delivery of messags in an MDB as mentioned
- *
- * See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
- * http://jira.jboss.com/jira/browse/JBMESSAGING-410
- * http://jira.jboss.com/jira/browse/JBMESSAGING-721
- * http://jira.jboss.org/jira/browse/JBMESSAGING-946
- *
- * For transactional delivery the receipt of the message should be in a transaction but by the time
- * the mdb container is invoked the message has already been received it is too late - the message
- * has already been received and passed on (see page 199 (chapter 5 JMS and Transactions, section "Application Server Integration"
- * of Mark Little's book Java Transaction processing
- * for a discussion of how different app serves deal with this)
- * The way jboss messaging (and jboss mq) deals with this is to convert any work done
- * prior to when the xasession is enlisted in the tx, into work done in the xa tx
- *
- */
- public void testReceiveNoGlobalTransaction() throws Exception
- {
- try
- {
- ServerManagement.deployQueue("MyQueue2");
-
- // send a message to the queue
-
- ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
- Queue queue = (Queue)initialContext.lookup("queue/MyQueue2");
- Connection conn = cf.createConnection();
- Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer p = s.createProducer(queue);
- p.setDeliveryMode(DeliveryMode.PERSISTENT);
- Message m = s.createTextMessage("one");
- p.send(m);
- conn.close();
-
- // make sure there's no active JTA transaction
-
- Transaction suspended = TransactionManagerLocator.getInstance().locate().suspend();
-
- try
- {
- // using a JCA wrapper
-
- ConnectionFactory mcf =
- (ConnectionFactory)initialContext.lookup("java:/JCAConnectionFactory");
- conn = mcf.createConnection();
- conn.start();
-
- // no active JTA transaction here
-
- s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer c = s.createConsumer(queue);
-
- // the messge should be store unacked in the local session
- TextMessage rm = (TextMessage)c.receive(1000);
-
- assertEquals("one", rm.getText());
-
- conn.close();
-
- // the messsage should still be in the queue
- ObjectName on = new ObjectName("jboss.messaging.destination:service=Queue,name=MyQueue2");
- Integer count = (Integer)ServerManagement.getAttribute(on, "MessageCount");
- assertEquals(1, count.intValue());
- }
- finally
- {
-
- if (suspended != null)
- {
- TransactionManagerLocator.getInstance().locate().resume(suspended);
- }
- }
- }
- finally
- {
- ServerManagement.undeployQueue("MyQueue2");
- }
- }
-
-
-
// See http://jira.jboss.com/jira/browse/JBMESSAGING-638
public void testResourceManagerMemoryLeakOnCommit() throws Exception
{
More information about the jboss-cvs-commits
mailing list