[hornetq-commits] JBoss hornetq SVN: r11894 - in branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710: src/main/org/hornetq/core/management/impl and 9 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat Dec 10 20:58:22 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-12-10 20:58:20 -0500 (Sat, 10 Dec 2011)
New Revision: 11894

Added:
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/mac-build.sh
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/MultipleConsumerTest.java
Modified:
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/Queue.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/util/JMSTestBase.java
Log:
JBPAPP-7710 - Back porting fixed from 2.2.8 into 2.2.5 _JBPAPP_7242 as the Customer is having issues

Added: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/mac-build.sh
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/mac-build.sh	                        (rev 0)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/mac-build.sh	2011-12-11 01:58:20 UTC (rev 11894)
@@ -0,0 +1,2 @@
+#you need to define this variable on mac
+./build.sh -Djdk5.home=/System/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home/ "$@"


Property changes on: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/mac-build.sh
___________________________________________________________________
Added: svn:executable
   + *

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/management/impl/QueueControlImpl.java	2011-12-11 01:43:04 UTC (rev 11893)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/management/impl/QueueControlImpl.java	2011-12-11 01:58:20 UTC (rev 11894)
@@ -400,7 +400,7 @@
       {
          Filter filter = FilterImpl.createFilter(filterStr);
          List<Map<String, Object>> messages = new ArrayList<Map<String, Object>>();
-         queue.blockOnExecutorFuture();
+         queue.flushExecutor();
          LinkedListIterator<MessageReference> iterator = queue.iterator();
          try
          {

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java	2011-12-11 01:43:04 UTC (rev 11893)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java	2011-12-11 01:58:20 UTC (rev 11894)
@@ -107,10 +107,10 @@
 
    public List<PagedMessage> read(StorageManager storage) throws Exception
    {
-	  if (isDebug)
-	  {
-	     log.debug("reading page " + this.pageId + " on address = " + storeName);
-	  }
+     if (isDebug)
+     {
+        log.debug("reading page " + this.pageId + " on address = " + storeName);
+     }
       
       ArrayList<PagedMessage> messages = new ArrayList<PagedMessage>();
 
@@ -212,7 +212,10 @@
 
    public void open() throws Exception
    {
-      file.open();
+      if (!file.isOpen())
+      {
+         file.open();
+      }
       size.set((int)file.size());
       file.position(0);
    }
@@ -307,6 +310,21 @@
    {
       return otherPage.getPageId() - this.pageId;
    }
+   
+   public void finalize()
+   {
+      try
+      {
+         if (file != null && file.isOpen())
+         {
+            file.close();
+         }
+      }
+      catch (Exception e)
+      {
+         log.warn(e.getMessage(), e);
+      }
+   }
 
 
    /* (non-Javadoc)

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-12-11 01:43:04 UTC (rev 11893)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-12-11 01:58:20 UTC (rev 11894)
@@ -730,6 +730,12 @@
    {
       return notificationLock;
    }
+   
+   // For tests
+   public AddressManager getAddressManager()
+   {
+      return addressManager;
+   }
 
    public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
    {

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/Queue.java	2011-12-11 01:43:04 UTC (rev 11893)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/Queue.java	2011-12-11 01:58:20 UTC (rev 11894)
@@ -167,7 +167,7 @@
    
    void resetAllIterators();
 
-   boolean blockOnExecutorFuture();
+   boolean flushExecutor();
    
    void close() throws Exception;
    

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-12-11 01:43:04 UTC (rev 11893)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-12-11 01:58:20 UTC (rev 11894)
@@ -24,11 +24,14 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
@@ -428,7 +431,7 @@
          {
             // We must block on the executor to ensure any async deliveries have completed or we might get out of order
             // deliveries
-            if (blockOnExecutorFuture())
+            if (flushExecutor())
             {
                // Go into direct delivery mode
                directDeliver = true;
@@ -449,7 +452,7 @@
 
       directDeliver = false;
 
-      executor.execute(concurrentPoller);
+      getExecutor().execute(concurrentPoller);
    }
 
    public void forceDelivery()
@@ -458,14 +461,14 @@
       {
          if (isTrace)
          {
-         	log.trace("Force delivery scheduling depage");
+            log.trace("Force delivery scheduling depage");
          }
          scheduleDepage();
       }
       
       if (isTrace)
       {
-      	log.trace("Force delivery deliverying async");
+         log.trace("Force delivery deliverying async");
       }
       
       deliverAsync();
@@ -473,7 +476,13 @@
    
    public void deliverAsync()
    {
-      getExecutor().execute(deliverRunner);
+      try
+      {
+         getExecutor().execute(deliverRunner);
+      }
+      catch (RejectedExecutionException ignored)
+      {
+      }
    }
 
    public void close() throws Exception
@@ -483,7 +492,20 @@
          checkQueueSizeFuture.cancel(false);
       }
 
-      cancelRedistributor();
+      getExecutor().execute(new Runnable(){
+         public void run()
+         {
+            try
+            {
+               cancelRedistributor();
+            }
+            catch (Exception e)
+            {
+               // nothing that could be done anyway.. just logging
+               log.warn(e.getMessage(), e);
+            }
+         }
+      });
    }
 
    public Executor getExecutor()
@@ -504,14 +526,14 @@
    {
       deliverAsync();
 
-      blockOnExecutorFuture();
+      flushExecutor();
    }
 
-   public boolean blockOnExecutorFuture()
+   public boolean flushExecutor()
    {
       Future future = new Future();
 
-      executor.execute(future);
+      getExecutor().execute(future);
 
       boolean ok = future.await(10000);
 
@@ -780,9 +802,31 @@
 
    public long getMessageCount()
    {
-      blockOnExecutorFuture();
+      final CountDownLatch latch = new CountDownLatch(1);
+      final AtomicLong count = new AtomicLong(0);
       
-      return getInstantMessageCount();
+      getExecutor().execute(new Runnable()
+      {
+         public void run()
+         {
+            count.set(getInstantMessageCount());
+            latch.countDown();
+         }
+      });
+      
+      try
+      {
+         if (!latch.await(10, TimeUnit.SECONDS))
+         {
+            throw new IllegalStateException("Timed out on waiting for MessageCount");
+         }
+      }
+      catch (Exception e)
+      {
+         log.warn(e.getMessage(), e);
+      }
+      
+      return count.get();
    }
    
    public long getInstantMessageCount()
@@ -907,6 +951,7 @@
 
    public synchronized void cancel(final MessageReference reference, final long timeBase) throws Exception
    {
+      deliveringCount.decrementAndGet();
       if (checkRedelivery(reference, timeBase))
       {
          if (!scheduledDeliveryHandler.checkAndSchedule(reference, false))
@@ -924,7 +969,7 @@
       {
          if (isTrace)
          {
-            log.trace("moving expired reference " + ref + " to address = " + expiryAddress + " from queue=" + this.getName(), new Exception ("trace"));
+            log.trace("moving expired reference " + ref + " to address = " + expiryAddress + " from queue=" + this.getName());
          }
          move(expiryAddress, ref, true, false);
       }
@@ -950,9 +995,31 @@
 
    public long getMessagesAdded()
    {
-      blockOnExecutorFuture();
+      final CountDownLatch latch = new CountDownLatch(1);
+      final AtomicLong count = new AtomicLong(0);
       
-      return getInstantMessagesAdded();
+      getExecutor().execute(new Runnable()
+      {
+         public void run()
+         {
+            count.set(getInstantMessagesAdded());
+            latch.countDown();
+         }
+      });
+      
+      try
+      {
+         if (!latch.await(10, TimeUnit.SECONDS))
+         {
+            throw new IllegalStateException("Timed out on waiting for MessagesAdded");
+         }
+      }
+      catch (Exception e)
+      {
+         log.warn(e.getMessage(), e);
+      }
+      
+      return count.get();
   }
    
    public long getInstantMessagesAdded()
@@ -1137,28 +1204,43 @@
       }
    }
 
-   public synchronized void expireReferences() throws Exception
+   public void expireReferences() throws Exception
    {
-      LinkedListIterator<MessageReference> iter = iterator();
-
-      try
-      {
-         while (iter.hasNext())
+      getExecutor().execute(new Runnable(){
+         public void run()
          {
-            MessageReference ref = iter.next();
-            if (ref.getMessage().isExpired())
+            synchronized (QueueImpl.this)
             {
-               deliveringCount.incrementAndGet();
-               expire(ref);
-               iter.remove();
-               refRemoved(ref);
+               LinkedListIterator<MessageReference> iter = iterator();
+   
+               try
+               {
+                  while (iter.hasNext())
+                  {
+                     MessageReference ref = iter.next();
+                     try
+                     {
+                        if (ref.getMessage().isExpired())
+                        {
+                           deliveringCount.incrementAndGet();
+                           expire(ref);
+                           iter.remove();
+                           refRemoved(ref);
+                        }
+                     }
+                     catch (Exception e)
+                     {
+                        log.warn("Error expiring reference " + ref, e);
+                     }
+                  }
+               }
+               finally
+               {
+                  iter.close();
+               }
             }
          }
-      }
-      finally
-      {
-         iter.close();
-      }
+      });
    }
 
    public synchronized boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception
@@ -1486,7 +1568,7 @@
    @Override
    public String toString()
    {
-      return "QueueImpl[name=" + name.toString() + "]@" + Integer.toHexString(System.identityHashCode(this));
+      return "QueueImpl[name=" + name.toString() + ", postOffice=" + this.postOffice + "]@" + Integer.toHexString(System.identityHashCode(this));
    }
 
    // Private
@@ -1530,166 +1612,168 @@
 
    // This method will deliver as many messages as possible until all consumers are busy or there are no more matching
    // or available messages
-   private synchronized void deliver()
+   private void deliver()
    {
-      if (paused || consumerList.isEmpty())
+      synchronized (this)
       {
-         return;
-      }
-      
-      if (log.isDebugEnabled())
-      {
-         log.debug(this + " doing deliver. messageReferences=" + messageReferences.size());
-      }
+         if (paused || consumerList.isEmpty())
+         {
+            return;
+         }
 
-      int busyCount = 0;
+         if (log.isDebugEnabled())
+         {
+            log.debug(this + " doing deliver. messageReferences=" + messageReferences.size());
+         }
 
-      int nullRefCount = 0;
+         int busyCount = 0;
 
-      int size = consumerList.size();
+         int nullRefCount = 0;
 
-      int endPos = pos == size - 1 ? 0 : size - 1;
+         int size = consumerList.size();
 
-      int numRefs = messageReferences.size();
+         int endPos = pos == size - 1 ? 0 : size - 1;
 
-      int handled = 0;
-      
-      long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
+         int numRefs = messageReferences.size();
 
-      while (handled < numRefs)
-      {
-         if (handled == MAX_DELIVERIES_IN_LOOP)
-         {
-            // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too long
+         int handled = 0;
 
-            deliverAsync();
+         long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
 
-            return;
-         }
-         
-         if (System.currentTimeMillis() > timeout)
+         while (handled < numRefs)
          {
-            if (isTrace)
+            if (handled == MAX_DELIVERIES_IN_LOOP)
             {
-               log.trace("delivery has been running for too long. Scheduling another delivery task now");
-            }
-            
-            deliverAsync();
-            
-            return;
-         }
-         
+               // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too
+               // long
 
-         ConsumerHolder holder = consumerList.get(pos);
+               deliverAsync();
 
-         Consumer consumer = holder.consumer;
+               return;
+            }
 
-         if (holder.iter == null)
-         {
-            holder.iter = messageReferences.iterator();
-         }
-
-         MessageReference ref;
-
-         if (holder.iter.hasNext())
-         {
-            ref = holder.iter.next();
-         }
-         else
-         {
-            ref = null;
-         }
-         
-
-         if (ref == null)
-         {
-            nullRefCount++;
-         }
-         else
-         {
-            if (checkExpired(ref))
+            if (System.currentTimeMillis() > timeout)
             {
                if (isTrace)
                {
-                  log.trace("Reference " + ref + " being expired");
+                  log.trace("delivery has been running for too long. Scheduling another delivery task now");
                }
-               holder.iter.remove();
 
-               refRemoved(ref);
-               
-               handled++;
+               deliverAsync();
 
-               continue;
+               return;
             }
 
-            Consumer groupConsumer = null;
-            
-            if (isTrace)
+            ConsumerHolder holder = consumerList.get(pos);
+
+            Consumer consumer = holder.consumer;
+
+            if (holder.iter == null)
             {
-               log.trace("Queue " + this.getName() + " is delivering reference " + ref);
+               holder.iter = messageReferences.iterator();
             }
 
-            // If a group id is set, then this overrides the consumer chosen round-robin
+            MessageReference ref;
 
-            SimpleString groupID = ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+            if (holder.iter.hasNext())
+            {
+               ref = holder.iter.next();
+            }
+            else
+            {
+               ref = null;
+            }
 
-            if (groupID != null)
+            if (ref == null)
             {
-               groupConsumer = groups.get(groupID);
+               nullRefCount++;
+            }
+            else
+            {
+               if (checkExpired(ref))
+               {
+                  if (isTrace)
+                  {
+                     log.trace("Reference " + ref + " being expired");
+                  }
+                  holder.iter.remove();
 
-               if (groupConsumer != null)
+                  refRemoved(ref);
+
+                  handled++;
+
+                  continue;
+               }
+
+               Consumer groupConsumer = null;
+
+               if (isTrace)
                {
-                  consumer = groupConsumer;
+                  log.trace("Queue " + this.getName() + " is delivering reference " + ref);
                }
-            }
 
-            HandleStatus status = handle(ref, consumer);
+               // If a group id is set, then this overrides the consumer chosen round-robin
 
-            if (status == HandleStatus.HANDLED)
-            {
-               holder.iter.remove();
+               SimpleString groupID = ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
 
-               refRemoved(ref);
+               if (groupID != null)
+               {
+                  groupConsumer = groups.get(groupID);
 
-               if (groupID != null && groupConsumer == null)
+                  if (groupConsumer != null)
+                  {
+                     consumer = groupConsumer;
+                  }
+               }
+
+               HandleStatus status = handle(ref, consumer);
+
+               if (status == HandleStatus.HANDLED)
                {
-                  groups.put(groupID, consumer);
+                  holder.iter.remove();
+
+                  refRemoved(ref);
+
+                  if (groupID != null && groupConsumer == null)
+                  {
+                     groups.put(groupID, consumer);
+                  }
+
+                  handled++;
                }
+               else if (status == HandleStatus.BUSY)
+               {
+                  holder.iter.repeat();
 
-               handled++;
+                  busyCount++;
+               }
+               else if (status == HandleStatus.NO_MATCH)
+               {
+               }
             }
-            else if (status == HandleStatus.BUSY)
-            {
-               holder.iter.repeat();
 
-               busyCount++;
-            }
-            else if (status == HandleStatus.NO_MATCH)
+            if (pos == endPos)
             {
-            }
-         }
+               // Round robin'd all
 
-         if (pos == endPos)
-         {
-            // Round robin'd all
-
-            if (nullRefCount + busyCount == size)
-            {
-               if (log.isDebugEnabled())
+               if (nullRefCount + busyCount == size)
                {
-                   log.debug(this + "::All the consumers were busy, giving up now");
+                  if (log.isDebugEnabled())
+                  {
+                     log.debug(this + "::All the consumers were busy, giving up now");
+                  }
+                  break;
                }
-               break;
+
+               nullRefCount = busyCount = 0;
             }
 
-            nullRefCount = busyCount = 0;
-         }
+            pos++;
 
-         pos++;
-
-         if (pos == size)
-         {
-            pos = 0;
+            if (pos == size)
+            {
+               pos = 0;
+            }
          }
       }
 
@@ -1735,7 +1819,7 @@
       }
    }
 
-   private synchronized void depage()
+   private void depage()
    {
       depagePending = false;
 
@@ -1860,7 +1944,7 @@
             }
             reference.setScheduledDeliveryTime(timeBase + redeliveryDelay);
             
-            if (message.isDurable() && durable)
+            if (!reference.isPaged() && message.isDurable() && durable)
             {
                storageManager.updateScheduledDeliveryTime(reference);
             }
@@ -2173,7 +2257,7 @@
       }
       catch (Exception e)
       {
-	      QueueImpl.log.warn("Unable to decrement reference counting", e);
+         QueueImpl.log.warn("Unable to decrement reference counting", e);
       }
    }
 
@@ -2244,6 +2328,10 @@
 
          for (MessageReference ref : refsToAck)
          {
+            if (log.isTraceEnabled())
+            {
+               log.trace("rolling back " + ref);
+            }
             try
             {
                if (ref.getQueue().checkRedelivery(ref, timeBase))

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-12-11 01:43:04 UTC (rev 11893)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-12-11 01:58:20 UTC (rev 11894)
@@ -21,6 +21,7 @@
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.client.impl.ClientConsumerImpl;
@@ -61,7 +62,7 @@
    // Constants ------------------------------------------------------------------------------------
 
    private static final Logger log = Logger.getLogger(ServerConsumerImpl.class);
-   
+
    private static boolean isTrace = log.isTraceEnabled();
 
    // Static ---------------------------------------------------------------------------------------
@@ -85,14 +86,12 @@
    private boolean started;
 
    private volatile LargeMessageDeliverer largeMessageDeliverer = null;
-   
+
    public String debug()
    {
       return toString() + "::Delivering " + this.deliveringRefs.size();
    }
 
-   private boolean largeMessageInDelivery;
-
    /**
     * if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
     */
@@ -117,7 +116,7 @@
    private final Binding binding;
 
    private boolean transferring = false;
-   
+
    /* As well as consumer credit based flow control, we also tap into TCP flow control (assuming transport is using TCP)
     * This is useful in the case where consumer-window-size = -1, but we don't want to OOM by sending messages ad infinitum to the Netty
     * write queue when the TCP buffer is full, e.g. the client is slow or has died.    
@@ -165,11 +164,11 @@
       minLargeMessageSize = session.getMinLargeMessageSize();
 
       this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
-      
+
       this.callback.addReadyListener(this);
 
       this.creationTime = System.currentTimeMillis();
-      
+
       if (browseOnly)
       {
          browserDeliverer = new BrowserDeliverer(messageQueue.iterator());
@@ -187,7 +186,7 @@
    {
       return id;
    }
-   
+
    public boolean isBrowseOnly()
    {
       return browseOnly;
@@ -197,12 +196,12 @@
    {
       return creationTime;
    }
-   
+
    public String getConnectionID()
    {
       return this.session.getConnectionID().toString();
    }
-   
+
    public String getSessionID()
    {
       return this.session.getName();
@@ -212,20 +211,23 @@
    {
       if (availableCredits != null && availableCredits.get() <= 0)
       {
-         if (log.isDebugEnabled() )
+         if (log.isDebugEnabled())
          {
-            log.debug(this  + " is busy for the lack of credits!!!");
+            log.debug(this + " is busy for the lack of credits. Current credits = " +
+                      availableCredits +
+                      " Can't receive reference " +
+                      ref);
          }
-         
+
          return HandleStatus.BUSY;
       }
-      
-// TODO - https://jira.jboss.org/browse/HORNETQ-533      
-//      if (!writeReady.get())
-//      {
-//         return HandleStatus.BUSY;
-//      }
-      
+
+      // TODO - https://jira.jboss.org/browse/HORNETQ-533
+      // if (!writeReady.get())
+      // {
+      // return HandleStatus.BUSY;
+      // }
+
       synchronized (lock)
       {
          // If the consumer is stopped then we don't accept the message, it
@@ -238,11 +240,23 @@
 
          // If there is a pendingLargeMessage we can't take another message
          // This has to be checked inside the lock as the set to null is done inside the lock
-         if (largeMessageInDelivery)
+         if (largeMessageDeliverer != null)
          {
+            if (log.isDebugEnabled())
+            {
+               log.debug(this + " is busy delivering large message " +
+                         largeMessageDeliverer +
+                         ", can't deliver reference " +
+                         ref);
+            }
             return HandleStatus.BUSY;
          }
 
+         if (log.isTraceEnabled())
+         {
+            log.trace("Handling reference " + ref);
+         }
+
          final ServerMessage message = ref.getMessage();
 
          if (filter != null && !filter.match(message))
@@ -265,7 +279,9 @@
             // the updateDeliveryCount would still be updated after c
             if (strictUpdateDeliveryCount && !ref.isPaged())
             {
-               if (ref.getMessage().isDurable() && ref.getQueue().isDurable() && !ref.getQueue().isInternalQueue())
+               if (ref.getMessage().isDurable() && ref.getQueue().isDurable() &&
+                   !ref.getQueue().isInternalQueue() &&
+                   !ref.isPaged())
                {
                   storageManager.updateDeliveryCount(ref);
                }
@@ -306,7 +322,7 @@
    public void close(final boolean failed) throws Exception
    {
       callback.removeReadyListener(this);
-      
+
       setStarted(false);
 
       if (largeMessageDeliverer != null)
@@ -352,8 +368,8 @@
 
          props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
 
-         props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filter == null ? null
-                                                                                        : filter.getFilterString());
+         props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING,
+                                       filter == null ? null : filter.getFilterString());
 
          props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
 
@@ -374,39 +390,71 @@
    {
       promptDelivery();
 
-      Future future = new Future();
+      // JBPAPP-6030 - Using the executor to avoid distributed dead locks 
+      messageQueue.getExecutor().execute(new Runnable()
+      {
+         public void run()
+         {
+            try
+            {
+               // We execute this on the same executor to make sure the force delivery message is written after
+               // any delivery is completed
 
-      messageQueue.getExecutor().execute(future);
+               synchronized (lock)
+               {
+                  if (transferring)
+                  {
+                     // Case it's transferring (reattach), we will retry later
+                     messageQueue.getExecutor().execute(new Runnable()
+                     {
+                        public void run()
+                        {
+                           forceDelivery(sequence);
+                        }
+                     });
+                  }
+                  else
+                  {
+                     ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
+      
+                     forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
+                     forcedDeliveryMessage.setAddress(messageQueue.getName());
+      
+                     callback.sendMessage(forcedDeliveryMessage, id, 0);
+                  }
+               }
+            }
+            catch (Exception e)
+            {
+               ServerConsumerImpl.log.error("Failed to send forced delivery message", e);
+            }
+         }
+      });
 
-      boolean ok = future.await(10000);
+   }
 
-      if (!ok)
-      {
-         log.warn("Timed out waiting for executor");
-      }
+   public LinkedList<MessageReference> cancelRefs(final boolean failed,
+                                                  final boolean lastConsumedAsDelivered,
+                                                  final Transaction tx) throws Exception
+   {
+      boolean performACK = lastConsumedAsDelivered;
 
       try
       {
-         // We execute this on the same executor to make sure the force delivery message is written after
-         // any delivery is completed
-
-         ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
-
-         forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
-         forcedDeliveryMessage.setAddress(messageQueue.getName());
-
-         callback.sendMessage(forcedDeliveryMessage, id, 0);
+         if (largeMessageDeliverer != null)
+         {
+            largeMessageDeliverer.finish();
+         }
       }
-      catch (Exception e)
+      catch (Throwable e)
       {
-         ServerConsumerImpl.log.error("Failed to send forced delivery message", e);
+         log.warn("Error on resetting large message deliver - " + largeMessageDeliverer, e);
       }
-   }
+      finally
+      {
+         largeMessageDeliverer = null;
+      }
 
-   public LinkedList<MessageReference> cancelRefs(final boolean failed, final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
-   {
-      boolean performACK = lastConsumedAsDelivered;
-
       LinkedList<MessageReference> refs = new LinkedList<MessageReference>();
 
       if (!deliveringRefs.isEmpty())
@@ -427,8 +475,9 @@
             {
                if (!failed)
                {
-                  //We don't decrement delivery count if the client failed, since there's a possibility that refs were actually delivered but we just didn't get any acks for them
-                  //before failure
+                  // We don't decrement delivery count if the client failed, since there's a possibility that refs
+                  // were actually delivered but we just didn't get any acks for them
+                  // before failure
                   ref.decrementDeliveryCount();
                }
 
@@ -461,21 +510,6 @@
       synchronized (lock)
       {
          this.transferring = transferring;
-
-         if (transferring)
-         {
-            // Now we must wait for any large message delivery to finish
-            while (largeMessageInDelivery)
-            {
-               try
-               {
-                  Thread.sleep(1);
-               }
-               catch (InterruptedException ignore)
-               {
-               }
-            }
-         }
       }
 
       // Outside the lock
@@ -504,18 +538,23 @@
    }
 
    public void receiveCredits(final int credits) throws Exception
-   {      
+   {
       if (credits == -1)
       {
+         if (log.isDebugEnabled())
+         {
+            log.debug(this + ":: FlowControl::Received disable flow control message");
+         }
          // No flow control
          availableCredits = null;
-         
-         //There may be messages already in the queue
+
+         // There may be messages already in the queue
          promptDelivery();
       }
       else if (credits == 0)
       {
-         //reset, used on slow consumers
+         // reset, used on slow consumers
+         log.debug(this + ":: FlowControl::Received reset flow control message");
          availableCredits.set(0);
       }
       else
@@ -524,16 +563,17 @@
 
          if (log.isDebugEnabled())
          {
-            log.debug(this + "::Received " + credits +
-                                     " credits, previous value = " +
-                                     previous +
-                                     " currentValue = " +
-                                     availableCredits.get());
+            log.debug(this + "::FlowControl::Received " +
+                      credits +
+                      " credits, previous value = " +
+                      previous +
+                      " currentValue = " +
+                      availableCredits.get());
          }
 
          if (previous <= 0 && previous + credits > 0)
          {
-            if (log.isTraceEnabled() )
+            if (log.isTraceEnabled())
             {
                log.trace(this + "::calling promptDelivery from receiving credits");
             }
@@ -547,59 +587,103 @@
       return messageQueue;
    }
 
-   public void acknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID) throws Exception
+   public void acknowledge(final boolean autoCommitAcks, Transaction tx, final long messageID) throws Exception
    {
       if (browseOnly)
       {
          return;
       }
-      
+
       // Acknowledge acknowledges all refs delivered by the consumer up to and including the one explicitly
       // acknowledged
-
-      MessageReference ref;
-      do
+      
+      // We use a transaction here as if the message is not found, we should rollback anything done
+      // This could eventually happen on retries during transactions, and we need to make sure we don't ACK things we are not supposed to acknowledge
+      
+      boolean startedTransaction = false;
+      
+      if (tx == null || autoCommitAcks)
       {
-         ref = deliveringRefs.poll();
-
-         if (ref == null)
+         startedTransaction = true;
+         tx = new TransactionImpl(storageManager);
+      }
+      
+      try
+      {
+   
+         MessageReference ref;
+         do
          {
-            throw new IllegalStateException(System.identityHashCode(this) + " Could not find reference on consumerID=" +
-                                            id +
-                                            ", messageId = " +
-                                            messageID +
-                                            " queue = " +
-                                            messageQueue.getName() +
-                                            " closed = " +
-                                            closed);
+            ref = deliveringRefs.poll();
+            
+            if (log.isTraceEnabled())
+            {
+               log.trace("ACKing ref " + ref + " on " + this);
+            }
+   
+            if (ref == null)
+            {
+               
+               HornetQException e = new HornetQException(HornetQException.ILLEGAL_STATE, "Could not find reference on consumerID=" +
+                                id +
+                                ", messageId = " +
+                                messageID +
+                                " queue = " +
+                                messageQueue.getName());
+               throw e;
+            }
+   
+            ref.getQueue().acknowledge(tx, ref);
          }
-
-         if (autoCommitAcks || tx == null)
+         while (ref.getMessage().getMessageID() != messageID);
+         
+         if (startedTransaction)
          {
-            ref.getQueue().acknowledge(ref);
+            tx.commit();
          }
+      }
+      catch (HornetQException e)
+      {
+         if (startedTransaction)
+         {
+            tx.rollback();
+         }
          else
          {
-            ref.getQueue().acknowledge(tx, ref);
+            tx.markAsRollbackOnly(e);
          }
+         throw e;
       }
-      while (ref.getMessage().getMessageID() != messageID);
+      catch (Throwable e)
+      {
+         log.error(e.getMessage(), e);
+         HornetQException hqex = new HornetQException(HornetQException.ILLEGAL_STATE, e.getMessage());
+         if (startedTransaction)
+         {
+            tx.rollback();
+         }
+         else
+         {
+            tx.markAsRollbackOnly(hqex);
+         }
+         throw hqex;
+      }
    }
-   
+
    public void individualAcknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID) throws Exception
    {
       if (browseOnly)
       {
          return;
       }
-      
+
       MessageReference ref = removeReferenceByID(messageID);
-      
+
       if (ref == null)
       {
          throw new IllegalStateException("Cannot find ref to ack " + messageID);
       }
-      
+
       if (autoCommitAcks)
       {
          ref.getQueue().acknowledge(ref);
@@ -639,13 +723,13 @@
 
       return ref;
    }
-      
+
    public void readyForWriting(final boolean ready)
    {
       if (ready)
       {
          writeReady.set(true);
-         
+
          promptDelivery();
       }
       else
@@ -664,28 +748,30 @@
 
    private void promptDelivery()
    {
-      synchronized (lock)
+      // largeMessageDeliverer is aways set inside a lock
+      // if we don't acquire a lock, we will have NPE eventually
+      if (largeMessageDeliverer != null)
       {
-         // largeMessageDeliverer is aways set inside a lock
-         // if we don't acquire a lock, we will have NPE eventually
-         if (largeMessageDeliverer != null)
-         {
-            resumeLargeMessage();
-         }
-         else
-         {
-            if (browseOnly)
-            {
-               messageQueue.getExecutor().execute(browserDeliverer);
-            }
-            else
-            {
-               messageQueue.forceDelivery();
-            }
-         }
+         resumeLargeMessage();
       }
+      else
+      {
+         forceDelivery();
+      }
    }
 
+   private void forceDelivery()
+   {
+      if (browseOnly)
+      {
+         messageQueue.getExecutor().execute(browserDeliverer);
+      }
+      else
+      {
+         messageQueue.deliverAsync();
+      }
+   }
+
    private void resumeLargeMessage()
    {
       messageQueue.getExecutor().execute(resumeLargeMessageRunnable);
@@ -693,8 +779,6 @@
 
    private void deliverLargeMessage(final MessageReference ref, final ServerMessage message) throws Exception
    {
-      largeMessageInDelivery = true;
-
       final LargeMessageDeliverer localDeliverer = new LargeMessageDeliverer((LargeServerMessage)message, ref);
 
       // it doesn't need lock because deliverLargeMesasge is already inside the lock()
@@ -713,6 +797,14 @@
       if (availableCredits != null)
       {
          availableCredits.addAndGet(-packetSize);
+
+         if (log.isTraceEnabled())
+         {
+            log.trace(this + "::FlowControl::delivery standard taking " +
+                      packetSize +
+                      " from credits, available now is " +
+                      availableCredits);
+         }
       }
    }
 
@@ -729,16 +821,7 @@
             {
                if (largeMessageDeliverer == null || largeMessageDeliverer.deliver())
                {
-                  if (browseOnly)
-                  {
-                     messageQueue.getExecutor().execute(browserDeliverer);
-                  }
-                  else
-                  {
-                     // prompt Delivery only if chunk was finished
-
-                     messageQueue.deliverAsync();
-                  }
+                  forceDelivery();
                }
             }
             catch (Exception e)
@@ -786,6 +869,12 @@
 
             if (availableCredits != null && availableCredits.get() <= 0)
             {
+               if (log.isTraceEnabled())
+               {
+                  log.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" +
+                            availableCredits);
+               }
+
                return false;
             }
 
@@ -794,7 +883,7 @@
                context = largeMessage.getBodyEncoder();
 
                sizePendingLargeMessage = context.getLargeBodySize();
-               
+
                context.open();
 
                sentInitialPacket = true;
@@ -807,6 +896,15 @@
                if (availableCredits != null)
                {
                   availableCredits.addAndGet(-packetSize);
+
+                  if (log.isTraceEnabled())
+                  {
+                     log.trace(this + "::FlowControl::" +
+                               " deliver initialpackage with " +
+                               packetSize +
+                               " delivered, available now = " +
+                               availableCredits);
+                  }
                }
 
                // Execute the rest of the large message on a different thread so as not to tie up the delivery thread
@@ -822,7 +920,8 @@
                {
                   if (ServerConsumerImpl.isTrace)
                   {
-                     log.trace("deliverLargeMessage: Leaving loop of send LargeMessage because of credits");
+                     log.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" +
+                               availableCredits);
                   }
 
                   return false;
@@ -845,16 +944,17 @@
 
                int chunkLen = body.length;
 
-               if (ServerConsumerImpl.isTrace)
-               {
-                  log.trace("deliverLargeMessage: Sending " + packetSize +
-                                           " availableCredits now is " +
-                                           availableCredits);
-               }
-
                if (availableCredits != null)
                {
                   availableCredits.addAndGet(-packetSize);
+
+                  if (log.isTraceEnabled())
+                  {
+                     log.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" +
+                               packetSize +
+                               " available now=" +
+                               availableCredits);
+                  }
                }
 
                positionPendingLargeMessage += chunkLen;
@@ -903,8 +1003,6 @@
 
             largeMessageDeliverer = null;
 
-            largeMessageInDelivery = false;
-
             largeMessage = null;
          }
       }
@@ -920,7 +1018,7 @@
       }
 
       private final LinkedListIterator<MessageReference> iterator;
-      
+
       public synchronized void close()
       {
          iterator.close();

Added: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/MultipleConsumerTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/MultipleConsumerTest.java	                        (rev 0)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/MultipleConsumerTest.java	2011-12-11 01:58:20 UTC (rev 11894)
@@ -0,0 +1,460 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.util.LinkedList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.core.postoffice.AddressManager;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.QueueBinding;
+import org.hornetq.core.postoffice.impl.PostOfficeImpl;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ * A MultipleConsumerTest
+ *
+ * @author clebert
+ *
+ *
+ */
+public class MultipleConsumerTest extends JMSTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+   private static final int TIMEOUT_ON_WAIT = 5000;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   protected boolean usePersistence()
+   {
+      return true;
+   }
+
+   volatile boolean running = true;
+
+   private static final long WAIT_ON_SEND = 0;
+
+   CountDownLatch errorLatch = new CountDownLatch(1);
+
+   AtomicInteger numberOfErrors = new AtomicInteger(0);
+
+   public void error(Throwable e)
+   {
+      System.err.println("Error at " + Thread.currentThread().getName());
+      e.printStackTrace();
+      errorLatch.countDown();
+   }
+
+   /**
+    * @param destinationID
+    * @return
+    */
+   public Topic createSampleTopic(int destinationID)
+   {
+      return HornetQJMSClient.createTopic(createTopicName(destinationID));
+   }
+
+   /**
+    * @param destinationID
+    * @return
+    */
+   private String createTopicName(int destinationID)
+   {
+      return "topic-input" + destinationID;
+   }
+
+   /**
+    * @param destinationID
+    * @return
+    */
+   public Queue createSampleQueue(int destinationID)
+   {
+      return HornetQJMSClient.createQueue(createQueueName(destinationID));
+   }
+
+   /**
+    * @param destinationID
+    * @return
+    */
+   private String createQueueName(int destinationID)
+   {
+      return "queue-output-" + destinationID;
+   }
+
+   public class Counter extends Thread
+   {
+      public Counter()
+      {
+         super("Counter-Thread-Simulating-Management");
+      }
+
+      public void run()
+      {
+         try
+         {
+            AddressManager addr = ((PostOfficeImpl)server.getPostOffice()).getAddressManager();
+
+            LinkedList<org.hornetq.core.server.Queue> queues = new LinkedList<org.hornetq.core.server.Queue>();
+            for (Binding binding : addr.getBindings().values())
+            {
+               if (binding instanceof QueueBinding)
+               {
+                  queues.add(((QueueBinding)binding).getQueue());
+               }
+            }
+
+            while (running)
+            {
+               Thread.sleep(1000);
+               for (org.hornetq.core.server.Queue q : queues)
+               {
+                  System.out.println("Queue " + q +
+                                     " has " +
+                                     q.getInstantMessageCount() +
+                                     " with " +
+                                     q.getMessagesAdded() +
+                                     " with " +
+                                     q.getConsumerCount() +
+                                     " consumers");
+               }
+            }
+         }
+         catch (Exception e)
+         {
+            error(e);
+         }
+      }
+   }
+
+   // It will produce to a destination
+   public class ProducerThread extends Thread
+   {
+      Connection conn;
+
+      Session sess;
+
+      Topic topic;
+
+      MessageProducer prod;
+
+      public ProducerThread(Connection conn, int destinationID) throws Exception
+      {
+         this.conn = conn;
+         this.sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+         this.topic = createSampleTopic(destinationID);
+         this.prod = sess.createProducer(topic);
+      }
+
+      public void run()
+      {
+         try
+         {
+            while (running)
+            {
+               BytesMessage msg = sess.createBytesMessage();
+               msg.writeBytes(new byte[1024]);
+               prod.send(msg);
+               sess.commit();
+               Thread.sleep(WAIT_ON_SEND);
+            }
+         }
+         catch (Exception e)
+         {
+            error(e);
+         }
+      }
+   }
+
+   // It will bridge from one subscription and send to a queue
+   public class BridgeSubscriberThread extends Thread
+   {
+      Session session;
+
+      MessageProducer prod;
+
+      MessageConsumer cons;
+
+      Topic topic;
+
+      Queue outputQueue;
+
+      public BridgeSubscriberThread(Connection masterConn, int destinationID) throws Exception
+      {
+         super("Bridge_destination=" + destinationID);
+         topic = createSampleTopic(destinationID);
+         outputQueue = createSampleQueue(destinationID);
+         session = masterConn.createSession(true, Session.SESSION_TRANSACTED);
+         cons = session.createDurableSubscriber(topic, "bridge-on-" + destinationID);
+
+         prod = session.createProducer(outputQueue);
+         prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+      }
+
+      public void run()
+      {
+         try
+         {
+
+            int i = 0;
+            while (running)
+            {
+               Message msg = cons.receive(TIMEOUT_ON_WAIT);
+
+               if (msg == null)
+               {
+                  System.err.println("couldn't receive a message within TIMEOUT_ON_WAIT miliseconds on " + topic);
+                  error(new RuntimeException("Couldn't receive message"));
+               }
+               else
+               {
+                  if (i++ % 100 == 0)
+                  {
+                     System.out.println(Thread.currentThread().getName() + " received " + i);
+                  }
+                  prod.send(msg);
+                  session.commit();
+               }
+            }
+         }
+         catch (Exception e)
+         {
+            error(e);
+         }
+      }
+   }
+
+   // It will read from a destination, and pretend it finished processing it
+   public class ProcessorThread extends Thread
+   {
+
+      Connection conn;
+
+      Session session;
+
+      MessageConsumer cons;
+
+      Destination dest;
+
+      final long waitOnEachConsume;
+
+      public ProcessorThread(Connection conn,
+                             Session sess,
+                             Destination dest,
+                             MessageConsumer cons,
+                             long waitOnEachConsume) throws Exception
+      {
+         super("Processor on " + dest);
+         this.conn = conn;
+         this.session = sess;
+         this.dest = dest;
+         this.cons = cons;
+         this.waitOnEachConsume = waitOnEachConsume;
+      }
+
+      public void run()
+      {
+         int i = 0;
+         try
+         {
+            while (running)
+            {
+               if (waitOnEachConsume != 0)
+               {
+                  Thread.sleep(waitOnEachConsume);
+               }
+               Message msg = cons.receive(TIMEOUT_ON_WAIT);
+
+               if (i++ % 100 == 0)
+               {
+                  System.out.println(Thread.currentThread().getName() + " processed " + i);
+               }
+               if (msg == null)
+               {
+                  System.err.println("couldn't receive a message on processor within TIMEOUT_ON_WAIT miliseconds on " + dest);
+                  error(new RuntimeException("Couldn't receive message"));
+               }
+               else
+               {
+                  session.commit();
+               }
+            }
+         }
+         catch (Exception e)
+         {
+            error(e);
+         }
+      }
+   }
+
+   // This test requires to be manually tested
+   // At the end this test is throwing an OME for some issue on the test itself.
+   // As long as you see the message "Finished" the test is considered successfull!
+   public void _testMultipleConsumers() throws Throwable
+   {
+
+      AddressSettings set = new AddressSettings();
+      set.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+      set.setPageSizeBytes(10 * 1024);
+      set.setMaxSizeBytes(100 * 1024);
+      server.getAddressSettingsRepository().addMatch("#", set);
+
+      try
+      {
+         int nDestinations = 100;
+
+         for (int i = 0; i < nDestinations; i++)
+         {
+            createTopic(createTopicName(i));
+            createQueue(createQueueName(i));
+         }
+
+         LinkedList<Connection> connections = new LinkedList<Connection>();
+
+         LinkedList<Thread> consumerThreads = new LinkedList<Thread>();
+
+         LinkedList<Thread> producerThreads = new LinkedList<Thread>();
+
+         // start a few simulated external consumers on the topic (1 external subscription)
+         for (int i = 0; i < nDestinations; i++)
+         {
+            Connection conn = cf.createConnection();
+            conn.setClientID("external-consumer-" + i);
+            conn.start();
+            Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+            Topic topic = createSampleTopic(i);
+            MessageConsumer cons = sess.createDurableSubscriber(topic, "ex-" + i);
+            ProcessorThread proc = new ProcessorThread(conn, sess, topic, cons, 100l);
+            consumerThreads.add(proc);
+
+            connections.add(conn);
+         }
+
+         // uncomment this to read from the output queues
+         for (int i = 0; i < nDestinations; i++)
+         {
+            Connection conncons = cf.createConnection();
+            conncons.setClientID("output-queue" + i);
+            conncons.start();
+            Session sesscons = conncons.createSession(true, Session.SESSION_TRANSACTED);
+            Queue queue = createSampleQueue(i);
+            MessageConsumer cons = sesscons.createConsumer(queue);
+            ProcessorThread proc = new ProcessorThread(conncons, sesscons, queue, cons, 0l);
+            consumerThreads.add(proc);
+            connections.add(conncons);
+         }
+
+         Connection masterConn = cf.createConnection();
+         connections.add(masterConn);
+         masterConn.setClientID("master-conn");
+         masterConn.start();
+
+         // start the bridges itself
+         for (int i = 0; i < nDestinations; i++)
+         {
+            BridgeSubscriberThread subs = new BridgeSubscriberThread(masterConn, i);
+            consumerThreads.add(subs);
+         }
+
+         // The producers
+         for (int i = 0; i < nDestinations; i++)
+         {
+            Connection prodConn = cf.createConnection();
+            ProducerThread prod = new ProducerThread(prodConn, i);
+            producerThreads.add(prod);
+         }
+
+         for (Thread t : producerThreads)
+         {
+            t.start();
+         }
+
+         // Waiting some time before we start the consumers. To make sure it's paging
+         Thread.sleep(20000);
+
+         System.out.println("starting consumers now");
+
+         for (Thread t : consumerThreads)
+         {
+            t.start();
+         }
+
+         Counter managerThread = new Counter();
+
+         managerThread.start();
+
+         errorLatch.await(20, TimeUnit.MINUTES);
+
+         assertEquals(0, numberOfErrors.get());
+
+         running = false;
+
+         for (Thread t : consumerThreads)
+         {
+            t.join();
+         }
+
+         for (Thread t : producerThreads)
+         {
+            t.join();
+         }
+
+         for (Connection conn : connections)
+         {
+            conn.close();
+         }
+
+         managerThread.join();
+
+         System.out.println("Finished!!!!");
+
+      }
+      catch (Throwable e)
+      {
+         e.printStackTrace();
+         throw e;
+      }
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java	2011-12-11 01:43:04 UTC (rev 11893)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java	2011-12-11 01:58:20 UTC (rev 11894)
@@ -134,7 +134,7 @@
          prod.send(msg);
       }
       
-      queue.blockOnExecutorFuture();
+      queue.flushExecutor();
       
       //Consumer is not started so should go queued
       assertFalse(queue.isDirectDeliver());
@@ -157,7 +157,7 @@
 
       prod.send(msg);
       
-      queue.blockOnExecutorFuture();
+      queue.flushExecutor();
             
       assertTrue(queue.isDirectDeliver());
       

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2011-12-11 01:43:04 UTC (rev 11893)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2011-12-11 01:58:20 UTC (rev 11894)
@@ -82,7 +82,7 @@
       
    }
 
-   public boolean blockOnExecutorFuture()
+   public boolean flushExecutor()
    {
       return true;
    }

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java	2011-12-11 01:43:04 UTC (rev 11893)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java	2011-12-11 01:58:20 UTC (rev 11894)
@@ -617,7 +617,7 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.server.Queue#blockOnExecutorFuture()
        */
-      public boolean blockOnExecutorFuture()
+      public boolean flushExecutor()
       {
          // TODO Auto-generated method stub
          return false;

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/util/JMSTestBase.java	2011-12-11 01:43:04 UTC (rev 11893)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/util/JMSTestBase.java	2011-12-11 01:58:20 UTC (rev 11894)
@@ -214,7 +214,7 @@
                                         HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                         HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
                                         HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
-                                        HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
+                                        100 * 1024, // HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
                                         HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE,
                                         HornetQClient.DEFAULT_PRODUCER_MAX_RATE,
                                         HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE,



More information about the hornetq-commits mailing list