[jboss-cvs] JBoss Messaging SVN: r8216 - in branches/port1842/src/main/org/jboss: messaging/core/contract and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Feb 9 12:18:07 EST 2011
Author: gaohoward
Date: 2011-02-09 12:18:07 -0500 (Wed, 09 Feb 2011)
New Revision: 8216
Added:
branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/StateRequest.java
Modified:
branches/port1842/src/main/org/jboss/jms/server/MessagingClusterHealthMBean.java
branches/port1842/src/main/org/jboss/messaging/core/contract/ClusterNotification.java
branches/port1842/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java
branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java
Log:
fix client load balance and sucker issue
Modified: branches/port1842/src/main/org/jboss/jms/server/MessagingClusterHealthMBean.java
===================================================================
--- branches/port1842/src/main/org/jboss/jms/server/MessagingClusterHealthMBean.java 2011-02-09 16:25:26 UTC (rev 8215)
+++ branches/port1842/src/main/org/jboss/jms/server/MessagingClusterHealthMBean.java 2011-02-09 17:18:07 UTC (rev 8216)
@@ -61,8 +61,6 @@
private NodeRecovery nodeRecovery;
- private long stopTime;
-
private long shutdownDelay;
public void startService() throws Exception
@@ -117,8 +115,6 @@
nodeRecovery.start();
- stopTime = System.currentTimeMillis();
-
log.info("JBM node is stopped.");
}
@@ -126,7 +122,7 @@
{
try
{
- long nodeRefreshInterval = (Long)server.invoke(postOfficeServiceName, "getNodeStateRefreshInterval", new Object[0], new String[0]);
+ long nodeRefreshInterval = (Long)server.getAttribute(postOfficeServiceName, "NodeStateRefreshInterval");
shutdownDelay = nodeRefreshInterval * 2;
}
catch (Exception e)
@@ -246,17 +242,19 @@
private void makeSureDelay()
{
- long delay = System.currentTimeMillis() - stopTime;
- while (delay < shutdownDelay)
+ long delay = shutdownDelay;
+
+ while (delay > 0)
{
+ long stopTime = System.currentTimeMillis();
try
{
- Thread.sleep(shutdownDelay - delay);
+ Thread.sleep(delay);
}
catch (InterruptedException e)
{
}
- delay = System.currentTimeMillis() - stopTime;
+ delay = shutdownDelay - (System.currentTimeMillis() - stopTime);
}
}
@@ -283,6 +281,7 @@
public NodeRecovery()
{
+ this.setDaemon(true);
}
public synchronized void abandon()
Modified: branches/port1842/src/main/org/jboss/messaging/core/contract/ClusterNotification.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/contract/ClusterNotification.java 2011-02-09 16:25:26 UTC (rev 8215)
+++ branches/port1842/src/main/org/jboss/messaging/core/contract/ClusterNotification.java 2011-02-09 17:18:07 UTC (rev 8216)
@@ -47,6 +47,8 @@
public static final int TYPE_REPLICATOR_PUT = 6;
public static final int TYPE_REPLICATOR_REMOVE = 7;
+
+ public static final int TYPE_NODE_FAILEDOVER = 8;
public int type;
Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2011-02-09 16:25:26 UTC (rev 8215)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2011-02-09 17:18:07 UTC (rev 8216)
@@ -376,6 +376,15 @@
removeSucker(queueName, notification.nodeID);
}
}
+ else if (notification.type == ClusterNotification.TYPE_NODE_FAILEDOVER)
+ {
+ // clean up connections
+ ConnectionInfo conn = (ConnectionInfo)connections.remove(notification.nodeID);
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
}
catch (Exception e)
{
Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java 2011-02-09 16:25:26 UTC (rev 8215)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java 2011-02-09 17:18:07 UTC (rev 8216)
@@ -65,6 +65,8 @@
public static final int ADD_ALL_REPLICATED_DELIVERIES_REQUEST = 12;
public static final int GET_REPLICATED_DELIVERIES_REQUEST = 13;
+
+ public static final int STATE_REQUEST = 14;
protected static final int NULL = 0;
@@ -148,6 +150,11 @@
request = new GetReplicatedDeliveriesRequest();
break;
}
+ case STATE_REQUEST:
+ {
+ request = new StateRequest();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + type);
Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2011-02-09 16:25:26 UTC (rev 8215)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2011-02-09 17:18:07 UTC (rev 8216)
@@ -347,7 +347,33 @@
}
}
}
-
+
+ public Object unicastRequest(ClusterRequest request, Address address) throws Exception
+ {
+ if (!requestTarget.isAvailable())
+ {
+ if (trace)
+ {
+ log.trace(this + " the request target is not available");
+ }
+ }
+
+ Object response = null;
+
+ if (ready.get())
+ {
+ if (trace)
+ {
+ log.trace(this + " sending " + request + " to control channel");
+ }
+
+ Message message = new Message(address, null, writeRequest(request));
+
+ response = dispatcher.sendMessage(message, GroupRequest.GET_FIRST, castTimeout);
+ }
+ return response;
+ }
+
public void multicastData(ClusterRequest request) throws Exception
{
if (!requestTarget.isAvailable())
Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-02-09 16:25:26 UTC (rev 8215)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-02-09 17:18:07 UTC (rev 8216)
@@ -261,6 +261,12 @@
private boolean keepOldFailoverModel = true;
private Object jgroupsLock = new Object();
+
+ private boolean autoRestarted = false;
+
+ private boolean jgroupsBack = false;
+
+ private Address justJoined;
// Constructors ---------------------------------------------------------------------------------
@@ -411,9 +417,20 @@
//Sanity check - we check there aren't any other nodes already in the cluster with the same node id
if (knowAboutNodeId(thisNodeID))
{
- throw new IllegalArgumentException("Cannot start post office since there is already a post office in the " +
- "cluster with the same node id (" + thisNodeID + "). " +
- "Are you sure you have given each node a unique node id during installation?");
+ if (this.keepOldFailoverModel)
+ {
+ throw new IllegalArgumentException("Cannot start post office since there is already a post office in the " +
+ "cluster with the same node id (" + thisNodeID + "). " +
+ "Are you sure you have given each node a unique node id during installation?");
+ }
+ else
+ {
+ log.info("The node id already in the state. This could happen in case of an auto-restart where JGroups is already" + " normal when post office restarts.");
+ synchronized (jgroupsLock)
+ {
+ jgroupsBack = true;
+ }
+ }
}
PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getControlChannelAddress(), groupMember.getDataChannelAddress());
@@ -1410,6 +1427,15 @@
return;
Integer newNode = findNodeIDForAddress(address);
+ synchronized (jgroupsLock)
+ {
+ if (autoRestarted)
+ {
+ justJoined = address;
+ autoRestarted = false;
+ }
+ }
+
if (newNode == null)
{
// newly Joined node not added yet.
@@ -1445,7 +1471,29 @@
}.start();
}
}
-
+
+ private void requestState(final Address address) throws Exception
+ {
+ Thread requester = new Thread()
+ {
+ public void run()
+ {
+ StateRequest request = new StateRequest();
+ try
+ {
+ byte[] state = (byte[])groupMember.unicastRequest(request, address);
+ setState(state);
+ }
+ catch (Exception e)
+ {
+ log.error("Error getting state from " + address, e);
+ }
+ }
+ };
+ requester.start();
+ requester.join();
+ }
+
public void nodesLeft(List addresses) throws Throwable
{
if (trace)
@@ -3902,7 +3950,13 @@
{
cleanDataForNode(failedNodeID);
}
+ else
+ {
+ notification = new ClusterNotification(ClusterNotification.TYPE_NODE_FAILEDOVER, failedNodeID.intValue(), null);
+ clusterNotifier.sendNotification(notification);
+ }
+
log.debug(this + " announcing that failover procedure is complete");
notification = new ClusterNotification(ClusterNotification.TYPE_FAILOVER_END, failedNodeID.intValue(), null);
@@ -4271,17 +4325,75 @@
{
synchronized (jgroupsLock)
{
+ autoRestarted = true;
//if I am alone but there are still others there
while (isFirstNode() && (clusterState.nodeNum() > 1))
{
try
{
+ View v = groupMember.getCurrentView();
+
+ if ((v != null) && v.size() > 1)
+ break;
+
jgroupsLock.wait(5000);
}
catch (InterruptedException e)
{
}
}
+ log.info("JGroups starts to work again, waiting for state.");
+ while (autoRestarted && (!jgroupsBack))
+ {
+ try
+ {
+ jgroupsLock.wait(5000);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ if (!jgroupsBack)
+ {
+ try
+ {
+ PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getControlChannelAddress(),
+ groupMember.getDataChannelAddress());
+
+ nodeIDAddressMap.put(new Integer(thisNodeID), info);
+
+ String clientVMId = JMSClientVMIdentifier.instance;
+
+ log.info("putting replicat");
+
+ // add our vm identifier to the replicator
+ put(Replicator.JVM_ID_KEY, clientVMId);
+
+ log.info("multicast...");
+
+ groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
+
+ log.info("requesting new state from failover node " + failoverNodeID + " address: " + justJoined);
+
+ if (justJoined != null)
+ {
+ requestState(justJoined);
+ }
+
+ // calculate the failover map
+ calculateFailoverMap();
+
+ log.info("new failover map: " + this.dumpFailoverMap(this.failoverMap));
+
+ log.info("state request got.");
+ }
+ catch (Exception e)
+ {
+ log.error("Error rejoining the cluster", e);
+ }
+ }
+ log.info("Now node is ready for work.");
}
}
Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java 2011-02-09 16:25:26 UTC (rev 8215)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java 2011-02-09 17:18:07 UTC (rev 8216)
@@ -72,4 +72,6 @@
void handleNodeDead(int nodeId);
boolean isAvailable();
+
+ Object getState() throws Exception;
}
Added: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/StateRequest.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/StateRequest.java (rev 0)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/StateRequest.java 2011-02-09 17:18:07 UTC (rev 8216)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2011, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.impl.postoffice;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+/**
+ * A StateRequest
+ *
+ * @author howard
+ *
+ * Created Feb 8, 2011 12:00:55 AM
+ *
+ *
+ */
+public class StateRequest extends ClusterRequest
+{
+ public void write(DataOutputStream out) throws Exception
+ {
+ }
+
+ public void read(DataInputStream in) throws Exception
+ {
+ }
+
+ Object execute(RequestTarget office) throws Throwable
+ {
+ return office.getState();
+ }
+
+ byte getType()
+ {
+ return ClusterRequest.STATE_REQUEST;
+ }
+}
More information about the jboss-cvs-commits
mailing list