[jboss-cvs] JBoss Messaging SVN: r1704 - in branches/Branch_Client_Failover_Experiment/src/main/org/jboss: jms/util messaging/core/plugin messaging/core/plugin/postoffice/cluster

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Dec 4 22:10:33 EST 2006


Author: clebert.suconic at jboss.com
Date: 2006-12-04 22:10:30 -0500 (Mon, 04 Dec 2006)
New Revision: 1704

Modified:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/JNDIUtil.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
Log:
Fixing dependency of stateTransfer & membership + adding listBindings into ClusteredPostOfficeService to help debug membership data

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/JNDIUtil.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/JNDIUtil.java	2006-12-04 18:13:09 UTC (rev 1703)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/JNDIUtil.java	2006-12-05 03:10:30 UTC (rev 1704)
@@ -21,12 +21,12 @@
   */
 package org.jboss.jms.util;
 
+import java.util.StringTokenizer;
+import javax.naming.Binding;
 import javax.naming.Context;
 import javax.naming.NameNotFoundException;
+import javax.naming.NamingEnumeration;
 import javax.naming.NamingException;
-import javax.naming.NamingEnumeration;
-import javax.naming.Binding;
-import java.util.StringTokenizer;
 
 /**
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
@@ -101,7 +101,19 @@
          context = createContext(c, jndiName.substring(0, idx));
          name = jndiName.substring(idx + 1);
       }
-      context.bind(name, o);
+      boolean failed=false;
+      try
+      {
+         context.rebind(name,o);
+      }
+      catch (Exception ignored)
+      {
+         failed=true;
+      }
+      if (failed)
+      {
+         context.bind(name, o);
+      }
    }
 
    // Attributes ----------------------------------------------------

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java	2006-12-04 18:13:09 UTC (rev 1703)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java	2006-12-05 03:10:30 UTC (rev 1704)
@@ -23,7 +23,6 @@
 
 import javax.management.ObjectName;
 import javax.transaction.TransactionManager;
-
 import org.jboss.jms.selector.SelectorFactory;
 import org.jboss.jms.server.QueuedExecutorPool;
 import org.jboss.jms.server.ServerPeer;
@@ -201,6 +200,11 @@
    {
       this.messagePullPolicy = messagePullPolicy;
    }
+
+   public String listBindings()
+   {
+      return postOffice.printBindingInformation();
+   }
    
    // ServiceMBeanSupport overrides ---------------------------------
    

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-04 18:13:09 UTC (rev 1703)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-05 03:10:30 UTC (rev 1704)
@@ -21,6 +21,7 @@
  */
 package org.jboss.messaging.core.plugin.postoffice.cluster;
 
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -39,10 +40,8 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-
 import javax.sql.DataSource;
 import javax.transaction.TransactionManager;
-
 import org.jboss.jms.server.QueuedExecutorPool;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Delivery;
@@ -75,8 +74,6 @@
 import org.jgroups.blocks.RequestHandler;
 import org.w3c.dom.Element;
 
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
 /**
  * 
  * A DefaultClusteredPostOffice
@@ -312,7 +309,7 @@
       MessageListener cml = new ControlMessageListener();
       MembershipListener ml = new ControlMembershipListener();
       RequestHandler rh = new PostOfficeRequestHandler();
-      
+
       this.controlMessageDispatcher = new MessageDispatcher(syncChannel, cml, ml, rh, true);
 
       Receiver r = new DataReceiver();
@@ -323,7 +320,7 @@
       asyncChannel.connect(groupName);
       
       super.start();
-                  
+
       Address syncAddress = syncChannel.getLocalAddress();
       
       Address asyncAddress = asyncChannel.getLocalAddress();
@@ -331,7 +328,9 @@
       PostOfficeAddressInfo info = new PostOfficeAddressInfo(syncAddress, asyncAddress);
       
       put(ADDRESS_INFO_KEY, info);
-      
+
+      verifyMembership(null,this.currentView);
+
       statsSender.start();
       
       started = true;
@@ -1314,7 +1313,48 @@
       return holdingArea.values();
    }
 
+
    /**
+    *  Verifies changes on the View deciding if a node joined or lefted the cluster
+    *
+    * */
+   private void verifyMembership(View oldView, View newView)
+   {
+      try
+      {
+         if (oldView != null)
+         {
+            for(Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
+            {
+               Address address = (Address)i.next();
+               if (!newView.containsMember(address))
+               {
+                  nodeLeft(address);
+               }
+            }
+         }
+
+         for(Iterator i = newView.getMembers().iterator(); i.hasNext(); )
+         {
+            Address address = (Address)i.next();
+            if (oldView == null || !oldView.containsMember(address))
+            {
+               nodeJoined(address);
+            }
+         }
+      }
+      catch (Throwable e)
+      {
+         log.error("Caught Exception in MembershipListener", e);
+         IllegalStateException e2 = new IllegalStateException(e.getMessage());
+         e2.setStackTrace(e.getStackTrace());
+         throw e2;
+      }
+   }
+
+
+   
+   /**
     * This method fails over all the queues from node <nodeId> onto this node
     * It is triggered when a JGroups view change occurs due to a member leaving and 
     * it's determined the member didn't leave cleanly
@@ -1567,17 +1607,40 @@
 
       out.println("</table>");
 
-//      out.println("Clustered Information");
-//
-//      NodeAddressInfo info[] = getClusterNodes();
-//
-//      out.println("<table border=1><tr><td>Node</td><td>AsyncChannel</td><td>SyncChannel</td></tr>");
-//      for (int i = 0; i < info.length; i++)
-//      {
-//         out.println("<tr><td>" + info[i].getNodeId() + "</td><td>" + info[i].getAsyncChannelAddress() + "</td><td>" + info[i].getSyncChannelAddress() + "</td>");
-//      }
-//      out.println("</table>");
+      out.println("Replicator's Information");
 
+      out.println("<table border=1><tr><td>Node</td><td>Key</td><td>Value</td></tr>");
+
+      for (Iterator iter = replicatedData.entrySet().iterator(); iter.hasNext();)
+      {
+         Map.Entry entry = (Map.Entry) iter.next();
+         Map subMap = (Map)entry.getValue();
+         boolean firstTime=true;
+         for (Iterator subIterator = subMap.entrySet().iterator(); subIterator.hasNext();)
+         {
+            Map.Entry subValue = (Map.Entry) subIterator.next();
+            out.println("<tr><td>" + entry.getKey() + "</td>");
+            out.println("<td>" + subValue.getKey() + "</td><td>" + subValue.getValue() + "</td></tr>" );
+         }
+
+      }
+
+      out.println("</table>");
+
+
+      out.println("View Information");
+
+      out.println("<table border=1><tr><td>Members</td></tr>");
+
+
+      for (Iterator iterMembers = currentView.getMembers().iterator(); iterMembers.hasNext();)
+      {
+         Address address = (Address) iterMembers.next();
+         out.println("<tr><td>" + address + "</td></tr>");
+      }
+
+      out.println("</table>");
+
       return buffer.toString();
    }
 
@@ -1693,7 +1756,7 @@
       if (trace) { log.trace(this + " loading bindings"); }
       
       boolean isState = syncChannel.getState(null, stateTimeout);
-      
+
       if (!isState)
       {
          //Must be first member in group or non clustered- we load the state ourself from the database
@@ -1707,20 +1770,20 @@
          //The state will be set in due course via the MessageListener - we must wait until this happens
          
          if (trace) { log.trace(this.nodeId + " Not first member of group- so waiting for state to arrive...."); }
-         
-         synchronized (setStateLock)
+
+      synchronized (setStateLock)
+      {
+         //TODO we should implement a timeout on this
+         while (!stateSet)
          {
-            //TODO we should implement a timeout on this
-            while (!stateSet)
-            {
-               setStateLock.wait();
-            }
+            setStateLock.wait();
          }
-         
+      }
+
          if (trace) { log.trace(this.nodeId + " Received state"); }
       }
    }
-   
+
    protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, Filter filter, boolean durable, boolean failed)
    {
       Queue queue;
@@ -2193,6 +2256,8 @@
          }
          try
          {
+            // TODO: Make it trace
+            log.info("getState:" + DefaultClusteredPostOffice.this.getOfficeName());
             return getStateAsBytes();
          }
          catch (Exception e)
@@ -2226,7 +2291,9 @@
             }
             try
             {
-               processStateBytes(bytes);               
+               // TODO: Make it trace
+               log.info("setState:" + DefaultClusteredPostOffice.this.getOfficeName());
+               processStateBytes(bytes);
             }
             catch (Exception e)
             {
@@ -2266,52 +2333,36 @@
       
       public void viewAccepted(View newView)
       {
-         if (trace) { log.trace(DefaultClusteredPostOffice.this  + " got new view: " + newView); }
+         //if (trace) { log.trace(DefaultClusteredPostOffice.this  + " got new view: " + newView
+         // + DefaultClusteredPostOffice.this.getOfficeName()); }
+         //TODO: (by Clebert) Most JBoss Services use info on viewAccepted,
+         //TODO:     can't we do the same since this is pretty useful?
+         log.info(DefaultClusteredPostOffice.this  + " got new view: " + newView + " postOffice:"
+            + DefaultClusteredPostOffice.this.getOfficeName());
 
          // JGroups will make sure this method is never called by more than one thread concurrently
          
          View oldView = currentView;
          currentView = newView;
-         
-         try
-         {         
-            if (oldView != null)
-            {
-               for(Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
-               {
-                  Address address = (Address)i.next();
-                  if (!newView.containsMember(address))
-                  {
-                     nodeLeft(address);
-                  }
-               }
-            }
-            
-            for(Iterator i = newView.getMembers().iterator(); i.hasNext(); )
-            {
-               Address address = (Address)i.next();
-               if (oldView == null || !oldView.containsMember(address))
-               {
-                  nodeJoined(address);
-               }
-            }
-         }
-         catch (Throwable e)
+
+         // Since now membership is sent over state transfer, we have to wait stateSet to be set.
+         // If this logic is not good enough, we will have to place Address in a separate data structure
+         // as it used to be (NodeInfos)
+         if (stateSet)
          {
-            log.error("Caught Exception in MembershipListener", e);
-            IllegalStateException e2 = new IllegalStateException(e.getMessage());
-            e2.setStackTrace(e.getStackTrace());
-            throw e2;
+            verifyMembership(oldView, newView);
          }
       }
-  
+
       public byte[] getState()
       {        
          //NOOP
          return null;
       }     
    }
-  
+
+
+
    /*
     * This class is used to listen for messages on the async channel
     */




More information about the jboss-cvs-commits mailing list