Author: jeff.yuchang
Date: 2010-09-20 22:20:54 -0400 (Mon, 20 Sep 2010)
New Revision: 961
Modified:
branches/ODE/RiftSaw-ODE-trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ClusterAware.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
Log:
* remove the RecoverStaleNode from SimpleScheduler.
* Add setting cluster nodes into SimpleScheduler via ClusterAware.
* making SimpleScheduler Clusteraware.
Modified:
branches/ODE/RiftSaw-ODE-trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ClusterAware.java
===================================================================
---
branches/ODE/RiftSaw-ODE-trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ClusterAware.java 2010-09-20
10:16:11 UTC (rev 960)
+++
branches/ODE/RiftSaw-ODE-trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ClusterAware.java 2010-09-21
02:20:54 UTC (rev 961)
@@ -18,6 +18,8 @@
*/
package org.apache.ode.bpel.iapi;
+import java.util.List;
+
/**
* The interface to implement for a custom Scheduler implementation to support
* Clustering.
@@ -33,4 +35,11 @@
* @return true when the node is coordinator
*/
boolean amICoordinator();
+
+ /**
+ * Set all available node lists
+ *
+ * @param nodeList
+ */
+ void setNodeList(List<String> nodeList);
}
Modified:
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
===================================================================
---
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java 2010-09-20
10:16:11 UTC (rev 960)
+++
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java 2010-09-21
02:20:54 UTC (rev 961)
@@ -20,14 +20,12 @@
package org.apache.ode.scheduler.simple;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -42,6 +40,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.helpers.AbsoluteTimeDateFormat;
+import org.apache.ode.bpel.iapi.ClusterAware;
import org.apache.ode.bpel.iapi.ContextException;
import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.dao.scheduler.DatabaseException;
@@ -70,7 +69,7 @@
* @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
*
*/
-public class SimpleScheduler implements Scheduler, TaskRunner {
+public class SimpleScheduler implements Scheduler, TaskRunner, ClusterAware {
private static final Log __log = LogFactory.getLog(SimpleScheduler.class);
/**
@@ -116,12 +115,6 @@
private SchedulerDAOConnectionFactory _dbcf;
- /** All the nodes we know about */
- private CopyOnWriteArraySet<String> _knownNodes = new
CopyOnWriteArraySet<String>();
-
- /** When we last heard from our nodes. */
- private ConcurrentHashMap<String, Long> _lastHeartBeat = new
ConcurrentHashMap<String, Long>();
-
/** Set of outstanding jobs, i.e., jobs that have been enqueued but not dequeued or
dispatched yet.
Used to avoid cases where a job would be dispatched twice if the server is under
high load and
does not fully process a job before it is reloaded from the database. */
@@ -146,6 +139,10 @@
/** Interval between immediate retries when the transaction fails **/
private long _immediateTransactionRetryInterval = 1000;
+
+ private List<String> _defaultNodeList = new ArrayList<String>();
+
+ private List<String> _nodeList = new ArrayList<String>();
public SimpleScheduler(String nodeId, SchedulerDAOConnectionFactory dbcf,
TransactionManager txm, Properties conf) {
_nodeId = nodeId;
@@ -163,6 +160,8 @@
_immediateTransactionRetryInterval = getLongProperty(conf,
"ode.scheduler.immediateTransactionRetryInterval",
_immediateTransactionRetryInterval);
_todo = new SchedulerThread(this);
+
+ _defaultNodeList.add(nodeId);
}
public void setPollIntervalForPolledRunnable(long pollIntervalForPolledRunnable) {
@@ -451,8 +450,21 @@
public void setJobProcessor(JobProcessor processor) throws ContextException {
_jobProcessor = processor;
}
+
+
- public void shutdown() {
+ public List<String> getNodeList() {
+ if (this._nodeList == null || this._nodeList.size() == 0) {
+ return _defaultNodeList;
+ }
+ return _nodeList;
+ }
+
+ public void setNodeList(List<String> nodeList) {
+ this._nodeList = nodeList;
+ }
+
+ public void shutdown() {
stop();
_jobProcessor = null;
_txm = null;
@@ -469,43 +481,15 @@
_todo.clearTasks(UpgradeJobsTask.class);
_todo.clearTasks(LoadImmediateTask.class);
- _todo.clearTasks(CheckStaleNodes.class);
+
_processedSinceLastLoadTask.clear();
_outstandingJobs.clear();
- _knownNodes.clear();
-
- try {
- execTransaction(new Callable<Void>() {
- public Void call() throws ContextException, DatabaseException {
- SchedulerDAOConnection conn = _dbcf.getConnection();
- List<String> ids = conn.getNodeIds();
- if (ids == null) {
- throw new ContextException("Error retrieving node
list.");
- }
- _knownNodes.addAll(ids);
- return null;
- }
-
- });
- } catch (Exception ex) {
- __log.error("Error retrieving node list.", ex);
- throw new ContextException("Error retrieving node list.", ex);
- }
-
long now = System.currentTimeMillis();
- // Pretend we got a heartbeat...
- for (String s : _knownNodes) {
- _lastHeartBeat.put(s, now);
- }
-
// schedule immediate job loading for now!
_todo.enqueue(new LoadImmediateTask(now));
- // schedule check for stale nodes, make it random so that the nodes don't
overlap.
- _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
-
// do the upgrade sometime (random) in the immediate interval.
_todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));
@@ -524,7 +508,7 @@
_todo.stop();
_todo.clearTasks(UpgradeJobsTask.class);
_todo.clearTasks(LoadImmediateTask.class);
- _todo.clearTasks(CheckStaleNodes.class);
+
_processedSinceLastLoadTask.clear();
_outstandingJobs.clear();
@@ -735,17 +719,6 @@
}
}
- public void updateHeartBeat(String nodeId) {
- if (nodeId == null)
- return;
-
- if (_nodeId.equals(nodeId))
- return;
-
- _lastHeartBeat.put(nodeId, System.currentTimeMillis());
- _knownNodes.add(nodeId);
- }
-
boolean doLoadImmediate() {
__log.debug("LOAD IMMEDIATE started");
@@ -832,10 +805,6 @@
boolean doUpgrade() {
__log.debug("UPGRADE started");
- final ArrayList<String> knownNodes = new
ArrayList<String>(_knownNodes);
- // Don't forget about self.
- knownNodes.add(_nodeId);
- Collections.sort(knownNodes);
// We're going to try to upgrade near future jobs using the db only.
// We assume that the distribution of the trailing digits in the
@@ -848,9 +817,9 @@
public Boolean call() throws ContextException, DatabaseException {
SchedulerDAOConnection conn = _dbcf.getConnection();
- int numNodes = knownNodes.size();
+ int numNodes = getNodeList().size();
for (int i = 0; i < numNodes; ++i) {
- String node = knownNodes.get(i);
+ String node = getNodeList().get(i);
conn.updateAssignToNode(node, i, numNodes, maxtime);
}
return true;
@@ -867,38 +836,7 @@
}
- /**
- * Re-assign stale node's jobs to self.
- * @param nodeId
- */
- void recoverStaleNode(final String nodeId) {
- __log.debug("recovering stale node " + nodeId);
- try {
- int numrows = execTransaction(new Callable<Integer>() {
- public Integer call() throws ContextException, DatabaseException {
- SchedulerDAOConnection conn = _dbcf.getConnection();
- return conn.updateReassign(nodeId, _nodeId);
- }
- });
- __log.debug("reassigned " + numrows + " jobs to self.
");
-
- // We can now forget about this node, if we see it again, it will be
- // "new to us"
- _knownNodes.remove(nodeId);
- _lastHeartBeat.remove(nodeId);
-
- // Force a load-immediate to catch anything new from the recovered node.
- doLoadImmediate();
-
- } catch (Exception ex) {
- __log.error("Database error reassigning node.", ex);
- } finally {
- __log.debug("node recovery complete");
- }
- }
-
-
private abstract class SchedulerTask extends Task implements Runnable {
SchedulerTask(long schedDate) {
super(schedDate);
@@ -959,28 +897,14 @@
}
}
}
-
+
/**
- * Check if any of the nodes in our cluster are stale.
+ * Right now, just assume all of nodes are coordinator for now.
+ *
*/
- private class CheckStaleNodes extends SchedulerTask {
- CheckStaleNodes(long schedDate) {
- super(schedDate);
- }
-
- public void run() {
- _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() +
_staleInterval));
- __log.debug("CHECK STALE NODES started");
- for (String nodeId : _knownNodes) {
- Long lastSeen = _lastHeartBeat.get(nodeId);
- if ((lastSeen == null || (System.currentTimeMillis() - lastSeen) >
_staleInterval)
- && !_nodeId.equals(nodeId))
- {
- recoverStaleNode(nodeId);
- }
- }
- }
- }
+ public boolean amICoordinator() {
+ return true;
+ }
}
Modified:
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
===================================================================
---
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java 2010-09-20
10:16:11 UTC (rev 960)
+++
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java 2010-09-21
02:20:54 UTC (rev 961)
@@ -160,38 +160,6 @@
assertEquals(3, _jobs.size());
}
- public void testRecoverySuppressed() throws Exception {
- // speed things up a bit to hit the right code paths
- _scheduler.setNearFutureInterval(2000);
- _scheduler.setImmediateInterval(1000);
- _scheduler.setStaleInterval(500);
-
- _txm.begin();
- try {
- _scheduler.schedulePersistedJob(newDetail("immediate"), new
Date(System.currentTimeMillis()));
- _scheduler.schedulePersistedJob(newDetail("near"), new
Date(System.currentTimeMillis() + 1100));
- _scheduler.schedulePersistedJob(newDetail("far"), new
Date(System.currentTimeMillis() + 15000));
- } finally {
- _txm.commit();
- }
- _scheduler.stop();
-
- _scheduler = newScheduler("n3");
- _scheduler.setNearFutureInterval(2000);
- _scheduler.setImmediateInterval(1000);
- _scheduler.setStaleInterval(1000);
- _scheduler.start();
- for (int i = 0; i < 40; ++i) {
- _scheduler.updateHeartBeat("n1");
- Thread.sleep(100);
- }
-
- _scheduler.stop();
- Thread.sleep(1000);
-
- assertEquals(0, _jobs.size());
- }
-
public void onScheduledJob(final JobInfo jobInfo) throws JobProcessorException {
synchronized (_jobs) {
_jobs.add(jobInfo);