[jboss-cvs] JBoss Messaging SVN: r8173 - in branches/JBM1842: src/main/org/jboss/messaging/core/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jan 19 10:09:53 EST 2011


Author: gaohoward
Date: 2011-01-19 10:09:52 -0500 (Wed, 19 Jan 2011)
New Revision: 8173

Modified:
   branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
   branches/JBM1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
   branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   branches/JBM1842/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java
Log:
save work


Modified: branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml	2011-01-19 07:08:59 UTC (rev 8172)
+++ branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml	2011-01-19 15:09:52 UTC (rev 8173)
@@ -130,7 +130,7 @@
 INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
 DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
 LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
-CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP BIGINT, STATE INTEGER, PRIMARY KEY(NODE_ID)))
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP BIGINT, STATE INTEGER, PRIMARY KEY(NODE_ID)) ENGINE = INNODB
 UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?
 UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP = ? WHERE NODE_ID = ?
 LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER

Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2011-01-19 07:08:59 UTC (rev 8172)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2011-01-19 15:09:52 UTC (rev 8173)
@@ -335,7 +335,6 @@
                   if (log.isTraceEnabled()) { log.trace("Executing: " + statement); }
                       
                   st = conn.createStatement();
-                  
                   st.executeUpdate(statement);
                }
                catch (Exception e) 

Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-01-19 07:08:59 UTC (rev 8172)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-01-19 15:09:52 UTC (rev 8173)
@@ -126,6 +126,7 @@
    
    private static final ExecutorFactory executorFactory = new OrderedExecutorFactory(
            Executors.newCachedThreadPool(new JBMThreadFactory("msg-post-office")));
+
    //End only used in testing
 
    // Static ---------------------------------------------------------------------------------------
@@ -263,10 +264,9 @@
 
    private ClusterState clusterState = new ClusterState();
    
-   private long refreshPeriod = 30000;
+   private long nodeStateRefreshInterval = 30000;
    
-   //default to true
-   private boolean keepOldFailoverModel = false;
+   private boolean keepOldFailoverModel = true;
 
    
    // Constructors ---------------------------------------------------------------------------------
@@ -299,8 +299,7 @@
                               ClusterNotifier clusterNotifier,
                               int maxRetry,
                               int retryInterval,
-                              boolean retryOnConnectionFailure,
-                              boolean keepOldFailoverModel)
+                              boolean retryOnConnectionFailure)
       throws Exception
    {
    	super (ds, tm, sqlProperties, createTablesOnStartup, maxRetry, retryInterval, retryOnConnectionFailure);
@@ -324,8 +323,6 @@
       this.channelIDManager = channelIDManager;
       
       this.clusterNotifier = clusterNotifier;
-      
-      this.keepOldFailoverModel = keepOldFailoverModel;
 
       lock = new ReentrantWriterPreferenceReadWriteLock();
        
@@ -357,11 +354,12 @@
                               int maxRetry,
                               int retryInterval,
                               boolean retryOnConnectionFailure,
-                              boolean keepOldFailoverModel)
+                              boolean keepOldFailoverModel,
+                              long nodeStateRefreshInterval)
       throws Exception
    {
    	this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, pm, tr,
-   		  filterFactory, conditionFactory, channelIDManager, clusterNotifier, maxRetry, retryInterval, retryOnConnectionFailure, keepOldFailoverModel);
+   		  filterFactory, conditionFactory, channelIDManager, clusterNotifier, maxRetry, retryInterval, retryOnConnectionFailure);
      
       this.clustered = true;
       
@@ -373,6 +371,10 @@
 
       this.supportsFailover = supportsFailover;
       
+      this.keepOldFailoverModel = keepOldFailoverModel;
+      
+      this.nodeStateRefreshInterval = nodeStateRefreshInterval;
+      
       nbSupport = new NotificationBroadcasterSupport();
       
       replicateSemaphore = new ClearableSemaphore(maxConcurrentReplications);
@@ -570,7 +572,7 @@
             QuarantinedNode qNode = quarantinedNodes.get(qNodeID);
             Integer fNodeID = qNode.getFailover();
             
-            Integer foNodeID = (Integer)failoverMap.get(qNodeID);
+            Integer foNodeID = (Integer)failoverMap.get(fNodeID);
             if (foNodeID == null)
             {
                throw new IllegalStateException("Failover node " + fNodeID + " for node " + qNode + " is not alive!");
@@ -1369,6 +1371,12 @@
       log.debug(this + ": " + address + " joined");
       Integer newNode = findNodeIDForAddress(address);
       
+      if (newNode == null)
+      {
+         //myself
+         return;
+      }
+      
       QuarantinedNode qNode = quarantinedNodes.remove(newNode);
       
       if (qNode == null) return;
@@ -2047,6 +2055,20 @@
                  "ALL_NODES " +
                  "FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?");
 
+      map.put("UPDATE_STATE",
+              "UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?");
+
+      map.put("UPDATE_STATE",
+              "UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?");
+
+      map.put("UPDATE_TIMESTAMP",
+              "UPDATE JBM_CLUSTER SET PING_TIMESTAMP = ? WHERE NODE_ID = ?");
+
+      map.put("LOAD_CLUSTER_STATE",
+              "SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER");
+
+      map.put("INSERT_NODE_STATE",
+              "INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, ?, ?)");
       return map;
    }
 
@@ -2058,6 +2080,8 @@
               "QUEUE_NAME VARCHAR(255), CONDITION VARCHAR(1023), " +
               "SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT, " +
               "CLUSTERED CHAR(1), ALL_NODES CHAR(1), PRIMARY KEY(POSTOFFICE_NAME, NODE_ID, QUEUE_NAME))");
+      map.put("CREATE_CLUSTER_STATE_TABLE", "CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, " + 
+              "PING_TIMESTAMP BIGINT, STATE INTEGER, PRIMARY KEY(NODE_ID)) ENGINE = INNODB");
       return map;
    }
 
@@ -3859,12 +3883,12 @@
             }
             catch (Exception e)
             {
-               log.error("Error refreshing state of node: " + thisNodeID);
+               log.error("Error refreshing state of node: " + thisNodeID, e);
             }
 
             try
             {
-               wait(refreshPeriod);
+               wait(nodeStateRefreshInterval);
             }
             catch (InterruptedException e)
             {
@@ -3917,6 +3941,7 @@
       public boolean isQuarantined(int qNodeID)
       {
          NodeState nState = states.get(qNodeID);
+         if (nState == null) return false;
          return nState.isQurarntined();
       }
 
@@ -3950,7 +3975,7 @@
       {
          long currentTime = System.currentTimeMillis();
          long stampAge = currentTime - timestamp;
-         if (stampAge > (2 * refreshPeriod))
+         if (stampAge > (2 * nodeStateRefreshInterval))
          {
             return true;
          }
@@ -3972,5 +3997,15 @@
    {
       keepOldFailoverModel = isKeep;
    }
+   
+   public long getNodeStateRefreshInterval()
+   {
+      return nodeStateRefreshInterval;
+   }
+   
+   public void setNodeStateRefreshInterval(long newValue)
+   {
+      nodeStateRefreshInterval = newValue;
+   }
 
 }

Modified: branches/JBM1842/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java	2011-01-19 07:08:59 UTC (rev 8172)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java	2011-01-19 15:09:52 UTC (rev 8173)
@@ -100,8 +100,10 @@
    
    private boolean failoverOnNodeLeave;
    
-   private boolean keepOldFailoverModel;
+   private boolean keepOldFailoverModel = true;
 
+   private long nodeStateRefreshInterval = 30000;
+
    private MessagingPostOffice postOffice;
 
    // Constructors --------------------------------------------------
@@ -367,8 +369,28 @@
          keepOldFailoverModel = isKeep;
       }
 	}
+	
+	public long getNodeStateRefreshInterval()
+	{
+      if (started)
+      {
+         return postOffice.getNodeStateRefreshInterval();
+      }
+      return nodeStateRefreshInterval;
+	}
+   
+   public void setNodeStateRefreshInterval(long newValue)
+   {
+      if (started)
+      {
+         postOffice.setNodeStateRefreshInterval(newValue);
+      }
+      else
+      {
+         nodeStateRefreshInterval = newValue;
+      }
+   }
 
-   
    public String listBindings()
    {
       return postOffice.printBindingInformation();
@@ -417,7 +439,7 @@
          FilterFactory ff = new SelectorFactory();
          
          if (clustered)
-         {        
+         {
             ChannelFactory jChannelFactory = null;
 
          	if (channelFactoryName != null)
@@ -474,7 +496,8 @@
                                                   maxRetry,
                                                   retryInterval,
                                                   retryOnConnectionFailure,
-                                                  keepOldFailoverModel);
+                                                  keepOldFailoverModel,
+                                                  nodeStateRefreshInterval);
          }
          else
          {
@@ -486,8 +509,7 @@
 											                 clusterNotifier,
 											                 maxRetry,
 											                 retryInterval,
-											                 retryOnConnectionFailure,
-											                 keepOldFailoverModel);
+											                 retryOnConnectionFailure);
          }
 
          postOffice.start();



More information about the jboss-cvs-commits mailing list