[Jboss-cvs] JBoss Messaging SVN: r1323 - in branches/Branch_1_0: src/etc src/main/org/jboss/jms/client src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/client/state src/main/org/jboss/jms/delegate src/main/org/jboss/jms/server/destination src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised src/main/org/jboss/jms/tx src/main/org/jboss/messaging/core tests/src/org/jboss/test/messaging/jms

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Sep 19 20:49:11 EDT 2006


Author: ovidiu.feodorov at jboss.com
Date: 2006-09-19 20:48:57 -0400 (Tue, 19 Sep 2006)
New Revision: 1323

Modified:
   branches/Branch_1_0/src/etc/aop-messaging-client.xml
   branches/Branch_1_0/src/main/org/jboss/jms/client/Closeable.java
   branches/Branch_1_0/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
   branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java
   branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java
   branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
   branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
   branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
   branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
   branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   branches/Branch_1_0/src/main/org/jboss/jms/client/state/SessionState.java
   branches/Branch_1_0/src/main/org/jboss/jms/delegate/SessionDelegate.java
   branches/Branch_1_0/src/main/org/jboss/jms/server/destination/Queue.java
   branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
   branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
   branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java
   branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
   branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java
   branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
   branches/Branch_1_0/src/main/org/jboss/jms/tx/AckInfo.java
   branches/Branch_1_0/src/main/org/jboss/messaging/core/ChannelSupport.java
   branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/JMSTest.java
   branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
   branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/MiscellaneousTest.java
   branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/SessionTest.java
   branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
Log:
fix and extra tests for http://jira.jboss.org/jira/browse/JBMESSAGING-542

Modified: branches/Branch_1_0/src/etc/aop-messaging-client.xml
===================================================================
--- branches/Branch_1_0/src/etc/aop-messaging-client.xml	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/etc/aop-messaging-client.xml	2006-09-20 00:48:57 UTC (rev 1323)
@@ -122,7 +122,10 @@
    </bind>   
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->redeliver(..))">
       <advice name="handleRedeliver" aspect="org.jboss.jms.client.container.SessionAspect"/>
-   </bind>    
+   </bind>
+   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->closing())">
+      <advice name="handleClosing" aspect="org.jboss.jms.client.container.SessionAspect"/>         
+   </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->close())">
       <advice name="handleClose" aspect="org.jboss.jms.client.container.SessionAspect"/>         
    </bind>   

Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/Closeable.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/Closeable.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/Closeable.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -28,13 +28,13 @@
  * Implemented by JMS classes that can be closed
  * 
  * @author <a href="mailto:adrian at jboss.org>Adrian Brock</a>
+ * @author <a href="mailto:ovidiu at jboss.org>Ovidiu Feodorov</a>
  *
  * @version $Revision$
  */
 public interface Closeable
 {
    /**
-    * 
     * Close the instance
     * 
     * @throws JMSException
@@ -47,5 +47,7 @@
     * @throws JMSException
     */
    void closing() throws JMSException;
+
+   boolean isClosed();
   
 }

Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/container/ClosedInterceptor.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/container/ClosedInterceptor.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -49,7 +49,7 @@
  *
  * $Id$
  */
-public class ClosedInterceptor  implements Interceptor
+public class ClosedInterceptor implements Interceptor
 {
    // Constants -----------------------------------------------------
    
@@ -76,6 +76,15 @@
    private String delegateType;
 
    // Static --------------------------------------------------------
+
+   public static String stateToString(int state)
+   {
+      return state == NOT_CLOSED ? "NOT_CLOSED" :
+         state == IN_CLOSING ? "IN_CLOSING" :
+            state == CLOSING ? "CLOSING" :
+               state == IN_CLOSE ? "IN_CLOSE" :
+                  state == CLOSED ? "CLOSED" : "UNKNOWN";
+   }
    
    // Constructors --------------------------------------------------
 
@@ -102,6 +111,11 @@
       return sb.toString();
    }
 
+   public boolean isClosed()
+   {
+      return state == IN_CLOSE || state == CLOSED;
+   }
+
    // Interceptor implementation -----------------------------------
 
    public String getName()
@@ -119,6 +133,12 @@
       }
 
       String methodName = ((MethodInvocation) invocation).getMethod().getName();
+
+      if ("isClosed".equals(methodName))
+      {
+         return new Boolean(isClosed());
+      }
+
       boolean isClosing = methodName.equals("closing");
       boolean isClose = methodName.equals("close");
       
@@ -138,7 +158,18 @@
       }
       else
       {
-         inuse();
+         synchronized(this)
+         {
+            // object "in use", increment inUseCount
+            if (state == IN_CLOSE || state == CLOSED)
+            {
+               log.error(this + ": method " + methodName + "() did not go through, " +
+                                "the interceptor is " + stateToString(state));
+
+               throw new IllegalStateException("The object is closed");
+            }
+            ++inUseCount;
+         }
       }
 
       if (isClosing)
@@ -222,19 +253,6 @@
    }
    
    /**
-    * Mark the object as inuse
-    */
-   protected synchronized void inuse() throws Throwable
-   {
-      if (state == IN_CLOSE || state == CLOSED)
-      {
-         log.error(this + " is closed");
-         throw new IllegalStateException("The object is closed");
-      }
-      ++inUseCount;
-   }
-   
-   /**
     * Mark the object as no longer inuse 
     */
    protected synchronized void done() throws Throwable

Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -23,9 +23,12 @@
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Iterator;
+import java.util.ArrayList;
 
 import javax.jms.IllegalStateException;
 import javax.jms.Session;
+import javax.jms.JMSException;
 
 import org.jboss.aop.joinpoint.Invocation;
 import org.jboss.aop.joinpoint.MethodInvocation;
@@ -64,18 +67,27 @@
    
    // Public --------------------------------------------------------
 
+   public Object handleClosing(Invocation invocation) throws Throwable
+   {
+      // Send to the server all acknowledgments accumulated in toAck. This is useful, for example,
+      // when a message listener close the session from its onMessage().
+      acknowledgeOnClosing(invocation);
+
+      return invocation.invokeNext();
+   }
+
+
    public Object handleClose(Invocation invocation) throws Throwable
    {      
       Object res = invocation.invokeNext();
-      
+
+      // We must explicitly shutdown the executor
+
       SessionState state = getState(invocation);
-      
-      //We must explicitly shutdown the executor
+      state.getExecutor().shutdownNow();
 
-      state.getExecutor().shutdownNow();
-         
       return res;
-   }      
+   }
    
    public Object handlePreDeliver(Invocation invocation) throws Throwable
    { 
@@ -99,7 +111,7 @@
          Object[] args = mi.getArguments();
          MessageProxy mp = (MessageProxy)args[0];
          int consumerID = ((Integer)args[1]).intValue();
-         AckInfo info = new AckInfo(mp, consumerID);
+         AckInfo info = new AckInfo(mp, consumerID, ackMode);
          
          state.getToAck().add(info);
          
@@ -301,6 +313,35 @@
    {
       return (SessionState)((DelegateSupport)inv.getTargetObject()).getState();
    }
+
+   /**
+    * The method sends to server all eligible acknowlegments (those that are NOT CLIIENT_ACKNOWLEDGE
+    * for example)
+    */
+   private void acknowledgeOnClosing(Invocation invocation) throws JMSException
+   {
+      MethodInvocation mi = (MethodInvocation)invocation;
+      SessionState state = getState(invocation);
+      SessionDelegate del = (SessionDelegate)mi.getTargetObject();
+
+      // select eligible acknowledgments
+      List acks = new ArrayList();
+      for(Iterator i = state.getToAck().iterator(); i.hasNext(); )
+      {
+         AckInfo ack = (AckInfo)i.next();
+         if (ack.getAckMode() == Session.AUTO_ACKNOWLEDGE ||
+             ack.getAckMode() == Session.DUPS_OK_ACKNOWLEDGE)
+         {
+            acks.add(ack);
+            i.remove();
+         }
+      }
+
+      if (!acks.isEmpty())
+      {
+         del.acknowledgeBatch(acks);
+      }
+   }
     
    // Inner Classes -------------------------------------------------
    

Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -24,6 +24,7 @@
 import javax.jms.IllegalStateException;
 import javax.jms.Message;
 import javax.jms.TransactionInProgressException;
+import javax.jms.Session;
 
 import org.jboss.aop.joinpoint.Invocation;
 import org.jboss.aop.joinpoint.MethodInvocation;
@@ -197,7 +198,7 @@
          MethodInvocation mi = (MethodInvocation)invocation;
          MessageProxy proxy = (MessageProxy)mi.getArguments()[0];
          int consumerID = ((Integer)mi.getArguments()[1]).intValue();
-         AckInfo info = new AckInfo(proxy, consumerID);
+         AckInfo info = new AckInfo(proxy, consumerID, Session.SESSION_TRANSACTED);
          ConnectionState connState = (ConnectionState)state.getParent();
 
          if (trace) { log.trace("sending acknowlegment transactionally, queueing on resource manager"); }

Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -59,7 +59,6 @@
    {      
    }
 
-
    // BrowserDelegate implementation --------------------------------
 
    /**
@@ -76,6 +75,11 @@
       throw new IllegalStateException("This invocation should not be handled here!");
    }
 
+   public boolean isClosed()
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+
    public boolean hasNextMessage() throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");

Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -93,6 +93,15 @@
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
+   public boolean isClosed()
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+
+   /**
+    * This invocation should either be handled by the client-side interceptor chain or by the
+    * server-side endpoint.
+    */
    public JBossConnectionConsumer createConnectionConsumer(Destination dest,
                                                            String subscriptionName,
                                                            String messageSelector,

Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -99,6 +99,15 @@
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
+   public boolean isClosed()
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+
+   /**
+    * This invocation should either be handled by the client-side interceptor chain or by the
+    * server-side endpoint.
+    */
    public MessageListener getMessageListener()
    {
       throw new IllegalStateException("This invocation should not be handled here!");

Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -55,7 +55,6 @@
    {
       super(-1);
    }
-   
 
    // ProducerDelegate implementation -------------------------------
 
@@ -81,6 +80,15 @@
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
+   public boolean isClosed()
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+
+   /**
+    * This invocation should either be handled by the client-side interceptor chain or by the
+    * server-side endpoint.
+    */
    public int getDeliveryMode() throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");

Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -83,25 +83,25 @@
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
-   public void acknowledge(AckInfo ackInfo) throws JMSException
+   public void close() throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }
-   
+
    /**
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
-   public void acknowledgeBatch(List ackInfos) throws JMSException
+   public void closing() throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }
-   
+
    /**
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
-   public void acknowledgeAll() throws JMSException
+   public boolean isClosed()
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }
@@ -110,25 +110,34 @@
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
-   public void addTemporaryDestination(JBossDestination destination) throws JMSException
+   public void acknowledge(AckInfo ackInfo) throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }
-
+   
    /**
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
-   public void redeliver() throws JMSException
+   public void acknowledgeBatch(List ackInfos) throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }
+   
+   /**
+    * This invocation should either be handled by the client-side interceptor chain or by the
+    * server-side endpoint.
+    */
+   public void acknowledgeAll() throws JMSException
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
 
    /**
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
-   public void close() throws JMSException
+   public void addTemporaryDestination(JBossDestination destination) throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }
@@ -137,7 +146,7 @@
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
-   public void closing() throws JMSException
+   public void redeliver() throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }

Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -132,8 +132,12 @@
             }
          }
       }
-            
-      postDeliver(sess, isConnectionConsumer);
+
+      if (!sess.isClosed())
+      {
+         // postDeliver only if the session is not closed
+         postDeliver(sess, isConnectionConsumer);
+      }
    }
    
    protected static void preDeliver(SessionDelegate sess,
@@ -344,12 +348,13 @@
    {
       // Wait for any onMessage() executions to complete
 
-//      if (Thread.currentThread().equals(sessionExecutor.getThread()))
-//      {
-//         // the current thread already closing this MessageCallbackHandler, so no need to register
-//         // another Closer (see http://jira.jboss.org/jira/browse/JBMESSAGING-542)
-//         return;
-//      }
+      if (Thread.currentThread().equals(sessionExecutor.getThread()))
+      {
+         // the current thread already closing this MessageCallbackHandler (this happens when the
+         // session is closed from within the MessageListener.onMessage(), for example), so no need
+         // to register another Closer (see http://jira.jboss.org/jira/browse/JBMESSAGING-542)
+         return;
+      }
 
       Future result = new Future();
       
@@ -665,7 +670,7 @@
          if (trace) { log.trace("Closer starts running"); }
 
          result.setResult(null);
-         
+
          if (trace) { log.trace("Closer finished run"); }
       }
    }

Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/state/SessionState.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/state/SessionState.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -27,8 +27,6 @@
 import java.util.List;
 import java.util.Map;
 
-import javax.jms.Session;
-
 import org.jboss.jms.client.remoting.MessageCallbackHandler;
 import org.jboss.jms.delegate.SessionDelegate;
 import org.jboss.jms.server.Version;
@@ -61,7 +59,8 @@
    private QueuedExecutor executor;
    
    private boolean recoverCalled;
-   
+
+   // List<AckInfo>
    private List toAck;
    
    private Map callbackHandlers;
@@ -98,7 +97,10 @@
       // to callbackhandlers) in the connection, instead of maintaining another map
       callbackHandlers = new HashMap();
    }
-   
+
+   /**
+    * @return List<AckInfo>
+    */
    public List getToAck()
    {
       return toAck;

Modified: branches/Branch_1_0/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/delegate/SessionDelegate.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/delegate/SessionDelegate.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -79,9 +79,9 @@
    
    void addAsfMessage(MessageProxy m, int consumerID, ConsumerDelegate cons);
    
-   public boolean getTransacted();
+   boolean getTransacted();
    
-   public int getAcknowledgeMode();   
+   int getAcknowledgeMode();
    
    void commit() throws JMSException;
    

Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/destination/Queue.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/destination/Queue.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/destination/Queue.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -59,7 +59,8 @@
          }
    
          JBossQueue jbq = new JBossQueue(name);
-         org.jboss.messaging.core.local.Queue q = (org.jboss.messaging.core.local.Queue)cm.getCoreDestination(jbq);
+         org.jboss.messaging.core.local.Queue q =
+            (org.jboss.messaging.core.local.Queue)cm.getCoreDestination(jbq);
    	   return q.getMessageCount();
       }
       catch (Throwable t)

Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -152,9 +152,14 @@
    
    public void closing() throws JMSException
    {
-      //Do nothing
+      // Do nothing
    }
 
+   public boolean isClosed()
+   {
+      return closed;
+   }
+
    // Public --------------------------------------------------------
 
    public String toString()

Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -330,7 +330,12 @@
    {
       log.trace("closing (noop)");    
    }
-     
+
+   public boolean isClosed()
+   {
+      return closed;
+   }
+
    public void sendTransaction(TransactionRequest request) throws JMSException
    {    
       try

Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -388,6 +388,11 @@
          throw ExceptionUtil.handleJMSInvocation(t, this + " close");
       }
    }
+
+   public boolean isClosed()
+   {
+      return closed;
+   }
                
    // ConsumerEndpoint implementation -------------------------------
    
@@ -739,47 +744,55 @@
             }
          }
                                                            
-         if (list != null)
-         {         
-            ServerConnectionEndpoint connection =
-               ServerConsumerEndpoint.this.sessionEndpoint.getConnectionEndpoint();
+         if (list == null)
+         {
+            if (trace) { log.trace(this + " has a null list, returning"); }
+            return;
+         }
 
-            try
-            {
-               if (trace) { log.trace(ServerConsumerEndpoint.this + " handing " + list.size() + " message(s) over to the remoting layer"); }
-            
-               ClientDelivery del = new ClientDelivery(list, id);
-               
-               // TODO How can we ensure that messages for the same consumer aren't delivered
-               // concurrently to the same consumer on different threads?
-               MessagingMarshallable mm = new MessagingMarshallable(connection.getUsingVersion(), del);
-               
-               MessagingMarshallable resp = (MessagingMarshallable)connection.getCallbackClient().invoke(mm);
+         ServerConnectionEndpoint connection =
+            ServerConsumerEndpoint.this.sessionEndpoint.getConnectionEndpoint();
 
-               if (trace) { log.trace(ServerConsumerEndpoint.this + " handed messages over to the remoting layer"); }
-                
-               HandleMessageResponse result = (HandleMessageResponse)resp.getLoad();
+         try
+         {
+            if (trace) { log.trace(ServerConsumerEndpoint.this + " handing " + list.size() + " message(s) over to the remoting layer"); }
 
-               // For now we don't look at how many messages are accepted since they all will be.
-               // The field is a placeholder for the future.
-               if (result.clientIsFull())
-               {
-                  // Stop the server sending any more messages to the client.
-                  // This is ok outside lock.
-                  clientConsumerFull = true;       
-               }                               
-            }
-            catch(Throwable t)
+            ClientDelivery del = new ClientDelivery(list, id);
+
+            // TODO How can we ensure that messages for the same consumer aren't delivered
+            // concurrently to the same consumer on different threads?
+            MessagingMarshallable mm = new MessagingMarshallable(connection.getUsingVersion(), del);
+
+            MessagingMarshallable resp = (MessagingMarshallable)connection.getCallbackClient().invoke(mm);
+
+            if (trace) { log.trace(ServerConsumerEndpoint.this + " handed messages over to the remoting layer"); }
+
+            HandleMessageResponse result = (HandleMessageResponse)resp.getLoad();
+
+            // For now we don't look at how many messages are accepted since they all will be.
+            // The field is a placeholder for the future.
+            if (result.clientIsFull())
             {
-               log.warn("Failed to deliver the message to the client. See the server log for more details.");
-               log.debug(ServerConsumerEndpoint.this + " failed to deliver the message to the client.", t);
-               
-               ConnectionManager mgr = connection.getServerPeer().getConnectionManager();
-               
-               mgr.handleClientFailure(connection.getRemotingClientSessionId());
+               // Stop the server sending any more messages to the client.
+               // This is ok outside lock.
+               clientConsumerFull = true;
             }
-         }              
+         }
+         catch(Throwable t)
+         {
+            log.warn("Failed to deliver the message to the client. See the server log for more details.");
+            log.debug(ServerConsumerEndpoint.this + " failed to deliver the message to the client.", t);
+
+            ConnectionManager mgr = connection.getServerPeer().getConnectionManager();
+
+            mgr.handleClientFailure(connection.getRemotingClientSessionId());
+         }
       }
+
+      public String toString()
+      {
+         return "Deliverer[" + Integer.toHexString(hashCode()) + "]";
+      }
    }
    
    /*

Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -409,6 +409,11 @@
       // currently does nothing
       if (trace) log.trace("closing (noop)");
    }
+
+   public boolean isClosed()
+   {
+      return closed;
+   }
    
    public void send(JBossMessage message) throws JMSException
    {

Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -111,7 +111,6 @@
     * This used at consumer close to cancel any undelivered messages left in the client buffer
     * or at session recovery to cancel any messages that couldn't be redelivered locally
     * @param ackInfos
-    * @throws Exception
     */
    void cancelDeliveries(List ackInfos) throws JMSException;
 }

Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -65,6 +65,11 @@
       endpoint.closing();
    }
 
+   public boolean isClosed()
+   {
+      return endpoint.isClosed();
+   }
+
    public boolean hasNextMessage() throws JMSException
    {
       return endpoint.hasNextMessage();

Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -67,6 +67,11 @@
       endpoint.closing();
    }
 
+   public boolean isClosed()
+   {
+      return endpoint.isClosed();
+   }
+
    public SessionDelegate createSessionDelegate(boolean transacted,
                                                 int acknowledgmentMode,
                                                 boolean isXA) throws JMSException

Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -63,7 +63,12 @@
    {
       endpoint.closing();
    }
-   
+
+   public boolean isClosed()
+   {
+      return endpoint.isClosed();
+   }
+
    public void more() throws JMSException
    {
       endpoint.more();

Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -72,7 +72,12 @@
    {
       endpoint.closing();
    }
-   
+
+   public boolean isClosed()
+   {
+      return endpoint.isClosed();
+   }
+
    public void send(JBossMessage msg) throws JMSException
    {
       endpoint.send(msg);

Modified: branches/Branch_1_0/src/main/org/jboss/jms/tx/AckInfo.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/tx/AckInfo.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/tx/AckInfo.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -33,6 +33,7 @@
  * for processing.
  * 
  * @author <a href="mailto:tim.fox at jboss.com>Tim Fox </a>
+ * @author <a href="mailto:ovidiu at jboss.com>Ovidiu Feodorov</a>
  *
  * $Id$
  */
@@ -45,8 +46,10 @@
    // Attributes ----------------------------------------------------
    
    protected long messageID;
-   
    protected int consumerID;
+
+   // One of Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE, etc.
+   private int ackMode;
    
    // The actual proxy must not get serialized
    protected transient MessageProxy msg;
@@ -58,18 +61,31 @@
    public AckInfo()
    {      
    }
-   
+
    public AckInfo(MessageProxy proxy, int consumerID)
    {
+      this(proxy, consumerID, -1);
+   }
+
+   /**
+    * @param ackMode - one of Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE, etc.
+    */
+   public AckInfo(MessageProxy proxy, int consumerID, int ackMode)
+   {
       this.msg = proxy;
       this.messageID = proxy.getMessage().getMessageID();
-      this.consumerID = consumerID;    
+      this.consumerID = consumerID;
+      this.ackMode = ackMode;
    }
    
-   public AckInfo(long messageID, int consumerID)
+   /**
+    * @param ackMode - one of Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE, etc.
+    */
+   public AckInfo(long messageID, int consumerID, int ackMode)
    {
       this.messageID = messageID;
       this.consumerID = consumerID;
+      this.ackMode = ackMode;
    }
 
    // Public --------------------------------------------------------
@@ -89,6 +105,16 @@
       return msg;
    }
 
+   /**
+    *
+    * @return one of Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE ..., or -1 if it has not
+    *         previously set
+    */
+   public int getAckMode()
+   {
+      return ackMode;
+   }
+
    public String toString()
    {
       return "AckInfo[" + messageID + ", " + consumerID + "]";

Modified: branches/Branch_1_0/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -570,6 +570,9 @@
       return undelivered;
    }
 
+   /**
+    * Returns the count of messages stored AND being delivered.
+    */
    public int messageCount()
    {   
       synchronized (refLock)

Modified: branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/JMSTest.java
===================================================================
--- branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/JMSTest.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/JMSTest.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -30,7 +30,9 @@
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Message;
+import javax.jms.MessageListener;
 import javax.naming.InitialContext;
+import javax.management.ObjectName;
 
 import org.jboss.test.messaging.MessagingTestCase;
 import org.jboss.test.messaging.tools.ServerManagement;
@@ -225,7 +227,7 @@
    }
 
 
-   public void test_NonPersistent_NonTransactional_Asynchronous_to_Client() throws Exception
+   public void test_Asynchronous_to_Client() throws Exception
    {
       ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
 
@@ -278,7 +280,91 @@
       conn.close();
    }
 
+   public void test_MessageListener() throws Exception
+   {
+      ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
 
+      Queue queue = (Queue)ic.lookup("/queue/JMSTestQueue");
+
+      Connection conn = cf.createConnection();
+
+      Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageConsumer cons = session.createConsumer(queue);
+
+      final Slot slot = new Slot();
+
+      cons.setMessageListener(new MessageListener()
+      {
+         public void onMessage(Message m)
+         {
+            try
+            {
+               slot.put(m);
+            }
+            catch(InterruptedException e)
+            {
+               log.warn("got InterruptedException", e);
+            }
+         }
+      });
+
+      conn.start();
+
+      MessageProducer prod = session.createProducer(queue);
+      prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+      TextMessage m = session.createTextMessage("one");
+      prod.send(m);
+
+      TextMessage rm = (TextMessage)slot.poll(5000);
+
+      assertEquals("one", rm.getText());
+
+      conn.close();
+   }
+
+   public void test_ClientAcknowledge() throws Exception
+   {
+      ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+
+      Queue queue = (Queue)ic.lookup("/queue/JMSTestQueue");
+
+      Connection conn = cf.createConnection();
+
+      Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+      MessageProducer p = session.createProducer(queue);
+      p.send(session.createTextMessage("CLACK"));
+
+      MessageConsumer cons = session.createConsumer(queue);
+
+      conn.start();
+
+      TextMessage m = (TextMessage)cons.receive(1000);
+
+      assertEquals("CLACK", m.getText());
+
+      // make sure there's no other message in queue
+      Message m2 = cons.receive(1000);
+      assertNull(m2);
+
+      // make sure the message is still in "delivering" state
+      ObjectName on = new ObjectName("jboss.messaging.destination:service=Queue,name=JMSTestQueue");
+      Integer mc = (Integer)ServerManagement.getAttribute(on, "MessageCount");
+
+      assertEquals(1, mc.intValue());
+
+      m.acknowledge();
+
+      // make sure there's nothing in queue anymore
+      mc = (Integer)ServerManagement.getAttribute(on, "MessageCount");
+
+      assertEquals(0, mc.intValue());
+
+      conn.close();
+   }
+
+
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -252,15 +252,15 @@
       
       assertEquals("Your mum", tm2.getText());
       
-      //Don't ack
+      // Don't ack
       
-      //Create another consumer
+      // Create another consumer
       
       Session sessConsume2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
       
       MessageConsumer cons2 = sessConsume2.createConsumer(queue);
       
-      //this should cancel message and cause delivery to other consumer
+      // this should cancel message and cause delivery to other consumer
       
       sessConsume1.close();
       

Modified: branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/MiscellaneousTest.java
===================================================================
--- branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/MiscellaneousTest.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/MiscellaneousTest.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -33,6 +33,7 @@
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.naming.InitialContext;
+import javax.management.ObjectName;
 
 import org.jboss.test.messaging.MessagingTestCase;
 import org.jboss.test.messaging.tools.ServerManagement;
@@ -96,9 +97,64 @@
    /**
     * Test case for http://jira.jboss.org/jira/browse/JBMESSAGING-542
     */
-   public void testClosingConnectionFromMessageListener() throws Exception
+   public void testClosingConsumerFromMessageListener() throws Exception
    {
+      ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+      Queue queue = (Queue)ic.lookup("/queue/MiscellaneousQueue");
 
+      // load the queue
+
+      Connection c = cf.createConnection();
+      Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer prod = s.createProducer(queue);
+      Message m = s.createMessage();
+      prod.send(m);
+      c.close();
+
+      final Result result = new Result();
+      Connection conn = cf.createConnection();
+      s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      final MessageConsumer cons = s.createConsumer(queue);
+      cons.setMessageListener(new MessageListener()
+      {
+         public void onMessage(Message m)
+         {
+            // close the connection on the same thread that processed the message
+            try
+            {
+               log.debug("attempting close");
+               cons.close();
+               log.debug("consumer closed");
+               result.setSuccess();
+            }
+            catch(Exception e)
+            {
+               result.setFailure(e);
+            }
+         }
+      });
+
+      conn.start();
+
+      // wait for the message to propagate
+      Thread.sleep(3000);
+
+      assertTrue(result.isSuccess());
+      assertNull(result.getFailure());
+
+      // make sure the acknowledgment made it back to the queue
+
+      Integer count = (Integer)ServerManagement.
+         getAttribute(new ObjectName("jboss.messaging.destination:service=Queue,name=MiscellaneousQueue"),
+                      "MessageCount");
+      assertEquals(0, count.intValue());
+   }
+
+   /**
+    * Test case for http://jira.jboss.org/jira/browse/JBMESSAGING-542
+    */
+   public void testClosingSessionFromMessageListener() throws Exception
+   {
       ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
       Queue queue = (Queue)ic.lookup("/queue/MiscellaneousQueue");
 
@@ -112,6 +168,63 @@
       c.close();
 
       final Result result = new Result();
+      Connection conn = cf.createConnection();
+      final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer cons = session.createConsumer(queue);
+      cons.setMessageListener(new MessageListener()
+      {
+         public void onMessage(Message m)
+         {
+            // close the connection on the same thread that processed the message
+            try
+            {
+               log.debug("attempting close");
+               session.close();
+               log.debug("session closed");
+               result.setSuccess();
+            }
+            catch(Exception e)
+            {
+               result.setFailure(e);
+            }
+         }
+      });
+
+      conn.start();
+
+      // wait for the message to propagate
+      Thread.sleep(3000);
+
+      assertTrue(result.isSuccess());
+      assertNull(result.getFailure());
+
+      // make sure the acknowledgment made it back to the queue
+
+      Integer count = (Integer)ServerManagement.
+         getAttribute(new ObjectName("jboss.messaging.destination:service=Queue,name=MiscellaneousQueue"),
+                      "MessageCount");
+      assertEquals(0, count.intValue());
+
+   }
+
+   /**
+    * Test case for http://jira.jboss.org/jira/browse/JBMESSAGING-542
+    */
+   public void testClosingConnectionFromMessageListener() throws Exception
+   {
+      ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+      Queue queue = (Queue)ic.lookup("/queue/MiscellaneousQueue");
+
+      // load the queue
+
+      Connection c = cf.createConnection();
+      Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer prod = s.createProducer(queue);
+      Message m = s.createMessage();
+      prod.send(m);
+      c.close();
+
+      final Result result = new Result();
       final Connection conn = cf.createConnection();
       s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
       MessageConsumer cons = s.createConsumer(queue);
@@ -142,6 +255,12 @@
       assertTrue(result.isSuccess());
       assertNull(result.getFailure());
 
+      // make sure the acknowledgment made it back to the queue
+
+      Integer count = (Integer)ServerManagement.
+         getAttribute(new ObjectName("jboss.messaging.destination:service=Queue,name=MiscellaneousQueue"),
+                      "MessageCount");
+      assertEquals(0, count.intValue());
    }
 
    // Package protected ---------------------------------------------

Modified: branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/SessionTest.java
===================================================================
--- branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/SessionTest.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/SessionTest.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -38,6 +38,7 @@
 import javax.jms.XAConnection;
 import javax.jms.XASession;
 import javax.naming.InitialContext;
+import javax.management.ObjectName;
 
 import org.jboss.jms.client.JBossConnectionFactory;
 import org.jboss.jms.client.JBossSession;
@@ -447,7 +448,81 @@
    }
 
 
+   public void testCloseNoClientAcknowledgment() throws Exception
+   {
+      // send a message to the queue
 
+      Connection conn = cf.createConnection();
+      Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      s.createProducer(queue).send(s.createTextMessage("wont_ack"));
+      conn.close();
+
+      conn = cf.createConnection();
+      s = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+      conn.start();
+
+      TextMessage m = (TextMessage)s.createConsumer(queue).receive(1000);
+
+      assertEquals("wont_ack", m.getText());
+
+      // Do NOT ACK
+
+      s.close(); // this shouldn cancel the delivery
+
+      // get the message again
+      s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      m = (TextMessage)s.createConsumer(queue).receive(1000);
+
+      assertEquals("wont_ack", m.getText());
+
+      conn.close();
+   }
+
+   public void testCloseInTransaction() throws Exception
+   {
+      // send a message to the queue
+
+      Connection conn = cf.createConnection();
+      Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      s.createProducer(queue).send(s.createTextMessage("bex"));
+      conn.close();
+
+      conn = cf.createConnection();
+      Session session = conn.createSession(true, -1);
+      conn.start();
+
+      TextMessage m = (TextMessage)session.createConsumer(queue).receive(1000);
+
+      assertEquals("bex", m.getText());
+
+      // make sure the acknowledment hasn't been sent to the channel
+      ObjectName on = new ObjectName("jboss.messaging.destination:service=Queue,name=TestQueue");
+      Integer mc = (Integer)ServerManagement.getAttribute(on, "MessageCount");
+
+      assertEquals(1, mc.intValue());
+
+      // close the session
+      session.close();
+
+      // JMS 1.1 4.4.1: "Closing a transacted session must roll back its transaction in progress"
+
+      mc = (Integer)ServerManagement.getAttribute(on, "MessageCount");
+      assertEquals(1, mc.intValue());
+
+      conn.close();
+
+      // make sure I can still get the right message
+
+      conn = cf.createConnection();
+      s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      conn.start();
+      TextMessage rm = (TextMessage)s.createConsumer(queue).receive(1000);
+
+      assertEquals("bex", m.getText());
+
+      conn.close();
+   }
+
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------

Modified: branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2006-09-20 00:48:57 UTC (rev 1323)
@@ -268,7 +268,7 @@
          
          long messageID = 123456;
          int consumerID = 65432;
-         AckInfo ack = new AckInfo(messageID, consumerID);
+         AckInfo ack = new AckInfo(messageID, consumerID, -1);
          
          Object[] args = new Object[] { ack };
          
@@ -358,9 +358,9 @@
          
          mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));   
          
-         AckInfo ackA = new AckInfo(1524, 71627);
-         AckInfo ackB = new AckInfo(987987, 45354);
-         AckInfo ackC = new AckInfo(32423, 4533);
+         AckInfo ackA = new AckInfo(1524, 71627, -1);
+         AckInfo ackB = new AckInfo(987987, 45354, -1);
+         AckInfo ackC = new AckInfo(32423, 4533, -1);
          
          List acks = new ArrayList();
          acks.add(ackA);
@@ -736,7 +736,7 @@
          JBossMessage m = new JBossMessage(123);
          MessageTest.configureMessage(m);
          
-         AckInfo info = new AckInfo(123, 456);
+         AckInfo info = new AckInfo(123, 456, -1);
          
          TxState state = new TxState();
          state.getMessages().add(m);
@@ -854,8 +854,8 @@
          
          List ids = new ArrayList();
          
-         AckInfo ack1 = new AckInfo(1254, 78123);
-         AckInfo ack2 = new AckInfo(786, 8979);
+         AckInfo ack1 = new AckInfo(1254, 78123, -1);
+         AckInfo ack2 = new AckInfo(786, 8979, -1);
          ids.add(ack1);
          ids.add(ack2);
          




More information about the jboss-cvs-commits mailing list