[jboss-cvs] JBoss Messaging SVN: r8173 - in branches/JBM1842: src/main/org/jboss/messaging/core/impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jan 19 10:09:53 EST 2011
Author: gaohoward
Date: 2011-01-19 10:09:52 -0500 (Wed, 19 Jan 2011)
New Revision: 8173
Modified:
branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
branches/JBM1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
branches/JBM1842/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java
Log:
save work
Modified: branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml 2011-01-19 07:08:59 UTC (rev 8172)
+++ branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml 2011-01-19 15:09:52 UTC (rev 8173)
@@ -130,7 +130,7 @@
INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
-CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP BIGINT, STATE INTEGER, PRIMARY KEY(NODE_ID)))
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP BIGINT, STATE INTEGER, PRIMARY KEY(NODE_ID)) ENGINE = INNODB
UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?
UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP = ? WHERE NODE_ID = ?
LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java 2011-01-19 07:08:59 UTC (rev 8172)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java 2011-01-19 15:09:52 UTC (rev 8173)
@@ -335,7 +335,6 @@
if (log.isTraceEnabled()) { log.trace("Executing: " + statement); }
st = conn.createStatement();
-
st.executeUpdate(statement);
}
catch (Exception e)
Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-01-19 07:08:59 UTC (rev 8172)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-01-19 15:09:52 UTC (rev 8173)
@@ -126,6 +126,7 @@
private static final ExecutorFactory executorFactory = new OrderedExecutorFactory(
Executors.newCachedThreadPool(new JBMThreadFactory("msg-post-office")));
+
//End only used in testing
// Static ---------------------------------------------------------------------------------------
@@ -263,10 +264,9 @@
private ClusterState clusterState = new ClusterState();
- private long refreshPeriod = 30000;
+ private long nodeStateRefreshInterval = 30000;
- //default to true
- private boolean keepOldFailoverModel = false;
+ private boolean keepOldFailoverModel = true;
// Constructors ---------------------------------------------------------------------------------
@@ -299,8 +299,7 @@
ClusterNotifier clusterNotifier,
int maxRetry,
int retryInterval,
- boolean retryOnConnectionFailure,
- boolean keepOldFailoverModel)
+ boolean retryOnConnectionFailure)
throws Exception
{
super (ds, tm, sqlProperties, createTablesOnStartup, maxRetry, retryInterval, retryOnConnectionFailure);
@@ -324,8 +323,6 @@
this.channelIDManager = channelIDManager;
this.clusterNotifier = clusterNotifier;
-
- this.keepOldFailoverModel = keepOldFailoverModel;
lock = new ReentrantWriterPreferenceReadWriteLock();
@@ -357,11 +354,12 @@
int maxRetry,
int retryInterval,
boolean retryOnConnectionFailure,
- boolean keepOldFailoverModel)
+ boolean keepOldFailoverModel,
+ long nodeStateRefreshInterval)
throws Exception
{
this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, pm, tr,
- filterFactory, conditionFactory, channelIDManager, clusterNotifier, maxRetry, retryInterval, retryOnConnectionFailure, keepOldFailoverModel);
+ filterFactory, conditionFactory, channelIDManager, clusterNotifier, maxRetry, retryInterval, retryOnConnectionFailure);
this.clustered = true;
@@ -373,6 +371,10 @@
this.supportsFailover = supportsFailover;
+ this.keepOldFailoverModel = keepOldFailoverModel;
+
+ this.nodeStateRefreshInterval = nodeStateRefreshInterval;
+
nbSupport = new NotificationBroadcasterSupport();
replicateSemaphore = new ClearableSemaphore(maxConcurrentReplications);
@@ -570,7 +572,7 @@
QuarantinedNode qNode = quarantinedNodes.get(qNodeID);
Integer fNodeID = qNode.getFailover();
- Integer foNodeID = (Integer)failoverMap.get(qNodeID);
+ Integer foNodeID = (Integer)failoverMap.get(fNodeID);
if (foNodeID == null)
{
throw new IllegalStateException("Failover node " + fNodeID + " for node " + qNode + " is not alive!");
@@ -1369,6 +1371,12 @@
log.debug(this + ": " + address + " joined");
Integer newNode = findNodeIDForAddress(address);
+ if (newNode == null)
+ {
+ //myself
+ return;
+ }
+
QuarantinedNode qNode = quarantinedNodes.remove(newNode);
if (qNode == null) return;
@@ -2047,6 +2055,20 @@
"ALL_NODES " +
"FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?");
+ map.put("UPDATE_STATE",
+ "UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?");
+
+ map.put("UPDATE_STATE",
+ "UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?");
+
+ map.put("UPDATE_TIMESTAMP",
+ "UPDATE JBM_CLUSTER SET PING_TIMESTAMP = ? WHERE NODE_ID = ?");
+
+ map.put("LOAD_CLUSTER_STATE",
+ "SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER");
+
+ map.put("INSERT_NODE_STATE",
+ "INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, ?, ?)");
return map;
}
@@ -2058,6 +2080,8 @@
"QUEUE_NAME VARCHAR(255), CONDITION VARCHAR(1023), " +
"SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT, " +
"CLUSTERED CHAR(1), ALL_NODES CHAR(1), PRIMARY KEY(POSTOFFICE_NAME, NODE_ID, QUEUE_NAME))");
+ map.put("CREATE_CLUSTER_STATE_TABLE", "CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, " +
+ "PING_TIMESTAMP BIGINT, STATE INTEGER, PRIMARY KEY(NODE_ID)) ENGINE = INNODB");
return map;
}
@@ -3859,12 +3883,12 @@
}
catch (Exception e)
{
- log.error("Error refreshing state of node: " + thisNodeID);
+ log.error("Error refreshing state of node: " + thisNodeID, e);
}
try
{
- wait(refreshPeriod);
+ wait(nodeStateRefreshInterval);
}
catch (InterruptedException e)
{
@@ -3917,6 +3941,7 @@
public boolean isQuarantined(int qNodeID)
{
NodeState nState = states.get(qNodeID);
+ if (nState == null) return false;
return nState.isQurarntined();
}
@@ -3950,7 +3975,7 @@
{
long currentTime = System.currentTimeMillis();
long stampAge = currentTime - timestamp;
- if (stampAge > (2 * refreshPeriod))
+ if (stampAge > (2 * nodeStateRefreshInterval))
{
return true;
}
@@ -3972,5 +3997,15 @@
{
keepOldFailoverModel = isKeep;
}
+
+ public long getNodeStateRefreshInterval()
+ {
+ return nodeStateRefreshInterval;
+ }
+
+ public void setNodeStateRefreshInterval(long newValue)
+ {
+ nodeStateRefreshInterval = newValue;
+ }
}
Modified: branches/JBM1842/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java 2011-01-19 07:08:59 UTC (rev 8172)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java 2011-01-19 15:09:52 UTC (rev 8173)
@@ -100,8 +100,10 @@
private boolean failoverOnNodeLeave;
- private boolean keepOldFailoverModel;
+ private boolean keepOldFailoverModel = true;
+ private long nodeStateRefreshInterval = 30000;
+
private MessagingPostOffice postOffice;
// Constructors --------------------------------------------------
@@ -367,8 +369,28 @@
keepOldFailoverModel = isKeep;
}
}
+
+ public long getNodeStateRefreshInterval()
+ {
+ if (started)
+ {
+ return postOffice.getNodeStateRefreshInterval();
+ }
+ return nodeStateRefreshInterval;
+ }
+
+ public void setNodeStateRefreshInterval(long newValue)
+ {
+ if (started)
+ {
+ postOffice.setNodeStateRefreshInterval(newValue);
+ }
+ else
+ {
+ nodeStateRefreshInterval = newValue;
+ }
+ }
-
public String listBindings()
{
return postOffice.printBindingInformation();
@@ -417,7 +439,7 @@
FilterFactory ff = new SelectorFactory();
if (clustered)
- {
+ {
ChannelFactory jChannelFactory = null;
if (channelFactoryName != null)
@@ -474,7 +496,8 @@
maxRetry,
retryInterval,
retryOnConnectionFailure,
- keepOldFailoverModel);
+ keepOldFailoverModel,
+ nodeStateRefreshInterval);
}
else
{
@@ -486,8 +509,7 @@
clusterNotifier,
maxRetry,
retryInterval,
- retryOnConnectionFailure,
- keepOldFailoverModel);
+ retryOnConnectionFailure);
}
postOffice.start();
More information about the jboss-cvs-commits
mailing list