[riftsaw-commits] riftsaw SVN: r965 - trunk/runtime/clustering/src/main/java/org/jboss/soa/bpel/clustering.

riftsaw-commits at lists.jboss.org riftsaw-commits at lists.jboss.org
Tue Sep 21 01:31:36 EDT 2010


Author: jeff.yuchang
Date: 2010-09-21 01:31:35 -0400 (Tue, 21 Sep 2010)
New Revision: 965

Modified:
   trunk/runtime/clustering/src/main/java/org/jboss/soa/bpel/clustering/JBossClusteringService.java
   trunk/runtime/clustering/src/main/java/org/jboss/soa/bpel/clustering/ODEJobClusterListener.java
Log:
* Asked Listener to update the nodelist to SimpleScheduler service.


Modified: trunk/runtime/clustering/src/main/java/org/jboss/soa/bpel/clustering/JBossClusteringService.java
===================================================================
--- trunk/runtime/clustering/src/main/java/org/jboss/soa/bpel/clustering/JBossClusteringService.java	2010-09-21 05:17:25 UTC (rev 964)
+++ trunk/runtime/clustering/src/main/java/org/jboss/soa/bpel/clustering/JBossClusteringService.java	2010-09-21 05:31:35 UTC (rev 965)
@@ -17,8 +17,6 @@
  */
 package org.jboss.soa.bpel.clustering;
 
-import java.util.Iterator;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.ha.framework.interfaces.HAPartition;
@@ -43,23 +41,10 @@
 	
 	public void start() {
 		engine = getBpelEngine();
-		listener = new ODEJobClusterListener(engine.getSchedulerDAOConnectionFactory(), engine.getTransactionManager());
+		listener = new ODEJobClusterListener(engine.getScheduler(), engine.getSchedulerDAOConnectionFactory(), engine.getTransactionManager());
 		this.haPartition.registerMembershipListener(listener);
 		logger.debug("Registered ODEJobCluster Listener.");
 		
-/*		logger.info("==============CLUSTERING INFORMATION===================");
-		logger.info("NodeName =>" + haPartition.getNodeName());
-		logger.info("PartitionNode =>" + haPartition.getPartitionName());
-		logger.info("Cluster Node, name is " + haPartition.getClusterNode().getName() + 
-				" ip is: " + haPartition.getClusterNode().getIpAddress()
-				+ " port is: " + haPartition.getClusterNode().getPort());
-		
-		Iterator it = haPartition.getCurrentView().iterator();
-		while (it.hasNext()) {
-			Object o = it.next();
-			logger.info("view -> " + o );
-		}*/
-		
 	}
 	
 	public String getMasterNodeId() {

Modified: trunk/runtime/clustering/src/main/java/org/jboss/soa/bpel/clustering/ODEJobClusterListener.java
===================================================================
--- trunk/runtime/clustering/src/main/java/org/jboss/soa/bpel/clustering/ODEJobClusterListener.java	2010-09-21 05:17:25 UTC (rev 964)
+++ trunk/runtime/clustering/src/main/java/org/jboss/soa/bpel/clustering/ODEJobClusterListener.java	2010-09-21 05:31:35 UTC (rev 965)
@@ -17,13 +17,17 @@
  */
 package org.jboss.soa.bpel.clustering;
 
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Vector;
 
 import javax.transaction.TransactionManager;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.ClusterAware;
+import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.ode.dao.scheduler.DatabaseException;
 import org.apache.ode.dao.scheduler.SchedulerDAOConnection;
 import org.apache.ode.dao.scheduler.SchedulerDAOConnectionFactory;
@@ -41,9 +45,12 @@
 	
 	private SchedulerDAOConnectionFactory schedulerCF;
 	
+	private Scheduler scheduler;
+	
 	private TransactionManager txm;
 	
-	public ODEJobClusterListener(SchedulerDAOConnectionFactory scf, TransactionManager tmgr) {
+	public ODEJobClusterListener(Scheduler scheduler, SchedulerDAOConnectionFactory scf, TransactionManager tmgr) {
+		this.scheduler = scheduler;
 		this.schedulerCF = scf;
 		this.txm = tmgr;
 	}
@@ -53,14 +60,19 @@
 	 * Move jobs that assoicated with dead members to an active node.
 	 * 
 	 */
-	public void membershipChanged(Vector deadMemebers, Vector newMembers, Vector allMemebers) {
-		if (allMemebers.isEmpty()) {
+	public void membershipChanged(Vector deadMembers, Vector newMembers, Vector allMembers) {
+		if (allMembers.isEmpty()) {
 			throw new ClusteringException("There are no active nodes in this clustering environment");
 		}
-		if (!deadMemebers.isEmpty()) {
+		
+		if (scheduler instanceof ClusterAware) {
+			setClusterNodeListsIntoScheduler(allMembers);
+		}
+		
+		if (!deadMembers.isEmpty()) {
 			SchedulerDAOConnection conn = schedulerCF.getConnection();
-			String activeNodeId = allMemebers.iterator().next().toString();
-			Iterator<?> it = deadMemebers.iterator();
+			String activeNodeId = allMembers.iterator().next().toString();
+			Iterator<?> it = deadMembers.iterator();
 			try {
 				while (it.hasNext()) {
 					String deadNodeId = it.next().toString();
@@ -78,4 +90,15 @@
 		
 	}
 
+	
+	private void setClusterNodeListsIntoScheduler(Vector allMembers) {
+		List<String> nodeLists = new ArrayList<String>();
+		Iterator<?> nodes = allMembers.iterator();
+		while (nodes.hasNext()) {
+			String node = nodes.next().toString();
+			nodeLists.add(node);
+		}
+		((ClusterAware)scheduler).setNodeList(nodeLists);
+	}
+
 }



More information about the riftsaw-commits mailing list