[jboss-cvs] JBoss Messaging SVN: r6569 - in trunk: src/main/org/jboss/messaging/core/cluster/impl and 8 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Apr 26 08:42:53 EDT 2009


Author: timfox
Date: 2009-04-26 08:42:52 -0400 (Sun, 26 Apr 2009)
New Revision: 6569

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlowRecord.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
more fixes

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-04-26 12:42:52 UTC (rev 6569)
@@ -506,7 +506,7 @@
          boolean attemptFailover = (backupConnectorFactory) != null && (failoverOnServerShutdown || me.getCode() != MessagingException.SERVER_DISCONNECTED);
 
          boolean done = false;
-
+         
          if (attemptFailover || reconnectAttempts != 0)
          {
             lockAllChannel1s();
@@ -709,7 +709,7 @@
       while (true)
       {
          if (closed)
-         {
+         {           
             return null;
          }
 

Modified: trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java	2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java	2009-04-26 12:42:52 UTC (rev 6569)
@@ -368,7 +368,7 @@
                   Map.Entry<String, DiscoveryEntry> entry = iter.next();
 
                   if (entry.getValue().getLastUpdate() + timeout <= now)
-                  {
+                  {                
                      iter.remove();
 
                      changed = true;

Modified: trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2009-04-26 12:42:52 UTC (rev 6569)
@@ -60,6 +60,8 @@
    long generateUniqueID();
    
    long getCurrentUniqueID();
+   
+   void setUniqueIDSequence(long id);
 
    void storeMessage(ServerMessage message) throws Exception;
    

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-04-26 12:42:52 UTC (rev 6569)
@@ -139,7 +139,7 @@
    private volatile boolean started;
 
    private final ExecutorService executor;
-
+   
    public JournalStorageManager(final Configuration config)
    {
       this.executor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-journal-storage-manager"));
@@ -244,7 +244,9 @@
 
    public long generateUniqueID()
    {
-      return idGenerator.generateID();
+      long id = idGenerator.generateID();
+      
+      return id;
    }
    
    public long getCurrentUniqueID()
@@ -252,6 +254,11 @@
       return idGenerator.getCurrentID();
    }
 
+   public void setUniqueIDSequence(final long id)
+   {
+      idGenerator.setID(id);
+   }
+   
    public LargeServerMessage createLargeMessage()
    {
       return new JournalLargeServerMessage(this);

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2009-04-26 12:42:52 UTC (rev 6569)
@@ -55,7 +55,7 @@
 {
    private static final Logger log = Logger.getLogger(NullStorageManager.class);
    
-   private final AtomicLong idGenerator = new AtomicLong(0);
+   private final AtomicLong idSequence = new AtomicLong(0);
    
    private UUID id;
 
@@ -184,15 +184,20 @@
    
    public long generateUniqueID()
    {
-      long id = idGenerator.getAndIncrement();
+      long id = idSequence.getAndIncrement();
       
       return id;
    }
    
    public long getCurrentUniqueID()
    {
-      return idGenerator.get();
+      return idSequence.get();
    }
+   
+   public void setUniqueIDSequence(final long id)
+   {
+      idSequence.set(id);
+   }
 
    public synchronized void start() throws Exception
    {
@@ -213,7 +218,7 @@
       
       id = null;
       
-      idGenerator.set(0);
+      idSequence.set(0);
 
       started = false;
    }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlowRecord.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlowRecord.java	2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlowRecord.java	2009-04-26 12:42:52 UTC (rev 6569)
@@ -43,7 +43,7 @@
    
    void activate(Queue queue) throws Exception;
    
-   void reset() throws Exception;
+   //void reset() throws Exception;
    
    void close() throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-04-26 12:42:52 UTC (rev 6569)
@@ -709,7 +709,7 @@
       }
       catch (Exception e)
       {
-         log.warn("Unable to connect. JMSBridge is now disabled.", e);
+         log.warn("Unable to connect. Bridge is now disabled.", e);
 
          return false;
       }
@@ -767,7 +767,7 @@
             {
                try
                {
-                  flowRecord.reset();
+                 // flowRecord.reset();
                }
                catch (Exception e)
                {

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-04-26 12:42:52 UTC (rev 6569)
@@ -256,7 +256,13 @@
 
       for (MessageFlowRecord record : records.values())
       {
-         record.close();
+         try
+         {
+            record.close();
+         }
+         catch (Exception ignore)
+         {            
+         }
       }
 
       started = false;
@@ -347,7 +353,7 @@
             // have messages - this is up to the administrator to do this
 
             entry.getValue().close();
-
+            
             iter.remove();
          }
       }
@@ -511,13 +517,13 @@
          this.bridge = bridge;
       }
 
-      public synchronized void reset() throws Exception
-      {
-         clearBindings();
+//      public synchronized void reset() throws Exception
+//      {
+//         clearBindings();
+//
+//         firstReset = false;
+//      }
 
-         firstReset = false;
-      }
-
       public synchronized void onMessage(final ClientMessage message)
       {
          try
@@ -666,8 +672,7 @@
                                                                     routingName,
                                                                     queueID,
                                                                     filterString,
-                                                                    queue,
-                                                                    // useDuplicateDetection,
+                                                                    queue,                                                                  
                                                                     bridge.getName(),
                                                                     distance + 1);
 
@@ -679,14 +684,20 @@
                // hops is too high
                // or there are multiple cluster connections for the same address
 
-               log.warn("Remoting queue binding " + clusterName +
+               log.warn("Remote queue binding " + clusterName +
                         " has already been bound in the post office. Most likely cause for this is you have a loop " +
                         "in your cluster due to cluster max-hops being too large or you have multiple cluster connections to the same nodes using overlapping addresses");
 
                return;
             }
 
-            postOffice.addBinding(binding);
+            try
+            {
+               postOffice.addBinding(binding);
+            }
+            catch (Exception ignore)
+            {               
+            }
 
             Bindings theBindings = postOffice.getBindingsForAddress(queueAddress);
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-04-26 12:42:52 UTC (rev 6569)
@@ -134,7 +134,7 @@
    private final Configuration configuration;
 
    private final MBeanServer mbeanServer;
-   
+
    private final Set<ActivateCallback> activateCallbacks = new HashSet<ActivateCallback>();
 
    private volatile boolean started;
@@ -216,7 +216,7 @@
       {
          throw new NullPointerException("Must inject SecurityManager into MessagingServer constructor");
       }
-      
+
       // We need to hard code the version information into a source file
 
       version = VersionLoader.getVersion();
@@ -235,7 +235,6 @@
    // lifecycle methods
    // ----------------------------------------------------------------
 
-   
    public synchronized void start() throws Exception
    {
       if (started)
@@ -410,7 +409,7 @@
    public ClusterManager getClusterManager()
    {
       return clusterManager;
-   }  
+   }
 
    public ReattachSessionResponseMessage reattachSession(final RemotingConnection connection,
                                                          final String name,
@@ -536,7 +535,7 @@
       }
    }
 
-   public void initialiseBackup(final UUID theUUID, final long currentMessageID) throws Exception
+   public void initialiseBackup(final UUID theUUID, final long liveUniqueID) throws Exception
    {
       if (theUUID == null)
       {
@@ -556,19 +555,34 @@
 
          initialisePart2();
 
-         if (currentMessageID != this.storageManager.getCurrentUniqueID())
+         // It is possible, in a replicated environment that ids are slightly different
+         // (live is higher)- this
+         // is due to on stopping of the live server, the cluster connections are stopped and cause
+         // a remove binding for all the flow records, which causes notifications which causes id to be
+         // generated for the notifications.
+         // When shutting down the backup the cluster connections are not active so no bindings are removed
+         // on close
+
+         long backupID = storageManager.getCurrentUniqueID();
+
+         if (liveUniqueID != backupID)
          {
-            initialised = false;
-
-            throw new IllegalStateException("Backup node current id sequence != live node current id sequence " + this.storageManager.getCurrentUniqueID() +
-                                            ", " +
-                                            currentMessageID);
+            if (liveUniqueID > backupID)
+            {
+               storageManager.setUniqueIDSequence(liveUniqueID);
+            }
+            else
+            {
+               initialised = false;
+               
+               throw new IllegalStateException("Live and backup unique ids different. Probably trying to restart a live backup pair after a crash");
+            }
          }
 
          log.info("Backup server is now operational");
       }
    }
-   
+
    public Channel getReplicatingChannel()
    {
       synchronized (replicatingChannelLock)
@@ -708,7 +722,7 @@
    public void destroyQueue(final SimpleString queueName, final ServerSession session) throws Exception
    {
       Binding binding = postOffice.getBinding(queueName);
-      
+
       if (binding == null)
       {
          throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST, "No such queue " + queueName);
@@ -743,7 +757,7 @@
 
       postOffice.removeBinding(queueName);
    }
-   
+
    public synchronized void registerActivateCallback(final ActivateCallback callback)
    {
       activateCallbacks.add(callback);
@@ -754,7 +768,6 @@
       activateCallbacks.remove(callback);
    }
 
-
    // Public
    // ---------------------------------------------------------------------------------------
 
@@ -781,12 +794,12 @@
 
    private synchronized void callActivateCallbacks()
    {
-      for (ActivateCallback callback: activateCallbacks)
+      for (ActivateCallback callback : activateCallbacks)
       {
          callback.activated();
       }
    }
-   
+
    private void checkActivate(final RemotingConnection connection)
    {
       if (configuration.isBackup())
@@ -850,7 +863,7 @@
          replConnection.freeze();
       }
    }
-   
+
    private void initialisePart1() throws Exception
    {
       managementService = new ManagementServiceImpl(mbeanServer, configuration);
@@ -965,7 +978,7 @@
       // Deploy any queues in the Configuration class - if there's no file deployment we still need
       // to load those
       deployQueuesFromConfiguration();
-      
+
       // Deploy any predefined queues - on backup we don't start queue deployer - instead deployments
       // are replicated from live
 
@@ -975,20 +988,20 @@
 
          queueDeployer.start();
       }
-      
+
       // We need to call this here, this gives any dependent server a chance to deploy its own destinations
       // this needs to be done before clustering is initialised, and in the same order on live and backup
       callActivateCallbacks();
 
       // Deply any pre-defined diverts
       deployDiverts();
-     
-      // Set-up the replicating connection 
+
+      // Set-up the replicating connection
       if (!setupReplicatingConnection())
       {
          return;
       }
-      
+
       if (configuration.isClustered())
       {
          // This can't be created until node id is set
@@ -1004,12 +1017,12 @@
 
          clusterManager.start();
       }
-      
+
       if (deploymentManager != null)
       {
          deploymentManager.start();
       }
-      
+
       pagingManager.startGlobalDepage();
 
       initialised = true;
@@ -1161,14 +1174,13 @@
       }
    }
 
-   
    private Queue createQueue(final SimpleString address,
                              final SimpleString queueName,
                              final SimpleString filterString,
                              final boolean durable,
                              final boolean temporary,
                              final boolean ignoreIfExists) throws Exception
-   {      
+   {
       Binding binding = postOffice.getBinding(queueName);
 
       if (binding != null)
@@ -1189,7 +1201,7 @@
       {
          filter = new FilterImpl(filterString);
       }
-            
+
       final Queue queue = queueFactory.createQueue(-1, address, queueName, filter, durable, temporary);
 
       binding = new LocalQueueBinding(address, queue, nodeID);
@@ -1200,7 +1212,7 @@
       }
 
       postOffice.addBinding(binding);
-      
+
       return queue;
    }
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	2009-04-26 12:42:52 UTC (rev 6569)
@@ -115,19 +115,6 @@
 
    protected void failNode(TransportConfiguration conf)
    {
-      // MessagingServer server = this.services[node];
-      //
-      // if (server == null)
-      // {
-      // throw new IllegalArgumentException("No server at " + node);
-      // }
-      //
-      // RemotingConnection conn = ((ClientSessionInternal)this.consumers[node].session).getConnection();
-      //      
-      // conn.fail(new MessagingException(MessagingException.INTERNAL_ERROR, "blah"));
-      //      
-      // //Also fail any cluster connections
-
       ConnectionManagerImpl.failAllConnectionsForConnector(conf);
    }
 
@@ -926,7 +913,7 @@
                                            boolean netty,
                                            boolean backup)
    {
-      this.setupServerWithDiscovery(node, groupAddress, port, fileStorage, netty, backup, -1);
+      setupServerWithDiscovery(node, groupAddress, port, fileStorage, netty, backup, -1);
    }
 
    protected void setupServerWithDiscovery(int node,
@@ -936,7 +923,7 @@
                                            boolean netty,
                                            int backupNode)
    {
-      this.setupServerWithDiscovery(node, groupAddress, port, fileStorage, netty, false, backupNode);
+      setupServerWithDiscovery(node, groupAddress, port, fileStorage, netty, false, backupNode);
    }
 
    protected void setupServerWithDiscovery(int node,
@@ -1029,7 +1016,7 @@
 
       configuration.getBroadcastGroupConfigurations().add(bcConfig);
 
-      DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1", groupAddress, port, 500);
+      DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1", groupAddress, port, 5000);
 
       configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
 
@@ -1128,72 +1115,9 @@
                                                                                       maxHops,
                                                                                       pairs);
       serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
-
-      // clusterConfs.add(clusterConf);
-
-      // serviceFrom.getConfiguration().setClusterConfigurations(clusterConfs);
    }
 
-   // protected void setupClusterConnection(String name,
-   // int nodeFrom,
-   // int nodeTo,
-   // String address,
-   // boolean forwardWhenNoConsumers,
-   // int maxHops,
-   // boolean netty)
-   // {
-   // MessagingServer serviceFrom = servers[nodeFrom];
-   //
-   // if (serviceFrom == null)
-   // {
-   // throw new IllegalStateException("No server at node " + nodeFrom);
-   // }
-   //
-   // Map<String, TransportConfiguration> connectors = serviceFrom
-   // .getConfiguration()
-   // .getConnectorConfigurations();
-   //
-   // Map<String, Object> params = generateParams(nodeTo, netty);
-   //
-   // TransportConfiguration serverTotc;
-   //
-   // if (netty)
-   // {
-   // serverTotc = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params);
-   // }
-   // else
-   // {
-   // serverTotc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
-   // }
-   //
-   // connectors.put(serverTotc.getName(), serverTotc);
-   //
-   // serviceFrom.getConfiguration().setConnectorConfigurations(connectors);
-   //
-   // Pair<String, String> connectorPair = new Pair<String, String>(serverTotc.getName(), null);
-   //
-   // List<Pair<String, String>> pairs = new ArrayList<Pair<String, String>>();
-   // pairs.add(connectorPair);
-   //
-   // ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
-   // address,
-   // 100,
-   // 1d,
-   // -1,
-   // -1,
-   // true,
-   // forwardWhenNoConsumers,
-   // maxHops,
-   // pairs);
-   // List<ClusterConnectionConfiguration> clusterConfs = serviceFrom
-   // .getConfiguration()
-   // .getClusterConfigurations();
-   //
-   // clusterConfs.add(clusterConf);
-   //
-   // serviceFrom.getConfiguration().setClusterConfigurations(clusterConfs);
-   // }
-
+   
    protected void setupClusterConnection(String name,
                                          String address,
                                          boolean forwardWhenNoConsumers,

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java	2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java	2009-04-26 12:42:52 UTC (rev 6569)
@@ -41,20 +41,5 @@
    {
       return true;
    }
-   
-//   public void testStart() throws Exception
-//   {
-//      try
-//      {
-//         setupCluster();
-//
-//         startServers();
-//
-//        // Thread.sleep(100000);
-//      }
-//      catch (Exception e)
-//      {
-//         e.printStackTrace();
-//      }
-//   }
+  
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java	2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java	2009-04-26 12:42:52 UTC (rev 6569)
@@ -1297,7 +1297,7 @@
       setupCluster();
 
       startServers();
-
+       
       setupSessionFactory(0, isNetty());
       setupSessionFactory(1, isNetty());
       setupSessionFactory(2, isNetty());
@@ -1409,14 +1409,14 @@
       removeConsumer(18);
       removeConsumer(21);
       removeConsumer(26);
-
+   
       closeSessionFactory(0);
       closeSessionFactory(3);
-
+      
       stopServers(0, 3);
-
+      
       startServers(3, 0);
-
+      
       setupSessionFactory(0, isNetty());
       setupSessionFactory(3, isNetty());
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java	2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java	2009-04-26 12:42:52 UTC (rev 6569)
@@ -367,8 +367,12 @@
       closeSessionFactory(0);
       closeSessionFactory(3);
 
+      log.info("*** stopping servers");
+      
       stopServers(0, 3, 5, 8);
       
+      log.info("**** rstarting servers");
+      
       startServers(5, 8, 0, 3);
       
       setupSessionFactory(0, isNetty());

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java	2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java	2009-04-26 12:42:52 UTC (rev 6569)
@@ -81,5 +81,199 @@
       setupServerWithDiscovery(4, groupAddress, groupPort, isFileStorage(), isNetty(), false); 
    }
      
+   /*
+    * This is like testStopStartServers but we make sure we pause longer than discovery group timeout
+    * before restarting (5 seconds)
+    */
+   public void testStartStopServersWithPauseBeforeRestarting() throws Exception
+   {
+      setupCluster();
 
+      startServers();
+      
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+      setupSessionFactory(2, isNetty());
+      setupSessionFactory(3, isNetty());
+      setupSessionFactory(4, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(1, "queues.testaddress", "queue1", null, false);
+      createQueue(2, "queues.testaddress", "queue2", null, false);
+      createQueue(3, "queues.testaddress", "queue3", null, false);
+      createQueue(4, "queues.testaddress", "queue4", null, false);
+
+      createQueue(0, "queues.testaddress", "queue5", null, false);
+      createQueue(1, "queues.testaddress", "queue6", null, false);
+      createQueue(2, "queues.testaddress", "queue7", null, false);
+      createQueue(3, "queues.testaddress", "queue8", null, false);
+      createQueue(4, "queues.testaddress", "queue9", null, false);
+
+      createQueue(0, "queues.testaddress", "queue10", null, false);
+      createQueue(1, "queues.testaddress", "queue11", null, false);
+      createQueue(2, "queues.testaddress", "queue12", null, false);
+      createQueue(3, "queues.testaddress", "queue13", null, false);
+      createQueue(4, "queues.testaddress", "queue14", null, false);
+
+      createQueue(0, "queues.testaddress", "queue15", null, false);
+      createQueue(1, "queues.testaddress", "queue15", null, false);
+      createQueue(2, "queues.testaddress", "queue15", null, false);
+      createQueue(3, "queues.testaddress", "queue15", null, false);
+      createQueue(4, "queues.testaddress", "queue15", null, false);
+
+      createQueue(2, "queues.testaddress", "queue16", null, false);
+      createQueue(3, "queues.testaddress", "queue16", null, false);
+      createQueue(4, "queues.testaddress", "queue16", null, false);
+
+      createQueue(0, "queues.testaddress", "queue17", null, false);
+      createQueue(1, "queues.testaddress", "queue17", null, false);
+      createQueue(4, "queues.testaddress", "queue17", null, false);
+
+      createQueue(3, "queues.testaddress", "queue18", null, false);
+      createQueue(4, "queues.testaddress", "queue18", null, false);
+
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 1, "queue1", null);
+      addConsumer(2, 2, "queue2", null);
+      addConsumer(3, 3, "queue3", null);
+      addConsumer(4, 4, "queue4", null);
+
+      addConsumer(5, 0, "queue5", null);
+      addConsumer(6, 1, "queue6", null);
+      addConsumer(7, 2, "queue7", null);
+      addConsumer(8, 3, "queue8", null);
+      addConsumer(9, 4, "queue9", null);
+
+      addConsumer(10, 0, "queue10", null);
+      addConsumer(11, 1, "queue11", null);
+      addConsumer(12, 2, "queue12", null);
+      addConsumer(13, 3, "queue13", null);
+      addConsumer(14, 4, "queue14", null);
+
+      addConsumer(15, 0, "queue15", null);
+      addConsumer(16, 1, "queue15", null);
+      addConsumer(17, 2, "queue15", null);
+      addConsumer(18, 3, "queue15", null);
+      addConsumer(19, 4, "queue15", null);
+
+      addConsumer(20, 2, "queue16", null);
+      addConsumer(21, 3, "queue16", null);
+      addConsumer(22, 4, "queue16", null);
+
+      addConsumer(23, 0, "queue17", null);
+      addConsumer(24, 1, "queue17", null);
+      addConsumer(25, 4, "queue17", null);
+
+      addConsumer(26, 3, "queue18", null);
+      addConsumer(27, 4, "queue18", null);
+
+      waitForBindings(0, "queues.testaddress", 5, 5, true);
+      waitForBindings(1, "queues.testaddress", 5, 5, true);
+      waitForBindings(2, "queues.testaddress", 5, 5, true);
+      waitForBindings(3, "queues.testaddress", 6, 6, true);
+      waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+      waitForBindings(0, "queues.testaddress", 23, 23, false);
+      waitForBindings(1, "queues.testaddress", 23, 23, false); 
+      waitForBindings(2, "queues.testaddress", 23, 23, false);
+      waitForBindings(3, "queues.testaddress", 22, 22, false);
+      waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+      send(0, "queues.testaddress", 10, false, null);
+
+      verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+      removeConsumer(0);
+      removeConsumer(5);
+      removeConsumer(10);
+      removeConsumer(15);
+      removeConsumer(23);
+      removeConsumer(3);
+      removeConsumer(8);
+      removeConsumer(13);
+      removeConsumer(18);
+      removeConsumer(21);
+      removeConsumer(26);
+  
+      closeSessionFactory(0);
+      closeSessionFactory(3);
+
+      stopServers(0, 3);
+      
+      Thread.sleep(10000);
+      
+      startServers(3, 0);
+      
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(3, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(3, "queues.testaddress", "queue3", null, false);
+
+      createQueue(0, "queues.testaddress", "queue5", null, false);
+      createQueue(3, "queues.testaddress", "queue8", null, false);
+
+      createQueue(0, "queues.testaddress", "queue10", null, false);
+      createQueue(3, "queues.testaddress", "queue13", null, false);
+
+      createQueue(0, "queues.testaddress", "queue15", null, false);
+      createQueue(3, "queues.testaddress", "queue15", null, false);
+
+      createQueue(3, "queues.testaddress", "queue16", null, false);
+
+      createQueue(0, "queues.testaddress", "queue17", null, false);
+
+      createQueue(3, "queues.testaddress", "queue18", null, false);
+
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(3, 3, "queue3", null);
+
+      addConsumer(5, 0, "queue5", null);
+      addConsumer(8, 3, "queue8", null);
+
+      addConsumer(10, 0, "queue10", null);
+      addConsumer(13, 3, "queue13", null);
+
+      addConsumer(15, 0, "queue15", null);
+      addConsumer(18, 3, "queue15", null);
+
+      addConsumer(21, 3, "queue16", null);
+
+      addConsumer(23, 0, "queue17", null);
+
+      addConsumer(26, 3, "queue18", null);
+
+      waitForBindings(0, "queues.testaddress", 5, 5, true);
+      waitForBindings(1, "queues.testaddress", 5, 5, true);
+      waitForBindings(2, "queues.testaddress", 5, 5, true);
+      waitForBindings(3, "queues.testaddress", 6, 6, true);
+      waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+      waitForBindings(0, "queues.testaddress", 23, 23, false);
+      waitForBindings(1, "queues.testaddress", 23, 23, false);
+      waitForBindings(2, "queues.testaddress", 23, 23, false);
+      waitForBindings(3, "queues.testaddress", 22, 22, false);
+      waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+      send(0, "queues.testaddress", 10, false, null);
+
+      verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+   }
+
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-04-26 12:42:52 UTC (rev 6569)
@@ -911,6 +911,12 @@
    class FakeStorageManager implements StorageManager
    {
 
+      public void setUniqueIDSequence(long id)
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
       /* (non-Javadoc)
        * @see org.jboss.messaging.core.persistence.StorageManager#addQueueBinding(org.jboss.messaging.core.postoffice.Binding)
        */




More information about the jboss-cvs-commits mailing list