[jboss-cvs] JBoss Messaging SVN: r8216 - in branches/port1842/src/main/org/jboss: messaging/core/contract and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Feb 9 12:18:07 EST 2011


Author: gaohoward
Date: 2011-02-09 12:18:07 -0500 (Wed, 09 Feb 2011)
New Revision: 8216

Added:
   branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/StateRequest.java
Modified:
   branches/port1842/src/main/org/jboss/jms/server/MessagingClusterHealthMBean.java
   branches/port1842/src/main/org/jboss/messaging/core/contract/ClusterNotification.java
   branches/port1842/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
   branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java
   branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
   branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java
Log:
fix client load balance and sucker issue


Modified: branches/port1842/src/main/org/jboss/jms/server/MessagingClusterHealthMBean.java
===================================================================
--- branches/port1842/src/main/org/jboss/jms/server/MessagingClusterHealthMBean.java	2011-02-09 16:25:26 UTC (rev 8215)
+++ branches/port1842/src/main/org/jboss/jms/server/MessagingClusterHealthMBean.java	2011-02-09 17:18:07 UTC (rev 8216)
@@ -61,8 +61,6 @@
    
    private NodeRecovery nodeRecovery;
    
-   private long stopTime;
-   
    private long shutdownDelay;
 
    public void startService() throws Exception
@@ -117,8 +115,6 @@
       
       nodeRecovery.start();
       
-      stopTime = System.currentTimeMillis();
-      
       log.info("JBM node is stopped.");
    }
 
@@ -126,7 +122,7 @@
    {
       try
       {
-         long nodeRefreshInterval = (Long)server.invoke(postOfficeServiceName, "getNodeStateRefreshInterval", new Object[0], new String[0]);
+         long nodeRefreshInterval = (Long)server.getAttribute(postOfficeServiceName, "NodeStateRefreshInterval");
          shutdownDelay = nodeRefreshInterval * 2;
       }
       catch (Exception e)
@@ -246,17 +242,19 @@
    
    private void makeSureDelay()
    {
-      long delay = System.currentTimeMillis() - stopTime;
-      while (delay < shutdownDelay)
+      long delay = shutdownDelay;
+      
+      while (delay > 0)
       {
+         long stopTime = System.currentTimeMillis();
          try
          {
-            Thread.sleep(shutdownDelay - delay);
+            Thread.sleep(delay);
          }
          catch (InterruptedException e)
          {
          }
-         delay = System.currentTimeMillis() - stopTime;
+         delay = shutdownDelay - (System.currentTimeMillis() - stopTime);
       }
       
    }
@@ -283,6 +281,7 @@
       
       public NodeRecovery()
       {
+         this.setDaemon(true);
       }
       
       public synchronized void abandon()

Modified: branches/port1842/src/main/org/jboss/messaging/core/contract/ClusterNotification.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/contract/ClusterNotification.java	2011-02-09 16:25:26 UTC (rev 8215)
+++ branches/port1842/src/main/org/jboss/messaging/core/contract/ClusterNotification.java	2011-02-09 17:18:07 UTC (rev 8216)
@@ -47,6 +47,8 @@
 	public static final int TYPE_REPLICATOR_PUT = 6;
 	
 	public static final int TYPE_REPLICATOR_REMOVE = 7;
+	
+	public static final int TYPE_NODE_FAILEDOVER = 8;
 		
 	public int type;
 	

Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2011-02-09 16:25:26 UTC (rev 8215)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2011-02-09 17:18:07 UTC (rev 8216)
@@ -376,6 +376,15 @@
 					removeSucker(queueName, notification.nodeID);					
 				}
 			}
+         else if (notification.type == ClusterNotification.TYPE_NODE_FAILEDOVER)
+         {
+            // clean up connections
+            ConnectionInfo conn = (ConnectionInfo)connections.remove(notification.nodeID);
+            if (conn != null)
+            {
+               conn.close();
+            }
+         }
 		}
 		catch (Exception e)
 		{

Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java	2011-02-09 16:25:26 UTC (rev 8215)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java	2011-02-09 17:18:07 UTC (rev 8216)
@@ -65,6 +65,8 @@
 	public static final int ADD_ALL_REPLICATED_DELIVERIES_REQUEST = 12;
 	
 	public static final int GET_REPLICATED_DELIVERIES_REQUEST = 13;
+	
+	public static final int STATE_REQUEST = 14;
 		
 	
 	protected static final int NULL = 0;
@@ -148,6 +150,11 @@
          	request = new GetReplicatedDeliveriesRequest();
          	break;
          }
+         case STATE_REQUEST:
+         {
+            request = new StateRequest();
+            break;
+         }
          default:
          {
             throw new IllegalArgumentException("Invalid type: " + type);

Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2011-02-09 16:25:26 UTC (rev 8215)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2011-02-09 17:18:07 UTC (rev 8216)
@@ -347,7 +347,33 @@
 	   	}
    	}
    }
-   
+
+   public Object unicastRequest(ClusterRequest request, Address address) throws Exception
+   {
+      if (!requestTarget.isAvailable())
+      {
+         if (trace)
+         {
+            log.trace(this + " the request target is not available");
+         }
+      }
+
+      Object response = null;
+
+      if (ready.get())
+      {
+         if (trace)
+         {
+            log.trace(this + " sending " + request + " to control channel");
+         }
+
+         Message message = new Message(address, null, writeRequest(request));
+
+         response = dispatcher.sendMessage(message, GroupRequest.GET_FIRST, castTimeout);
+      }
+      return response;
+   }
+
    public void multicastData(ClusterRequest request) throws Exception
    {
       if (!requestTarget.isAvailable())

Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-02-09 16:25:26 UTC (rev 8215)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-02-09 17:18:07 UTC (rev 8216)
@@ -261,6 +261,12 @@
    private boolean keepOldFailoverModel = true;
 
    private Object jgroupsLock = new Object();
+   
+   private boolean autoRestarted = false;
+   
+   private boolean jgroupsBack = false;
+   
+   private Address justJoined;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -411,9 +417,20 @@
 	      //Sanity check - we check there aren't any other nodes already in the cluster with the same node id
 	      if (knowAboutNodeId(thisNodeID))
 	      {
-	      	throw new IllegalArgumentException("Cannot start post office since there is already a post office in the " +
-	      			"cluster with the same node id (" + thisNodeID + "). " +
-	      			"Are you sure you have given each node a unique node id during installation?");
+            if (this.keepOldFailoverModel)
+            {
+               throw new IllegalArgumentException("Cannot start post office since there is already a post office in the " + 
+                                                  "cluster with the same node id (" + thisNodeID + "). " +
+                                                  "Are you sure you have given each node a unique node id during installation?");
+            }
+            else
+            {
+               log.info("The node id already in the state. This could happen in case of an auto-restart where JGroups is already" + " normal when post office restarts.");
+               synchronized (jgroupsLock)
+               {
+                  jgroupsBack = true;
+               }
+            }
 	      }
 	
 	      PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getControlChannelAddress(), groupMember.getDataChannelAddress());
@@ -1410,6 +1427,15 @@
          return;
       Integer newNode = findNodeIDForAddress(address);
 
+      synchronized (jgroupsLock)
+      {
+         if (autoRestarted)
+         {
+            justJoined = address;
+            autoRestarted = false;
+         }
+      }
+
       if (newNode == null)
       {
          // newly Joined node not added yet.
@@ -1445,7 +1471,29 @@
          }.start();
       }
    }
-   
+
+   private void requestState(final Address address) throws Exception
+   {
+      Thread requester = new Thread()
+      {
+         public void run()
+         {
+            StateRequest request = new StateRequest();
+            try
+            {
+               byte[] state = (byte[])groupMember.unicastRequest(request, address);
+               setState(state);
+            }
+            catch (Exception e)
+            {
+               log.error("Error getting state from " + address, e);
+            }
+         }
+      };
+      requester.start();
+      requester.join();
+   }
+
    public void nodesLeft(List addresses) throws Throwable
    {
       if (trace)
@@ -3902,7 +3950,13 @@
       {
          cleanDataForNode(failedNodeID);
       }
+      else
+      {
+         notification = new ClusterNotification(ClusterNotification.TYPE_NODE_FAILEDOVER, failedNodeID.intValue(), null);
       
+         clusterNotifier.sendNotification(notification);
+      }
+
       log.debug(this + " announcing that failover procedure is complete");
 
       notification = new ClusterNotification(ClusterNotification.TYPE_FAILOVER_END, failedNodeID.intValue(), null);
@@ -4271,17 +4325,75 @@
    {
       synchronized (jgroupsLock)
       {
+         autoRestarted = true;
          //if I am alone but there are still others there
          while (isFirstNode() && (clusterState.nodeNum() > 1))
          {
             try
             {
+               View v = groupMember.getCurrentView();
+
+               if ((v != null) && v.size() > 1)
+                  break;
+
                jgroupsLock.wait(5000);
             }
             catch (InterruptedException e)
             {
             }
          }
+         log.info("JGroups starts to work again, waiting for state.");
+         while (autoRestarted && (!jgroupsBack))
+         {
+            try
+            {
+               jgroupsLock.wait(5000);
+            }
+            catch (InterruptedException e)
+            {
+            }
+         }
+
+         if (!jgroupsBack)
+         {
+            try
+            {
+               PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getControlChannelAddress(),
+                                                                      groupMember.getDataChannelAddress());
+
+               nodeIDAddressMap.put(new Integer(thisNodeID), info);
+
+               String clientVMId = JMSClientVMIdentifier.instance;
+
+               log.info("putting replicat");
+
+               // add our vm identifier to the replicator
+               put(Replicator.JVM_ID_KEY, clientVMId);
+
+               log.info("multicast...");
+
+               groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
+
+               log.info("requesting new state from failover node " + failoverNodeID + " address: " + justJoined);
+
+               if (justJoined != null)
+               {
+                  requestState(justJoined);
+               }
+
+               // calculate the failover map
+               calculateFailoverMap();
+
+               log.info("new failover map: " + this.dumpFailoverMap(this.failoverMap));
+
+               log.info("state request got.");
+            }
+            catch (Exception e)
+            {
+               log.error("Error rejoining the cluster", e);
+            }
+         }
+         log.info("Now node is ready for work.");
       }
    }
     

Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java	2011-02-09 16:25:26 UTC (rev 8215)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java	2011-02-09 17:18:07 UTC (rev 8216)
@@ -72,4 +72,6 @@
    void handleNodeDead(int nodeId);
 
    boolean isAvailable();
+   
+   Object getState() throws Exception;
 }

Added: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/StateRequest.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/StateRequest.java	                        (rev 0)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/StateRequest.java	2011-02-09 17:18:07 UTC (rev 8216)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2011, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.impl.postoffice;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+/**
+ * A StateRequest
+ *
+ * @author howard
+ * 
+ * Created Feb 8, 2011 12:00:55 AM
+ *
+ *
+ */
+public class StateRequest extends ClusterRequest
+{
+   public void write(DataOutputStream out) throws Exception
+   {
+   }
+
+   public void read(DataInputStream in) throws Exception
+   {
+   }
+
+   Object execute(RequestTarget office) throws Throwable
+   {
+      return office.getState();
+   }
+
+   byte getType()
+   {
+      return ClusterRequest.STATE_REQUEST;
+   }
+}



More information about the jboss-cvs-commits mailing list