[jboss-cvs] JBoss Messaging SVN: r6583 - in trunk: src/main/org/jboss/messaging/core/server/cluster/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Apr 27 13:14:04 EDT 2009


Author: timfox
Date: 2009-04-27 13:14:04 -0400 (Mon, 27 Apr 2009)
New Revision: 6583

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
Log:
more fixes

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-04-27 16:07:31 UTC (rev 6582)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-04-27 17:14:04 UTC (rev 6583)
@@ -297,7 +297,6 @@
       // if unsetting a previous handler may be in onMessage so wait for completion
       else if (handler == null && !noPreviousHandler)
       {
-
          waitForOnMessageToComplete();
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-04-27 16:07:31 UTC (rev 6582)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-04-27 17:14:04 UTC (rev 6583)
@@ -27,6 +27,7 @@
 import static org.jboss.messaging.core.postoffice.impl.PostOfficeImpl.HDR_RESET_QUEUE_DATA;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -233,7 +234,7 @@
       {
          return;
       }
-      
+
       if (discoveryGroup != null)
       {
          discoveryGroup.registerListener(this);
@@ -261,7 +262,7 @@
             record.close();
          }
          catch (Exception ignore)
-         {            
+         {
          }
       }
 
@@ -335,7 +336,7 @@
       {
          map.put(String.valueOf(i++), new DiscoveryEntry(connectorPair, 0));
       }
-      
+
       updateConnectors(map);
    }
 
@@ -353,7 +354,7 @@
             // have messages - this is up to the administrator to do this
 
             entry.getValue().close();
-            
+
             iter.remove();
          }
       }
@@ -468,7 +469,6 @@
       }
    }
 
-
    // Inner classes -----------------------------------------------------------------------------------
 
    private class MessageFlowRecordImpl implements MessageFlowRecord
@@ -501,8 +501,54 @@
          bridge.stop();
 
          clearBindings();
+         
+         waitForReplicationsToComplete(3000);
       }
+      
+      private int replicationCount;
+      
+      private synchronized void waitForReplicationsToComplete(long timeout)
+      {
+         long toWait = timeout;
 
+         long start = System.currentTimeMillis();
+
+         while (replicationCount > 0 && toWait > 0)
+         {
+            try
+            {
+               wait(toWait);
+            }
+            catch (InterruptedException e)
+            {
+            }
+
+            long now = System.currentTimeMillis();
+
+            toWait -= now - start;
+
+            start = now;
+         }
+
+         if (toWait <= 0)
+         {
+            log.warn("Timed out waiting for replication responses to return");
+         }
+      
+      }
+      
+      private synchronized void replicationComplete()
+      {
+         replicationCount--;
+         
+         notify();
+      }
+      
+      private synchronized void beforeReplicate()
+      {
+         replicationCount++;
+      }
+
       public void activate(final Queue queue) throws Exception
       {
          this.queue = queue;
@@ -517,12 +563,12 @@
          this.bridge = bridge;
       }
 
-//      public synchronized void reset() throws Exception
-//      {
-//         clearBindings();
-//
-//         firstReset = false;
-//      }
+      // public synchronized void reset() throws Exception
+      // {
+      // clearBindings();
+      //
+      // firstReset = false;
+      // }
 
       public synchronized void onMessage(final ClientMessage message)
       {
@@ -559,7 +605,7 @@
                }
                case BINDING_REMOVED:
                {
-                  doBindingRemoved(message, replicatingChannel);
+                  doBindingRemoved(message);
 
                   break;
                }
@@ -590,17 +636,15 @@
          }
       }
 
-      private void clearBindings() throws Exception
+      private synchronized void clearBindings() throws Exception
       {
-         for (RemoteQueueBinding binding : bindings.values())
+         for (RemoteQueueBinding binding : new HashSet<RemoteQueueBinding>(bindings.values()))
          {
-            postOffice.removeBinding(binding.getUniqueName());
-         }
-
-         bindings.clear();
+            removeBinding(binding.getClusterName(), replicatingChannel);
+         }         
       }
 
-      private void doBindingAdded(final ClientMessage message, final Channel replChannel) throws Exception
+      private synchronized void doBindingAdded(final ClientMessage message, final Channel replChannel) throws Exception
       {
          Integer distance = (Integer)message.getProperty(ManagementHelper.HDR_DISTANCE);
 
@@ -650,10 +694,11 @@
                                                                    queue.getName(),
                                                                    distance + 1);
 
+            beforeReplicate();
             replChannel.replicatePacket(packet, 1, new Runnable()
             {
                public void run()
-               {
+               {                  
                   try
                   {
                      doBindingAdded(message, null);
@@ -662,6 +707,8 @@
                   {
                      log.error("Failed to add remote queue binding", e);
                   }
+                  
+                  replicationComplete();
                }
             });
          }
@@ -672,7 +719,7 @@
                                                                     routingName,
                                                                     queueID,
                                                                     filterString,
-                                                                    queue,                                                                  
+                                                                    queue,
                                                                     bridge.getName(),
                                                                     distance + 1);
 
@@ -696,7 +743,7 @@
                postOffice.addBinding(binding);
             }
             catch (Exception ignore)
-            {               
+            {
             }
 
             Bindings theBindings = postOffice.getBindingsForAddress(queueAddress);
@@ -705,7 +752,7 @@
          }
       }
 
-      private void doBindingRemoved(final ClientMessage message, final Channel replChannel) throws Exception
+      private void doBindingRemoved(final ClientMessage message) throws Exception
       {
          SimpleString clusterName = (SimpleString)message.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
 
@@ -714,22 +761,30 @@
             throw new IllegalStateException("clusterName is null");
          }
 
+         removeBinding(clusterName, replicatingChannel);
+      }
+      
+      private synchronized void removeBinding(final SimpleString clusterName, final Channel replChannel) throws Exception
+      {
          if (replChannel != null)
          {
             Packet packet = new ReplicateRemoteBindingRemovedMessage(clusterName);
 
+            beforeReplicate();
             replChannel.replicatePacket(packet, 1, new Runnable()
             {
                public void run()
                {
                   try
                   {
-                     doBindingRemoved(message, null);
+                     removeBinding(clusterName, null);
                   }
                   catch (Exception e)
                   {
                      log.error("Failed to remove remote queue binding", e);
                   }
+                  
+                  replicationComplete();
                }
             });
          }
@@ -746,7 +801,7 @@
          }
       }
 
-      private void doConsumerCreated(final ClientMessage message, final Channel replChannel) throws Exception
+      private synchronized void doConsumerCreated(final ClientMessage message, final Channel replChannel) throws Exception
       {
          Integer distance = (Integer)message.getProperty(ManagementHelper.HDR_DISTANCE);
 
@@ -770,6 +825,7 @@
          {
             Packet packet = new ReplicateRemoteConsumerAddedMessage(clusterName, filterString, message.getProperties());
 
+            beforeReplicate();
             replChannel.replicatePacket(packet, 1, new Runnable()
             {
                public void run()
@@ -782,6 +838,8 @@
                   {
                      log.error("Failed to add remote consumer", e);
                   }
+                  
+                  replicationComplete();
                }
             });
          }
@@ -803,7 +861,7 @@
          }
       }
 
-      private void doConsumerClosed(final ClientMessage message, final Channel replChannel) throws Exception
+      private synchronized void doConsumerClosed(final ClientMessage message, final Channel replChannel) throws Exception
       {
          Integer distance = (Integer)message.getProperty(ManagementHelper.HDR_DISTANCE);
 
@@ -829,6 +887,7 @@
                                                                       filterString,
                                                                       message.getProperties());
 
+            beforeReplicate();
             replChannel.replicatePacket(packet, 1, new Runnable()
             {
                public void run()
@@ -841,6 +900,8 @@
                   {
                      log.error("Failed to remove remote consumer", e);
                   }
+                  
+                  replicationComplete();
                }
             });
          }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-04-27 16:07:31 UTC (rev 6582)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-04-27 17:14:04 UTC (rev 6583)
@@ -554,29 +554,14 @@
          this.nodeID = new SimpleString(uuid.toString());
 
          initialisePart2();
-
-         // It is possible, in a replicated environment that ids are slightly different
-         // (live is higher)- this
-         // is due to on stopping of the live server, the cluster connections are stopped and cause
-         // a remove binding for all the flow records, which causes notifications which causes id to be
-         // generated for the notifications.
-         // When shutting down the backup the cluster connections are not active so no bindings are removed
-         // on close
-
+        
          long backupID = storageManager.getCurrentUniqueID();
 
          if (liveUniqueID != backupID)
          {
-            if (liveUniqueID > backupID)
-            {
-               storageManager.setUniqueIDSequence(liveUniqueID);
-            }
-            else
-            {
-               initialised = false;
-               
-               throw new IllegalStateException("Live and backup unique ids different. Probably trying to restart a live backup pair after a crash");
-            }
+            initialised = false;
+            
+            throw new IllegalStateException("Live and backup unique ids different. You're probably trying to restart a live backup pair after a crash");            
          }
 
          log.info("Backup server is now operational");
@@ -804,6 +789,8 @@
    {
       if (configuration.isBackup())
       {
+         log.info("*** activating");
+         
          synchronized (this)
          {
             freezeBackupConnection();

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java	2009-04-27 16:07:31 UTC (rev 6582)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java	2009-04-27 17:14:04 UTC (rev 6583)
@@ -251,8 +251,6 @@
       setupCluster();
 
       startServers();
-      
-      log.info("*** started servers");
 
       setupSessionFactory(0, isNetty());
       setupSessionFactory(1, isNetty());
@@ -369,14 +367,12 @@
       closeSessionFactory(0);
       closeSessionFactory(3);
 
-      log.info("*** stopping servers");
-      
       stopServers(0, 3, 5, 8);
-      
-      log.info("**** rstarting servers");
-      
+
       startServers(5, 8, 0, 3);
-      
+
+      Thread.sleep(2000);
+
       setupSessionFactory(0, isNetty());
       setupSessionFactory(3, isNetty());
 
@@ -439,8 +435,6 @@
       verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
 
       verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
-
-      tearDown();
    }
 
    @Override




More information about the jboss-cvs-commits mailing list