[jboss-cvs] JBoss Messaging SVN: r2619 - in branches/Branch_1_0_1_SP: tests/src/org/jboss/test/messaging/jms and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed May 2 02:44:04 EDT 2007


Author: clebert.suconic at jboss.com
Date: 2007-05-02 02:44:04 -0400 (Wed, 02 May 2007)
New Revision: 2619

Modified:
   branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java
   branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/TransactionAspect.java
   branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java
Log:
reverting changes on http://jira.jboss.org/jira/browse/JBMESSAGING-946

Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java	2007-05-02 04:36:53 UTC (rev 2618)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java	2007-05-02 06:44:04 UTC (rev 2619)
@@ -28,7 +28,6 @@
 
 import javax.jms.IllegalStateException;
 import javax.jms.Session;
-import javax.jms.ServerSession;
 
 import org.jboss.aop.joinpoint.Invocation;
 import org.jboss.aop.joinpoint.MethodInvocation;
@@ -38,13 +37,12 @@
 import org.jboss.jms.delegate.SessionDelegate;
 import org.jboss.jms.message.MessageProxy;
 import org.jboss.jms.tx.AckInfo;
-import org.jboss.jms.tx.LocalTx;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.util.Util;
 
 /**
  * This aspect handles JMS session related logic
- * 
+ *
  * This aspect is PER_VM
  *
  * @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
@@ -55,17 +53,17 @@
 public class SessionAspect
 {
    // Constants -----------------------------------------------------
-   
+
    private static final Logger log = Logger.getLogger(SessionAspect.class);
-   
+
    // Attributes ----------------------------------------------------
-   
+
    private boolean trace = log.isTraceEnabled();
-   
+
    // Static --------------------------------------------------------
-   
+
    // Constructors --------------------------------------------------
-   
+
    // Public --------------------------------------------------------
 
    public Object handleClosing(Invocation invocation) throws Throwable
@@ -73,7 +71,7 @@
       MethodInvocation mi = (MethodInvocation)invocation;
       SessionState state = getState(invocation);
       SessionDelegate del = (SessionDelegate)mi.getTargetObject();
-      
+
       int ackMode = state.getAcknowledgeMode();
 
       // select eligible acknowledgments
@@ -85,7 +83,7 @@
          if (ackMode == Session.AUTO_ACKNOWLEDGE ||
              ackMode == Session.DUPS_OK_ACKNOWLEDGE)
          {
-            acks.add(ack);            
+            acks.add(ack);
          }
          else
          {
@@ -93,7 +91,7 @@
          }
          i.remove();
       }
-      
+
       // On closing we acknowlege any AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE, since the session
       // might have closed before the onMessage had finished executing.
       // We cancel any client ack or transactional, we do this explicitly so we can pass the updated
@@ -114,7 +112,7 @@
 
 
    public Object handleClose(Invocation invocation) throws Throwable
-   {      
+   {
       Object res = invocation.invokeNext();
 
       // We must explicitly shutdown the executor
@@ -124,73 +122,69 @@
 
       return res;
    }
-   
+
    public Object handlePreDeliver(Invocation invocation) throws Throwable
-   { 
+   {
       MethodInvocation mi = (MethodInvocation)invocation;
       SessionState state = getState(invocation);
-      
+
       int ackMode = state.getAcknowledgeMode();
-      
+
       if (ackMode == Session.CLIENT_ACKNOWLEDGE ||
           ackMode == Session.AUTO_ACKNOWLEDGE ||
-          ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
-          state.isXA() && (state.getCurrentTxId() instanceof LocalTx) &&
-             state.getDistinguishedListener() == null)
+          ackMode == Session.DUPS_OK_ACKNOWLEDGE)
       {
          // We collect acknowledgments (and not transact them) for CLIENT, AUTO and DUPS_OK
 
          SessionDelegate del = (SessionDelegate)mi.getTargetObject();
-         
+
          // We store the ack in a list for later acknowledgement or recovery
-    
+
          Object[] args = mi.getArguments();
          MessageProxy mp = (MessageProxy)args[0];
          int consumerID = ((Integer)args[1]).intValue();
          AckInfo info = new AckInfo(mp, consumerID);
-         
+
          state.getToAck().add(info);
-         
+
          if (trace) { log.trace("ack mode is " + Util.acknowledgmentModeToString(ackMode)+ ", acknowledged on " + del); }
       }
 
       return invocation.invokeNext();
    }
-   
+
    public Object handleAcknowledgeAll(Invocation invocation) throws Throwable
-   {    
+   {
       MethodInvocation mi = (MethodInvocation)invocation;
       SessionState state = getState(invocation);
       SessionDelegate del = (SessionDelegate)mi.getTargetObject();
-    
+
       if (!state.getToAck().isEmpty())
-      {                  
+      {
          del.acknowledgeBatch(state.getToAck());
-      
+
          state.getToAck().clear();
       }
-        
+
       return null;
    }
-   
+
    public Object handlePostDeliver(Invocation invocation) throws Throwable
-   { 
+   {
       MethodInvocation mi = (MethodInvocation)invocation;
       SessionState state = getState(invocation);
-      
+
       int ackMode = state.getAcknowledgeMode();
-      
+
       boolean cancel = ((Boolean)mi.getArguments()[0]).booleanValue();
-      
+
       if (cancel && ackMode != Session.AUTO_ACKNOWLEDGE && ackMode != Session.DUPS_OK_ACKNOWLEDGE)
       {
          throw new IllegalStateException("Ack mode must be AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE");
       }
-      
+
       if (ackMode == Session.AUTO_ACKNOWLEDGE ||
-          ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
-          state.isXA() && (state.getCurrentTxId() instanceof LocalTx) &&
-             state.getDistinguishedListener() == null)
+          ackMode == Session.DUPS_OK_ACKNOWLEDGE)
       {
          // We acknowledge immediately on a non-transacted session that does not want to
          // CLIENT_ACKNOWLEDGE
@@ -200,18 +194,18 @@
          if (!state.isRecoverCalled())
          {
             if (trace) { log.trace("acknowledging NON-transactionally"); }
-                        
+
             List acks = state.getToAck();
-            
+
             // Sanity check
             if (acks.size() != 1)
             {
                throw new IllegalStateException("Should only be one entry in list. " +
                                                "There are " + acks.size());
             }
-            
+
             AckInfo ack = (AckInfo)acks.get(0);
-            
+
             try
             {
                if (cancel)
@@ -242,39 +236,39 @@
 
       return null;
    }
-                  
+
    /*
     * Called when session.recover is called
     */
    public Object handleRecover(Invocation invocation) throws Throwable
    {
       if (trace) { log.trace("recover called"); }
-      
+
       MethodInvocation mi = (MethodInvocation)invocation;
-            
+
       SessionState state = getState(invocation);
-      
+
       int ackMode = state.getAcknowledgeMode();
-         
+
       if (ackMode == Session.SESSION_TRANSACTED)
       {
          throw new IllegalStateException("Cannot recover a transacted session");
       }
-      
+
       if (trace) { log.trace("recovering the session"); }
-       
+
       //Call redeliver
       SessionDelegate del = (SessionDelegate)mi.getTargetObject();
-      
+
       del.redeliver(state.getToAck());
-            
+
       state.getToAck().clear();
 
       state.setRecoverCalled(true);
-      
-      return null;  
+
+      return null;
    }
-   
+
    /**
     * Redelivery occurs in two situations:
     *
@@ -298,35 +292,35 @@
     *
     * So on rollback we do session recovery (local redelivery) in the same as if session.recover()
     * was called.
-    * 
+    *
     * There is a conflict here though. It seems a CTS test requires messages to be available to
     * OTHER sessions on rollback - see CTSMiscellaneousTest.testContestedQueueOnRollback(), which
     * seems in direct contradiction to the spec.
-    * 
+    *
     * In order to satisfy the test, on session recovery, if there are no local consumers available
     * to consume the message, we cancel the message back to the channel.
     */
    public Object handleRedeliver(Invocation invocation) throws Throwable
    {
       if (trace) { log.trace("redeliver called"); }
-      
+
       MethodInvocation mi = (MethodInvocation)invocation;
       SessionState state = getState(invocation);
-            
+
       // We put the messages back in the front of their appropriate consumer buffers and set
       // JMSRedelivered to true.
-      
+
       List toRedeliver = (List)mi.getArguments()[0];
       LinkedList toCancel = new LinkedList();
-      
+
       // Need to be recovered in reverse order.
       for (int i = toRedeliver.size() - 1; i >= 0; i--)
       {
          AckInfo info = (AckInfo)toRedeliver.get(i);
-         MessageProxy proxy = info.getMessage();        
-         
+         MessageProxy proxy = info.getMessage();
+
          MessageCallbackHandler handler = state.getCallbackHandler(info.getConsumerID());
-              
+
          if (handler == null)
          {
             // This is ok. The original consumer has closed, this message wil get cancelled back
@@ -336,36 +330,36 @@
          else
          {
             handler.addToFrontOfBuffer(proxy);
-         }                                    
+         }
       }
-      
+
       if (!toCancel.isEmpty())
       {
          // Cancel the messages that can't be redelivered locally
-         
+
          SessionDelegate del = (SessionDelegate)mi.getTargetObject();
          del.cancelDeliveries(toCancel);
       }
-            
-      return null;  
+
+      return null;
    }
-   
+
    public Object handleGetXAResource(Invocation invocation) throws Throwable
    {
       return getState(invocation).getXAResource();
    }
-   
+
    public Object handleGetTransacted(Invocation invocation) throws Throwable
    {
       return getState(invocation).isTransacted() ? Boolean.TRUE : Boolean.FALSE;
    }
-   
+
    public Object handleGetAcknowledgeMode(Invocation invocation) throws Throwable
    {
       return new Integer(getState(invocation).getAcknowledgeMode());
    }
-   
 
+
    // Class YYY overrides -------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -373,13 +367,13 @@
    // Package Private -----------------------------------------------
 
    // Private -------------------------------------------------------
-   
+
    private SessionState getState(Invocation inv)
    {
       return (SessionState)((DelegateSupport)inv.getTargetObject()).getState();
    }
 
    // Inner Classes -------------------------------------------------
-   
+
 }
 

Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/TransactionAspect.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/TransactionAspect.java	2007-05-02 04:36:53 UTC (rev 2618)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/TransactionAspect.java	2007-05-02 06:44:04 UTC (rev 2619)
@@ -152,7 +152,7 @@
       SessionState state = (SessionState)getState(invocation);
       Object txID = state.getCurrentTxId();
 
-      if ((!state.isXA() && state.isTransacted()) || (state.isXA() && !(txID instanceof LocalTx)))
+      if (txID != null)
       {
          // the session is non-XA and transacted, or XA and enrolled in a global transaction, so
          // we add the message to a transaction instead of sending it now. An XA session that has

Modified: branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java
===================================================================
--- branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java	2007-05-02 04:36:53 UTC (rev 2618)
+++ branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java	2007-05-02 06:44:04 UTC (rev 2619)
@@ -31,16 +31,12 @@
 import javax.jms.TextMessage;
 import javax.jms.XAConnection;
 import javax.jms.XASession;
-import javax.jms.Queue;
-import javax.jms.ConnectionFactory;
 import javax.naming.InitialContext;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
-import javax.transaction.UserTransaction;
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
-import javax.management.ObjectName;
 
 import org.jboss.jms.client.JBossConnection;
 import org.jboss.jms.client.JBossConnectionFactory;
@@ -62,15 +58,15 @@
    // Constants ------------------------------------------------------------------------------------
 
    // Static ---------------------------------------------------------------------------------------
-   
+
    // Attributes -----------------------------------------------------------------------------------
 
    protected InitialContext initialContext;
-   
+
    protected JBossConnectionFactory cf;
    protected Destination queue;
    protected TransactionManager tm;
-   
+
    protected Transaction suspendedTx;
 
    // Constructors ---------------------------------------------------------------------------------
@@ -86,10 +82,11 @@
    {
       super.setUp();
       ServerManagement.start("all");
-      
+      initialContext = new InitialContext();
+
       initialContext = new InitialContext(ServerManagement.getJNDIEnvironment());
       cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");
-            
+
       if (!ServerManagement.isRemote())
       {
          tm = TransactionManagerLocator.getInstance().locate();
@@ -98,7 +95,7 @@
       ServerManagement.undeployQueue("Queue");
       ServerManagement.deployQueue("Queue");
       queue = (Destination)initialContext.lookup("/queue/Queue");
-      
+
       this.drainDestination(cf, queue);
 
       if (!ServerManagement.isRemote())
@@ -110,7 +107,7 @@
    public void tearDown() throws Exception
    {
       ServerManagement.undeployQueue("Queue");
-      
+
       if (!ServerManagement.isRemote())
       {
          if (tm.getTransaction() != null)
@@ -119,7 +116,7 @@
             tm.rollback();
          }
       }
-      
+
       if (suspendedTx != null)
       {
          tm.resume(suspendedTx);
@@ -127,242 +124,12 @@
 
       super.tearDown();
    }
-   
-   
 
 
-   // Public ---------------------------------------------------------------------------------------
 
-   /* If there is no global tx present the send must behave as non transacted
-    * See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
-    * http://jira.jboss.com/jira/browse/JBMESSAGING-410
-    * http://jira.jboss.com/jira/browse/JBMESSAGING-721
-    * http://jira.jboss.org/jira/browse/JBMESSAGING-946
-    */
-   public void testSendNoGlobalTransaction() throws Exception
-   {
-      Transaction suspended = null;
 
-      try
-      {
-         ServerManagement.deployQueue("MyQueue");
+   // Public ---------------------------------------------------------------------------------------
 
-         // make sure there's no active JTA transaction
-
-         suspended = TransactionManagerLocator.getInstance().locate().suspend();
-
-         // send a message to the queue, using a JCA wrapper
-
-         Queue queue = (Queue)initialContext.lookup("queue/MyQueue");
-
-         ConnectionFactory mcf =
-            (ConnectionFactory)initialContext.lookup("java:/JCAConnectionFactory");
-
-         Connection conn = mcf.createConnection();
-
-         Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer p = s.createProducer(queue);
-         p.setDeliveryMode(DeliveryMode.PERSISTENT);
-         Message m = s.createTextMessage("one");
-
-         p.send(m);
-
-         log.debug("message sent");
-
-         conn.close();
-
-         // receive the message
-         ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
-         conn = cf.createConnection();
-         conn.start();
-         s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageConsumer c = s.createConsumer(queue);
-         TextMessage rm = (TextMessage)c.receive(1000);
-
-         assertEquals("one", rm.getText());
-
-         conn.close();
-      }
-      finally
-      {
-         ServerManagement.undeployQueue("MyQueue");
-
-         if (suspended != null)
-         {
-            TransactionManagerLocator.getInstance().locate().resume(suspended);
-         }
-      }
-   }
-
-   /* If there is no global tx present the send must behave as non transacted
-    * See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
-    * http://jira.jboss.com/jira/browse/JBMESSAGING-410
-    * http://jira.jboss.com/jira/browse/JBMESSAGING-721
-    * http://jira.jboss.org/jira/browse/JBMESSAGING-946
-    */
-   public void testSendNoGlobalTransaction2() throws Exception
-   {
-
-      Transaction suspended = TransactionManagerLocator.getInstance().locate().suspend();
-
-      try
-      {
-
-         ConnectionFactory mcf =
-            (ConnectionFactory)initialContext.lookup("java:/JCAConnectionFactory");
-         Connection conn = mcf.createConnection();
-         conn.start();
-
-         UserTransaction ut = ServerManagement.getUserTransaction();
-
-         ut.begin();
-
-         Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer p = s.createProducer(queue);
-         Message m = s.createTextMessage("one");
-
-         p.send(m);
-
-         ut.commit();
-
-         conn.close();
-
-         ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("ConnectionFactory");
-         conn = cf.createConnection();
-         s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         conn.start();
-
-         TextMessage rm = (TextMessage)s.createConsumer(queue).receive(500);
-
-         assertEquals("one", rm.getText());
-
-         conn.close();
-
-         // make sure there's no active JTA transaction
-
-         assertNull(TransactionManagerLocator.getInstance().locate().getTransaction());
-
-         // send a message to the queue, using a JCA wrapper
-
-         mcf = (ConnectionFactory)initialContext.lookup("java:/JCAConnectionFactory");
-
-         conn = mcf.createConnection();
-
-         s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         p = s.createProducer(queue);
-         p.setDeliveryMode(DeliveryMode.PERSISTENT);
-         m = s.createTextMessage("one");
-
-         p.send(m);
-
-         conn.close();
-
-         // receive the message
-         cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
-         conn = cf.createConnection();
-         conn.start();
-         s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageConsumer c = s.createConsumer(queue);
-         rm = (TextMessage)c.receive(1000);
-
-         assertEquals("one", rm.getText());
-
-         conn.close();
-      }
-      finally
-      {
-         if (suspended != null)
-         {
-            TransactionManagerLocator.getInstance().locate().resume(suspended);
-         }
-      }
-   }
-
-
-   /*
-    * If there is no global tx present messages consumed must consumed as if they were in a
-    * local tx. Note this behaviour differs from messages sent
-    * This is so we can support transacted delivery of messags in an MDB as mentioned
-    *
-    * See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
-    * http://jira.jboss.com/jira/browse/JBMESSAGING-410
-    * http://jira.jboss.com/jira/browse/JBMESSAGING-721
-    * http://jira.jboss.org/jira/browse/JBMESSAGING-946
-    *
-    * For transactional delivery the receipt of the message should be in a transaction but by the time
-    * the mdb container is invoked the message has already been received it is too late - the message
-    * has already been received and passed on (see page 199 (chapter 5 JMS and Transactions, section "Application Server Integration"
-    * of Mark Little's book Java Transaction processing
-    * for a discussion of how different app serves deal with this)
-    * The way jboss messaging (and jboss mq) deals with this is to convert any work done
-    * prior to when the xasession is enlisted in the tx, into work done in the xa tx
-    *
-    */
-   public void testReceiveNoGlobalTransaction() throws Exception
-   {
-      try
-      {
-         ServerManagement.deployQueue("MyQueue2");
-
-         // send a message to the queue
-
-         ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
-         Queue queue = (Queue)initialContext.lookup("queue/MyQueue2");
-         Connection conn = cf.createConnection();
-         Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer p = s.createProducer(queue);
-         p.setDeliveryMode(DeliveryMode.PERSISTENT);
-         Message m = s.createTextMessage("one");
-         p.send(m);
-         conn.close();
-
-         // make sure there's no active JTA transaction
-
-         Transaction suspended = TransactionManagerLocator.getInstance().locate().suspend();
-
-         try
-         {
-            // using a JCA wrapper
-
-            ConnectionFactory mcf =
-               (ConnectionFactory)initialContext.lookup("java:/JCAConnectionFactory");
-            conn = mcf.createConnection();
-            conn.start();
-
-            // no active JTA transaction here
-
-            s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            MessageConsumer c = s.createConsumer(queue);
-
-            // the messge should be store unacked in the local session
-            TextMessage rm = (TextMessage)c.receive(1000);
-
-            assertEquals("one", rm.getText());
-
-            conn.close();
-
-            // the messsage should still be in the queue
-            ObjectName on = new ObjectName("jboss.messaging.destination:service=Queue,name=MyQueue2");
-            Integer count = (Integer)ServerManagement.getAttribute(on, "MessageCount");
-            assertEquals(1, count.intValue());
-         }
-         finally
-         {
-
-            if (suspended != null)
-            {
-               TransactionManagerLocator.getInstance().locate().resume(suspended);
-            }
-         }
-      }
-      finally
-      {
-         ServerManagement.undeployQueue("MyQueue2");
-      }
-   }
-
-
-   
    // See http://jira.jboss.com/jira/browse/JBMESSAGING-638
    public void testResourceManagerMemoryLeakOnCommit() throws Exception
    {




More information about the jboss-cvs-commits mailing list