[jboss-cvs] JBoss Messaging SVN: r2204 - in trunk: src/main/org/jboss/messaging/core/plugin/postoffice/cluster and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Feb 8 09:21:08 EST 2007


Author: timfox
Date: 2007-02-08 09:21:08 -0500 (Thu, 08 Feb 2007)
New Revision: 2204

Modified:
   trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
   trunk/tests/etc/log4j.xml
   trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/base/PostOfficeTestBase.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java
Log:
Mainly fix MessageRedistributionTest


Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java	2007-02-08 10:52:31 UTC (rev 2203)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java	2007-02-08 14:21:08 UTC (rev 2204)
@@ -239,7 +239,7 @@
                //Sanity check
                if (((List)param).isEmpty())
                {
-                  log.error("Got a polled callback list - but it is empty!!!");
+                  log.error("Got a polled callback list - but it is empty!!! See http://jira.jboss.org/jira/browse/JBMESSAGING-818");
                }
                
                packet = new PolledCallbacksDelivery((List)param, resp.getSessionId());             

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2007-02-08 10:52:31 UTC (rev 2203)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2007-02-08 14:21:08 UTC (rev 2204)
@@ -158,6 +158,15 @@
    private String groupName;
 
    private volatile boolean started;
+   
+   //FIXME using a stopping flag is not a good approach and introduces a race condition
+   //http://jira.jboss.org/jira/browse/JBMESSAGING-819
+   //the code can check stopping and find it to be false, then the service can stop, setting stopping to true
+   //then actually stopping the post office, then the same thread that checked stopping continues and performs
+   //its action only to find the service stopped
+   //Should use a read-write lock instead
+   //One way to minimise the chance of the race happening is to sleep for a little while after setting stopping to true
+   //before actually stopping the service (see below)
    private volatile boolean stopping;
 
    private JChannelFactory jChannelFactory;
@@ -333,11 +342,15 @@
          log.warn("Attempt to stop() but " + this + " is not started");
          return;
       }
+      
+      //Need to send this *before* stopping
+      syncSendRequest(new LeaveClusterRequest(getNodeId()));
 
       stopping = true;
+      
+      //FIXME http://jira.jboss.org/jira/browse/JBMESSAGING-819 this is a temporary kludge for now
+      Thread.sleep(1000);
 
-      syncSendRequest(new LeaveClusterRequest(getNodeId()));
-
       statsSender.stop();
 
       super.stop(sendNotification);
@@ -715,6 +728,11 @@
     */
    public void asyncSendRequest(ClusterRequest request) throws Exception
    {
+      if (stopping)
+      {
+         return;
+      }
+      
       if (trace) { log.trace(this + " sending asynchronously " + request + " to group"); }
 
       byte[] bytes = writeRequest(request);
@@ -726,6 +744,11 @@
     */
    public void asyncSendRequest(ClusterRequest request, int nodeId) throws Exception
    {
+      if (stopping)
+      {
+         return;
+      }
+      
       Address address = this.getAddressForNodeId(nodeId, false);
 
       if (address == null)
@@ -765,12 +788,15 @@
 
       if (tx == null)
       {
-         throw new IllegalStateException("Cannot find transaction transaction id: " + id);
+         //Commit can come in after the node has left - this is ok
+         if (trace) { log.trace("Cannot find transaction in map, node may have already left"); }
       }
-
-      tx.commit(this);
-
-      if (trace) { log.trace(this + " committed transaction " + id ); }
+      else
+      {
+         tx.commit(this);
+   
+         if (trace) { log.trace(this + " committed transaction " + id ); }
+      }
    }
 
    public void rollbackTransaction(TransactionId id) throws Throwable
@@ -786,12 +812,15 @@
 
       if (tx == null)
       {
-         throw new IllegalStateException("Cannot find transaction transaction id: " + id);
+         // Rollback can come in after the node has left - this is ok
+         if (trace) { log.trace("Cannot find transaction in map, node may have already left"); }
       }
-
-      tx.rollback(this);
-
-      if (trace) { log.trace(this + " committed transaction " + id ); }
+      else
+      {
+         tx.rollback(this);
+   
+         if (trace) { log.trace(this + " committed transaction " + id ); }
+      }
    }
 
    public void updateQueueStats(int nodeId, List statsList) throws Exception
@@ -814,7 +843,7 @@
          if (nameMap == null)
          {
             // This is ok, the node might have left
-            if (trace) { log.trace(this + " cannot find node in name map, i guess the node might have left?"); }
+            if (trace) { log.trace(this + " cannot find node in name map, the node might have left"); }
          }
          else
          {
@@ -1728,6 +1757,11 @@
     */
    private void syncSendRequest(ClusterRequest request) throws Exception
    {
+      if (stopping)
+      {
+         return;
+      }
+      
       if (trace) { log.trace(this + " sending synch request " + request); }
 
       Message message = new Message(null, null, writeRequest(request));

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2007-02-08 10:52:31 UTC (rev 2203)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2007-02-08 14:21:08 UTC (rev 2204)
@@ -395,7 +395,7 @@
                                                name,
                                                del.getReference().getMessage());
                         
-               if (!del.getReference().getMessage().isReliable())
+               if (!del.getReference().getMessage().isReliable() || !recoverable)
                {
                   //We can ack it now
                   del.acknowledge(null);

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java	2007-02-08 10:52:31 UTC (rev 2203)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java	2007-02-08 14:21:08 UTC (rev 2204)
@@ -135,18 +135,26 @@
    {
       //We need to ack the delivery
       
+      if (trace) { log.trace(this + " committing, tx: " + this.txId); }
+      
       //We need to ack it in memory only
       //since it would have been acked on the pulling node
       LocalClusteredQueue queue = (LocalClusteredQueue)reliableDelivery.getObserver();
            
       queue.acknowledgeFromCluster(reliableDelivery);      
+      
+      if (trace) { log.trace(this + " committed, tx: " + this.txId); }
    }
 
    public void rollback(PostOfficeInternal office) throws Throwable
    {
       //We need to cancel the delivery
       
+      if (trace) { log.trace(this + " rolling back, tx: " + this.txId); }
+      
       reliableDelivery.cancel();  
+      
+      if (trace) { log.trace(this + " rolled back, tx: " + this.txId); }
    }
    
    public void read(DataInputStream in) throws Exception

Modified: trunk/tests/etc/log4j.xml
===================================================================
--- trunk/tests/etc/log4j.xml	2007-02-08 10:52:31 UTC (rev 2203)
+++ trunk/tests/etc/log4j.xml	2007-02-08 14:21:08 UTC (rev 2204)
@@ -47,8 +47,8 @@
    </category>
 
    <category name="org.jboss.remoting">
-      <priority value="TRACE" class="org.jboss.logging.XLevel"/>
-      <!-- <priority value="DEBUG"/> -->
+      <!-- <priority value="TRACE" class="org.jboss.logging.XLevel"/> -->
+      <priority value="DEBUG"/>
    </category>
 
    <category name="org.jboss">
@@ -57,14 +57,17 @@
 
    <category name="org.jboss.messaging">
       <priority value="TRACE" class="org.jboss.logging.XLevel"/>
+      <!-- <priority value="DEBUG"/> -->
    </category>
 
    <category name="org.jboss.jms">
       <priority value="TRACE" class="org.jboss.logging.XLevel"/>
+      <!-- <priority value="DEBUG"/> -->
    </category>
 
    <category name="org.jboss.test">
       <priority value="TRACE" class="org.jboss.logging.XLevel"/>
+      <!-- <priority value="DEBUG"/> -->
    </category>
 
    <!-- Ignoring trace from these: -->
@@ -82,8 +85,8 @@
    </category>
 
    <category name="org.jboss.jms.server.remoting.JMSWireFormat">
-      <priority value="TRACE" class="org.jboss.logging.XLevel"/>
-      <!-- <priority value="DEBUG"/> -->
+      <!-- <priority value="TRACE" class="org.jboss.logging.XLevel"/> -->
+      <priority value="DEBUG"/>
    </category>
 
    <root>

Modified: trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java	2007-02-08 10:52:31 UTC (rev 2203)
+++ trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java	2007-02-08 14:21:08 UTC (rev 2204)
@@ -22,10 +22,8 @@
 package org.jboss.test.messaging.core;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Channel;
@@ -45,6 +43,7 @@
  * an "active" delivery (NACKING) undelivered, or throw unchecked exceptions.
  *
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision$</tt>
  *
  * $Id$
@@ -62,8 +61,6 @@
    public static final String SELECTOR_REJECTING = "SELECTOR_REJECTING";
    public static final String ACCEPTING_TO_MAX = "ACCEPTING_TO_MAX";
 
-   private static final String INVOCATION_COUNT = "INVOCATION_COUNT";
-
    // Static --------------------------------------------------------
 
    // Attributes ----------------------------------------------------
@@ -72,12 +69,14 @@
    private List messages;
    private String state;
    private String name;
-   private Channel channel;
-   private String futureState;
-   private int invocationsToFutureStateCount;
-   private Map waitingArea;
+   private Channel channel; 
+   
    private boolean immediateAsynchronousAcknowledgment;
    private int maxRefs;
+   
+   private int count;
+   private int waitForCount = -1;
+   
 
    // Constructors --------------------------------------------------
 
@@ -113,8 +112,6 @@
       this.state = state;
       this.channel = channel;
       messages = new ArrayList();
-      waitingArea = new HashMap();
-      waitingArea.put(INVOCATION_COUNT, new Integer(0));
       immediateAsynchronousAcknowledgment = false;
    }
 
@@ -123,7 +120,7 @@
    public Delivery handle(DeliveryObserver observer, MessageReference ref, Transaction tx)
    {
       log.trace(this + " got routable:" + ref);
-      
+          
       try
       {
          if (ref == null)
@@ -163,8 +160,10 @@
          log.trace("State is:" + state);
          
          boolean done = ACKING.equals(state) ? true : false;
-         log.trace(this + " is " + (done ? "ACKing" : "NACKing") +  " message " + ref);
          
+         //NOTE! it is NOT Nacking, it is keeping - don't say NACKing - it is misleading (nack means cancel)         
+         log.trace(this + " is " + (done ? "ACKing" : "Keeping") +  " message " + ref);
+         
          Message m = ref.getMessage();
          
          SimpleDelivery delivery = new SimpleDelivery(observer, ref, done);
@@ -186,18 +185,14 @@
       }
       finally
       {
-         synchronized(waitingArea)
+         synchronized (this)
          {
-            if (futureState != null && --invocationsToFutureStateCount == 0)
+            count++;
+            if (waitForCount != -1 && count >= waitForCount)
             {
-               state = futureState;
-               futureState = null;
+               this.notify();
             }
-
-            Integer crt = (Integer)waitingArea.get(INVOCATION_COUNT);
-            waitingArea.put(INVOCATION_COUNT, new Integer(crt.intValue() + 1));
-            waitingArea.notifyAll();
-         }
+         }         
       }
    }
    
@@ -245,38 +240,34 @@
       return l;
    }
 
-   public boolean waitForHandleInvocations(int count)
-   {
-      return waitForHandleInvocations(count, Long.MAX_VALUE);
-   }
-
    /**
     * Blocks until handle() is called for the specified number of times.
     *
     * @return true if the handle was invoked the specified number of times or false if the method
     *         exited with timeout.
     */
-   public boolean waitForHandleInvocations(int count, long timeout)
+   public boolean waitForHandleInvocations(int waitFor, long timeout)
    {
       long start = System.currentTimeMillis();
-      synchronized(waitingArea)
+      
+      synchronized(this)
       {
-         while(true)
-         {
-            Integer invocations = (Integer)waitingArea.get(INVOCATION_COUNT);
-
-            if (invocations.intValue() == count)
+         this.waitForCount = waitFor;
+         
+         while (this.count < waitForCount)
+         {      
+            if (timeout < 0)
             {
-               return true;
-            }
-            if (timeout <= 0)
-            {
+               resetInvocationCount();
                return false;
             }
+            
             try
             {
-               waitingArea.wait(timeout);
-               timeout -= System.currentTimeMillis() - start;
+               this.wait(timeout);
+               long now = System.currentTimeMillis();
+               timeout -= now - start;
+               start = now;
             }
             catch(InterruptedException e)
             {
@@ -284,17 +275,12 @@
             }
          }
       }
+      
+      resetInvocationCount();
+      
+      return true;
    }
 
-   public void resetInvocationCount()
-   {
-      synchronized(waitingArea)
-      {
-         waitingArea.put(INVOCATION_COUNT, new Integer(0));
-         waitingArea.notifyAll();
-      }
-   }
-
    public void acknowledge(Message r, Transaction tx) throws Throwable
    {
       log.debug(this + " acknowledging "  + r);
@@ -367,29 +353,6 @@
       log.trace(this + " cancelled "  + r);
    }
 
-
-   public void setState(String state)
-   {
-      checkValid(state);
-      this.state = state;
-   }
-
-   /**
-    * Sets the given state on the receiver, but only after "invocationCount" handle() invocations.
-    * The state changes <i>after</i> the last invocation.
-    */
-   public void setState(String state, int invocationCount)
-   {
-      checkValid(state);
-      futureState = state;
-      invocationsToFutureStateCount = invocationCount;
-   }
-
-   public String getState()
-   {
-      return state;
-   }
-
    public String toString()
    {
       return "Receiver["+ name +"](" + state + ")";
@@ -413,6 +376,12 @@
          throw new IllegalArgumentException("Unknown receiver state: " + state);
       }
    }
+   
+   private void resetInvocationCount()
+   {
+     this.waitForCount = -1;
+     this.count = 0;      
+   }
 
    // Inner classes -------------------------------------------------
 

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/base/PostOfficeTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/base/PostOfficeTestBase.java	2007-02-08 10:52:31 UTC (rev 2203)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/base/PostOfficeTestBase.java	2007-02-08 14:21:08 UTC (rev 2204)
@@ -127,6 +127,7 @@
                                         "Clustered", ms, pm, tr, ff, cf, pool,
                                         groupName, jChannelFactory,
                                         stateTimeout, castTimeout, pullPolicy, rf, mapper, 1000);
+      
       postOffice.start();
 
       return postOffice;

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java	2007-02-08 10:52:31 UTC (rev 2203)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java	2007-02-08 14:21:08 UTC (rev 2204)
@@ -524,6 +524,8 @@
 
       try
       {
+         log.trace("Creating post offices");
+         
          office1 = (DefaultClusteredPostOffice)
             createClusteredPostOffice(1, "testgroup", 10000, 10000, new DefaultMessagePullPolicy(),
                                       sc, ms, pm, tr, pool);
@@ -543,6 +545,8 @@
          office5 = (DefaultClusteredPostOffice)
             createClusteredPostOffice(5, "testgroup", 10000, 10000, new DefaultMessagePullPolicy(),
                                       sc, ms, pm, tr, pool);
+         
+         log.trace("Created postoffices");
 
          LocalClusteredQueue queue1 =
             new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm,
@@ -568,17 +572,25 @@
             new LocalClusteredQueue(office5, 5, "queue1", channelIDManager.getID(), ms, pm,
                                     true, recoverable, (QueuedExecutor) pool.get(), -1, null, tr);
          office5.bindClusteredQueue(new SimpleCondition("queue1"), queue5);
+         
+         log.trace("Created and bound queues");
 
          final int NUM_MESSAGES = 100;
 
+         log.trace("sending messages");
          this.sendMessages("queue1", persistent, office1, NUM_MESSAGES, null);
          this.sendMessages("queue1", persistent, office2, NUM_MESSAGES, null);
          this.sendMessages("queue1", persistent, office3, NUM_MESSAGES, null);
          this.sendMessages("queue1", persistent, office4, NUM_MESSAGES, null);
          this.sendMessages("queue1", persistent, office5, NUM_MESSAGES, null);
+         
+         log.trace("sent messages");
 
          Thread.sleep(2000);
 
+         
+         log.trace("Finished small sleep");
+         
          //Check the sizes
 
          log.trace("Here are the sizes:");
@@ -602,14 +614,29 @@
 
          assertEquals(NUM_MESSAGES, queue5.memoryRefCount());
          assertEquals(0, queue5.getDeliveringCount());
+         
+         log.trace("Creating receiver");
 
          SimpleReceiver receiver = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         
+         log.trace("Created receiver");
 
          queue1.add(receiver);
+         
+         log.trace("Added receiver");
 
          queue1.deliver(false);
+         
+         log.trace("Called deliver");
 
-         Thread.sleep(7000);
+         log.trace("Waiting for handleInvocations");
+         long start = System.currentTimeMillis();         
+         receiver.waitForHandleInvocations(NUM_MESSAGES * 5, 60000);
+         long end = System.currentTimeMillis();
+         log.trace("I waited for " + (end - start) + " ms");
+         
+         Thread.sleep(2000);
+         
 
          log.trace("Here are the sizes:");
          log.trace("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
@@ -640,6 +667,8 @@
          assertEquals(NUM_MESSAGES * 5, messages.size());
 
          Iterator iter = messages.iterator();
+         
+         log.trace("Acknowledging messages");
 
          while (iter.hasNext())
          {
@@ -647,6 +676,8 @@
 
             receiver.acknowledge(msg, null);
          }
+         
+         log.trace("Acknowledged messages");
 
          receiver.clear();
 
@@ -800,7 +831,8 @@
          queue5.add(receiver5);
 
          receiver1.setMaxRefs(5);
-         queue1.deliver(false);
+         queue1.deliver(false);         
+         receiver1.waitForHandleInvocations(5, 20000);
          Thread.sleep(1000);
          assertEquals(NUM_MESSAGES - 5, queue1.memoryRefCount());
          assertEquals(5, queue1.getDeliveringCount());
@@ -810,7 +842,8 @@
          receiver1.setMaxRefs(0);
 
          receiver2.setMaxRefs(10);
-         queue2.deliver(false);
+         queue2.deliver(false);         
+         receiver2.waitForHandleInvocations(10, 20000);
          Thread.sleep(1000);
          assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
          assertEquals(10, queue2.getDeliveringCount());
@@ -818,7 +851,8 @@
          receiver2.setMaxRefs(0);
 
          receiver3.setMaxRefs(15);
-         queue3.deliver(false);
+         queue3.deliver(false);         
+         receiver3.waitForHandleInvocations(15, 20000);
          Thread.sleep(1000);
          assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
          assertEquals(15, queue3.getDeliveringCount());
@@ -826,7 +860,8 @@
          receiver3.setMaxRefs(0);
 
          receiver4.setMaxRefs(20);
-         queue4.deliver(false);
+         queue4.deliver(false);         
+         receiver4.waitForHandleInvocations(20, 20000);
          Thread.sleep(1000);
          assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
          assertEquals(20, queue4.getDeliveringCount());
@@ -834,15 +869,14 @@
          receiver4.setMaxRefs(0);
 
          receiver5.setMaxRefs(25);
-         queue5.deliver(false);
+         queue5.deliver(false);         
+         receiver5.waitForHandleInvocations(25, 20000);
          Thread.sleep(1000);
          assertEquals(NUM_MESSAGES - 25, queue5.memoryRefCount());
          assertEquals(25, queue5.getDeliveringCount());
          acknowledgeAll(receiver5);
          receiver5.setMaxRefs(0);
 
-         Thread.sleep(1000);
-
          assertTrue(office1.getHoldingTransactions().isEmpty());
          assertTrue(office2.getHoldingTransactions().isEmpty());
          assertTrue(office3.getHoldingTransactions().isEmpty());
@@ -859,7 +893,9 @@
          //Consume the rest from queue 5
          receiver5.setMaxRefs(NUM_MESSAGES - 25);
          queue5.deliver(false);
-         Thread.sleep(5000);
+         receiver5.waitForHandleInvocations(NUM_MESSAGES - 25, 20000);
+         
+         Thread.sleep(2000);
 
          log.trace("receiver5 msgs:" + receiver5.getMessages().size());
 
@@ -907,7 +943,9 @@
 
          receiver5.setMaxRefs(5);
          queue5.deliver(false);
-         Thread.sleep(5000);
+         receiver5.waitForHandleInvocations(5, 20000);
+         
+         Thread.sleep(2000);
 
          log.trace("Here are the sizes 4:");
          log.trace("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
@@ -948,6 +986,8 @@
 
          receiver5.setMaxRefs(1);
          queue5.deliver(false);
+         receiver5.waitForHandleInvocations(1, 20000);
+         
          Thread.sleep(2000);
 
          log.trace("Here are the sizes 5:");
@@ -986,9 +1026,12 @@
 
          //From queue 4 consume everything else
 
-         receiver4.setMaxRefs(NUM_MESSAGES - 15 + NUM_MESSAGES - 20 + NUM_MESSAGES - 11 + NUM_MESSAGES - 11 + 1);
+         int num = NUM_MESSAGES - 15 + NUM_MESSAGES - 20 + NUM_MESSAGES - 11 + NUM_MESSAGES - 11 + 1;
+         receiver4.setMaxRefs(num);
          queue4.deliver(false);
-         Thread.sleep(7000);
+         receiver4.waitForHandleInvocations(num, 20000);
+         
+         Thread.sleep(2000);
 
          log.trace("Here are the sizes 6:");
          log.trace("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());




More information about the jboss-cvs-commits mailing list