[jboss-cvs] JBoss Messaging SVN: r8197 - in branches/port1842: integration/EAP4/etc/xmdesc and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jan 26 02:03:20 EST 2011
Author: gaohoward
Date: 2011-01-26 02:03:19 -0500 (Wed, 26 Jan 2011)
New Revision: 8197
Added:
branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/QuarantinedNode.java
Modified:
branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml
branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml
branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml
branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml
branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml
branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml
branches/port1842/integration/EAP4/etc/xmdesc/MessagingPostOffice-xmbean.xml
branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java
branches/port1842/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java
branches/port1842/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java
Log:
1842
Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml 2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml 2011-01-26 07:03:19 UTC (rev 8197)
@@ -134,6 +134,12 @@
INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID SMALLINT, PING_TIMESTAMP INTEGER, STATE SMALLINT, PRIMARY KEY(NODE_ID))
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?
+UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=? WHERE NODE_ID=?
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, ?, ?)
+DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
]]></attribute>
<!-- This post office is non clustered. If you want a clustered post office then set to true -->
Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml 2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml 2011-01-26 07:03:19 UTC (rev 8197)
@@ -130,6 +130,12 @@
INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID SMALLINT, PING_TIMESTAMP INTEGER, STATE SMALLINT, PRIMARY KEY(NODE_ID))
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
+UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=? WHERE NODE_ID=?
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, ?, ?)
+DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
]]></attribute>
<!-- This post office is non clustered. If you want a clustered post office then set to true -->
Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml 2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml 2011-01-26 07:03:19 UTC (rev 8197)
@@ -130,6 +130,12 @@
INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP BIGINT, STATE INTEGER, PRIMARY KEY(NODE_ID)) ENGINE = INNODB
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?
+UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP = ? WHERE NODE_ID = ?
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, ?, ?)
+DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID = ?
]]></attribute>
<!-- This post office is non clustered. If you want a clustered post office then set to true -->
Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml 2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml 2011-01-26 07:03:19 UTC (rev 8197)
@@ -132,6 +132,12 @@
INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP BIGINT, STATE INTEGER, PRIMARY KEY(NODE_ID)) ENGINE = NDBCLUSTER
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
+UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=? WHERE NODE_ID=?
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, ?, ?)
+DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
]]></attribute>
<!-- This post office is clustered. If you don't want a clustered post office then set to false -->
Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml 2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml 2011-01-26 07:03:19 UTC (rev 8197)
@@ -134,6 +134,12 @@
INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP INTEGER, STATE INTEGER, PRIMARY KEY(NODE_ID))
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
+UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=? WHERE NODE_ID=?
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, ?, ?)
+DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
]]></attribute>
<!-- This post office is non clustered. If you want a clustered post office then set to true -->
Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml 2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml 2011-01-26 07:03:19 UTC (rev 8197)
@@ -130,6 +130,12 @@
INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP BIGINT, STATE INTEGER, PRIMARY KEY(NODE_ID))
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
+UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=? WHERE NODE_ID=?
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, ?, ?)
+DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
]]></attribute>
<!-- This post office is non clustered. If you want a clustered post office then set to true -->
Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml 2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml 2011-01-26 07:03:19 UTC (rev 8197)
@@ -135,6 +135,12 @@
INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID SMALLINT, PING_TIMESTAMP INTEGER, STATE SMALLINT, PRIMARY KEY(NODE_ID))
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
+UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=? WHERE NODE_ID=?
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, ?, ?)
+DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
]]></attribute>
<!-- This post office is non clustered. If you want a clustered post office then set to true -->
Modified: branches/port1842/integration/EAP4/etc/xmdesc/MessagingPostOffice-xmbean.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/xmdesc/MessagingPostOffice-xmbean.xml 2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/integration/EAP4/etc/xmdesc/MessagingPostOffice-xmbean.xml 2011-01-26 07:03:19 UTC (rev 8197)
@@ -155,6 +155,18 @@
<type>boolean</type>
</attribute>
+ <attribute access="read-write" getMethod="getKeepOldFailoverModel" setMethod="setKeepOldFailoverModel">
+ <description>If JBM cluster failover should be in old style, default true</description>
+ <name>KeepOldFailoverModel</name>
+ <type>boolean</type>
+ </attribute>
+
+ <attribute access="read-write" getMethod="getNodeStateRefreshInterval" setMethod="setNodeStateRefreshInterval">
+ <description>Time (milliseconds) between two consecutive timestamping, default 30000</description>
+ <name>NodeStateRefreshInterval</name>
+ <type>long</type>
+ </attribute>
+
<!-- Managed operations -->
<operation>
Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2011-01-26 07:03:19 UTC (rev 8197)
@@ -27,6 +27,7 @@
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.MessageDispatcher;
import org.jgroups.blocks.RequestHandler;
+import org.jgroups.jmx.JmxConfigurator;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
@@ -37,6 +38,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@@ -46,6 +48,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
/**
*
* This class handles the interface with JGroups
@@ -58,6 +63,10 @@
*/
public class GroupMember
{
+ public static final String JGROUPS_JMX_DOMAIN = "jboss.jgroups";
+ public static final String CHANNEL_JMX_ATTRIBUTES = "type=channel,cluster=";
+ public static final String PROTOCOL_JMX_ATTRIBUTES = "type=protocol,cluster=";
+
public static final String DATA_SUFFIX = "-DATA";
public static final String CONTROL_SUFFIX = "-CTRL";
@@ -93,6 +102,10 @@
private CountDownLatch latch;
private volatile boolean starting;
+
+ //jmx
+ protected boolean channelRegistered;
+ protected boolean protocolsRegistered;
//We need to process view changes on a different thread, since if we have more than one node running
//in the same VM then the thread that sends the leave message ends up executing the view change on the other node
@@ -264,6 +277,11 @@
public void multicastControl(ClusterRequest request, boolean sync) throws Exception
{
+ if (!requestTarget.isAvailable())
+ {
+ if (trace) { log.trace(this + " the request target is not available"); }
+ }
+
if (ready.get())
{
if (trace) { log.trace(this + " multicasting " + request + " to control channel, sync=" + sync); }
@@ -296,6 +314,11 @@
public void unicastControl(ClusterRequest request, Address address, boolean sync) throws Exception
{
+ if (!requestTarget.isAvailable())
+ {
+ if (trace) { log.trace(this + " the request target is not available"); }
+ }
+
if (ready.get())
{
if (trace) { log.trace(this + " multicasting " + request + " to control channel, sync=" + sync); }
@@ -327,6 +350,11 @@
public void multicastData(ClusterRequest request) throws Exception
{
+ if (!requestTarget.isAvailable())
+ {
+ if (trace) { log.trace(this + " the request target is not available"); }
+ }
+
if (ready.get())
{
if (trace) { log.trace(this + " multicasting " + request + " to data channel"); }
@@ -339,7 +367,12 @@
public void unicastData(ClusterRequest request, Address address) throws Exception
{
- if (ready.get())
+ if (!requestTarget.isAvailable())
+ {
+ if (trace) { log.trace(this + " the request target is not available"); }
+ }
+
+ if (ready.get())
{
if (trace) { log.trace(this + " unicasting " + request + " to address " + address); }
@@ -658,4 +691,71 @@
}
}
}
+
+ public void registerChannelInJmx(MBeanServer server, String channelPartitionName)
+ {
+ try
+ {
+ String protocolPrefix = JGROUPS_JMX_DOMAIN + ":" + PROTOCOL_JMX_ATTRIBUTES + channelPartitionName;
+ JmxConfigurator.registerProtocols(server, (JChannel)controlChannel, protocolPrefix);
+ protocolsRegistered = true;
+
+ log.debug("Registed protocol: " + protocolPrefix);
+
+ String name = JGROUPS_JMX_DOMAIN + ":" + CHANNEL_JMX_ATTRIBUTES + channelPartitionName;
+ JmxConfigurator.registerChannel((JChannel)controlChannel, server, name);
+ channelRegistered = true;
+
+ log.debug("Registed channel: " + name);
+ }
+ catch (Exception e)
+ {
+ log.error("Caught exception registering channel in JXM", e);
+ }
+ }
+
+ public void unregisterChannelInJmx(MBeanServer server, String channelPartitionName)
+ {
+ ObjectName on = null;
+ if (channelRegistered)
+ {
+ // Unregister the channel itself
+ try
+ {
+ on = new ObjectName(JGROUPS_JMX_DOMAIN + ":" + CHANNEL_JMX_ATTRIBUTES + channelPartitionName);
+ server.unregisterMBean(on);
+ }
+ catch (Exception e)
+ {
+ if (on != null)
+ log.error("Caught exception unregistering channel at " + on, e);
+ else
+ log.error("Caught exception unregistering channel", e);
+ }
+ }
+
+ if (protocolsRegistered)
+ {
+ // Unregister the protocols
+ try
+ {
+ on = new ObjectName(JGROUPS_JMX_DOMAIN + ":*," + PROTOCOL_JMX_ATTRIBUTES + channelPartitionName);
+ Set mbeans = server.queryNames(on, null);
+ if (mbeans != null)
+ {
+ for (Iterator it = mbeans.iterator(); it.hasNext();)
+ {
+ server.unregisterMBean((ObjectName)it.next());
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ if (on != null)
+ log.error("Caught exception unregistering protocols at " + on, e);
+ else
+ log.error("Caught exception unregistering protocols", e);
+ }
+ }
+ }
}
Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-01-26 07:03:19 UTC (rev 8197)
@@ -41,6 +41,7 @@
import javax.management.ListenerNotFoundException;
import javax.management.MBeanNotificationInfo;
+import javax.management.MBeanServer;
import javax.management.Notification;
import javax.management.NotificationBroadcasterSupport;
import javax.management.NotificationFilter;
@@ -108,6 +109,14 @@
private static final Logger log = Logger.getLogger(MessagingPostOffice.class);
+ public static final int STATE_STANDALONE = 1;
+
+ public static final int STATE_CLUSTERED = 2;
+
+ public static final int STATE_QUARANTINED = 3;
+
+ public static final int STATE_DEAD = 4;
+
//This are only used in testing
public static final String VIEW_CHANGED_NOTIFICATION = "VIEW_CHANGED";
@@ -240,7 +249,17 @@
private boolean useJGroupsWorkaround;
private boolean failoverOnNodeLeave;
-
+
+ private StateMonitor stateMonitor = null;
+
+ private Map<Integer, QuarantinedNode> suspectedNodes = new java.util.concurrent.ConcurrentHashMap<Integer, QuarantinedNode>();
+
+ private ClusterState clusterState = new ClusterState();
+
+ private long nodeStateRefreshInterval = 30000;
+
+ private boolean keepOldFailoverModel = true;
+
// Constructors ---------------------------------------------------------------------------------
public boolean isFailoverOnNodeLeave()
@@ -325,7 +344,9 @@
boolean failoverOnNodeLeave,
int maxRetry,
int retryInterval,
- boolean retryOnConnectionFailure)
+ boolean retryOnConnectionFailure,
+ boolean keepOldFailoverModel,
+ long nodeStateRefreshInterval)
throws Exception
{
this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, pm, tr,
@@ -339,6 +360,10 @@
this.supportsFailover = supportsFailover;
+ this.keepOldFailoverModel = keepOldFailoverModel;
+
+ this.nodeStateRefreshInterval = nodeStateRefreshInterval;
+
nbSupport = new NotificationBroadcasterSupport();
replicateSemaphore = new ClearableSemaphore(maxConcurrentReplications);
@@ -402,6 +427,8 @@
put(Replicator.JVM_ID_KEY, clientVMId);
groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
+
+ initClusterState();
}
//Now load the bindings for this node
@@ -413,8 +440,260 @@
log.debug(this + " started");
}
+ //this method will trigger a dedicated thread to write time stamp
+ private synchronized void initClusterState() throws Exception
+ {
+ if (this.keepOldFailoverModel) return;
+
+ this.addThisNodeStateInStorage();
+
+ stateMonitor = new StateMonitor();
+ stateMonitor.start();
+ }
+
+ private void addThisNodeStateInStorage() throws Exception
+ {
+ if (ds == null)
+ {
+ return;
+ }
+
+ class AddNodeState extends JDBCTxRunner
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
+ PreparedStatement ps1 = null;
+
+ try
+ {
+ ps = conn.prepareStatement(getSQLStatement("UPDATE_STATE"));
+
+ ps.setInt(1, STATE_CLUSTERED);
+ ps.setInt(2, thisNodeID);
+
+ int row = ps.executeUpdate();
+
+ if (row == 0)
+ {
+ ps1 = conn.prepareStatement(getSQLStatement("INSERT_NODE_STATE"));
+ ps1.setInt(1, thisNodeID);
+ ps1.setLong(2, System.currentTimeMillis());
+ ps1.setInt(3, STATE_CLUSTERED);
+
+ ps1.executeUpdate();
+ }
+ }
+ finally
+ {
+ closeStatement(ps);
+ closeStatement(ps1);
+ }
+ return null;
+ }
+ }
+ new AddNodeState().executeWithRetry();
+ }
+
+ private Integer updateStateInStorage(final int nID, final int newState) throws Exception
+ {
+ if (ds == null)
+ {
+ return 0;
+ }
+
+ class UpdateState extends JDBCTxRunner
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
+ int row = 0;
+
+ try
+ {
+ ps = conn.prepareStatement(getSQLStatement("UPDATE_STATE"));
+
+ ps.setInt(1, newState);
+ ps.setInt(2, nID);
+
+ row = ps.executeUpdate();
+ }
+ finally
+ {
+ closeStatement(ps);
+ }
+ return new Integer(row);
+ }
+ }
+ return (Integer)new UpdateState().executeWithRetry();
+ }
+
+ private void cleanUpSuspectedNode(final Integer qNode) throws Exception
+ {
+ QuarantinedNode node = suspectedNodes.remove(qNode);
+ if (node == null)
+ {
+ log.warn("Cannot find the suspected node.");
+ }
+
+ //clean from DB
+ if (ds != null)
+ {
+ class CleanupDeadNode extends JDBCTxRunner
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
+
+ try
+ {
+ ps = conn.prepareStatement(getSQLStatement("DELETE_DEAD_NODE"));
+ ps.setInt(1, qNode);
+
+ ps.executeUpdate();
+ }
+ finally
+ {
+ closeStatement(ps);
+ }
+ return null;
+ }
+ }
+ new CleanupDeadNode().executeWithRetry();
+ }
+ }
+
+ /*
+ * 1. for each dead node, clean up.
+ * 2. for its dead buddy, failover
+ * 3. if itself being quarantined, becomes standalone.
+ */
+ private void processClusterState() throws Exception
+ {
+ Iterator<Integer> iter = suspectedNodes.keySet().iterator();
+
+ ClusterState clusterStateCopy = clusterState.copy();
+
+ while (iter.hasNext())
+ {
+ Integer qNodeID = iter.next();
+
+ Boolean isDead = clusterStateCopy.isNodeDead(qNodeID);
+
+ if (isDead == null)
+ {
+ //someone else done the job.
+ cleanDataForNode(qNodeID);
+ suspectedNodes.remove(qNodeID);
+ }
+ else if (isDead)
+ {
+
+ QuarantinedNode qNode = suspectedNodes.get(qNodeID);
+ Integer fNodeID = qNode.getFailover();
+
+ Integer foNodeID = (Integer)failoverMap.get(fNodeID);
+ if (foNodeID == null)
+ {
+ throw new IllegalStateException("Failover node " + fNodeID + " for node " + qNode + " is not alive!");
+ }
+
+ //every suspected node must have a one and only failover node.
+ //however, each dead node may have multiple failover nodes that are not in one 'brain'
+ //if a node is quarantined, it suspects all other nodes which could be still in the cluster.
+ //so this left node will be failover node for all of them, at the same time cluster nodes
+ //have a strict failover map. If a node in cluster dies, this left node and it's 'legitimate' failover
+ //node all have chance to do the failover.
+ if (fNodeID.intValue() == thisNodeID)
+ {
+ ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, qNodeID.intValue(), null);
+
+ clusterNotifier.sendNotification(notification);
+
+ // I am the failover node for the dead, perform failover now
+ if (suspectedNodes.get(qNodeID).shouldFailover() && isSupportsFailover())
+ {
+ log.debug(this + ": I am the failover node for node " + qNodeID + " that crashed");
+
+ //update node status. this definitely not me!
+ this.updateStateInStorage(qNodeID, STATE_DEAD);
+
+ performFailover(qNodeID);
+
+ // now clean up the quarantined set
+ cleanUpSuspectedNode(qNodeID);
+ }
+ }
+ else
+ {
+ //now clean up the node from this node.
+ cleanDataForNode(qNodeID);
+ suspectedNodes.remove(qNodeID);
+ }
+ }
+ }
+ }
+
+ //timestamp and query the new state from db
+ private void refreshNodeState() throws Exception
+ {
+ if (ds == null)
+ {
+ return;
+ }
+
+ class RefreshNodeState extends JDBCTxRunner
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
+
+ try
+ {
+ //writing timestamp
+ ps = conn.prepareStatement(getSQLStatement("UPDATE_TIMESTAMP"));
+
+ ps.setLong(1, System.currentTimeMillis());
+ ps.setInt(2, thisNodeID);
+
+ ps.executeUpdate();
+
+ synchronized (clusterState)
+ {
+ clusterState.clear();
+ // collect states
+ ps = conn.prepareStatement(getSQLStatement("LOAD_CLUSTER_STATE"));
+ ResultSet result = ps.executeQuery();
+
+ while (result.next())
+ {
+ int nodeID = result.getInt(1);
+ long timestamp = result.getLong(2);
+ int nodeState = result.getInt(3);
+
+ clusterState.addNode(nodeID, timestamp, nodeState);
+ }
+ }
+ }
+ finally
+ {
+ closeStatement(ps);
+ }
+ return null;
+ }
+ }
+ new RefreshNodeState().executeWithRetry();
+ }
+
+
public synchronized void stop() throws Exception
- {
+ {
+ if (this.stateMonitor != null)
+ {
+ stateMonitor.shutdown();
+ stateMonitor = null;
+ }
+
if (!started)
{
log.warn(this + " is not started");
@@ -1058,97 +1337,264 @@
*/
public void nodeJoined(Address address) throws Exception
{
- log.debug(this + ": " + address + " joined");
+ log.debug(this + ": " + address + " joined");
+ if (this.keepOldFailoverModel)
+ return;
+ Integer newNode = findNodeIDForAddress(address);
+
+ if (newNode == null)
+ {
+ // newly Joined node not added yet.
+ return;
+ }
+
+ QuarantinedNode qNode = suspectedNodes.remove(newNode);
+
+ if (qNode == null)
+ return;
+
+ log.debug("A quarantined node " + qNode + " re-joined cluster.");
+
+ // if I am the quarantined node, I do rejoin.
+ if (clusterState.isQuarantined(thisNodeID))
+ {
+ updateStateInStorage(thisNodeID, STATE_CLUSTERED);
+ new Thread()
+ {
+ public void run()
+ {
+ PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getControlChannelAddress(),
+ groupMember.getDataChannelAddress());
+ try
+ {
+ groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
+ }
+ catch (Exception e)
+ {
+ log.error("error sending node join request!", e);
+ }
+ }
+ }.start();
+ }
}
public void nodesLeft(List addresses) throws Throwable
{
- if (trace) { log.trace("Nodes left " + addresses.size()); }
-
- Map oldFailoverMap = new HashMap(this.failoverMap);
-
- int oldFailoverNodeID = failoverNodeID;
-
- if (trace) { log.trace("Old failover node id: " + oldFailoverNodeID); }
-
- calculateFailoverMap();
-
- if (trace) { log.trace("First node is now " + firstNode); }
-
+ if (trace)
+ {
+ log.trace("Nodes left " + addresses.size());
+ }
+
+ if (!keepOldFailoverModel)
+ {
+ // quarantine the nodes
+ quarantine(addresses);
+ }
+ else
+ {
+ Map oldFailoverMap = new HashMap(this.failoverMap);
+
+ int oldFailoverNodeID = failoverNodeID;
+
+ if (trace)
+ {
+ log.trace("Old failover node id: " + oldFailoverNodeID);
+ }
+
+ calculateFailoverMap();
+
+ if (trace)
+ {
+ log.trace("First node is now " + firstNode);
+ }
+
+ if (firstNode && this.useJGroupsWorkaround)
+ {
+ // If we are now the first node in the cluster then any outstanding replication requests will not get
+ // responses
+ // so we must release these and we have no more need of a semaphore until another node joins
+ replicateSemaphore.disable();
+ }
+
+ Iterator iter = addresses.iterator();
+
+ while (iter.hasNext())
+ {
+ Address address = (Address)iter.next();
+
+ log.debug(this + ": " + address + " left");
+
+ Integer leftNodeID = getNodeIDForSyncAddress(address);
+
+ if (leftNodeID == null)
+ {
+ throw new IllegalStateException(this + " cannot find node ID for address " + address);
+ }
+
+ boolean crashed = failoverOnNodeLeave || !leaveMessageReceived(leftNodeID);
+
+ log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
+
+ Integer fnodeID = (Integer)oldFailoverMap.get(leftNodeID);
+
+ log.debug(this + " the failover node for the crashed node is " + fnodeID);
+
+ boolean doneFailover = false;
+
+ if (crashed && isSupportsFailover())
+ {
+ if (fnodeID == null)
+ {
+ throw new IllegalStateException("Cannot find failover node for node " + leftNodeID);
+ }
+
+ if (fnodeID.intValue() == thisNodeID)
+ {
+ // The node crashed and we are the failover node so let's perform failover
+
+ log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
+
+ performFailover(leftNodeID);
+
+ doneFailover = true;
+ }
+ }
+
+ if (!doneFailover)
+ {
+ // Remove any replicant data and non durable bindings for the node - This will notify any listeners which
+ // will
+ // recalculate the connection factory delegates and failover delegates.
+
+ cleanDataForNode(leftNodeID);
+ }
+
+ if (trace)
+ {
+ log.trace("First node: " + firstNode +
+ " oldFailoverNodeID: " +
+ oldFailoverNodeID +
+ " failoverNodeID: " +
+ failoverNodeID);
+ }
+
+ if (oldFailoverNodeID != failoverNodeID)
+ {
+ // Failover node for this node has changed
+
+ failoverNodeChanged(oldFailoverNodeID, firstNode, false);
+ }
+ }
+ }
+ sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
+ }
+
+ /**
+ * For each node, update its state to be STATE_QUARANTINED
+ * then store the information for later failover.
+ * @throws Exception
+ */
+ private void quarantine(List addresses) throws Exception
+ {
+ Map oldFailoverMap = new HashMap(this.failoverMap);
+
+ int oldFailoverNodeID = failoverNodeID;
+
+ if (trace)
+ {
+ log.trace("Old failover node id: " + oldFailoverNodeID);
+ }
+
+ calculateFailoverMap();
+
+ if (trace)
+ {
+ log.trace("First node is now " + firstNode);
+ }
+
if (firstNode && this.useJGroupsWorkaround)
{
- //If we are now the first node in the cluster then any outstanding replication requests will not get responses
- //so we must release these and we have no more need of a semaphore until another node joins
- replicateSemaphore.disable();
+ // If we are now the first node in the cluster then any outstanding replication requests will not get responses
+ // so we must release these and we have no more need of a semaphore until another node joins
+ replicateSemaphore.disable();
}
-
- Iterator iter = addresses.iterator();
-
- while (iter.hasNext())
- {
- Address address = (Address)iter.next();
- log.debug(this + ": " + address + " left");
+ Iterator iter = addresses.iterator();
+ while (iter.hasNext())
+ {
+ Address addr = (Address)iter.next();
- Integer leftNodeID = getNodeIDForSyncAddress(address);
-
- if (leftNodeID == null)
- {
- throw new IllegalStateException(this + " cannot find node ID for address " + address);
- }
-
- boolean crashed = failoverOnNodeLeave || !leaveMessageReceived(leftNodeID);
-
- log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
-
- Integer fnodeID = (Integer)oldFailoverMap.get(leftNodeID);
-
+ Integer leftNodeID = getNodeIDForSyncAddress(addr);
+
+ if (leftNodeID == null)
+ {
+ throw new IllegalStateException(this + " cannot find node ID for address " + addr);
+ }
+
+ boolean crashed = failoverOnNodeLeave || !leaveMessageReceived(leftNodeID);
+
+ log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
+
+ Integer fnodeID = (Integer)oldFailoverMap.get(leftNodeID);
+
log.debug(this + " the failover node for the crashed node is " + fnodeID);
-
- boolean doneFailover = false;
-
- ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, leftNodeID.intValue(), null);
-
- clusterNotifier.sendNotification(notification);
-
- if (crashed && isSupportsFailover())
- {
- if (fnodeID == null)
- {
- throw new IllegalStateException("Cannot find failover node for node " + leftNodeID);
- }
-
- if (fnodeID.intValue() == thisNodeID)
- {
- // The node crashed and we are the failover node so let's perform failover
-
- log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
-
- performFailover(leftNodeID);
-
- doneFailover = true;
- }
- }
-
- if (!doneFailover)
- {
- // Remove any replicant data and non durable bindings for the node - This will notify any listeners which will
- // recalculate the connection factory delegates and failover delegates.
-
- cleanDataForNode(leftNodeID);
- }
-
- if (trace) {log.trace("First node: " + firstNode + " oldFailoverNodeID: " + oldFailoverNodeID + " failoverNodeID: " + failoverNodeID); }
-
- if (oldFailoverNodeID != failoverNodeID)
- {
- //Failover node for this node has changed
-
- failoverNodeChanged(oldFailoverNodeID, firstNode, false);
- }
- }
-
- sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
+
+ // if its failover node is not in the new failoverMap, that means its failover node is quarantined with it
+ Integer ffNode = (Integer)failoverMap.get(fnodeID);
+ Integer currentFNode;
+ while (ffNode == null)
+ {
+ // Note this requires the failover map must form a circle! otherwise this may become an infinite loop!
+ currentFNode = fnodeID;
+ // search old map
+ fnodeID = (Integer)oldFailoverMap.get(currentFNode);
+ // then new map
+ ffNode = (Integer)failoverMap.get(fnodeID);
+ }
+
+ // now the fnodeID is an active node
+ QuarantinedNode qNode = new QuarantinedNode(leftNodeID, fnodeID, crashed);
+
+ // now we need take care of those already quarantined nodes
+ Iterator<Integer> iterQ = suspectedNodes.keySet().iterator();
+ while (iterQ.hasNext())
+ {
+ QuarantinedNode aNode = suspectedNodes.get(iterQ.next());
+ if (aNode.getFailover().equals(leftNodeID))
+ {
+ // We need to transfer the failover responsibility to the new active one
+ aNode.setFailover(fnodeID);
+ }
+ }
+
+ suspectedNodes.put(leftNodeID, qNode);
+
+ if (isFirstNode())
+ {
+ clusterState.updateNodeState(thisNodeID, STATE_QUARANTINED);
+ }
+
+ if (trace)
+ {
+ log.trace("Quarantined node: " + qNode);
+ }
+ }
+
+ if (trace)
+ {
+ log.trace("First node: " + firstNode +
+ " oldFailoverNodeID: " +
+ oldFailoverNodeID +
+ " failoverNodeID: " +
+ failoverNodeID);
+ }
+
+ if (oldFailoverNodeID != failoverNodeID)
+ {
+ // Failover node for this node has changed
+ failoverNodeChanged(oldFailoverNodeID, firstNode, false);
+ }
+ stateMonitor.newQuarantined();
}
// RequestTarget implementation ------------------------------------------------------------
@@ -1318,6 +1764,14 @@
sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
}
+ //some node is finally dead and failed over
+ //so it's safe to remove it now.
+ public void handleNodeDead(int nodeId)
+ {
+ QuarantinedNode node = suspectedNodes.remove(nodeId);
+ log.info("Quarantined node " + node + " is finally dead.");
+ }
+
/**
* @param originatorNodeID - the ID of the node that initiated the modification.
*/
@@ -1688,7 +2142,24 @@
"CLUSTERED, " +
"ALL_NODES " +
"FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?");
+ map.put("UPDATE_STATE",
+ "UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?");
+ map.put("UPDATE_STATE",
+ "UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?");
+
+ map.put("UPDATE_TIMESTAMP",
+ "UPDATE JBM_CLUSTER SET PING_TIMESTAMP = ? WHERE NODE_ID = ?");
+
+ map.put("LOAD_CLUSTER_STATE",
+ "SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER");
+
+ map.put("INSERT_NODE_STATE",
+ "INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, ?, ?)");
+
+ map.put("DELETE_DEAD_NODE",
+ "DELETE FROM JBM_CLUSTER WHERE NODE_ID = ?");
+
return map;
}
@@ -1700,6 +2171,9 @@
"QUEUE_NAME VARCHAR(255), CONDITION VARCHAR(1023), " +
"SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT, " +
"CLUSTERED CHAR(1), ALL_NODES CHAR(1), PRIMARY KEY(POSTOFFICE_NAME, NODE_ID, QUEUE_NAME))");
+ map.put("CREATE_CLUSTER_STATE_TABLE", "CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, " +
+ "PING_TIMESTAMP BIGINT, STATE INTEGER, PRIMARY KEY(NODE_ID)) ENGINE = INNODB");
+
return map;
}
@@ -3341,7 +3815,10 @@
//Now clean the data for the failed node
//TODO - does this need to be inside the lock above?
- cleanDataForNode(failedNodeID);
+ if (keepOldFailoverModel)
+ {
+ cleanDataForNode(failedNodeID);
+ }
log.debug(this + " announcing that failover procedure is complete");
@@ -3476,4 +3953,205 @@
}
+ /*
+ * This thread does the following:
+ *
+ * Periodically update the timestamp of this node
+ * and monitor its status. If it becomes quarantined, it will
+ * make itself to be a standalone
+ * it also monitors its buddy's state, if it is quarantined, see if it will really die.
+ * When it dies, trigger failover then.
+ */
+ private class StateMonitor extends Thread
+ {
+ private boolean working = true;
+
+ public synchronized void run()
+ {
+ do
+ {
+ ClusterState newState;
+ try
+ {
+ refreshNodeState();
+ processClusterState();
+ }
+ catch (Exception e)
+ {
+ log.error("Error refreshing state of node: " + thisNodeID, e);
+ }
+
+ try
+ {
+ wait(nodeStateRefreshInterval);
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ }
+ while (working);
+ log.debug("Stop monitoring the stats at node " + thisNodeID);
+ }
+
+ public synchronized void shutdown()
+ {
+ working = false;
+ notify();
+ }
+
+ public synchronized void newQuarantined()
+ {
+ notify();
+ }
+ }
+
+ // not for concurrent use!
+ private class ClusterState
+ {
+ Map<Integer, NodeState> states = new java.util.concurrent.ConcurrentHashMap<Integer, NodeState>();
+
+ private ClusterState(Map<Integer, NodeState> copy)
+ {
+ states = new HashMap<Integer, NodeState>(copy);
+ }
+
+ public void updateNodeState(int nodeID, int newState) throws Exception
+ {
+ NodeState node = states.get(nodeID);
+ node.setState(newState);
+ updateStateInStorage(nodeID, newState);
+ }
+
+ public boolean allDeadButMe(int nodeID)
+ {
+ Iterator<NodeState> iter = states.values().iterator();
+ while (iter.hasNext())
+ {
+ NodeState node = iter.next();
+ if (!node.isDead() && (node.getID() != nodeID))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void clear()
+ {
+ states.clear();
+ }
+
+ public ClusterState()
+ {
+ }
+
+ public void addNode(int nodeID, long timestamp, int nodeState)
+ {
+ states.put(nodeID, new NodeState(nodeID, timestamp, nodeState));
+ }
+
+ public ClusterState copy()
+ {
+ return new ClusterState(states);
+ }
+
+ public boolean isQuarantined(int qNodeID)
+ {
+ NodeState nState = states.get(qNodeID);
+ if (nState == null)
+ return false;
+ return nState.isQurarntined();
+ }
+
+ public Boolean isNodeDead(Integer qNodeID)
+ {
+ NodeState nState = states.get(qNodeID);
+
+ // if doesn't exists, it is already dead.
+ if (nState == null)
+ return null;
+
+ return nState.isDead();
+ }
+ }
+
+ private class NodeState
+ {
+ private int nodeID;
+
+ private long timestamp;
+
+ private int state;
+
+ public NodeState(int nodeID, long timestamp, int state)
+ {
+ this.nodeID = nodeID;
+ this.timestamp = timestamp;
+ this.state = state;
+ }
+
+ public void setState(int newState)
+ {
+ state = newState;
+ }
+
+ public int getID()
+ {
+ return nodeID;
+ }
+
+ public boolean isQurarntined()
+ {
+ return state == STATE_QUARANTINED;
+ }
+
+ // don't rely on state, use timestamp
+ public boolean isDead()
+ {
+ long currentTime = System.currentTimeMillis();
+ long stampAge = currentTime - timestamp;
+ if (stampAge > (2 * nodeStateRefreshInterval))
+ {
+ return true;
+ }
+ return false;
+ }
+ }
+
+ public boolean isAvailable()
+ {
+ return !clusterState.isQuarantined(thisNodeID);
+ }
+
+ public boolean isKeepOldFailoverModel()
+ {
+ return keepOldFailoverModel;
+ }
+
+ public void setKeepOldFailoverModel(boolean isKeep)
+ {
+ keepOldFailoverModel = isKeep;
+ }
+
+ public long getNodeStateRefreshInterval()
+ {
+ return nodeStateRefreshInterval;
+ }
+
+ public void setNodeStateRefreshInterval(long newValue)
+ {
+ nodeStateRefreshInterval = newValue;
+ }
+
+ public void registerChannelInJmx(MBeanServer server, String channelPartitionName)
+ {
+ groupMember.registerChannelInJmx(server, channelPartitionName);
+ }
+
+ public void unregisterChannelInJmx(MBeanServer server, String channelPartitionName)
+ {
+ groupMember.unregisterChannelInJmx(server, channelPartitionName);
+ }
+
}
Added: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/QuarantinedNode.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/QuarantinedNode.java (rev 0)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/QuarantinedNode.java 2011-01-26 07:03:19 UTC (rev 8197)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2011, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.impl.postoffice;
+
+/**
+ * A QuarantinedNode
+ *
+ * @author howard
+ *
+ * Created Jan 18, 2011 11:02:55 AM
+ *
+ *
+ */
+public class QuarantinedNode
+{
+ private Integer nodeID;
+ private Integer failoverID; //it must be an active node at the time of being quarantined.
+ private boolean crashed;
+
+ public QuarantinedNode(Integer leftNodeID, Integer failoverID, boolean crashed)
+ {
+ nodeID = leftNodeID;
+ this.failoverID = failoverID;
+ this.crashed = crashed;
+ }
+
+ public String toString()
+ {
+ return "Quarantined Node[" + nodeID + "], failover[" + failoverID + "], crashed[" + crashed + "]";
+ }
+
+ public Integer getFailover()
+ {
+ return failoverID;
+ }
+
+ public void setFailover(Integer newFailover)
+ {
+ this.failoverID = newFailover;
+ }
+
+ public boolean shouldFailover()
+ {
+ return crashed;
+ }
+
+}
Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java 2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java 2011-01-26 07:03:19 UTC (rev 8197)
@@ -68,4 +68,8 @@
void handleAddAllReplicatedDeliveries(int nodeID, Map deliveries) throws Exception;
void handleGetReplicatedDeliveries(String queueName, Address returnAddress) throws Exception;
+
+ void handleNodeDead(int nodeId);
+
+ boolean isAvailable();
}
Modified: branches/port1842/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java 2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java 2011-01-26 07:03:19 UTC (rev 8197)
@@ -100,6 +100,10 @@
private boolean failoverOnNodeLeave;
+ private boolean keepOldFailoverModel = true;
+
+ private long nodeStateRefreshInterval = 30000;
+
private MessagingPostOffice postOffice;
// Constructors --------------------------------------------------
@@ -345,7 +349,48 @@
}
}
-
+ public boolean isKeepOldFailoverModel()
+ {
+ if (started)
+ {
+ return postOffice.isKeepOldFailoverModel();
+ }
+ return keepOldFailoverModel;
+ }
+
+ public void setKeepOldFailoverModel(boolean isKeep)
+ {
+ if (started)
+ {
+ postOffice.setKeepOldFailoverModel(isKeep);
+ }
+ else
+ {
+ keepOldFailoverModel = isKeep;
+ }
+ }
+
+ public long getNodeStateRefreshInterval()
+ {
+ if (started)
+ {
+ return postOffice.getNodeStateRefreshInterval();
+ }
+ return nodeStateRefreshInterval;
+ }
+
+ public void setNodeStateRefreshInterval(long newValue)
+ {
+ if (started)
+ {
+ postOffice.setNodeStateRefreshInterval(newValue);
+ }
+ else
+ {
+ nodeStateRefreshInterval = newValue;
+ }
+ }
+
public String listBindings()
{
return postOffice.printBindingInformation();
@@ -450,7 +495,9 @@
failoverOnNodeLeave,
maxRetry,
retryInterval,
- retryOnConnectionFailure);
+ retryOnConnectionFailure,
+ keepOldFailoverModel,
+ nodeStateRefreshInterval);
}
else
{
@@ -468,6 +515,18 @@
postOffice.start();
started = true;
+
+ if (clustered)
+ {
+ try
+ {
+ postOffice.registerChannelInJmx(server, channelPartitionName);
+ }
+ catch (Exception e)
+ {
+ log.error("Caught exception registering channel in JXM", e);
+ }
+ }
}
catch (Throwable t)
{
@@ -482,6 +541,11 @@
throw new IllegalStateException("Service is not started");
}
+ if (clustered)
+ {
+ postOffice.unregisterChannelInJmx(server, channelPartitionName);
+ }
+
super.stopService();
try
Modified: branches/port1842/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java
===================================================================
--- branches/port1842/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java 2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java 2011-01-26 07:03:19 UTC (rev 8197)
@@ -95,7 +95,7 @@
sc.getPostOfficeSQLProperties(), true, nodeID,
"Clustered", ms, pm, tr, ff, cf, idm, cn,
groupName, jChannelFactory,
- stateTimeout, castTimeout, true, 100, false, 25, 1000, false);
+ stateTimeout, castTimeout, true, 100, false, 25, 1000, false, true, 30000);
postOffice.start();
More information about the jboss-cvs-commits
mailing list