[jboss-cvs] JBoss Messaging SVN: r6313 - in trunk: src/main/org/jboss/messaging/core/postoffice/impl and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Apr 6 08:00:43 EDT 2009


Author: timfox
Date: 2009-04-06 08:00:43 -0400 (Mon, 06 Apr 2009)
New Revision: 6313

Modified:
   trunk/examples/jms/large-message/src/org/jboss/jms/example/LargeMessageExample.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
   trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
   trunk/src/main/org/jboss/messaging/core/server/Queue.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/Redistributor.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
message redistribution with backup

Modified: trunk/examples/jms/large-message/src/org/jboss/jms/example/LargeMessageExample.java
===================================================================
--- trunk/examples/jms/large-message/src/org/jboss/jms/example/LargeMessageExample.java	2009-04-04 00:30:06 UTC (rev 6312)
+++ trunk/examples/jms/large-message/src/org/jboss/jms/example/LargeMessageExample.java	2009-04-06 12:00:43 UTC (rev 6313)
@@ -68,8 +68,11 @@
 
          //Step 7. Create a BytesMessage with 1MB arbitrary bytes
          BytesMessage message = session.createBytesMessage();
-         message.writeBytes(new byte[1024 * 1024]);
+         byte[] bytes = new byte[100 * 1024 * 1024];
+         message.writeBytes(bytes);
          
+         System.out.println("Sending message of " + bytes.length + " bytes");
+         
          //Step 8. Send the Message
          producer.send(message);
          
@@ -84,7 +87,7 @@
          connection.start();
 
          //Step 11. Receive the message
-         BytesMessage messageReceived = (BytesMessage) messageConsumer.receive(5000);
+         BytesMessage messageReceived = (BytesMessage) messageConsumer.receive(60000);
 
          System.out.println("Received message: " + messageReceived.getBodyLength() + " bytes");
 

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-04-04 00:30:06 UTC (rev 6312)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-04-06 12:00:43 UTC (rev 6313)
@@ -377,7 +377,8 @@
 
                      if (redistributionDelay != -1)
                      {
-                        queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
+                        queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor(),
+                                               server.getReplicatingChannel());
                      }
                   }
                }
@@ -447,7 +448,8 @@
 
                      if (redistributionDelay != -1)
                      {
-                        queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
+                        queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor(),
+                                               server.getReplicatingChannel());
                      }
                   }
                }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-04-04 00:30:06 UTC (rev 6312)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-04-06 12:00:43 UTC (rev 6313)
@@ -29,6 +29,7 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_CONSUMER;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_QUEUE_BINDING;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_CREATESESSION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REDISTRIBUTION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REMOVE_REMOTE_CONSUMER;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REMOVE_REMOTE_QUEUE_BINDING;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_STARTUP_INFO;
@@ -137,6 +138,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRedistributionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingAddedMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingRemovedMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerAddedMessage;
@@ -881,6 +883,11 @@
             packet = new ReplicateAcknowledgeMessage();
             break;
          }
+         case REPLICATE_REDISTRIBUTION:
+         {
+            packet = new ReplicateRedistributionMessage();
+            break;
+         }
          default:
          {
             throw new IllegalArgumentException("Invalid type: " + packetType);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2009-04-04 00:30:06 UTC (rev 6312)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2009-04-06 12:00:43 UTC (rev 6313)
@@ -160,6 +160,8 @@
    
    public static final byte REPLICATE_STARTUP_INFO = 96;
    
+   public static final byte REPLICATE_REDISTRIBUTION = 97;
+   
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)

Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2009-04-04 00:30:06 UTC (rev 6312)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2009-04-06 12:00:43 UTC (rev 6313)
@@ -24,6 +24,7 @@
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRedistributionMessage;
 import org.jboss.messaging.core.remoting.server.RemotingService;
 import org.jboss.messaging.core.security.JBMSecurityManager;
 import org.jboss.messaging.core.security.Role;
@@ -132,4 +133,6 @@
    boolean isInitialised();
    
    Queue createQueue(SimpleString address, SimpleString queueName, SimpleString filter, boolean durable, boolean temporary) throws Exception;
+   
+   void handleReplicateRedistribution(final SimpleString queueName, final long messageID) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java	2009-04-04 00:30:06 UTC (rev 6312)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java	2009-04-06 12:00:43 UTC (rev 6313)
@@ -22,14 +22,15 @@
 
 package org.jboss.messaging.core.server;
 
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
 import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.utils.SimpleString;
 
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Executor;
-
 /**
  * 
  * A Queue
@@ -138,7 +139,7 @@
 
    boolean consumerFailedOver();
 
-   void addRedistributor(long delay, Executor executor);
+   void addRedistributor(long delay, Executor executor, final Channel replicatingChannel);
 
    void cancelRedistributor() throws Exception;
 

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/Redistributor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/Redistributor.java	2009-04-04 00:30:06 UTC (rev 6312)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/Redistributor.java	2009-04-06 12:00:43 UTC (rev 6313)
@@ -28,6 +28,9 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRedistributionMessage;
 import org.jboss.messaging.core.server.Consumer;
 import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.MessageReference;
@@ -63,11 +66,14 @@
 
    private int count;
    
+   private final Channel replicatingChannel;
+   
    public Redistributor(final Queue queue,
                         final StorageManager storageManager,
                         final PostOffice postOffice,
                         final Executor executor,
-                        final int batchSize)
+                        final int batchSize,
+                        final Channel replicatingChannel)
    {
       this.queue = queue;
       
@@ -78,6 +84,8 @@
       this.executor = executor;
 
       this.batchSize = batchSize;
+      
+      this.replicatingChannel = replicatingChannel;
    }
    
    public Filter getFilter()
@@ -121,38 +129,43 @@
 
       active = false;
    }
-
+   
    public synchronized HandleStatus handle(final MessageReference reference) throws Exception
    {
       if (!active)
       {
          return HandleStatus.BUSY;
       }
-        
-      Transaction tx = new TransactionImpl(storageManager);
+      
+      final Transaction tx = new TransactionImpl(storageManager);
 
       boolean routed = postOffice.redistribute(reference.getMessage(), queue.getName(), tx);
 
       if (routed)
-      {
-         queue.referenceHandled();
-
-         queue.acknowledge(tx, reference);
-
-         tx.commit();
-  
-         count++;
-
-         if (count == batchSize)
+      {    
+         if (replicatingChannel == null)
          {
-            // We continue the next batch on a different thread, so as not to keep the delivery thread busy for a very
-            // long time, in the case there are many messages in the queue
-            active = false;
-
-            executor.execute(new Prompter());
-
-            count = 0;
+            doRedistribute(reference, tx);
          }
+         else
+         {
+            Packet packet = new ReplicateRedistributionMessage(queue.getName(), reference.getMessage().getMessageID());
+            
+            replicatingChannel.replicatePacket(packet, 1, new Runnable()
+            {
+               public void run()
+               {
+                  try
+                  {
+                     doRedistribute(reference, tx);
+                  }
+                  catch (Exception e)
+                  {
+                     log.error("Failed to handle redistribution", e);
+                  }
+               }
+            });
+         }
 
          return HandleStatus.HANDLED;
       }
@@ -161,7 +174,29 @@
          return HandleStatus.BUSY;
       }
    }
+   
+   private void doRedistribute(final MessageReference reference, final Transaction tx) throws Exception
+   {
+      queue.referenceHandled();
 
+      queue.acknowledge(tx, reference);
+
+      tx.commit();
+
+      count++;
+
+      if (count == batchSize)
+      {
+         // We continue the next batch on a different thread, so as not to keep the delivery thread busy for a very
+         // long time in the case there are many messages in the queue
+         active = false;
+
+         executor.execute(new Prompter());
+
+         count = 0;
+      }
+   }
+
    private class Prompter implements Runnable
    {
       public void run()

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-04-04 00:30:06 UTC (rev 6312)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-04-06 12:00:43 UTC (rev 6313)
@@ -65,6 +65,7 @@
 import org.jboss.messaging.core.security.SecurityStore;
 import org.jboss.messaging.core.security.impl.SecurityStoreImpl;
 import org.jboss.messaging.core.server.Divert;
+import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
@@ -76,7 +77,9 @@
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.core.settings.impl.HierarchicalObjectRepository;
 import org.jboss.messaging.core.transaction.ResourceManager;
+import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.impl.ResourceManagerImpl;
+import org.jboss.messaging.core.transaction.impl.TransactionImpl;
 import org.jboss.messaging.core.version.Version;
 import org.jboss.messaging.utils.Future;
 import org.jboss.messaging.utils.Pair;
@@ -993,7 +996,37 @@
 
       return queue;
    }
+   
+   public void handleReplicateRedistribution(final SimpleString queueName, final long messageID)
+      throws Exception
+   {
+      Binding binding = postOffice.getBinding(queueName);
 
+      if (binding == null)
+      {
+         throw new IllegalStateException("Cannot find queue " + queueName);
+      }
+
+      Queue queue = (Queue)binding.getBindable();
+      
+      MessageReference reference = queue.removeFirstReference(messageID);
+      
+      Transaction tx = new TransactionImpl(storageManager);
+
+      boolean routed = postOffice.redistribute(reference.getMessage(), queue.getName(), tx);
+
+      if (routed)
+      {         
+         queue.acknowledge(tx, reference);
+
+         tx.commit();           
+      }
+      else
+      {
+         throw new IllegalStateException("Must be routed");
+      }
+   }
+
    // Public
    // ---------------------------------------------------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-04-04 00:30:06 UTC (rev 6312)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-04-06 12:00:43 UTC (rev 6313)
@@ -35,6 +35,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReplicateCreateSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRedistributionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingAddedMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingRemovedMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerAddedMessage;
@@ -45,6 +46,8 @@
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.cluster.ClusterConnection;
 import org.jboss.messaging.core.server.cluster.RemoteQueueBinding;
+import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.core.transaction.impl.TransactionImpl;
 
 /**
  * A packet handler for all packets that need to be handled at the server level
@@ -176,6 +179,14 @@
 
             break;
          }
+         case PacketImpl.REPLICATE_REDISTRIBUTION:
+         {
+            ReplicateRedistributionMessage message = (ReplicateRedistributionMessage)packet;
+            
+            handleReplicateRedistribution(message);
+            
+            break;
+         }
          default:
          {
             log.error("Invalid packet " + packet);
@@ -443,4 +454,23 @@
          log.error("Failed to handle remove remote consumer", e);
       }
    }
+   
+   private void handleReplicateRedistribution(final ReplicateRedistributionMessage request)
+   {
+      Binding binding = server.getPostOffice().getBinding(request.getQueueName());
+
+      if (binding == null)
+      {
+         throw new IllegalStateException("Cannot find binding " + request.getQueueName());
+      }
+
+      try
+      {
+         server.handleReplicateRedistribution(request.getQueueName(), request.getMessageID());
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to handle remove remote consumer", e);
+      }
+   }
 }
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-04-04 00:30:06 UTC (rev 6312)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-04-06 12:00:43 UTC (rev 6313)
@@ -12,19 +12,36 @@
 
 package org.jboss.messaging.core.server.impl;
 
+import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ACTUAL_EXPIRY_TIME;
+import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ORIGINAL_DESTINATION;
+import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ORIG_MESSAGE_ID;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.list.PriorityLinkedList;
 import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.message.impl.MessageImpl;
-import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ACTUAL_EXPIRY_TIME;
-import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ORIGINAL_DESTINATION;
-import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ORIG_MESSAGE_ID;
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Bindings;
 import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.server.Consumer;
 import org.jboss.messaging.core.server.Distributor;
 import org.jboss.messaging.core.server.HandleStatus;
@@ -43,21 +60,6 @@
 import org.jboss.messaging.utils.ConcurrentSet;
 import org.jboss.messaging.utils.SimpleString;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
 /**
  * Implementation of a Queue TODO use Java 5 concurrent queue
  *
@@ -115,7 +117,7 @@
    private final StorageManager storageManager;
 
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
-   
+
    private final ScheduledExecutorService scheduledExecutor;
 
    private volatile boolean backup;
@@ -123,17 +125,17 @@
    private int consumersToFailover = -1;
 
    private SimpleString address;
-   
+
    private Redistributor redistributor;
 
    private final Set<ScheduledFuture<?>> futures = new ConcurrentHashSet<ScheduledFuture<?>>();
-  
+
    private ScheduledFuture<?> future;
-   
-   //We cache the consumers here since we don't want to include the redistributor
-   
+
+   // We cache the consumers here since we don't want to include the redistributor
+
    private final Set<Consumer> consumers = new HashSet<Consumer>();
-   
+
    public QueueImpl(final long persistenceID,
                     final SimpleString address,
                     final SimpleString name,
@@ -162,7 +164,7 @@
       this.storageManager = storageManager;
 
       this.addressSettingsRepository = addressSettingsRepository;
-      
+
       this.scheduledExecutor = scheduledExecutor;
 
       if (postOffice == null)
@@ -251,7 +253,7 @@
          {
             storageManager.updateScheduledDeliveryTime(ref);
          }
- 
+
          addLast(ref);
       }
       else
@@ -357,7 +359,7 @@
    }
 
    public void addLast(final MessageReference ref)
-   {          
+   {
       add(ref, false);
    }
 
@@ -387,7 +389,7 @@
       cancelRedistributor();
 
       distributionPolicy.addConsumer(consumer);
-      
+
       consumers.add(consumer);
    }
 
@@ -399,58 +401,58 @@
       {
          promptDelivery = false;
       }
-      
+
       consumers.remove(consumer);
 
       return removed;
    }
 
-   public synchronized void addRedistributor(final long delay, final Executor executor)
+   public synchronized void addRedistributor(final long delay, final Executor executor, final Channel replicatingChannel)
    {
       if (future != null)
       {
          future.cancel(false);
-         
+
          futures.remove(future);
       }
-      
+
       if (redistributor != null)
       {
-         //Just prompt delivery
+         // Just prompt delivery
          deliverAsync(executor);
       }
-            
+
       if (delay > 0)
       {
-         DelayedAddRedistributor dar = new DelayedAddRedistributor(executor);
-         
+         DelayedAddRedistributor dar = new DelayedAddRedistributor(executor, replicatingChannel);
+
          future = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS);
-         
+
          futures.add(future);
       }
       else
       {
-         internalAddRedistributor(executor);
+         internalAddRedistributor(executor, replicatingChannel);
       }
    }
-   
+
    public synchronized void cancelRedistributor() throws Exception
-   {            
+   {
       if (redistributor != null)
       {
          redistributor.stop();
 
          redistributor = null;
       }
-      
+
       if (future != null)
       {
          future.cancel(false);
-         
+
          future = null;
       }
    }
-   
+
    public synchronized int getConsumerCount()
    {
       return consumers.size();
@@ -513,25 +515,25 @@
 
       return removed;
    }
-   
+
    public synchronized MessageReference removeFirstReference(final long id) throws Exception
-   {      
+   {
       MessageReference ref = messageReferences.peekFirst();
-      
+
       if (ref != null && ref.getMessage().getMessageID() == id)
       {
          messageReferences.removeFirst();
-         
+
          return ref;
       }
       else
       {
          ref = scheduledDeliveryHandler.removeReferenceWithID(id);
       }
-      
+
       return ref;
    }
-   
+
    public synchronized MessageReference getReference(final long id)
    {
       Iterator<MessageReference> iterator = messageReferences.iterator();
@@ -553,15 +555,15 @@
    {
       int count = messageReferences.size() + getScheduledCount() + getDeliveringCount();
 
-//      log.info(System.identityHashCode(this) + " message count is " +
-//               count +
-//               " ( mr:" +
-//               messageReferences.size() +
-//               " sc:" +
-//               getScheduledCount() +
-//               " dc:" +
-//               getDeliveringCount() +
-//               ")");
+      // log.info(System.identityHashCode(this) + " message count is " +
+      // count +
+      // " ( mr:" +
+      // messageReferences.size() +
+      // " sc:" +
+      // getScheduledCount() +
+      // " dc:" +
+      // getDeliveringCount() +
+      // ")");
 
       return count;
    }
@@ -628,16 +630,16 @@
       synchronized (tx)
       {
          RefsOperation oper = (RefsOperation)tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
-   
+
          if (oper == null)
          {
             oper = new RefsOperation();
-   
+
             tx.putProperty(TransactionPropertyIndexes.REFS_OPERATION, oper);
-   
+
             tx.addOperation(oper);
          }
-   
+
          return oper;
       }
    }
@@ -926,7 +928,7 @@
    public synchronized void setBackup()
    {
       backup = true;
-      
+
       direct = false;
    }
 
@@ -937,7 +939,7 @@
       if (consumersToFailover == 0)
       {
          backup = false;
-         
+
          return true;
       }
       else
@@ -1002,7 +1004,7 @@
    {
       return name.hashCode();
    }
-   
+
    @Override
    public String toString()
    {
@@ -1012,11 +1014,16 @@
    // Private
    // ------------------------------------------------------------------------------
 
-   private void internalAddRedistributor(final Executor executor)
-   {     
+   private void internalAddRedistributor(final Executor executor, final Channel replicatingChannel)
+   {
       if (consumers.size() == 0 && messageReferences.size() > 0)
       {
-         redistributor = new Redistributor(this, storageManager, postOffice, executor, REDISTRIBUTOR_BATCH_SIZE);
+         redistributor = new Redistributor(this,
+                                           storageManager,
+                                           postOffice,
+                                           executor,
+                                           REDISTRIBUTOR_BATCH_SIZE,
+                                           replicatingChannel);
 
          distributionPolicy.addConsumer(redistributor);
 
@@ -1026,7 +1033,6 @@
       }
    }
 
-   
    public boolean checkDLQ(final MessageReference reference) throws Exception
    {
       ServerMessage message = reference.getMessage();
@@ -1155,7 +1161,9 @@
          else
          {
 
-            log.warn("Message has reached maximum delivery attempts, sending it to Dead Letter Address " + deadLetterAddress + " from " + name);
+            log.warn("Message has reached maximum delivery attempts, sending it to Dead Letter Address " + deadLetterAddress +
+                     " from " +
+                     name);
             move(deadLetterAddress, ref, false);
          }
       }
@@ -1198,7 +1206,7 @@
       {
          return;
       }
-      
+
       direct = false;
 
       MessageReference reference;
@@ -1232,9 +1240,10 @@
                   // If the queue is empty, we need to check if there are pending messages, and throw a warning
                   if (pagingStore.isPaging() && !pagingStore.isDropWhenMaxSize())
                   {
-                     // This is just a *request* to depage. Depage will only happens if there is space on the Address and GlobalSize
+                     // This is just a *request* to depage. Depage will only happens if there is space on the Address
+                     // and GlobalSize
                      pagingStore.startDepaging();
-                     
+
                      log.warn("The Queue " + name +
                               " is empty, however there are pending messages on Paging for the address " +
                               pagingStore.getStoreName() +
@@ -1268,12 +1277,12 @@
          if (status == HandleStatus.HANDLED)
          {
             if (iterator == null)
-            {            
-               messageReferences.removeFirst();              
+            {
+               messageReferences.removeFirst();
             }
             else
             {
-               iterator.remove();               
+               iterator.remove();
             }
          }
          else if (status == HandleStatus.BUSY)
@@ -1304,7 +1313,7 @@
       }
 
       boolean add = false;
-      
+
       if (direct && !backup)
       {
          // Deliver directly
@@ -1340,9 +1349,9 @@
          {
             expiringMessageReferences.addIfAbsent(ref);
          }
-         
+
          if (first)
-         {            
+         {
             messageReferences.addFirst(ref, ref.getMessage().getPriority());
          }
          else
@@ -1362,7 +1371,7 @@
          }
       }
    }
-   
+
    private HandleStatus deliver(final MessageReference reference)
    {
       HandleStatus status = distributionPolicy.distribute(reference);
@@ -1442,7 +1451,7 @@
             ServerMessage msg = ref.getMessage();
 
             if (!scheduledDeliveryHandler.checkAndSchedule(ref, backup))
-            {              
+            {
                messageReferences.addFirst(ref, msg.getPriority());
             }
          }
@@ -1498,7 +1507,6 @@
 
          for (MessageReference ref : refsToAck)
          {
-            //if (((QueueImpl)ref.getQueue()).checkDLQ(ref))
             if (ref.getQueue().checkDLQ(ref))
             {
                LinkedList<MessageReference> toCancel = queueMap.get(ref.getQueue());
@@ -1570,22 +1578,26 @@
          }
       }
    }
-   
+
    private class DelayedAddRedistributor implements Runnable
    {
       private final Executor executor;
+      
+      private final Channel replicatingChannel;
 
-      DelayedAddRedistributor(final Executor executor)
+      DelayedAddRedistributor(final Executor executor, final Channel replicatingChannel)
       {
          this.executor = executor;
+         
+         this.replicatingChannel = replicatingChannel;
       }
 
       public void run()
       {
          synchronized (QueueImpl.this)
          {
-            internalAddRedistributor(executor);
-   
+            internalAddRedistributor(executor, replicatingChannel);
+
             futures.remove(this);
          }
       }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	2009-04-04 00:30:06 UTC (rev 6312)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	2009-04-06 12:00:43 UTC (rev 6313)
@@ -412,13 +412,18 @@
    {
       sendInRange(node, address, 0, numMessages, durable, filterVal);
    }
-
+   
+   protected void verifyReceiveAllInRange(boolean ack, int msgStart, int msgEnd, int... consumerIDs) throws Exception
+   {
+      verifyReceiveAllInRangeNotBefore(ack, -1, msgStart, msgEnd, consumerIDs);
+   }
+   
    protected void verifyReceiveAllInRange(int msgStart, int msgEnd, int... consumerIDs) throws Exception
    {
-      verifyReceiveAllInRangeNotBefore(-1, msgStart, msgEnd, consumerIDs);
+      verifyReceiveAllInRangeNotBefore(false, -1, msgStart, msgEnd, consumerIDs);
    }
 
-   protected void verifyReceiveAllInRangeNotBefore(long firstReceiveTime, int msgStart, int msgEnd, int... consumerIDs) throws Exception
+   protected void verifyReceiveAllInRangeNotBefore(boolean ack, long firstReceiveTime, int msgStart, int msgEnd, int... consumerIDs) throws Exception
    {
       boolean outOfOrder = false;
       for (int i = 0; i < consumerIDs.length; i++)
@@ -436,6 +441,11 @@
 
             assertNotNull("consumer " + consumerIDs[i] + " did not receive message " + j, message);
 
+            if (ack)
+            {
+               message.acknowledge();
+            }
+            
             if (firstReceiveTime != -1)
             {
                assertTrue("Message received too soon", System.currentTimeMillis() >= firstReceiveTime);
@@ -451,15 +461,20 @@
 
       assertFalse("Messages were consumed out of order, look at System.out for more information", outOfOrder);
    }
+   
+   protected void verifyReceiveAll(boolean ack, int numMessages, int... consumerIDs) throws Exception
+   {
+      verifyReceiveAllInRange(ack, 0, numMessages, consumerIDs);
+   }
 
    protected void verifyReceiveAll(int numMessages, int... consumerIDs) throws Exception
    {
-      verifyReceiveAllInRange(0, numMessages, consumerIDs);
+      verifyReceiveAllInRange(false, 0, numMessages, consumerIDs);
    }
 
    protected void verifyReceiveAllNotBefore(long firstReceiveTime, int numMessages, int... consumerIDs) throws Exception
    {
-      verifyReceiveAllInRangeNotBefore(firstReceiveTime, 0, numMessages, consumerIDs);
+      verifyReceiveAllInRangeNotBefore(false, firstReceiveTime, 0, numMessages, consumerIDs);
    }
 
    protected void checkReceive(int... consumerIDs) throws Exception




More information about the jboss-cvs-commits mailing list