[riftsaw-commits] riftsaw SVN: r961 - in branches/ODE/RiftSaw-ODE-trunk: scheduler-simple/src/main/java/org/apache/ode/scheduler/simple and 1 other directories.

riftsaw-commits at lists.jboss.org riftsaw-commits at lists.jboss.org
Mon Sep 20 22:20:54 EDT 2010


Author: jeff.yuchang
Date: 2010-09-20 22:20:54 -0400 (Mon, 20 Sep 2010)
New Revision: 961

Modified:
   branches/ODE/RiftSaw-ODE-trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ClusterAware.java
   branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
   branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
Log:
* remove the RecoverStaleNode from SimpleScheduler.
* Add setting cluster nodes into SimpleScheduler via ClusterAware.
* making SimpleScheduler Clusteraware.


Modified: branches/ODE/RiftSaw-ODE-trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ClusterAware.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ClusterAware.java	2010-09-20 10:16:11 UTC (rev 960)
+++ branches/ODE/RiftSaw-ODE-trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ClusterAware.java	2010-09-21 02:20:54 UTC (rev 961)
@@ -18,6 +18,8 @@
  */
 package org.apache.ode.bpel.iapi;
 
+import java.util.List;
+
 /**
  * The interface to implement for a custom Scheduler implementation to support
  * Clustering.
@@ -33,4 +35,11 @@
      * @return true when the node is coordinator
      */
     boolean amICoordinator();
+    
+    /**
+     * Set all available node lists
+     * 
+     * @param nodeList
+     */
+    void setNodeList(List<String> nodeList);
 }

Modified: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java	2010-09-20 10:16:11 UTC (rev 960)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java	2010-09-21 02:20:54 UTC (rev 961)
@@ -20,14 +20,12 @@
 package org.apache.ode.scheduler.simple;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -42,6 +40,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.log4j.helpers.AbsoluteTimeDateFormat;
+import org.apache.ode.bpel.iapi.ClusterAware;
 import org.apache.ode.bpel.iapi.ContextException;
 import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.ode.dao.scheduler.DatabaseException;
@@ -70,7 +69,7 @@
  * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
  *
  */
-public class SimpleScheduler implements Scheduler, TaskRunner {
+public class SimpleScheduler implements Scheduler, TaskRunner, ClusterAware {
     private static final Log __log = LogFactory.getLog(SimpleScheduler.class);
 
     /**
@@ -116,12 +115,6 @@
 
     private SchedulerDAOConnectionFactory _dbcf;
 
-    /** All the nodes we know about */
-    private CopyOnWriteArraySet<String> _knownNodes = new CopyOnWriteArraySet<String>();
-
-    /** When we last heard from our nodes. */
-    private ConcurrentHashMap<String, Long> _lastHeartBeat = new ConcurrentHashMap<String, Long>();
-
     /** Set of outstanding jobs, i.e., jobs that have been enqueued but not dequeued or dispatched yet.
         Used to avoid cases where a job would be dispatched twice if the server is under high load and
         does not fully process a job before it is reloaded from the database. */
@@ -146,6 +139,10 @@
 
     /** Interval between immediate retries when the transaction fails **/
     private long _immediateTransactionRetryInterval = 1000;
+    
+    private List<String> _defaultNodeList = new ArrayList<String>();
+    
+    private List<String> _nodeList = new ArrayList<String>();
 
     public SimpleScheduler(String nodeId, SchedulerDAOConnectionFactory dbcf, TransactionManager txm, Properties conf) {
         _nodeId = nodeId;
@@ -163,6 +160,8 @@
         _immediateTransactionRetryInterval = getLongProperty(conf, "ode.scheduler.immediateTransactionRetryInterval", _immediateTransactionRetryInterval);
 
         _todo = new  SchedulerThread(this);
+        
+        _defaultNodeList.add(nodeId);
     }
 
     public void setPollIntervalForPolledRunnable(long pollIntervalForPolledRunnable) {
@@ -451,8 +450,21 @@
     public void setJobProcessor(JobProcessor processor) throws ContextException {
         _jobProcessor = processor;
     }
+    
+    
 
-    public void shutdown() {
+    public List<String> getNodeList() {
+    	if (this._nodeList == null || this._nodeList.size() == 0) {
+    		return _defaultNodeList;
+    	}
+		return _nodeList;
+	}
+
+	public void setNodeList(List<String> nodeList) {
+		this._nodeList = nodeList;
+	}
+
+	public void shutdown() {
         stop();
         _jobProcessor = null;
         _txm = null;
@@ -469,43 +481,15 @@
         
         _todo.clearTasks(UpgradeJobsTask.class);
         _todo.clearTasks(LoadImmediateTask.class);
-        _todo.clearTasks(CheckStaleNodes.class);
+        
         _processedSinceLastLoadTask.clear();
         _outstandingJobs.clear();
 
-        _knownNodes.clear();
-
-        try {
-            execTransaction(new Callable<Void>() {
-                public Void call() throws ContextException, DatabaseException {
-                	SchedulerDAOConnection conn = _dbcf.getConnection();
-                    List<String> ids = conn.getNodeIds();
-                    if (ids == null) {
-                        throw new ContextException("Error retrieving node list.");
-                    }
-                	_knownNodes.addAll(ids);
-                    return null;
-                }
-
-            });
-        } catch (Exception ex) {
-            __log.error("Error retrieving node list.", ex);
-            throw new ContextException("Error retrieving node list.", ex);
-        }
-
         long now = System.currentTimeMillis();
 
-        // Pretend we got a heartbeat...
-        for (String s : _knownNodes) {
-        	_lastHeartBeat.put(s, now);
-        }
-
         // schedule immediate job loading for now!
         _todo.enqueue(new LoadImmediateTask(now));
 
-        // schedule check for stale nodes, make it random so that the nodes don't overlap.
-        _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
-
         // do the upgrade sometime (random) in the immediate interval.
         _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));
 
@@ -524,7 +508,7 @@
         _todo.stop();
         _todo.clearTasks(UpgradeJobsTask.class);
         _todo.clearTasks(LoadImmediateTask.class);
-        _todo.clearTasks(CheckStaleNodes.class);
+
         _processedSinceLastLoadTask.clear();
         _outstandingJobs.clear();
 
@@ -735,17 +719,6 @@
         }
     }
 
-    public void updateHeartBeat(String nodeId) {
-        if (nodeId == null)
-            return;
-
-        if (_nodeId.equals(nodeId))
-            return;
-
-        _lastHeartBeat.put(nodeId, System.currentTimeMillis());
-        _knownNodes.add(nodeId);
-    }
-
     boolean doLoadImmediate() {
         __log.debug("LOAD IMMEDIATE started");
 
@@ -832,10 +805,6 @@
 
     boolean doUpgrade() {
         __log.debug("UPGRADE started");
-        final ArrayList<String> knownNodes = new ArrayList<String>(_knownNodes);
-        // Don't forget about self.
-        knownNodes.add(_nodeId);
-        Collections.sort(knownNodes);
 
         // We're going to try to upgrade near future jobs using the db only.
         // We assume that the distribution of the trailing digits in the
@@ -848,9 +817,9 @@
 
                 public Boolean call() throws ContextException, DatabaseException {
                 	SchedulerDAOConnection conn = _dbcf.getConnection();
-                    int numNodes = knownNodes.size();
+                    int numNodes = getNodeList().size();
                     for (int i = 0; i < numNodes; ++i) {
-                        String node = knownNodes.get(i);
+                        String node = getNodeList().get(i);
                         conn.updateAssignToNode(node, i, numNodes, maxtime);
                     }
                     return true;
@@ -867,38 +836,7 @@
 
     }
 
-    /**
-     * Re-assign stale node's jobs to self.
-     * @param nodeId
-     */
-    void recoverStaleNode(final String nodeId) {
-        __log.debug("recovering stale node " + nodeId);
-        try {
-            int numrows = execTransaction(new Callable<Integer>() {
-                public Integer call() throws ContextException, DatabaseException {
-                	SchedulerDAOConnection conn = _dbcf.getConnection();
-                    return conn.updateReassign(nodeId, _nodeId);
-                }
-            });
 
-            __log.debug("reassigned " + numrows + " jobs to self. ");
-
-            // We can now forget about this node, if we see it again, it will be
-            // "new to us"
-            _knownNodes.remove(nodeId);
-            _lastHeartBeat.remove(nodeId);
-
-            // Force a load-immediate to catch anything new from the recovered node.
-            doLoadImmediate();
-
-        } catch (Exception ex) {
-            __log.error("Database error reassigning node.", ex);
-        } finally {
-            __log.debug("node recovery complete");
-        }
-    }
-
-
     private abstract class SchedulerTask extends Task implements Runnable {
         SchedulerTask(long schedDate) {
             super(schedDate);
@@ -959,28 +897,14 @@
             }
         }
     }
-
+    
     /**
-     * Check if any of the nodes in our cluster are stale.
+     * Right now, just assume all of nodes are coordinator for now.
+     * 
      */
-    private class CheckStaleNodes extends SchedulerTask {
-        CheckStaleNodes(long schedDate) {
-            super(schedDate);
-        }
-
-        public void run() {
-            _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + _staleInterval));
-            __log.debug("CHECK STALE NODES started");
-            for (String nodeId : _knownNodes) {
-                Long lastSeen = _lastHeartBeat.get(nodeId);
-                if ((lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval)
-                    && !_nodeId.equals(nodeId))
-                {
-                    recoverStaleNode(nodeId);
-                }
-            }
-        }
-    }
+	public boolean amICoordinator() {
+		return true;
+	}
     
 }
 

Modified: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java	2010-09-20 10:16:11 UTC (rev 960)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java	2010-09-21 02:20:54 UTC (rev 961)
@@ -160,38 +160,6 @@
         assertEquals(3, _jobs.size());
     }
 
-    public void testRecoverySuppressed() throws Exception {
-        // speed things up a bit to hit the right code paths
-        _scheduler.setNearFutureInterval(2000);
-        _scheduler.setImmediateInterval(1000);
-        _scheduler.setStaleInterval(500);
-
-        _txm.begin();
-        try {
-            _scheduler.schedulePersistedJob(newDetail("immediate"), new Date(System.currentTimeMillis()));
-            _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 1100));
-            _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 15000));
-        } finally {
-            _txm.commit();
-        }
-        _scheduler.stop();
-
-        _scheduler = newScheduler("n3");
-        _scheduler.setNearFutureInterval(2000);
-        _scheduler.setImmediateInterval(1000);
-        _scheduler.setStaleInterval(1000);
-        _scheduler.start();
-        for (int i = 0; i < 40; ++i) {
-            _scheduler.updateHeartBeat("n1");
-            Thread.sleep(100);
-        }
-
-        _scheduler.stop();
-        Thread.sleep(1000);
-
-        assertEquals(0, _jobs.size());
-    }
-
     public void onScheduledJob(final JobInfo jobInfo) throws JobProcessorException {
         synchronized (_jobs) {
             _jobs.add(jobInfo);



More information about the riftsaw-commits mailing list