[hornetq-commits] JBoss hornetq SVN: r8186 - in trunk: src/main/org/hornetq/core/management/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Nov 2 12:46:47 EST 2009


Author: timfox
Date: 2009-11-02 12:46:47 -0500 (Mon, 02 Nov 2009)
New Revision: 8186

Modified:
   trunk/src/main/org/hornetq/core/exception/HornetQException.java
   trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
various tweaks including fixing bridge reconnect test

Modified: trunk/src/main/org/hornetq/core/exception/HornetQException.java
===================================================================
--- trunk/src/main/org/hornetq/core/exception/HornetQException.java	2009-11-02 16:10:49 UTC (rev 8185)
+++ trunk/src/main/org/hornetq/core/exception/HornetQException.java	2009-11-02 17:46:47 UTC (rev 8186)
@@ -61,6 +61,8 @@
    public static final int LARGE_MESSAGE_ERROR_BODY = 110;
    
    public static final int TRANSACTION_ROLLED_BACK = 111;
+   
+   public static final int SESSION_CREATION_REJECTED = 112;
 
    
    // Native Error codes ----------------------------------------------

Modified: trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java	2009-11-02 16:10:49 UTC (rev 8185)
+++ trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java	2009-11-02 17:46:47 UTC (rev 8186)
@@ -695,7 +695,7 @@
                }
 
                // start sending notification *messages* only when the *remoting service* if started
-               if (messagingServer == null || !messagingServer.isStarted() ||
+               if (messagingServer == null || 
                    !messagingServer.getRemotingService().isStarted())
                {
                   return;

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java	2009-11-02 16:10:49 UTC (rev 8185)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java	2009-11-02 17:46:47 UTC (rev 8186)
@@ -49,10 +49,8 @@
    private final Channel channel1;
 
    private final RemotingConnection connection;
-   
-   public HornetQPacketHandler(final HornetQServer server,
-                                       final Channel channel1,
-                                       final RemotingConnection connection)
+
+   public HornetQPacketHandler(final HornetQServer server, final Channel channel1, final RemotingConnection connection)
    {
       this.server = server;
 
@@ -64,11 +62,11 @@
    public void handlePacket(final Packet packet)
    {
       byte type = packet.getType();
-      
+
       // All these operations need to be idempotent since they are outside of the session
       // reliability replay functionality
       switch (type)
-      {         
+      {
          case CREATESESSION:
          {
             CreateSessionMessage request = (CreateSessionMessage)packet;
@@ -76,7 +74,7 @@
             handleCreateSession(request);
 
             break;
-         }         
+         }
          case REATTACH_SESSION:
          {
             ReattachSessionMessage request = (ReattachSessionMessage)packet;
@@ -90,21 +88,21 @@
             // Create queue can also be fielded here in the case of a replicated store and forward queue creation
 
             CreateQueueMessage request = (CreateQueueMessage)packet;
-            
+
             handleCreateQueue(request);
 
             break;
-         }         
+         }
          case CREATE_REPLICATION:
          {
             // Create queue can also be fielded here in the case of a replicated store and forward queue creation
 
             CreateReplicationSessionMessage request = (CreateReplicationSessionMessage)packet;
-            
+
             handleCreateReplication(request);
 
             break;
-         }         
+         }
          default:
          {
             log.error("Invalid packet " + packet);
@@ -118,7 +116,7 @@
       try
       {
          response = server.createSession(request.getName(),
-                                         request.getSessionChannelID(),                                         
+                                         request.getSessionChannelID(),
                                          request.getUsername(),
                                          request.getPassword(),
                                          request.getMinLargeMessageSize(),
@@ -129,11 +127,6 @@
                                          request.isPreAcknowledge(),
                                          request.isXA(),
                                          request.getWindowSize());
-         
-         if (response == null)
-         {
-            response = new HornetQExceptionMessage(new HornetQException(HornetQException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS));
-         }
       }
       catch (Exception e)
       {
@@ -148,10 +141,10 @@
             response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
          }
       }
-      
-      channel1.send(response);    
+
+      channel1.send(response);
    }
-   
+
    private void handleReattachSession(final ReattachSessionMessage request)
    {
       Packet response;
@@ -181,14 +174,18 @@
    {
       try
       {
-         server.createQueue(request.getAddress(), request.getQueueName(), request.getFilterString(), request.isDurable(), request.isTemporary());
+         server.createQueue(request.getAddress(),
+                            request.getQueueName(),
+                            request.getFilterString(),
+                            request.isDurable(),
+                            request.isTemporary());
       }
       catch (Exception e)
       {
          log.error("Failed to handle create queue", e);
       }
    }
-   
+
    private void handleCreateReplication(final CreateReplicationSessionMessage request)
    {
       Packet response;
@@ -196,17 +193,17 @@
       try
       {
          Channel channel = connection.getChannel(request.getSessionChannelID(), request.getWindowSize());
-         
+
          ReplicationEndpoint endpoint = server.createReplicationEndpoint(channel);
-         
+
          channel.setHandler(endpoint);
-         
+
          response = new NullResponseMessage();
       }
-      catch  (Exception e)
+      catch (Exception e)
       {
          log.warn(e.getMessage(), e);
-         
+
          if (e instanceof HornetQException)
          {
             response = new HornetQExceptionMessage((HornetQException)e);
@@ -219,9 +216,5 @@
 
       channel1.send(response);
    }
-   
-   
 
-
-   
 }
\ No newline at end of file

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-11-02 16:10:49 UTC (rev 8185)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-11-02 17:46:47 UTC (rev 8186)
@@ -135,9 +135,9 @@
    // Attributes
    // -----------------------------------------------------------------------------------
 
-   private SimpleString nodeID;
+   private volatile SimpleString nodeID;
 
-   private UUID uuid;
+   private volatile UUID uuid;
 
    private final Version version;
 
@@ -149,39 +149,39 @@
 
    private volatile boolean started;
 
-   private SecurityStore securityStore;
+   private volatile SecurityStore securityStore;
 
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
 
-   private QueueFactory queueFactory;
+   private volatile QueueFactory queueFactory;
 
-   private PagingManager pagingManager;
+   private volatile PagingManager pagingManager;
 
-   private PostOffice postOffice;
+   private volatile PostOffice postOffice;
 
-   private ExecutorService threadPool;
+   private volatile ExecutorService threadPool;
 
-   private ScheduledExecutorService scheduledPool;
+   private volatile ScheduledExecutorService scheduledPool;
 
-   private ExecutorFactory executorFactory;
+   private volatile ExecutorFactory executorFactory;
 
-   private HierarchicalRepository<Set<Role>> securityRepository;
+   private volatile HierarchicalRepository<Set<Role>> securityRepository;
 
-   private ResourceManager resourceManager;
+   private volatile ResourceManager resourceManager;
 
-   private HornetQServerControlImpl messagingServerControl;
+   private volatile HornetQServerControlImpl messagingServerControl;
 
-   private ClusterManager clusterManager;
+   private volatile ClusterManager clusterManager;
 
-   private StorageManager storageManager;
+   private volatile StorageManager storageManager;
 
-   private RemotingService remotingService;
+   private volatile RemotingService remotingService;
 
-   private ManagementService managementService;
+   private volatile ManagementService managementService;
 
    private MemoryManager memoryManager;
 
-   private DeploymentManager deploymentManager;
+   private volatile DeploymentManager deploymentManager;
 
    private Deployer basicUserCredentialsDeployer;
 
@@ -205,7 +205,7 @@
 
    private final Set<ActivateCallback> activateCallbacks = new HashSet<ActivateCallback>();
 
-   private GroupingHandler groupingHandler;
+   private volatile GroupingHandler groupingHandler;
 
    // Constructors
    // ---------------------------------------------------------------------------------
@@ -258,8 +258,6 @@
       this.addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>();
 
       addressSettingsRepository.setDefault(new AddressSettings());
-
-      // this.managementConnectorID = managementConnectorSequence.decrementAndGet();
    }
 
    // lifecycle methods
@@ -317,92 +315,130 @@
       super.finalize();
    }
 
-   public synchronized void stop() throws Exception
+   public void stop() throws Exception
    {
-      if (!started)
+      synchronized (this)
       {
-         return;
-      }
+         if (!started)
+         {
+            return;
+         }
 
-      if (clusterManager != null)
-      {
-         clusterManager.stop();
-      }
+         if (clusterManager != null)
+         {
+            clusterManager.stop();
+         }
 
-      if (groupingHandler != null)
-      {
-         managementService.removeNotificationListener(groupingHandler);
-         groupingHandler = null;
-      }
-      // Need to flush all sessions to make sure all confirmations get sent back to client
+         if (groupingHandler != null)
+         {
+            managementService.removeNotificationListener(groupingHandler);
+            groupingHandler = null;
+         }
+         // Need to flush all sessions to make sure all confirmations get sent back to client
 
-      for (ServerSession session : sessions.values())
-      {
-         session.getChannel().flushConfirmations();
-      }
+         for (ServerSession session : sessions.values())
+         {
+            session.getChannel().flushConfirmations();
+         }
 
-      remotingService.stop();
+         remotingService.stop();
 
-      // Stop the deployers
-      if (configuration.isFileDeploymentEnabled())
-      {
-         basicUserCredentialsDeployer.stop();
+         // Stop the deployers
+         if (configuration.isFileDeploymentEnabled())
+         {
+            basicUserCredentialsDeployer.stop();
 
-         addressSettingsDeployer.stop();
+            addressSettingsDeployer.stop();
 
-         if (queueDeployer != null)
+            if (queueDeployer != null)
+            {
+               queueDeployer.stop();
+            }
+
+            if (securityDeployer != null)
+            {
+               securityDeployer.stop();
+            }
+
+            deploymentManager.stop();
+         }
+
+         managementService.unregisterServer();
+
+         managementService.stop();
+
+         if (storageManager != null)
          {
-            queueDeployer.stop();
+            storageManager.stop();
          }
 
-         if (securityDeployer != null)
+         if (replicationEndpoint != null)
          {
-            securityDeployer.stop();
+            replicationEndpoint.stop();
+            replicationEndpoint = null;
          }
 
-         deploymentManager.stop();
-      }
+         if (securityManager != null)
+         {
+            securityManager.stop();
+         }
 
-      managementService.unregisterServer();
+         if (resourceManager != null)
+         {
+            resourceManager.stop();
+         }
 
-      managementService.stop();
+         if (postOffice != null)
+         {
+            postOffice.stop();
+         }
 
-      if (storageManager != null)
-      {
-         storageManager.stop();
-      }
+         // Need to shutdown pools before shutting down paging manager to make sure everything is written ok
 
-      if (replicationEndpoint != null)
-      {
-         replicationEndpoint.stop();
-         replicationEndpoint = null;
-      }
+         List<Runnable> tasks = scheduledPool.shutdownNow();
 
-      if (securityManager != null)
-      {
-         securityManager.stop();
-      }
+         for (Runnable task : tasks)
+         {
+            log.debug("Waiting for " + task);
+         }
 
-      if (resourceManager != null)
-      {
-         resourceManager.stop();
-      }
+         threadPool.shutdown();
 
-      if (postOffice != null)
-      {
-         postOffice.stop();
-      }
+         scheduledPool = null;
 
-      // Need to shutdown pools before shutting down paging manager to make sure everything is written ok
+         if (pagingManager != null)
+         {
+            pagingManager.stop();
+         }
 
-      List<Runnable> tasks = scheduledPool.shutdownNow();
+         if (memoryManager != null)
+         {
+            memoryManager.stop();
+         }
 
-      for (Runnable task : tasks)
-      {
-         log.debug("Waiting for " + task);
+         pagingManager = null;
+         securityStore = null;
+         resourceManager = null;
+         postOffice = null;
+         securityRepository = null;
+         securityStore = null;
+         queueFactory = null;
+         resourceManager = null;
+         messagingServerControl = null;
+         memoryManager = null;
+
+         sessions.clear();
+
+         started = false;
+         initialised = false;
+         uuid = null;
+         nodeID = null;
+
+         log.info("HornetQ Server version " + getVersion().getFullVersion() + " stopped");
+
+         Logger.reset();
       }
 
-      threadPool.shutdown();
       try
       {
          if (!threadPool.awaitTermination(30000, TimeUnit.MILLISECONDS))
@@ -414,41 +450,7 @@
       {
          // Ignore
       }
-
-      scheduledPool = null;
       threadPool = null;
-
-      if (pagingManager != null)
-      {
-         pagingManager.stop();
-      }
-
-      if (memoryManager != null)
-      {
-         memoryManager.stop();
-      }
-
-      pagingManager = null;
-      securityStore = null;
-      resourceManager = null;
-      postOffice = null;
-      securityRepository = null;
-      securityStore = null;
-      queueFactory = null;
-      resourceManager = null;
-      messagingServerControl = null;
-      memoryManager = null;
-
-      sessions.clear();
-
-      started = false;
-      initialised = false;
-      uuid = null;
-      nodeID = null;
-
-      log.info("HornetQ Server version " + getVersion().getFullVersion() + " stopped");
-
-      Logger.reset();
    }
 
    // HornetQServer implementation
@@ -509,7 +511,7 @@
       return version;
    }
 
-   public boolean isStarted()
+   public synchronized boolean isStarted()
    {
       return started;
    }
@@ -520,9 +522,14 @@
    }
 
    public ReattachSessionResponseMessage reattachSession(final RemotingConnection connection,
-                                                         final String name,
-                                                         final int lastReceivedCommandID) throws Exception
+                                                                      final String name,
+                                                                      final int lastReceivedCommandID) throws Exception
    {
+      if (!started)
+      {
+         return null;
+      }
+
       ServerSession session = sessions.get(name);
 
       if (!checkActivate())
@@ -552,7 +559,7 @@
             }
 
             sessions.remove(name);
-            
+
             return new ReattachSessionResponseMessage(-1, false);
          }
          else
@@ -566,18 +573,23 @@
    }
 
    public CreateSessionResponseMessage createSession(final String name,
-                                                     final long channelID,
-                                                     final String username,
-                                                     final String password,
-                                                     final int minLargeMessageSize,
-                                                     final int incrementingVersion,
-                                                     final RemotingConnection connection,
-                                                     final boolean autoCommitSends,
-                                                     final boolean autoCommitAcks,
-                                                     final boolean preAcknowledge,
-                                                     final boolean xa,
-                                                     final int sendWindowSize) throws Exception
+                                                                  final long channelID,
+                                                                  final String username,
+                                                                  final String password,
+                                                                  final int minLargeMessageSize,
+                                                                  final int incrementingVersion,
+                                                                  final RemotingConnection connection,
+                                                                  final boolean autoCommitSends,
+                                                                  final boolean autoCommitAcks,
+                                                                  final boolean preAcknowledge,
+                                                                  final boolean xa,
+                                                                  final int sendWindowSize) throws Exception
    {
+      if (!started)
+      {
+         throw new HornetQException(HornetQException.SESSION_CREATION_REJECTED, "Server not started");
+      }
+
       if (version.getIncrementingVersion() != incrementingVersion)
       {
          log.warn("Client with version " + incrementingVersion +
@@ -588,14 +600,14 @@
                   ". " +
                   "Please ensure all clients and servers are upgraded to the same version for them to " +
                   "interoperate properly");
-         return null;
+         throw new HornetQException(HornetQException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS, "Server and client versions incompatible");
       }
 
       if (!checkActivate())
       {
          // Backup server is not ready to accept connections
 
-         return new CreateSessionResponseMessage(version.getIncrementingVersion());
+         throw new HornetQException(HornetQException.SESSION_CREATION_REJECTED, "Server will not accept create session requests");
       }
 
       if (securityStore != null)
@@ -673,7 +685,7 @@
       return sessions.get(name);
    }
 
-   public List<ServerSession> getSessions(final String connectionID)
+   public synchronized List<ServerSession> getSessions(final String connectionID)
    {
       Set<Entry<String, ServerSession>> sessionEntries = sessions.entrySet();
       List<ServerSession> matchingSessions = new ArrayList<ServerSession>();
@@ -688,11 +700,12 @@
       return matchingSessions;
    }
 
-   public Set<ServerSession> getSessions()
+   public synchronized Set<ServerSession> getSessions()
    {
       return new HashSet<ServerSession>(sessions.values());
    }
 
+   //TODO - should this really be here?? It's only used in tests
    public boolean isInitialised()
    {
       synchronized (initialiseLock)
@@ -727,19 +740,19 @@
    }
 
    public Queue createQueue(final SimpleString address,
-                            final SimpleString queueName,
-                            final SimpleString filterString,
-                            final boolean durable,
-                            final boolean temporary) throws Exception
+                                         final SimpleString queueName,
+                                         final SimpleString filterString,
+                                         final boolean durable,
+                                         final boolean temporary) throws Exception
    {
       return createQueue(address, queueName, filterString, durable, temporary, false);
    }
 
    public Queue deployQueue(final SimpleString address,
-                            final SimpleString queueName,
-                            final SimpleString filterString,
-                            final boolean durable,
-                            final boolean temporary) throws Exception
+                                         final SimpleString queueName,
+                                         final SimpleString filterString,
+                                         final boolean durable,
+                                         final boolean temporary) throws Exception
    {
       return createQueue(address, queueName, filterString, durable, temporary, true);
    }
@@ -793,12 +806,12 @@
       activateCallbacks.remove(callback);
    }
 
-   public ExecutorFactory getExecutorFactory()
+   public synchronized ExecutorFactory getExecutorFactory()
    {
       return executorFactory;
    }
 
-   public void setGroupingHandler(GroupingHandler groupingHandler)
+   public void setGroupingHandler(final GroupingHandler groupingHandler)
    {
       this.groupingHandler = groupingHandler;
    }
@@ -900,7 +913,7 @@
       return true;
    }
 
-   private synchronized void callActivateCallbacks()
+   private void callActivateCallbacks()
    {
       for (ActivateCallback callback : activateCallbacks)
       {

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java	2009-11-02 16:10:49 UTC (rev 8185)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java	2009-11-02 17:46:47 UTC (rev 8186)
@@ -35,16 +35,16 @@
    {
       super.setUp();
 
-         start();
+      start();
    }
-   
+
    private void start() throws Exception
    {
       setupServers();
-      
-      setRedistributionDelay(0);   
+
+      setRedistributionDelay(0);
    }
-   
+
    private void stop() throws Exception
    {
       stopServers();
@@ -71,7 +71,7 @@
    public void testRedistributionWhenConsumerIsClosed() throws Exception
    {
       setupCluster(false);
-      
+
       log.info("Doing test");
 
       startServers(0, 1, 2);
@@ -105,7 +105,7 @@
       removeConsumer(1);
 
       verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
-      
+
       log.info("Test done");
    }
 
@@ -358,100 +358,98 @@
       verifyReceiveAll(20, 1);
       verifyNotReceive(1);
    }
-   
+
    public void testBackAndForth() throws Exception
    {
       for (int i = 0; i < 10; i++)
       {
          setupCluster(false);
-   
+
          startServers(0, 1, 2);
-   
+
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
          setupSessionFactory(2, isNetty());
-         
+
          final String ADDRESS = "queues.testaddress";
          final String QUEUE = "queue0";
-   
-   
+
          createQueue(0, ADDRESS, QUEUE, null, false);
          createQueue(1, ADDRESS, QUEUE, null, false);
          createQueue(2, ADDRESS, QUEUE, null, false);
-   
+
          addConsumer(0, 0, QUEUE, null);
-   
+
          waitForBindings(0, ADDRESS, 1, 1, true);
          waitForBindings(1, ADDRESS, 1, 0, true);
          waitForBindings(2, ADDRESS, 1, 0, true);
-   
+
          waitForBindings(0, ADDRESS, 2, 0, false);
          waitForBindings(1, ADDRESS, 2, 1, false);
          waitForBindings(2, ADDRESS, 2, 1, false);
-   
+
          send(0, ADDRESS, 20, false, null);
-         
+
          waitForMessages(0, ADDRESS, 20);
-   
+
          removeConsumer(0);
-         
+
          waitForBindings(0, ADDRESS, 1, 0, true);
          waitForBindings(1, ADDRESS, 1, 0, true);
          waitForBindings(2, ADDRESS, 1, 0, true);
-   
+
          waitForBindings(0, ADDRESS, 2, 0, false);
          waitForBindings(1, ADDRESS, 2, 0, false);
          waitForBindings(2, ADDRESS, 2, 0, false);
-   
+
          addConsumer(1, 1, QUEUE, null);
-         
+
          waitForBindings(0, ADDRESS, 1, 0, true);
          waitForBindings(1, ADDRESS, 1, 1, true);
          waitForBindings(2, ADDRESS, 1, 0, true);
-         
+
          waitForMessages(1, ADDRESS, 20);
          waitForMessages(0, ADDRESS, 0);
-         
-   
+
          waitForBindings(0, ADDRESS, 2, 1, false);
          waitForBindings(1, ADDRESS, 2, 0, false);
          waitForBindings(2, ADDRESS, 2, 1, false);
-         
+
          removeConsumer(1);
-         
+
          waitForBindings(0, ADDRESS, 1, 0, true);
          waitForBindings(1, ADDRESS, 1, 0, true);
          waitForBindings(2, ADDRESS, 1, 0, true);
-   
+
          waitForBindings(0, ADDRESS, 2, 0, false);
          waitForBindings(1, ADDRESS, 2, 0, false);
          waitForBindings(2, ADDRESS, 2, 0, false);
-   
+
          addConsumer(0, 0, QUEUE, null);
-         
+
          waitForBindings(0, ADDRESS, 1, 1, true);
          waitForBindings(1, ADDRESS, 1, 0, true);
          waitForBindings(2, ADDRESS, 1, 0, true);
-         
+
          waitForBindings(0, ADDRESS, 2, 0, false);
          waitForBindings(1, ADDRESS, 2, 1, false);
          waitForBindings(2, ADDRESS, 2, 1, false);
-   
+
          waitForMessages(0, ADDRESS, 20);
-         
+
          verifyReceiveAll(20, 0);
          verifyNotReceive(0);
-         
+
          addConsumer(1, 1, QUEUE, null);
          verifyNotReceive(1);
          removeConsumer(1);
-         
+
          stop();
          start();
       }
-      
+
    }
-   
+
    public void testRedistributionToQueuesWhereNotAllMessagesMatch() throws Exception
    {
       setupCluster(false);
@@ -469,7 +467,7 @@
       createQueue(1, "queues.testaddress", "queue0", null, false);
       createQueue(2, "queues.testaddress", "queue0", null, false);
 
-      addConsumer(0, 0, "queue0", null);      
+      addConsumer(0, 0, "queue0", null);
 
       waitForBindings(0, "queues.testaddress", 1, 1, true);
       waitForBindings(1, "queues.testaddress", 1, 0, true);
@@ -483,18 +481,18 @@
       sendInRange(0, "queues.testaddress", 10, 20, false, filter2);
 
       removeConsumer(0);
-      addConsumer(1, 1, "queue0", filter1);      
-      addConsumer(2, 2, "queue0", filter2);      
+      addConsumer(1, 1, "queue0", filter1);
+      addConsumer(2, 2, "queue0", filter2);
 
       verifyReceiveAllInRange(0, 10, 1);
       verifyReceiveAllInRange(10, 20, 2);
    }
-   
+
    public void testDelayedRedistribution() throws Exception
    {
       final long delay = 1000;
       setRedistributionDelay(delay);
-      
+
       setupCluster(false);
 
       startServers(0, 1, 2);
@@ -507,7 +505,7 @@
       createQueue(1, "queues.testaddress", "queue0", null, false);
       createQueue(2, "queues.testaddress", "queue0", null, false);
 
-      addConsumer(0, 0, "queue0", null);      
+      addConsumer(0, 0, "queue0", null);
 
       waitForBindings(0, "queues.testaddress", 1, 1, true);
       waitForBindings(1, "queues.testaddress", 1, 0, true);
@@ -520,20 +518,20 @@
       send(0, "queues.testaddress", 20, false, null);
 
       long start = System.currentTimeMillis();
-      
+
       removeConsumer(0);
-      addConsumer(1, 1, "queue0", null);      
-           
+      addConsumer(1, 1, "queue0", null);
+
       long minReceiveTime = start + delay;
-      
+
       verifyReceiveAllNotBefore(minReceiveTime, 20, 1);
    }
-   
+
    public void testDelayedRedistributionCancelled() throws Exception
    {
       final long delay = 1000;
       setRedistributionDelay(delay);
-      
+
       setupCluster(false);
 
       startServers(0, 1, 2);
@@ -546,7 +544,7 @@
       createQueue(1, "queues.testaddress", "queue0", null, false);
       createQueue(2, "queues.testaddress", "queue0", null, false);
 
-      addConsumer(0, 0, "queue0", null);      
+      addConsumer(0, 0, "queue0", null);
 
       waitForBindings(0, "queues.testaddress", 1, 1, true);
       waitForBindings(1, "queues.testaddress", 1, 0, true);
@@ -559,18 +557,18 @@
       send(0, "queues.testaddress", 20, false, null);
 
       removeConsumer(0);
-      addConsumer(1, 1, "queue0", null);     
-      
+      addConsumer(1, 1, "queue0", null);
+
       Thread.sleep(delay / 2);
-      
-      //Add it back on the local queue - this should stop any redistributionm
-      addConsumer(0, 0, "queue0", null); 
-      
+
+      // Add it back on the local queue - this should stop any redistributionm
+      addConsumer(0, 0, "queue0", null);
+
       Thread.sleep(delay);
-           
+
       verifyReceiveAll(20, 0);
    }
-   
+
    public void testRedistributionNumberOfMessagesGreaterThanBatchSize() throws Exception
    {
       setupCluster(false);
@@ -585,7 +583,7 @@
       createQueue(1, "queues.testaddress", "queue0", null, false);
       createQueue(2, "queues.testaddress", "queue0", null, false);
 
-      addConsumer(0, 0, "queue0", null);      
+      addConsumer(0, 0, "queue0", null);
 
       waitForBindings(0, "queues.testaddress", 1, 1, true);
       waitForBindings(1, "queues.testaddress", 1, 0, true);
@@ -598,8 +596,8 @@
       send(0, "queues.testaddress", QueueImpl.REDISTRIBUTOR_BATCH_SIZE * 2, false, null);
 
       removeConsumer(0);
-      addConsumer(1, 1, "queue0", null);      
-           
+      addConsumer(1, 1, "queue0", null);
+
       verifyReceiveAll(QueueImpl.REDISTRIBUTOR_BATCH_SIZE * 2, 1);
    }
 
@@ -636,7 +634,7 @@
       closeAllSessionFactories();
 
       stopServers(0, 1, 2);
-      
+
       clearServer(0, 1, 2);
    }
 



More information about the hornetq-commits mailing list