riftsaw SVN: r1363 - in branches/ODE/RiftSaw-ODE-trunk: dao-jpa/src/main/java/org/apache/ode/dao/jpa/scheduler and 1 other directories.
by riftsaw-commits@lists.jboss.org
Author: jeff.yuchang
Date: 2011-06-01 12:07:46 -0400 (Wed, 01 Jun 2011)
New Revision: 1363
Modified:
branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/SchedulerDAOConnection.java
branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/scheduler/JobDAOImpl.java
branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/scheduler/SchedulerDAOConnectionImpl.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
Log:
*RIFTSAW-382:
1. Used the EntityManager.Find method to update the detached object.
2. Only add the 'finished & noNeedToRetry' jobs into the processedInLastLoad map.
3. Override toString method for JobDAOImpl for the log.
Modified: branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/SchedulerDAOConnection.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/SchedulerDAOConnection.java 2011-05-09 06:47:07 UTC (rev 1362)
+++ branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/SchedulerDAOConnection.java 2011-06-01 16:07:46 UTC (rev 1363)
@@ -49,7 +49,7 @@
boolean insertJob(JobDAO job, String nodeId, boolean loaded) throws DatabaseException ;
/**
- * Update the job in the database (only updates timestamp and retryCount)
+ * Update the job in the database (only updates timestamp, retryCount and the schedule status)
*
* @param job the job
* @throws DatabaseException in case of error
Modified: branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/scheduler/JobDAOImpl.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/scheduler/JobDAOImpl.java 2011-05-09 06:47:07 UTC (rev 1362)
+++ branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/scheduler/JobDAOImpl.java 2011-06-01 16:07:46 UTC (rev 1363)
@@ -254,4 +254,20 @@
}
}
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("[");
+ builder.append("JobId: " + _jobId);
+ builder.append(",nodeId: " + _nodeId);
+ builder.append(",scheduled: " + _scheduled);
+ builder.append(",transacted: " + _transacted);
+ builder.append(",ts: " + _ts);
+ builder.append(",channel: " + _details.getChannel());
+ builder.append(",instaceId : " + _details.getInstanceId());
+ builder.append(",type: " + _details.getType());
+ builder.append(",retrycount: " + _details.getRetryCount());
+ builder.append("]");
+ return builder.toString();
+ }
}
Modified: branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/scheduler/SchedulerDAOConnectionImpl.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/scheduler/SchedulerDAOConnectionImpl.java 2011-05-09 06:47:07 UTC (rev 1362)
+++ branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/scheduler/SchedulerDAOConnectionImpl.java 2011-06-01 16:07:46 UTC (rev 1363)
@@ -157,7 +157,13 @@
public boolean updateJob(JobDAO job) throws DatabaseException {
_txCtx.begin();
- _em.persist(job);
+ JobDAOImpl theJob = _em.find(JobDAOImpl.class, job.getJobId());
+ if (theJob == null) {
+ throw new DatabaseException("the updated job is not existed! job detail is: " + job);
+ }
+ theJob.setScheduledDate(job.getScheduledDate());
+ theJob.getDetails().setRetryCount(job.getDetails().getRetryCount());
+ theJob.setScheduled(job.isScheduled());
_txCtx.commit();
return true;
}
@@ -186,7 +192,8 @@
public JobDAO createJob(boolean transacted, JobDetails jobDetails,
boolean persisted, long scheduledDate) {
- return createJob(new GUID().toString(), transacted, jobDetails, persisted, scheduledDate);
+ String jobId = new GUID().toString();
+ return createJob(jobId, transacted, jobDetails, persisted, scheduledDate);
}
}
Modified: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java 2011-05-09 06:47:07 UTC (rev 1362)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java 2011-06-01 16:07:46 UTC (rev 1363)
@@ -125,6 +125,11 @@
In such a case the job is no longer in the _outstandingJobs map, and so it's queued again. */
private ConcurrentHashMap<String, Long> _processedSinceLastLoadTask = new ConcurrentHashMap<String, Long>();
+ /**
+ * Set of jobs that needed to be retried.
+ */
+ private ConcurrentHashMap<String, Boolean> _retryJobList = new ConcurrentHashMap<String, Boolean>();
+
private boolean _running;
/** Time for next upgrade. */
@@ -139,7 +144,7 @@
/** Interval between immediate retries when the transaction fails **/
private long _immediateTransactionRetryInterval = 1000;
-
+
private List<String> _defaultNodeList = new ArrayList<String>();
private List<String> _nodeList = new ArrayList<String>();
@@ -450,8 +455,6 @@
public void setJobProcessor(JobProcessor processor) throws ContextException {
_jobProcessor = processor;
}
-
-
public List<String> getNodeList() {
if (this._nodeList == null || this._nodeList.size() == 0) {
@@ -484,6 +487,7 @@
_processedSinceLastLoadTask.clear();
_outstandingJobs.clear();
+ _retryJobList.clear();
long now = System.currentTimeMillis();
@@ -511,6 +515,7 @@
_processedSinceLastLoadTask.clear();
_outstandingJobs.clear();
+ _retryJobList.clear();
// disable because this is not the right way to do it
// will be fixed by ODE-595
@@ -545,7 +550,12 @@
return null;
} finally {
// the order of these 2 actions is crucial to avoid a race condition.
- _processedSinceLastLoadTask.put(job.getJobId(), job.getScheduledDate());
+ // if the transaction failed, and has retry mechanism, we will not put it to avoid being ignore.
+ if (_retryJobList.get(job.getJobId()) == null ) {
+ _processedSinceLastLoadTask.put(job.getJobId(), job.getScheduledDate());
+ } else {
+ _retryJobList.remove(job.getJobId());
+ }
_outstandingJobs.remove(job.getJobId());
}
}
@@ -626,8 +636,10 @@
job.setScheduled(false);
job.setScheduledDate(scheddate);
conn.updateJob(job);
-
- __log.error("Error while processing job, retrying in " + delay + "s");
+
+ _retryJobList.put(job.getJobId(), new Boolean(true));
+
+ __log.error("Error while processing job, retrying in " + delay + "s, the job is " + job);
}
}
@@ -637,9 +649,7 @@
/**
* Run a job in the current thread.
- *
- * @param job job to run.
- */
+ **/
protected void runJob(final JobDAO jobDao) {
_exec.submit(new RunJobCallable(jobDao, _jobProcessor));
}
@@ -664,7 +674,6 @@
* not persisted, and the same runnable may be tried again after system failure,
* the runnable that's used with this polling should be repeatable.
*
- * @param job job to run.
*/
protected void runPolledRunnable(final JobDAO jobDao) {
_exec.submit(new RunJobCallable(jobDao, _polledRunnableProcessor));
@@ -773,6 +782,7 @@
// clear only if the batch succeeded
_processedSinceLastLoadTask.clear();
+ _retryJobList.clear();
return true;
} catch (Exception ex) {
__log.error("Error loading immediate jobs from database.", ex);