Author: jeff.yuchang
Date: 2010-09-21 01:31:35 -0400 (Tue, 21 Sep 2010)
New Revision: 965
Modified:
trunk/runtime/clustering/src/main/java/org/jboss/soa/bpel/clustering/JBossClusteringService.java
trunk/runtime/clustering/src/main/java/org/jboss/soa/bpel/clustering/ODEJobClusterListener.java
Log:
* Asked Listener to update the nodelist to SimpleScheduler service.
Modified:
trunk/runtime/clustering/src/main/java/org/jboss/soa/bpel/clustering/JBossClusteringService.java
===================================================================
---
trunk/runtime/clustering/src/main/java/org/jboss/soa/bpel/clustering/JBossClusteringService.java 2010-09-21
05:17:25 UTC (rev 964)
+++
trunk/runtime/clustering/src/main/java/org/jboss/soa/bpel/clustering/JBossClusteringService.java 2010-09-21
05:31:35 UTC (rev 965)
@@ -17,8 +17,6 @@
*/
package org.jboss.soa.bpel.clustering;
-import java.util.Iterator;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.ha.framework.interfaces.HAPartition;
@@ -43,23 +41,10 @@
public void start() {
engine = getBpelEngine();
- listener = new ODEJobClusterListener(engine.getSchedulerDAOConnectionFactory(),
engine.getTransactionManager());
+ listener = new ODEJobClusterListener(engine.getScheduler(),
engine.getSchedulerDAOConnectionFactory(), engine.getTransactionManager());
this.haPartition.registerMembershipListener(listener);
logger.debug("Registered ODEJobCluster Listener.");
-/* logger.info("==============CLUSTERING INFORMATION===================");
- logger.info("NodeName =>" + haPartition.getNodeName());
- logger.info("PartitionNode =>" + haPartition.getPartitionName());
- logger.info("Cluster Node, name is " + haPartition.getClusterNode().getName()
+
- " ip is: " + haPartition.getClusterNode().getIpAddress()
- + " port is: " + haPartition.getClusterNode().getPort());
-
- Iterator it = haPartition.getCurrentView().iterator();
- while (it.hasNext()) {
- Object o = it.next();
- logger.info("view -> " + o );
- }*/
-
}
public String getMasterNodeId() {
Modified:
trunk/runtime/clustering/src/main/java/org/jboss/soa/bpel/clustering/ODEJobClusterListener.java
===================================================================
---
trunk/runtime/clustering/src/main/java/org/jboss/soa/bpel/clustering/ODEJobClusterListener.java 2010-09-21
05:17:25 UTC (rev 964)
+++
trunk/runtime/clustering/src/main/java/org/jboss/soa/bpel/clustering/ODEJobClusterListener.java 2010-09-21
05:31:35 UTC (rev 965)
@@ -17,13 +17,17 @@
*/
package org.jboss.soa.bpel.clustering;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Vector;
import javax.transaction.TransactionManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.ClusterAware;
+import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.dao.scheduler.DatabaseException;
import org.apache.ode.dao.scheduler.SchedulerDAOConnection;
import org.apache.ode.dao.scheduler.SchedulerDAOConnectionFactory;
@@ -41,9 +45,12 @@
private SchedulerDAOConnectionFactory schedulerCF;
+ private Scheduler scheduler;
+
private TransactionManager txm;
- public ODEJobClusterListener(SchedulerDAOConnectionFactory scf, TransactionManager tmgr)
{
+ public ODEJobClusterListener(Scheduler scheduler, SchedulerDAOConnectionFactory scf,
TransactionManager tmgr) {
+ this.scheduler = scheduler;
this.schedulerCF = scf;
this.txm = tmgr;
}
@@ -53,14 +60,19 @@
* Move jobs that assoicated with dead members to an active node.
*
*/
- public void membershipChanged(Vector deadMemebers, Vector newMembers, Vector
allMemebers) {
- if (allMemebers.isEmpty()) {
+ public void membershipChanged(Vector deadMembers, Vector newMembers, Vector allMembers)
{
+ if (allMembers.isEmpty()) {
throw new ClusteringException("There are no active nodes in this clustering
environment");
}
- if (!deadMemebers.isEmpty()) {
+
+ if (scheduler instanceof ClusterAware) {
+ setClusterNodeListsIntoScheduler(allMembers);
+ }
+
+ if (!deadMembers.isEmpty()) {
SchedulerDAOConnection conn = schedulerCF.getConnection();
- String activeNodeId = allMemebers.iterator().next().toString();
- Iterator<?> it = deadMemebers.iterator();
+ String activeNodeId = allMembers.iterator().next().toString();
+ Iterator<?> it = deadMembers.iterator();
try {
while (it.hasNext()) {
String deadNodeId = it.next().toString();
@@ -78,4 +90,15 @@
}
+
+ private void setClusterNodeListsIntoScheduler(Vector allMembers) {
+ List<String> nodeLists = new ArrayList<String>();
+ Iterator<?> nodes = allMembers.iterator();
+ while (nodes.hasNext()) {
+ String node = nodes.next().toString();
+ nodeLists.add(node);
+ }
+ ((ClusterAware)scheduler).setNodeList(nodeLists);
+ }
+
}