riftsaw SVN: r932 - trunk/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/ws and 1 other directory.
by riftsaw-commits@lists.jboss.org
Author: objectiser
Date: 2010-09-02 07:08:33 -0400 (Thu, 02 Sep 2010)
New Revision: 932
Modified:
branches/RiftSaw-2.1.x/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/ws/SOAPMessageAdapter.java
trunk/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/ws/SOAPMessageAdapter.java
Log:
RIFTSAW-279 - Added SOAPAction on request as required by .NET
Modified: branches/RiftSaw-2.1.x/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/ws/SOAPMessageAdapter.java
===================================================================
--- branches/RiftSaw-2.1.x/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/ws/SOAPMessageAdapter.java 2010-09-01 11:41:53 UTC (rev 931)
+++ branches/RiftSaw-2.1.x/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/ws/SOAPMessageAdapter.java 2010-09-02 11:08:33 UTC (rev 932)
@@ -135,6 +135,18 @@
wsdlOperation.getName()
);
+ // Add SOAPAction
+ for (Object extension : bop.getExtensibilityElements()) {
+ if (extension instanceof javax.wsdl.extensions.soap.SOAPOperation) {
+ javax.wsdl.extensions.soap.SOAPOperation soapop=
+ (javax.wsdl.extensions.soap.SOAPOperation)extension;
+
+ if (soapop.getSoapActionURI() != null) {
+ MimeHeaders hd = soapMessage.getMimeHeaders();
+ hd.addHeader("SOAPAction", soapop.getSoapActionURI());
+ }
+ }
+ }
}
public boolean isRPC()
Modified: trunk/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/ws/SOAPMessageAdapter.java
===================================================================
--- trunk/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/ws/SOAPMessageAdapter.java 2010-09-01 11:41:53 UTC (rev 931)
+++ trunk/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/ws/SOAPMessageAdapter.java 2010-09-02 11:08:33 UTC (rev 932)
@@ -135,6 +135,18 @@
wsdlOperation.getName()
);
+ // Add SOAPAction
+ for (Object extension : bop.getExtensibilityElements()) {
+ if (extension instanceof javax.wsdl.extensions.soap.SOAPOperation) {
+ javax.wsdl.extensions.soap.SOAPOperation soapop=
+ (javax.wsdl.extensions.soap.SOAPOperation)extension;
+
+ if (soapop.getSoapActionURI() != null) {
+ MimeHeaders hd = soapMessage.getMimeHeaders();
+ hd.addHeader("SOAPAction", soapop.getSoapActionURI());
+ }
+ }
+ }
}
public boolean isRPC()
14 years, 6 months
riftsaw SVN: r931 - trunk/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/engine/ode.
by riftsaw-commits@lists.jboss.org
Author: jeff.yuchang
Date: 2010-09-01 07:41:53 -0400 (Wed, 01 Sep 2010)
New Revision: 931
Modified:
trunk/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/engine/ode/BPELEngineImpl.java
Log:
* fixed the compliation error for updated simple scheduler API changes.
Modified: trunk/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/engine/ode/BPELEngineImpl.java
===================================================================
--- trunk/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/engine/ode/BPELEngineImpl.java 2010-09-01 10:19:22 UTC (rev 930)
+++ trunk/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/engine/ode/BPELEngineImpl.java 2010-09-01 11:41:53 UTC (rev 931)
@@ -29,10 +29,10 @@
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
import org.apache.ode.dao.bpel.BpelDAOConnectionFactory;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnectionFactory;
import org.apache.ode.dao.store.ConfStoreDAOConnectionFactory;
import org.apache.ode.il.config.OdeConfigProperties;
import org.apache.ode.il.dbutil.Database;
-import org.apache.ode.scheduler.simple.JdbcDelegate;
import org.apache.ode.scheduler.simple.SimpleScheduler;
import org.apache.ode.store.ProcessStoreImpl;
import org.apache.ode.store.RiftSawProcessStore;
@@ -67,6 +67,7 @@
protected TransactionManager _txMgr;
protected BpelDAOConnectionFactory _daoCF;
protected ConfStoreDAOConnectionFactory _storeCF;
+ protected SchedulerDAOConnectionFactory _schedulerDaoCF;
protected Scheduler _scheduler;
protected Database _db;
protected ExecutorService _executorService;
@@ -375,6 +376,15 @@
} finally {
_storeCF = null;
}
+
+ if (_schedulerDaoCF != null)
+ try {
+ _schedulerDaoCF.shutdown();
+ } catch (Throwable ex) {
+ __log.debug("Scheduler DAO shutdown failed.", ex);
+ } finally {
+ _schedulerDaoCF = null;
+ }
if (_db != null)
try {
@@ -431,12 +441,15 @@
}
protected void initDAO() throws Exception {
- __log.debug("USING DAO: "+_odeConfig.getDAOConnectionFactory());
+ __log.debug("USING DAO: "+_odeConfig.getDAOConnectionFactory() + ", " + _odeConfig.getDAOConfStoreConnectionFactory()
+ + ", " + _odeConfig.getDAOConfScheduleConnectionFactory());
try {
_daoCF = _db.createDaoCF();
_storeCF = _db.createDaoStoreCF();
+ _schedulerDaoCF = _db.createDaoSchedulerCF();
} catch (Exception ex) {
- String errmsg = "DAO INSTANTIATION FAILED: "+_odeConfig.getDAOConnectionFactory() + " and " + _odeConfig.getDAOConfStoreConnectionFactory();
+ String errmsg = "DAO INSTANTIATION FAILED: "+_odeConfig.getDAOConnectionFactory() + " , " + _odeConfig.getDAOConfStoreConnectionFactory()
+ + " and " + _odeConfig.getDAOConfScheduleConnectionFactory();
__log.error(errmsg, ex);
throw new Exception(errmsg, ex);
@@ -453,7 +466,7 @@
}
protected Scheduler createScheduler() {
- SimpleScheduler scheduler = new SimpleScheduler(new GUID().toString(),new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties());
+ SimpleScheduler scheduler = new SimpleScheduler(new GUID().toString(), _schedulerDaoCF, _txMgr, _odeConfig.getProperties());
scheduler.setExecutorService(_executorService);
scheduler.setTransactionManager(_txMgr);
14 years, 6 months
riftsaw SVN: r930 - branches/RiftSaw-2.1.x/runtime/engine/src/test/java and 4 other directories.
by riftsaw-commits@lists.jboss.org
Author: objectiser
Date: 2010-09-01 06:19:22 -0400 (Wed, 01 Sep 2010)
New Revision: 930
Added:
branches/RiftSaw-2.1.x/runtime/engine/src/test/java/riftsaw277/
branches/RiftSaw-2.1.x/runtime/engine/src/test/java/riftsaw277/Riftsaw277TestCase.java
trunk/runtime/engine/src/test/java/riftsaw277/
trunk/runtime/engine/src/test/java/riftsaw277/Riftsaw277TestCase.java
Modified:
branches/RiftSaw-2.1.x/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/ws/SOAPMessageAdapter.java
trunk/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/ws/SOAPMessageAdapter.java
Log:
RIFTSAW-277 - protected against null pointer, so now no fault is returned, and ODE handles this case by internally throwing a failure.
Modified: branches/RiftSaw-2.1.x/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/ws/SOAPMessageAdapter.java
===================================================================
--- branches/RiftSaw-2.1.x/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/ws/SOAPMessageAdapter.java 2010-09-01 06:42:40 UTC (rev 929)
+++ branches/RiftSaw-2.1.x/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/ws/SOAPMessageAdapter.java 2010-09-01 10:19:22 UTC (rev 930)
@@ -538,7 +538,7 @@
SOAPHeader.class);
}
- public Fault parseSoapFault(
+ public static Fault parseSoapFault(
Element odeMessage,
SOAPMessage soapMessage,
javax.wsdl.Operation operation)
@@ -574,7 +574,7 @@
return fdef;
}
- public Fault parseSoapFault(
+ public static Fault parseSoapFault(
Element odeMessage,
SOAPFault flt,
javax.wsdl.Operation operation)
@@ -602,12 +602,17 @@
return fault;
}
- private Fault inferFault(Operation operation, SOAPFault flt) {
+ private static Fault inferFault(Operation operation, SOAPFault flt) {
if (!flt.hasDetail())
return null;
// The detail is a dummy <detail> node containing the interesting fault element
Element element = DOMUtils.getFirstChildElement(flt.getDetail());
- QName elName = new QName(element.getNamespaceURI(), element.getLocalName());
+
+ if (element == null) {
+ return(null);
+ }
+
+ QName elName=new QName(element.getNamespaceURI(), element.getLocalName());
return WsdlUtils.inferFault(operation, elName);
}
}
Added: branches/RiftSaw-2.1.x/runtime/engine/src/test/java/riftsaw277/Riftsaw277TestCase.java
===================================================================
--- branches/RiftSaw-2.1.x/runtime/engine/src/test/java/riftsaw277/Riftsaw277TestCase.java (rev 0)
+++ branches/RiftSaw-2.1.x/runtime/engine/src/test/java/riftsaw277/Riftsaw277TestCase.java 2010-09-01 10:19:22 UTC (rev 930)
@@ -0,0 +1,75 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package riftsaw277;
+
+import javax.xml.soap.SOAPMessage;
+
+import org.jboss.soa.bpel.runtime.ws.SOAPMessageAdapter;
+
+import junit.framework.TestCase;
+
+/**
+ * https://jira.jboss.org/jira/browse/RIFTSAW-277
+ */
+public class Riftsaw277TestCase extends TestCase {
+
+ public Riftsaw277TestCase() {
+ }
+
+ public void testSOAPFaultWithNoDetails() {
+ try {
+ org.w3c.dom.Document doc = javax.xml.parsers.DocumentBuilderFactory.
+ newInstance().newDocumentBuilder().newDocument();
+
+ org.w3c.dom.Element odeMessage=doc.createElement("message");
+
+ doc.appendChild(odeMessage);
+
+ String str="<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\">"+
+ "<soap:Body><soap:Fault><faultcode>soap:Client</faultcode>"+
+ "<faultstring>Server did not recognize the value of HTTP Header SOAPAction: .</faultstring>"+
+ "<detail/></soap:Fault></soap:Body></soap:Envelope>";
+
+ java.io.InputStream is=new java.io.ByteArrayInputStream(str.getBytes());
+
+ SOAPMessage soapMessage=javax.xml.soap.MessageFactory.newInstance().createMessage(null,
+ is);
+
+ is.close();
+
+ javax.wsdl.factory.WSDLFactory fact=
+ javax.wsdl.factory.WSDLFactory.newInstance();
+ javax.wsdl.Definition defn=fact.newDefinition();
+ javax.wsdl.Operation operation=defn.createOperation();
+
+ javax.wsdl.Fault fault=SOAPMessageAdapter.parseSoapFault(odeMessage,
+ soapMessage, operation);
+
+ if (fault != null) {
+ fail("No fault should be returned");
+ }
+ } catch(Exception e) {
+ e.printStackTrace();
+ fail("Failed to run test: "+e);
+ }
+ }
+}
Modified: trunk/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/ws/SOAPMessageAdapter.java
===================================================================
--- trunk/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/ws/SOAPMessageAdapter.java 2010-09-01 06:42:40 UTC (rev 929)
+++ trunk/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/ws/SOAPMessageAdapter.java 2010-09-01 10:19:22 UTC (rev 930)
@@ -538,7 +538,7 @@
SOAPHeader.class);
}
- public Fault parseSoapFault(
+ public static Fault parseSoapFault(
Element odeMessage,
SOAPMessage soapMessage,
javax.wsdl.Operation operation)
@@ -574,7 +574,7 @@
return fdef;
}
- public Fault parseSoapFault(
+ public static Fault parseSoapFault(
Element odeMessage,
SOAPFault flt,
javax.wsdl.Operation operation)
@@ -602,12 +602,17 @@
return fault;
}
- private Fault inferFault(Operation operation, SOAPFault flt) {
+ private static Fault inferFault(Operation operation, SOAPFault flt) {
if (!flt.hasDetail())
return null;
// The detail is a dummy <detail> node containing the interesting fault element
Element element = DOMUtils.getFirstChildElement(flt.getDetail());
- QName elName = new QName(element.getNamespaceURI(), element.getLocalName());
+
+ if (element == null) {
+ return(null);
+ }
+
+ QName elName=new QName(element.getNamespaceURI(), element.getLocalName());
return WsdlUtils.inferFault(operation, elName);
}
}
Added: trunk/runtime/engine/src/test/java/riftsaw277/Riftsaw277TestCase.java
===================================================================
--- trunk/runtime/engine/src/test/java/riftsaw277/Riftsaw277TestCase.java (rev 0)
+++ trunk/runtime/engine/src/test/java/riftsaw277/Riftsaw277TestCase.java 2010-09-01 10:19:22 UTC (rev 930)
@@ -0,0 +1,75 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package riftsaw277;
+
+import javax.xml.soap.SOAPMessage;
+
+import org.jboss.soa.bpel.runtime.ws.SOAPMessageAdapter;
+
+import junit.framework.TestCase;
+
+/**
+ * https://jira.jboss.org/jira/browse/RIFTSAW-277
+ */
+public class Riftsaw277TestCase extends TestCase {
+
+ public Riftsaw277TestCase() {
+ }
+
+ public void testSOAPFaultWithNoDetails() {
+ try {
+ org.w3c.dom.Document doc = javax.xml.parsers.DocumentBuilderFactory.
+ newInstance().newDocumentBuilder().newDocument();
+
+ org.w3c.dom.Element odeMessage=doc.createElement("message");
+
+ doc.appendChild(odeMessage);
+
+ String str="<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\">"+
+ "<soap:Body><soap:Fault><faultcode>soap:Client</faultcode>"+
+ "<faultstring>Server did not recognize the value of HTTP Header SOAPAction: .</faultstring>"+
+ "<detail/></soap:Fault></soap:Body></soap:Envelope>";
+
+ java.io.InputStream is=new java.io.ByteArrayInputStream(str.getBytes());
+
+ SOAPMessage soapMessage=javax.xml.soap.MessageFactory.newInstance().createMessage(null,
+ is);
+
+ is.close();
+
+ javax.wsdl.factory.WSDLFactory fact=
+ javax.wsdl.factory.WSDLFactory.newInstance();
+ javax.wsdl.Definition defn=fact.newDefinition();
+ javax.wsdl.Operation operation=defn.createOperation();
+
+ javax.wsdl.Fault fault=SOAPMessageAdapter.parseSoapFault(odeMessage,
+ soapMessage, operation);
+
+ if (fault != null) {
+ fail("No fault should be returned");
+ }
+ } catch(Exception e) {
+ e.printStackTrace();
+ fail("Failed to run test: "+e);
+ }
+ }
+}
14 years, 6 months
riftsaw SVN: r929 - in branches/ODE/RiftSaw-ODE-trunk: axis2/src/main/java/org/apache/ode/axis2 and 21 other directories.
by riftsaw-commits@lists.jboss.org
Author: jeff.yuchang
Date: 2010-09-01 02:42:40 -0400 (Wed, 01 Sep 2010)
New Revision: 929
Added:
branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/
branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/DatabaseException.java
branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/JobDAO.java
branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/SchedulerDAOConnection.java
branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/SchedulerDAOConnectionFactory.java
branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/Task.java
branches/ODE/RiftSaw-ODE-trunk/dao-jpa-hibernate/src/main/java/org/apache/ode/dao/jpa/hibernate/SchedulerDAOConnectionFactoryImpl.java
branches/ODE/RiftSaw-ODE-trunk/dao-jpa-ojpa/src/main/java/org/apache/ode/dao/jpa/openjpa/SchedulerDAOConnectionFactoryImpl.java
branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/scheduler/
branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/scheduler/JobDAOImpl.java
branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/scheduler/SchedulerDAOConnectionImpl.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JobDAOTask.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/JobDAOImpl.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionFactoryImpl.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionImpl.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DAOConnectionTest.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerTestBase.java
Removed:
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/DatabaseDelegate.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/DatabaseException.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Task.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DelegateSupport.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/GeronimoDelegateSupport.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
Modified:
branches/ODE/RiftSaw-ODE-trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
branches/ODE/RiftSaw-ODE-trunk/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java
branches/ODE/RiftSaw-ODE-trunk/bpel-epr/src/main/java/org/apache/ode/il/dbutil/Database.java
branches/ODE/RiftSaw-ODE-trunk/bpel-epr/src/main/java/org/apache/ode/il/dbutil/H2Database.java
branches/ODE/RiftSaw-ODE-trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImplTest.java
branches/ODE/RiftSaw-ODE-trunk/bpel-test/pom.xml
branches/ODE/RiftSaw-ODE-trunk/dao-jpa-hibernate/src/main/descriptors/persistence.db.xml
branches/ODE/RiftSaw-ODE-trunk/dao-jpa-hibernate/src/main/java/org/apache/ode/dao/jpa/hibernate/ConfStoreDAOConnectionFactoryImpl.java
branches/ODE/RiftSaw-ODE-trunk/dao-jpa-hibernate/src/main/resources/META-INF/persistence.xml
branches/ODE/RiftSaw-ODE-trunk/dao-jpa-ojpa/src/main/descriptors/persistence.derby.xml
branches/ODE/RiftSaw-ODE-trunk/dao-jpa-ojpa/src/main/descriptors/persistence.mysql.xml
branches/ODE/RiftSaw-ODE-trunk/dao-jpa-ojpa/src/main/descriptors/persistence.oracle.xml
branches/ODE/RiftSaw-ODE-trunk/dao-jpa-ojpa/src/main/descriptors/persistence.postgres.xml
branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/resources/META-INF/persistence.xml
branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/riftsaw/riftsaw-persistence.xml
branches/ODE/RiftSaw-ODE-trunk/pom.xml
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/pom.xml
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JobComparatorByDate.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/TaskRunner.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/resources/sched_schema.sql
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/resources/log4j.properties
Log:
* Refactored scheduler-simple module, also added jpa based DAO implementation.
* RetriesTest can passed with jdbc based implementation in eclipse, but due into the TransactionManager in Test setup, it will cause later on tests failed, so exclude it for now.
Modified: branches/ODE/RiftSaw-ODE-trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -73,9 +73,9 @@
import org.apache.ode.bpel.pmapi.InstanceManagement;
import org.apache.ode.bpel.pmapi.ProcessManagement;
import org.apache.ode.dao.bpel.BpelDAOConnectionFactory;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnectionFactory;
import org.apache.ode.dao.store.ConfStoreDAOConnectionFactory;
import org.apache.ode.il.dbutil.Database;
-import org.apache.ode.scheduler.simple.JdbcDelegate;
import org.apache.ode.scheduler.simple.SimpleScheduler;
import org.apache.ode.store.ProcessStoreImpl;
import org.apache.ode.utils.GUID;
@@ -112,6 +112,8 @@
protected BpelDAOConnectionFactory _daoCF;
protected ConfStoreDAOConnectionFactory _storeDaoCF;
+
+ protected SchedulerDAOConnectionFactory _schedDaoCF;
protected ExecutorService _executorService;
@@ -436,11 +438,14 @@
protected void initDAO() throws ServletException {
__log.info(__msgs.msgOdeUsingDAOImpl(_odeConfig.getDAOConnectionFactory()));
__log.info(__msgs.msgOdeUsingDAOImpl(_odeConfig.getDAOConfStoreConnectionFactory()));
+ __log.info(__msgs.msgOdeUsingDAOImpl(_odeConfig.getDAOConfScheduleConnectionFactory()));
try {
_daoCF = _db.createDaoCF();
_storeDaoCF = _db.createDaoStoreCF();
+ _schedDaoCF = _db.createDaoSchedulerCF();
} catch (Exception ex) {
- String errmsg = __msgs.msgDAOInstantiationFailed(_odeConfig.getDAOConnectionFactory());
+ String errmsg = __msgs.msgDAOInstantiationFailed(_odeConfig.getDAOConnectionFactory() + ": " + _odeConfig.getDAOConfStoreConnectionFactory()
+ + _odeConfig.getDAOConfScheduleConnectionFactory());
__log.error(errmsg, ex);
throw new ServletException(errmsg, ex);
@@ -462,10 +467,8 @@
}
protected Scheduler createScheduler() {
- SimpleScheduler scheduler = new SimpleScheduler(new GUID().toString(),
- new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties());
+ SimpleScheduler scheduler = new SimpleScheduler(new GUID().toString(), _schedDaoCF, _txMgr, _odeConfig.getProperties());
scheduler.setExecutorService(_executorService);
- scheduler.setTransactionManager(_txMgr);
return scheduler;
}
Added: branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/DatabaseException.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/DatabaseException.java (rev 0)
+++ branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/DatabaseException.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.scheduler;
+
+
+/**
+ * Exception class thrown by {@link SchedulerDAOConnection} implementations.
+ *
+ * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
+ *
+ */
+public class DatabaseException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public DatabaseException(String message) {
+ super(message);
+ }
+
+ public DatabaseException(Exception ex) {
+ super(ex);
+ }
+
+ public DatabaseException(String message, Exception ex) {
+ super(message, ex);
+ }
+}
Added: branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/JobDAO.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/JobDAO.java (rev 0)
+++ branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/JobDAO.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.scheduler;
+
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
+
+/**
+ * Like a task, but a little bit better.
+ *
+ * @author jeffyu
+ *
+ */
+public interface JobDAO {
+
+ public String getJobId();
+
+ public boolean isTransacted();
+
+ public JobDetails getDetails();
+
+ public boolean isPersisted();
+
+ public long getScheduledDate();
+
+ public void setScheduledDate(long scheduledDate);
+
+ public boolean isScheduled();
+
+ public void setScheduled(boolean scheduled);
+
+}
Added: branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/SchedulerDAOConnection.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/SchedulerDAOConnection.java (rev 0)
+++ branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/SchedulerDAOConnection.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.scheduler;
+
+import java.util.List;
+
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
+import org.apache.ode.dao.DAOConnection;
+
+
+/**
+ * Database abstraction; provides all database access for the simple scheduler.
+ *
+ * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
+ *
+ */
+public interface SchedulerDAOConnection extends DAOConnection{
+
+
+ public JobDAO createJob(boolean transacted, JobDetails jobDetails, boolean persisted, long scheduledDate) ;
+
+ public JobDAO createJob(String jobid, boolean transacted, JobDetails jobDetails, boolean persisted, long scheduledDate);
+
+
+ /**
+ * Save the job in the database.
+ * @param job the job
+ * @param nodeId node assigned to the job (or null if no node has been asssigned)
+ * @param loaded whether the job has been loaded into memory (i.e. in preperation for execution)
+ * @throws DatabaseException in case of error
+ */
+ boolean insertJob(JobDAO job, String nodeId, boolean loaded) throws DatabaseException ;
+
+ /**
+ * Update the job in the database (only updates timestamp and retryCount)
+ *
+ * @param job the job
+ * @throws DatabaseException in case of error
+ */
+ boolean updateJob(JobDAO job) throws DatabaseException;
+
+ /**
+ * Delete a job from the database.
+ * @param jobid job identifier
+ * @param nodeId node identifier
+ * @throws DatabaseException in case of error
+ */
+ boolean deleteJob(String jobid, String nodeId) throws DatabaseException;
+
+ /**
+ * Return a list of unique nodes identifiers found in the database. This is used
+ * to initialize the list of known nodes when a new node starts up.
+ * @return list of unique node identfiers found in the databaseuniqu
+ */
+ List<String> getNodeIds() throws DatabaseException;
+
+ /**
+ * "Dequeue" jobs from the database that are ready for immediate execution; this basically
+ * is a select/delete operation with constraints on the nodeId and scheduled time.
+ *
+ * @param nodeId node identifier of the jobs
+ * @param maxtime only jobs with scheduled time earlier than this will be dequeued
+ * @param maxjobs maximum number of jobs to deqeue
+ * @return list of jobs that met the criteria and were deleted from the database
+ * @throws DatabaseException in case of error
+ */
+ List<JobDAO> dequeueImmediate(String nodeId, long maxtime, int maxjobs) throws DatabaseException ;
+
+ /**
+ * Assign a particular node identifier to a fraction of jobs in the database that do not have one,
+ * and are up for execution within a certain time. Only a fraction of the jobs found are assigned
+ * the node identifier. This fraction is determined by the "y" parameter, while membership in the
+ * group (of jobs that get the nodeId) is determined by the "x" parameter. Essentially the logic is:
+ * <code>
+ * UPDATE jobs AS job
+ * WHERE job.scheduledTime before :maxtime
+ * AND job.nodeId is null
+ * AND job.scheduledTime MOD :y == :x
+ * SET job.nodeId = :nodeId
+ * </code>
+ *
+ * @param nodeId node identifier to assign to jobs
+ * @param x the result of the mod-division
+ * @param y the dividend of the mod-division
+ * @param maxtime only jobs with scheduled time earlier than this will be updated
+ * @return number of jobs updated
+ * @throws DatabaseException in case of error
+ */
+ int updateAssignToNode(String nodeId, int x, int y, long maxtime) throws DatabaseException;
+
+ /**
+ * Reassign jobs from one node to another.
+ *
+ * @param oldnode node assigning from
+ * @param newnode new node asssigning to
+ * @return number of rows changed
+ * @throws DatabaseException
+ */
+ int updateReassign(String oldnode, String newnode) throws DatabaseException;
+}
Added: branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/SchedulerDAOConnectionFactory.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/SchedulerDAOConnectionFactory.java (rev 0)
+++ branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/SchedulerDAOConnectionFactory.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -0,0 +1,9 @@
+package org.apache.ode.dao.scheduler;
+
+import org.apache.ode.dao.DAOConnectionFactory;
+
+public interface SchedulerDAOConnectionFactory extends DAOConnectionFactory<SchedulerDAOConnection>{
+
+ public SchedulerDAOConnection getConnection();
+
+}
Added: branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/Task.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/Task.java (rev 0)
+++ branches/ODE/RiftSaw-ODE-trunk/bpel-dao/src/main/java/org/apache/ode/dao/scheduler/Task.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.scheduler;
+
+/**
+ * The thing that we schedule.
+ *
+ * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
+ *
+ */
+public class Task {
+ /** Scheduled date/time. */
+ private long schedDate;
+
+ public Task(long schedDate) {
+ this.schedDate = schedDate;
+ }
+
+ public long getScheduledDate() {
+ return schedDate;
+ }
+}
Modified: branches/ODE/RiftSaw-ODE-trunk/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -125,7 +125,8 @@
public static String DEFAULT_DB_EMB_TYPE = "h2";
public static String DEFAULT_DAOCF_CLASS = "org.apache.ode.dao.jpa.hibernate.BpelDAOConnectionFactoryImpl";
public static String DEFAULT_DAOCF_STORE_CLASS = "org.apache.ode.dao.jpa.hibernate.ConfStoreDAOConnectionFactoryImpl";
-
+ public static String DEFAULT_DAOCF_SCHEDULER_CLASS = "org.apache.ode.scheduler.simple.jdbc.SchedulerDAOConnectionFactoryImpl";
+
static {
String odep = System.getProperty("ode.persistence");
if (odep != null &&
@@ -234,8 +235,11 @@
public String getDAOConfStoreConnectionFactory() {
return getProperty(PROP_DAOCF_STORE, DEFAULT_DAOCF_STORE_CLASS);
}
+
+ public String getDAOConfScheduleConnectionFactory() {
+ return getProperty(PROP_DAOCF_SCHEDULER, DEFAULT_DAOCF_SCHEDULER_CLASS);
+ }
-
public String getDbDataSource() {
return getProperty(OdeConfigProperties.PROP_DB_EXTERNAL_DS, "java:comp/env/jdbc/ode-ds");
}
Modified: branches/ODE/RiftSaw-ODE-trunk/bpel-epr/src/main/java/org/apache/ode/il/dbutil/Database.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/bpel-epr/src/main/java/org/apache/ode/il/dbutil/Database.java 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/bpel-epr/src/main/java/org/apache/ode/il/dbutil/Database.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -27,6 +27,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.dao.bpel.BpelDAOConnectionFactory;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnectionFactory;
import org.apache.ode.dao.store.ConfStoreDAOConnectionFactory;
import org.apache.ode.il.config.OdeConfigProperties;
import org.apache.ode.utils.LoggingInterceptor;
@@ -233,5 +234,23 @@
cf.init(_odeConfig.getProperties(),_txm,getDataSource());
return cf;
}
+
+ public SchedulerDAOConnectionFactory createDaoSchedulerCF() throws DatabaseConfigException {
+ String pClassName = _odeConfig.getDAOConfScheduleConnectionFactory();
+ __log.debug(__msgs.msgOdeUsingDAOImpl(pClassName));
+
+ SchedulerDAOConnectionFactory sdcf;
+ try {
+ sdcf = (SchedulerDAOConnectionFactory) Class.forName(pClassName).newInstance();
+ } catch (Exception ex) {
+ String errmsg = __msgs.msgDAOInstantiationFailed(pClassName);
+ __log.error(errmsg, ex);
+ throw new DatabaseConfigException(errmsg, ex);
+ }
+
+ sdcf.init(_odeConfig.getProperties(), _txm, getDataSource());
+ return sdcf;
+ }
+
}
Modified: branches/ODE/RiftSaw-ODE-trunk/bpel-epr/src/main/java/org/apache/ode/il/dbutil/H2Database.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/bpel-epr/src/main/java/org/apache/ode/il/dbutil/H2Database.java 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/bpel-epr/src/main/java/org/apache/ode/il/dbutil/H2Database.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -25,6 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.il.config.OdeConfigProperties;
+import org.apache.ode.utils.GUID;
import org.h2.jdbcx.JdbcDataSource;
public class H2Database implements EmbeddedDatabase {
@@ -35,28 +36,41 @@
private DataSource _dataSource = null;
public void init(File workRoot, OdeConfigProperties props, TransactionManager txm) {
- String db = props.getDbEmbeddedName();
+ String db = props.getDbEmbeddedName() ;
+ String rollbackedDS = props.getProperties().getProperty("needed.Rollback");
+ if ("true".equals(rollbackedDS) || workRoot != null) {
+ db = db + new GUID().toString();
+ }
+
if (workRoot == null) {
_dbUrl = "jdbc:h2:mem:" + db + ";DB_CLOSE_DELAY=-1";
- JdbcDataSource hds = new JdbcDataSource();
- hds.setURL(_dbUrl);
- hds.setUser("sa");
- _dataSource = hds;
+
} else {
_dbUrl = "jdbc:h2:" + workRoot + File.separator + db;
if (!props.isDbEmbeddedCreate()) {
_dbUrl += ";IFEXISTS=TRUE";
}
- String clazz = org.h2.Driver.class.getName();
- _connectionManager = new DatabaseConnectionManager(txm, props);
- try {
- _connectionManager.init(_dbUrl, clazz, "sa", null);
- } catch (DatabaseConfigException ex) {
- __log.error("Unable to initialize connection pool", ex);
- }
- _dataSource = _connectionManager.getDataSource();
}
- __log.debug("Using Embedded Database: " + _dbUrl);
+
+ __log.info("The db url is: " + _dbUrl);
+ __log.info("The rollbackedDS: " + rollbackedDS + ":workRoot ->" + workRoot );
+ if (workRoot != null || "true".equals(rollbackedDS)) {
+ String clazz = org.h2.Driver.class.getName();
+ _connectionManager = new DatabaseConnectionManager(txm, props);
+ try {
+ _connectionManager.init(_dbUrl, clazz, "sa", null);
+ } catch (DatabaseConfigException ex) {
+ __log.error("Unable to initialize connection pool", ex);
+ }
+ _dataSource = _connectionManager.getDataSource();
+ } else {
+ JdbcDataSource hds = new JdbcDataSource();
+ hds.setURL(_dbUrl);
+ hds.setUser("sa");
+ _dataSource = hds;
+ }
+
+ __log.info("Using Embedded Database: " + _dbUrl);
}
public void shutdown() {
@@ -73,6 +87,7 @@
} catch (Throwable ex) {
__log.debug("Error shutting down H2.", ex);
}
+
}
public DataSource getDataSource() {
Modified: branches/ODE/RiftSaw-ODE-trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImplTest.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImplTest.java 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImplTest.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -87,8 +87,7 @@
_txm = new GeronimoTransactionManager();
mexDao = new Mock(MessageExchangeDAO.class);
- SimpleScheduler scheduler = new SimpleScheduler("node", null, new Properties());
- scheduler.setTransactionManager(_txm);
+ SimpleScheduler scheduler = new SimpleScheduler("node", null, _txm, new Properties());
contexts = new Contexts();
contexts.scheduler = scheduler;
Modified: branches/ODE/RiftSaw-ODE-trunk/bpel-test/pom.xml
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/bpel-test/pom.xml 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/bpel-test/pom.xml 2010-09-01 06:42:40 UTC (rev 929)
@@ -174,14 +174,16 @@
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
- <!-- FIXME: temporary skip it, 3 failed test cases (2 in JDK6, 3 in JDK5): testIMA, testRetireOld, testIsolated**-->
<excludes>
<exclude>**/MessageRouting20Test.java</exclude>
- <exclude>**/VersionedRedeployTest.java</exclude>
- <exclude>**/StructuredActivities20Test.java</exclude>
<exclude>**/ProcessManagementDaoTest.java</exclude>
+ <exclude>**/VersionedRedeployTest.java</exclude>
<exclude>**/SelectObjectTest.java</exclude>
</excludes>
+ <systemPropertyVariables>
+ <org.apache.ode.autoRetireProcess>true</org.apache.ode.autoRetireProcess>
+ </systemPropertyVariables>
+ <argLine>-Xms512M -Xmx1024M -XX:MaxPermSize=512M</argLine>
</configuration>
<executions>
<execution>
@@ -192,19 +194,12 @@
</goals>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports/hibernate-jpa</reportsDirectory>
- <systemProperties>
- <property>
- <name>dao.factory</name>
- <value>org.apache.ode.dao.jpa.hibernate.BpelDAOConnectionFactoryImpl</value>
- </property>
- <property>
- <name>dao.factory.store</name>
- <value>org.apache.ode.dao.jpa.hibernate.ConfStoreDAOConnectionFactoryImpl</value>
- </property>
- </systemProperties>
+ <systemPropertyVariables>
+ <dao.factory>org.apache.ode.dao.jpa.hibernate.BpelDAOConnectionFactoryImpl</dao.factory>
+ <dao.factory.store>org.apache.ode.dao.jpa.hibernate.ConfStoreDAOConnectionFactoryImpl</dao.factory.store>
+ </systemPropertyVariables>
<includes>
<include>**/dao/bpel/*Test.java</include>
- <include>**/dao/bpel/*Test.java</include>
</includes>
</configuration>
</execution>
@@ -216,19 +211,12 @@
</goals>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports/hibernate</reportsDirectory>
- <systemProperties>
- <property>
- <name>dao.factory</name>
- <value>org.apache.ode.dao.hib.bpel.BpelDAOConnectionFactoryImpl</value>
- </property>
- <property>
- <name>dao.factory.store</name>
- <value>org.apache.ode.dao.hib.store.ConfStoreDAOConnectionFactoryImpl</value>
- </property>
- </systemProperties>
+ <systemPropertyVariables>
+ <dao.factory>org.apache.ode.dao.hib.bpel.BpelDAOConnectionFactoryImpl</dao.factory>
+ <dao.factory.store>org.apache.ode.dao.hib.store.ConfStoreDAOConnectionFactoryImpl</dao.factory.store>
+ </systemPropertyVariables>
<includes>
<include>**/dao/bpel/*Test.java</include>
- <include>**/dao/bpel/*Test.java</include>
</includes>
</configuration>
</execution>
Added: branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/scheduler/JobDAOImpl.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/scheduler/JobDAOImpl.java (rev 0)
+++ branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/scheduler/JobDAOImpl.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.scheduler;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Map;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Lob;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.persistence.Transient;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.CorrelationKeySet;
+import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
+import org.apache.ode.dao.scheduler.JobDAO;
+
+/**
+ * @author jeffyu
+ *
+ */
+@Entity
+@Table(name = "ODE_JOB")
+@NamedQueries ({
+ @NamedQuery (name = "deleteJobs", query = "DELETE FROM JobDAOImpl AS j WHERE j.jobId = :job AND j.nodeId = :node"),
+ @NamedQuery(name = "nodeIds", query = "SELECT DISTINCT j.nodeId FROM JobDAOImpl AS j WHERE j.nodeId IS NOT NULL"),
+ @NamedQuery(name = "dequeueImmediate", query = "SELECT j FROM JobDAOImpl AS j WHERE j.nodeId = :node AND j.scheduled = false AND j.timestamp < :time ORDER BY j.timestamp"),
+ @NamedQuery(name = "updateScheduled", query = "UPDATE JobDAOImpl AS j SET j.scheduled = true WHERE j.jobId in (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)"),
+ @NamedQuery(name = "updateAssignToNode", query = "UPDATE JobDAOImpl AS j SET j.nodeId = :node WHERE j.nodeId IS NULL AND j.scheduled = false AND mod(j.timestamp,:numNode) = :i AND j.timestamp < :maxTime"),
+ @NamedQuery(name = "updateReassign", query = "UPDATE JobDAOImpl AS j SET j.nodeId = :newNode, j.scheduled = false WHERE j.nodeId = :oldNode")
+})
+public class JobDAOImpl implements JobDAO, Serializable {
+
+ private static final Log __log = LogFactory.getLog(JobDAOImpl.class);
+ private String _jobId;
+ private long _ts;
+ private String _nodeId;
+ private boolean _scheduled;
+ private boolean _transacted;
+ private boolean _persisted = true;
+ private JobDetails _details = new Scheduler.JobDetails();
+
+
+ @Transient
+ public JobDetails getDetails() {
+ return _details;
+ }
+
+ @Id
+ @Column(name="jobid", length = 64)
+ public String getJobId() {
+ return _jobId;
+ }
+
+ public void setJobId(String jobId) {
+ _jobId = jobId;
+ }
+
+ @Transient
+ public long getScheduledDate() {
+ return getTimestamp();
+ }
+
+ public void setScheduledDate(long scheduledDate) {
+ this.setTimestamp(scheduledDate);
+ }
+
+ @Transient
+ public boolean isPersisted() {
+ return _persisted;
+ }
+
+ @Column(name = "transacted", nullable=false)
+ public boolean isTransacted() {
+ return _transacted;
+ }
+
+ @Column(name = "ts", nullable = false)
+ public long getTimestamp() {
+ return _ts;
+ }
+
+ public void setTimestamp(long ts) {
+ _ts = ts;
+ }
+
+ @Column(name = "nodeid", length = 64)
+ public String getNodeId() {
+ return _nodeId;
+ }
+
+ public void setNodeId(String nodeId) {
+ _nodeId = nodeId;
+ }
+
+ @Column(name = "scheduled", nullable = false)
+ public boolean isScheduled() {
+ return _scheduled;
+ }
+
+ public void setScheduled(boolean scheduled) {
+ this._scheduled = scheduled;
+ }
+
+ public void setTransacted(boolean transacted) {
+ this._transacted = transacted;
+ }
+
+ public void setPersisted(boolean persisted) {
+ this._persisted = persisted;
+ }
+
+ public void setDetails(JobDetails details) {
+ _details = details;
+ }
+
+ //JPA JobDetails getters/setters
+ @Column(name = "instanceId")
+ public long getInstanceId() {
+ return _details.instanceId == null ? 0L : _details.instanceId.longValue();
+ }
+
+ public void setInstanceId(long instanceId) {
+ _details.instanceId = instanceId;
+ }
+
+ @Column(name = "mexId")
+ public String getMexId() {
+ return _details.mexId;
+ }
+
+ public void setMexId(String mexId) {
+ _details.mexId = mexId;
+ }
+
+ @Column(name = "processId")
+ public String getProcessId() {
+ return _details.processId;
+ }
+
+ public void setProcessId(String processId) {
+ _details.processId = processId;
+ }
+
+ @Column(name = "type")
+ public String getType() {
+ return _details.type;
+ }
+
+ public void setType(String type) {
+ _details.type = type!=null?type:"";
+ }
+
+ @Column(name = "channel")
+ public String getChannel() {
+ return _details.channel;
+ }
+
+ public void setChannel(String channel) {
+ _details.channel = channel;
+ }
+
+ @Column(name = "correlatorId")
+ public String getCorrelatorId() {
+ return _details.correlatorId;
+ }
+
+ public void setCorrelatorId(String correlatorId) {
+ _details.correlatorId = correlatorId;
+ }
+
+ @Column(name = "correlationKeySet")
+ public String getCorrelationKeySet() {
+ return _details.getCorrelationKeySet().toCanonicalString();
+ }
+
+ public void setCorrelationKeySet(String correlationKey) {
+ _details.setCorrelationKeySet(new CorrelationKeySet(correlationKey));
+ }
+
+ @Column(name = "retryCount")
+ public int getRetryCount() {
+ return _details.retryCount == null ? 0 : _details.retryCount.intValue();
+ }
+
+ public void setRetryCount(int retryCount) {
+ _details.retryCount = retryCount;
+ }
+
+ @Column(name = "inMem")
+ public boolean isInMem() {
+ return _details.inMem == null ? false : _details.inMem.booleanValue();
+ }
+
+ public void setInMem(boolean inMem) {
+ _details.inMem = inMem;
+ }
+
+ //should not lazy load, it is possible getDetails() called before this
+ @Lob
+ @Column(name = "detailsExt")
+ public byte[] getDetailsExt() {
+ if (_details.detailsExt != null) {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream os = new ObjectOutputStream(bos);
+ os.writeObject(_details.detailsExt);
+ os.close();
+ return bos.toByteArray();
+ } catch (Exception e) {
+ __log.error("Error in getDetailsExt ", e);
+ }
+ }
+ return null;
+ }
+
+ public void setDetailsExt(byte[] detailsExt) {
+ if (detailsExt != null) {
+ try {
+ ByteArrayInputStream bis = new ByteArrayInputStream(detailsExt);
+ ObjectInputStream is = new ObjectInputStream(bis);
+ _details.detailsExt = (Map<String, Object>) is.readObject();
+ is.close();
+ } catch (Exception e) {
+ __log.error("Error in setDetailsExt ", e);
+ }
+ }
+ }
+
+}
Added: branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/scheduler/SchedulerDAOConnectionImpl.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/scheduler/SchedulerDAOConnectionImpl.java (rev 0)
+++ branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/scheduler/SchedulerDAOConnectionImpl.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -0,0 +1,192 @@
+/**
+ *
+ */
+package org.apache.ode.dao.jpa.scheduler;
+
+import java.util.List;
+import java.util.Map;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import javax.transaction.TransactionManager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
+import org.apache.ode.dao.jpa.JpaConnection;
+import org.apache.ode.dao.jpa.JpaOperator;
+import org.apache.ode.dao.scheduler.DatabaseException;
+import org.apache.ode.dao.scheduler.JobDAO;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnection;
+import org.apache.ode.utils.GUID;
+
+/**
+ * @author jeffyu
+ *
+ */
+public class SchedulerDAOConnectionImpl extends JpaConnection implements SchedulerDAOConnection {
+
+ private static final Log __log = LogFactory.getLog(SchedulerDAOConnectionImpl.class);
+
+ static final ThreadLocal<SchedulerDAOConnectionImpl> _connections = new ThreadLocal<SchedulerDAOConnectionImpl>();
+
+ private static final int UPDATE_SCHEDULED_SLOTS = 10;
+
+ public SchedulerDAOConnectionImpl(EntityManager mgr, TransactionManager txMgr, JpaOperator operator) {
+ super(mgr, txMgr, operator);
+ }
+
+ public static ThreadLocal<SchedulerDAOConnectionImpl> getThreadLocal() {
+ return _connections;
+ }
+
+
+ public boolean deleteJob(String jobid, String nodeId) throws DatabaseException {
+ _txCtx.begin();
+ Query q = _em.createNamedQuery("deleteJobs");
+ q.setParameter("job", jobid);
+ q.setParameter("node", nodeId);
+ boolean ret = q.executeUpdate() == 1 ? true : false;
+ _txCtx.commit();
+ return ret;
+ }
+
+
+ public List<JobDAO> dequeueImmediate(String nodeId, long maxtime, int maxjobs) throws DatabaseException {
+ _txCtx.begin();
+ Query q = _em.createNamedQuery("dequeueImmediate");
+ q.setParameter("node", nodeId);
+ q.setParameter("time", maxtime);
+ q.setMaxResults(maxjobs);
+ List<JobDAO> ret = (List<JobDAO>) q.getResultList();
+
+ //For compatibility reasons, we check whether there are entries inside
+ //jobDetailsExt blob, which correspond to extracted entries. If so, we
+ //use them.
+ for (JobDAO dao : ret) {
+
+ JobDAOImpl daoImpl = (JobDAOImpl) dao;
+ Map<String, Object> detailsExt = daoImpl.getDetails().getDetailsExt();
+ if (detailsExt.get("type") != null) {
+ daoImpl.setType((String) detailsExt.get("type"));
+ }
+ if (detailsExt.get("iid") != null) {
+ daoImpl.setInstanceId((Long) detailsExt.get("iid"));
+ }
+ if (detailsExt.get("pid") != null) {
+ daoImpl.setProcessId((String) detailsExt.get("pid"));
+ }
+ if (detailsExt.get("inmem") != null) {
+ daoImpl.setInMem((Boolean) detailsExt.get("inmem"));
+ }
+ if (detailsExt.get("ckey") != null) {
+ daoImpl.setCorrelationKeySet((String) detailsExt.get("ckey"));
+ }
+ if (detailsExt.get("channel") != null) {
+ daoImpl.setChannel((String) detailsExt.get("channel"));
+ }
+ if (detailsExt.get("mexid") != null) {
+ daoImpl.setMexId((String) detailsExt.get("mexid"));
+ }
+ if (detailsExt.get("correlatorId") != null) {
+ daoImpl.setCorrelatorId((String) detailsExt.get("correlatorId"));
+ }
+ if (detailsExt.get("retryCount") != null) {
+ daoImpl.setRetryCount(Integer.parseInt((String) detailsExt.get("retryCount")));
+ }
+ }
+
+ // mark jobs as scheduled, UPDATE_SCHEDULED_SLOTS at a time
+ int j = 0;
+ int updateCount = 0;
+ q = _em.createNamedQuery("updateScheduled");
+
+ for (int updates = 1; updates <= (ret.size() / UPDATE_SCHEDULED_SLOTS) + 1; updates++) {
+ for (int i = 1; i <= UPDATE_SCHEDULED_SLOTS; i++) {
+ q.setParameter(i, j < ret.size() ? ret.get(j).getJobId() : "");
+ j++;
+ }
+ updateCount += q.executeUpdate();
+ }
+ if (updateCount != ret.size()) {
+ __log.error("Updating scheduled jobs failed to update all jobs; expected=" + ret.size()
+ + " actual=" + updateCount);
+ return null;
+
+ }
+ _txCtx.commit();
+ return ret;
+
+ }
+
+
+ public List<String> getNodeIds() throws DatabaseException {
+ _txCtx.begin();
+ Query q = _em.createNamedQuery("nodeIds");
+ List<String> ret = (List<String>) q.getResultList();
+ _txCtx.commit();
+ return ret;
+ }
+
+
+ public boolean insertJob(JobDAO job, String nodeId, boolean loaded) throws DatabaseException {
+ _txCtx.begin();
+ JobDAOImpl theJob = (JobDAOImpl)job;
+ theJob.setNodeId(nodeId);
+ theJob.setScheduled(loaded);
+ _em.persist(theJob);
+ _txCtx.commit();
+ return true;
+ }
+
+
+ public int updateAssignToNode(String nodeId, int i, int numNodes, long maxtime) throws DatabaseException {
+ _txCtx.begin();
+ Query q = _em.createNamedQuery("updateAssignToNode");
+ q.setParameter("node", nodeId);
+ q.setParameter("numNode", numNodes);
+ q.setParameter("i", i);
+ q.setParameter("maxTime", maxtime);
+
+ int ret = q.executeUpdate();
+
+ _txCtx.commit();
+ return ret;
+ }
+
+
+ public boolean updateJob(JobDAO job) throws DatabaseException {
+ _txCtx.begin();
+ _em.persist(job);
+ _txCtx.commit();
+ return true;
+ }
+
+
+ public int updateReassign(String oldnode, String newnode) throws DatabaseException {
+ _txCtx.begin();
+ Query q = _em.createNamedQuery("updateReassign");
+ q.setParameter("newNode", newnode);
+ q.setParameter("oldNode", oldnode);
+ int ret = q.executeUpdate();
+ _txCtx.commit();
+ return ret;
+ }
+
+ public JobDAO createJob(String jobId, boolean transacted, JobDetails jobDetails,
+ boolean persisted, long scheduledDate) {
+ JobDAOImpl theJob = new JobDAOImpl();
+ theJob.setJobId(jobId);
+ theJob.setTransacted(transacted);
+ theJob.setDetails(jobDetails);
+ theJob.setPersisted(persisted);
+ theJob.setTimestamp(scheduledDate);
+ return theJob;
+ }
+
+ public JobDAO createJob(boolean transacted, JobDetails jobDetails,
+ boolean persisted, long scheduledDate) {
+ return createJob(new GUID().toString(), transacted, jobDetails, persisted, scheduledDate);
+ }
+
+}
Modified: branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/resources/META-INF/persistence.xml
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/resources/META-INF/persistence.xml 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/resources/META-INF/persistence.xml 2010-09-01 06:42:40 UTC (rev 929)
@@ -45,5 +45,9 @@
<class>org.apache.ode.dao.jpa.store.DeploymentUnitDaoImpl</class>
<class>org.apache.ode.dao.jpa.store.VersionTrackerDAOImpl</class>
</persistence-unit>
+ <persistence-unit name="ode-scheduler" transaction-type="JTA">
+ <class>org.apache.ode.dao.jpa.scheduler.JobDAOImpl</class>
+ </persistence-unit>
+
</persistence>
\ No newline at end of file
Modified: branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/riftsaw/riftsaw-persistence.xml
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/riftsaw/riftsaw-persistence.xml 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/dao-jpa/src/main/riftsaw/riftsaw-persistence.xml 2010-09-01 06:42:40 UTC (rev 929)
@@ -47,5 +47,10 @@
<class>org.apache.ode.dao.jpa.store.DeploymentUnitDaoImpl</class>
<class>org.apache.ode.dao.jpa.store.VersionTrackerDAOImpl</class>
</persistence-unit>
+ <persistence-unit name="ode-scheduler" transaction-type="JTA">
+ <mapping-file>META-INF/riftsaw-orm.xml</mapping-file>
+ <class>org.apache.ode.dao.jpa.scheduler.JobDAOImpl</class>
+ </persistence-unit>
+
</persistence>
\ No newline at end of file
Modified: branches/ODE/RiftSaw-ODE-trunk/dao-jpa-hibernate/src/main/descriptors/persistence.db.xml
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/dao-jpa-hibernate/src/main/descriptors/persistence.db.xml 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/dao-jpa-hibernate/src/main/descriptors/persistence.db.xml 2010-09-01 06:42:40 UTC (rev 929)
@@ -44,6 +44,8 @@
<class>org.apache.ode.dao.jpa.store.ProcessConfPropertyDaoImpl</class>
<class>org.apache.ode.dao.jpa.store.DeploymentUnitDaoImpl</class>
<class>org.apache.ode.dao.jpa.store.VersionTrackerDAOImpl</class>
+
+ <class>org.apache.ode.dao.jpa.scheduler.JobDAOImpl</class>
<properties>
<property name="hibernate.show_sql" value="false" />
Modified: branches/ODE/RiftSaw-ODE-trunk/dao-jpa-hibernate/src/main/java/org/apache/ode/dao/jpa/hibernate/ConfStoreDAOConnectionFactoryImpl.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/dao-jpa-hibernate/src/main/java/org/apache/ode/dao/jpa/hibernate/ConfStoreDAOConnectionFactoryImpl.java 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/dao-jpa-hibernate/src/main/java/org/apache/ode/dao/jpa/hibernate/ConfStoreDAOConnectionFactoryImpl.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -49,7 +49,7 @@
public void init(Properties odeConfig,TransactionManager txm, Object env) {
this._txm = txm;
this._ds = (DataSource) env;
- Map emfProperties = HibernateUtil.buildConfig(OdeConfigProperties.PROP_DAOCF + ".", odeConfig, _txm, _ds);
+ Map emfProperties = HibernateUtil.buildConfig(OdeConfigProperties.PROP_DAOCF_STORE + ".", odeConfig, _txm, _ds);
_emf = Persistence.createEntityManagerFactory("ode-store", emfProperties);
}
Added: branches/ODE/RiftSaw-ODE-trunk/dao-jpa-hibernate/src/main/java/org/apache/ode/dao/jpa/hibernate/SchedulerDAOConnectionFactoryImpl.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/dao-jpa-hibernate/src/main/java/org/apache/ode/dao/jpa/hibernate/SchedulerDAOConnectionFactoryImpl.java (rev 0)
+++ branches/ODE/RiftSaw-ODE-trunk/dao-jpa-hibernate/src/main/java/org/apache/ode/dao/jpa/hibernate/SchedulerDAOConnectionFactoryImpl.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.jpa.hibernate;
+
+import java.util.Map;
+import java.util.Properties;
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.Persistence;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.dao.jpa.JpaOperator;
+import org.apache.ode.dao.jpa.scheduler.SchedulerDAOConnectionImpl;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnection;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnectionFactory;
+import org.apache.ode.il.config.OdeConfigProperties;
+
+/**
+
+ */
+public class SchedulerDAOConnectionFactoryImpl implements SchedulerDAOConnectionFactory {
+
+ static final Log __log = LogFactory.getLog(SchedulerDAOConnectionFactoryImpl.class);
+ EntityManagerFactory _emf;
+ TransactionManager _txm;
+ DataSource _ds;
+ JpaOperator _operator = new JpaOperatorImpl();
+
+ public void init(Properties odeConfig, TransactionManager txm, Object env) {
+ this._txm = txm;
+ this._ds = (DataSource) env;
+ Map emfProperties = HibernateUtil.buildConfig(OdeConfigProperties.PROP_DAOCF_SCHEDULER + ".", odeConfig, _txm, _ds);
+ _emf = Persistence.createEntityManagerFactory("ode-scheduler", emfProperties);
+
+ }
+
+ public SchedulerDAOConnection getConnection() {
+ final ThreadLocal<SchedulerDAOConnectionImpl> currentConnection = SchedulerDAOConnectionImpl.getThreadLocal();
+
+ SchedulerDAOConnectionImpl conn = (SchedulerDAOConnectionImpl) currentConnection.get();
+ if (conn != null && HibernateUtil.isOpen(conn)) {
+ return conn;
+ } else {
+ EntityManager em = _emf.createEntityManager();
+ conn = new SchedulerDAOConnectionImpl(em, _txm, _operator);
+ currentConnection.set(conn);
+ return conn;
+ }
+ }
+
+ public void shutdown() {
+ _emf.close();
+ }
+}
Modified: branches/ODE/RiftSaw-ODE-trunk/dao-jpa-hibernate/src/main/resources/META-INF/persistence.xml
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/dao-jpa-hibernate/src/main/resources/META-INF/persistence.xml 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/dao-jpa-hibernate/src/main/resources/META-INF/persistence.xml 2010-09-01 06:42:40 UTC (rev 929)
@@ -44,6 +44,8 @@
<class>org.apache.ode.dao.jpa.store.ProcessConfPropertyDaoImpl</class>
<class>org.apache.ode.dao.jpa.store.DeploymentUnitDaoImpl</class>
<class>org.apache.ode.dao.jpa.store.VersionTrackerDAOImpl</class>
+
+ <class>org.apache.ode.dao.jpa.scheduler.JobDAOImpl</class>
<properties>
<property name="hibernate.show_sql" value="false" />
Modified: branches/ODE/RiftSaw-ODE-trunk/dao-jpa-ojpa/src/main/descriptors/persistence.derby.xml
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/dao-jpa-ojpa/src/main/descriptors/persistence.derby.xml 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/dao-jpa-ojpa/src/main/descriptors/persistence.derby.xml 2010-09-01 06:42:40 UTC (rev 929)
@@ -40,6 +40,8 @@
<class>org.apache.ode.store.jpa.ProcessConfPropertyDaoImpl</class>
<class>org.apache.ode.store.jpa.DeploymentUnitDaoImpl</class>
<class>org.apache.ode.store.jpa.VersionTrackerDAOImpl</class>
+
+ <class>org.apache.ode.dao.jpa.scheduler.JobDAOImpl</class>
<properties>
<!-- Properties for an embedded Derby connection -->
Modified: branches/ODE/RiftSaw-ODE-trunk/dao-jpa-ojpa/src/main/descriptors/persistence.mysql.xml
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/dao-jpa-ojpa/src/main/descriptors/persistence.mysql.xml 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/dao-jpa-ojpa/src/main/descriptors/persistence.mysql.xml 2010-09-01 06:42:40 UTC (rev 929)
@@ -40,6 +40,8 @@
<class>org.apache.ode.store.jpa.ProcessConfPropertyDaoImpl</class>
<class>org.apache.ode.store.jpa.DeploymentUnitDaoImpl</class>
<class>org.apache.ode.store.jpa.VersionTrackerDAOImpl</class>
+
+ <class>org.apache.ode.dao.jpa.scheduler.JobDAOImpl</class>
<properties>
<!-- Properties for an embedded Derby connection -->
Modified: branches/ODE/RiftSaw-ODE-trunk/dao-jpa-ojpa/src/main/descriptors/persistence.oracle.xml
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/dao-jpa-ojpa/src/main/descriptors/persistence.oracle.xml 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/dao-jpa-ojpa/src/main/descriptors/persistence.oracle.xml 2010-09-01 06:42:40 UTC (rev 929)
@@ -40,6 +40,8 @@
<class>org.apache.ode.store.jpa.ProcessConfPropertyDaoImpl</class>
<class>org.apache.ode.store.jpa.DeploymentUnitDaoImpl</class>
<class>org.apache.ode.store.jpa.VersionTrackerDAOImpl</class>
+
+ <class>org.apache.ode.dao.jpa.scheduler.JobDAOImpl</class>
<properties>
<!-- Properties for an embedded Derby connection -->
Modified: branches/ODE/RiftSaw-ODE-trunk/dao-jpa-ojpa/src/main/descriptors/persistence.postgres.xml
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/dao-jpa-ojpa/src/main/descriptors/persistence.postgres.xml 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/dao-jpa-ojpa/src/main/descriptors/persistence.postgres.xml 2010-09-01 06:42:40 UTC (rev 929)
@@ -40,6 +40,8 @@
<class>org.apache.ode.store.jpa.ProcessConfPropertyDaoImpl</class>
<class>org.apache.ode.store.jpa.DeploymentUnitDaoImpl</class>
<class>org.apache.ode.store.jpa.VersionTrackerDAOImpl</class>
+
+ <class>org.apache.ode.dao.jpa.scheduler.JobDAOImpl</class>
<properties>
<!-- Properties for an embedded Derby connection -->
Added: branches/ODE/RiftSaw-ODE-trunk/dao-jpa-ojpa/src/main/java/org/apache/ode/dao/jpa/openjpa/SchedulerDAOConnectionFactoryImpl.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/dao-jpa-ojpa/src/main/java/org/apache/ode/dao/jpa/openjpa/SchedulerDAOConnectionFactoryImpl.java (rev 0)
+++ branches/ODE/RiftSaw-ODE-trunk/dao-jpa-ojpa/src/main/java/org/apache/ode/dao/jpa/openjpa/SchedulerDAOConnectionFactoryImpl.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.jpa.openjpa;
+
+import java.util.Map;
+import java.util.Properties;
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.Persistence;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import org.apache.ode.dao.jpa.scheduler.SchedulerDAOConnectionImpl;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnection;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnectionFactory;
+import org.apache.ode.il.config.OdeConfigProperties;
+import static org.apache.ode.dao.jpa.openjpa.BpelDAOConnectionFactoryImpl.buildConfig;
+import static org.apache.ode.dao.jpa.openjpa.BpelDAOConnectionFactoryImpl._operator;
+
+public class SchedulerDAOConnectionFactoryImpl implements SchedulerDAOConnectionFactory {
+
+ EntityManagerFactory _emf;
+ TransactionManager _txm;
+ DataSource _ds;
+
+ public void init(Properties odeConfig, TransactionManager txm, Object env) {
+ this._txm = txm;
+ this._ds = (DataSource) env;
+ Map emfProperties = buildConfig(OdeConfigProperties.PROP_DAOCF_SCHEDULER + ".", odeConfig, _txm, _ds);
+ _emf = Persistence.createEntityManagerFactory("ode-scheduler", emfProperties);
+
+ }
+
+ public SchedulerDAOConnection getConnection() {
+ final ThreadLocal<SchedulerDAOConnectionImpl> currentConnection = SchedulerDAOConnectionImpl.getThreadLocal();
+ SchedulerDAOConnectionImpl conn = (SchedulerDAOConnectionImpl) currentConnection.get();
+ if (conn != null && !conn.isClosed()) {
+ return conn;
+ } else {
+ EntityManager em = _emf.createEntityManager();
+ conn = new SchedulerDAOConnectionImpl(em, _txm, _operator);
+ currentConnection.set(conn);
+ return conn;
+ }
+ }
+
+ public void shutdown() {
+ _emf.close();
+ }
+}
Modified: branches/ODE/RiftSaw-ODE-trunk/pom.xml
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/pom.xml 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/pom.xml 2010-09-01 06:42:40 UTC (rev 929)
@@ -120,9 +120,8 @@
<module>dao-jpa</module>
<module>jca-ra</module>
<module>jca-server</module>
- <module>scheduler-simple</module>
+ <module>bpel-epr</module>
<module>bpel-compiler</module>
- <module>bpel-epr</module>
<module>bpel-ql</module>
<module>dao-hibernate</module>
<module>tools</module>
@@ -130,6 +129,7 @@
<module>dao-jpa-ojpa</module>
<module>dao-jpa-db</module>
<module>dao-hibernate-db</module>
+ <module>scheduler-simple</module>
<module>bpel-store</module>
<module>bpel-runtime</module>
<module>bpel-connector</module>
@@ -164,7 +164,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
- <version>2.4.3</version>
+ <version>2.5</version>
</plugin>
Modified: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/pom.xml
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/pom.xml 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/pom.xml 2010-09-01 06:42:40 UTC (rev 929)
@@ -35,10 +35,18 @@
<artifactId>riftsaw-bpel-api</artifactId>
</dependency>
<dependency>
+ <groupId>org.jboss.soa.bpel</groupId>
+ <artifactId>riftsaw-bpel-dao</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.jboss.soa.bpel</groupId>
<artifactId>riftsaw-utils</artifactId>
</dependency>
<dependency>
+ <groupId>org.jboss.soa.bpel</groupId>
+ <artifactId>riftsaw-bpel-epr</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
@@ -54,51 +62,58 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <scope>test</scope>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
- <scope>test</scope>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>backport-util-concurrent</groupId>
<artifactId>backport-util-concurrent</artifactId>
- <scope>test</scope>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.geronimo.modules</groupId>
<artifactId>geronimo-kernel</artifactId>
- <scope>test</scope>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.geronimo.components</groupId>
<artifactId>geronimo-transaction</artifactId>
- <scope>test</scope>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.geronimo.components</groupId>
<artifactId>geronimo-connector</artifactId>
- <scope>test</scope>
+ <scope>test</scope>
</dependency>
- <dependency>
- <groupId>tranql</groupId>
- <artifactId>tranql-connector</artifactId>
- </dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-j2ee-connector_1.5_spec</artifactId>
- <scope>test</scope>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-ejb_2.1_spec</artifactId>
- <scope>test</scope>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
+ <!-- for the integration tests - seems to work with the openjpa enhanced enities -->
+ <dependency>
+ <groupId>org.jboss.soa.bpel</groupId>
+ <artifactId>riftsaw-dao-jpa-hibernate</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.soa.bpel</groupId>
+ <artifactId>riftsaw-dao-jpa</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -107,9 +122,28 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
+ <exclude>**/RetriesTest.java</exclude>
<exclude>**/SchedulerThreadTest.java</exclude>
</excludes>
</configuration>
+ <executions>
+ <execution>
+ <id>hibernate-jpa</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <reportsDirectory>${project.build.directory}/surefire-reports/hibernate-jpa</reportsDirectory>
+ <systemProperties>
+ <property>
+ <name>dao.factory.scheduler</name>
+ <value>org.apache.ode.dao.jpa.hibernate.SchedulerDAOConnectionFactoryImpl</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </execution>
+ </executions>
</plugin>
</plugins>
</build>
Deleted: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/DatabaseDelegate.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/DatabaseDelegate.java 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/DatabaseDelegate.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ode.scheduler.simple;
-
-import java.util.List;
-
-/**
- * Database abstraction; provides all database access for the simple scheduler.
- *
- * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
- *
- */
-public interface DatabaseDelegate {
- /**
- * Save the job in the database.
- * @param job the job
- * @param nodeId node assigned to the job (or null if no node has been asssigned)
- * @param loaded whether the job has been loaded into memory (i.e. in preperation for execution)
- * @throws DatabaseException in case of error
- */
- boolean insertJob(Job job, String nodeId, boolean loaded) throws DatabaseException ;
-
- /**
- * Update the job in the database (only updates timestamp and retryCount)
- *
- * @param job the job
- * @throws DatabaseException in case of error
- */
- boolean updateJob(Job job) throws DatabaseException;
-
- /**
- * Delete a job from the database.
- * @param jobid job identifier
- * @param nodeId node identifier
- * @throws DatabaseException in case of error
- */
- boolean deleteJob(String jobid, String nodeId) throws DatabaseException;
-
- /**
- * Return a list of unique nodes identifiers found in the database. This is used
- * to initialize the list of known nodes when a new node starts up.
- * @return list of unique node identfiers found in the databaseuniqu
- */
- List<String> getNodeIds() throws DatabaseException;
-
- /**
- * "Dequeue" jobs from the database that are ready for immediate execution; this basically
- * is a select/delete operation with constraints on the nodeId and scheduled time.
- *
- * @param nodeId node identifier of the jobs
- * @param maxtime only jobs with scheduled time earlier than this will be dequeued
- * @param maxjobs maximum number of jobs to deqeue
- * @return list of jobs that met the criteria and were deleted from the database
- * @throws DatabaseException in case of error
- */
- List<Job> dequeueImmediate(String nodeId, long maxtime, int maxjobs) throws DatabaseException ;
-
- /**
- * Assign a particular node identifier to a fraction of jobs in the database that do not have one,
- * and are up for execution within a certain time. Only a fraction of the jobs found are assigned
- * the node identifier. This fraction is determined by the "y" parameter, while membership in the
- * group (of jobs that get the nodeId) is determined by the "x" parameter. Essentially the logic is:
- * <code>
- * UPDATE jobs AS job
- * WHERE job.scheduledTime before :maxtime
- * AND job.nodeId is null
- * AND job.scheduledTime MOD :y == :x
- * SET job.nodeId = :nodeId
- * </code>
- *
- * @param nodeId node identifier to assign to jobs
- * @param x the result of the mod-division
- * @param y the dividend of the mod-division
- * @param maxtime only jobs with scheduled time earlier than this will be updated
- * @return number of jobs updated
- * @throws DatabaseException in case of error
- */
- int updateAssignToNode(String nodeId, int x, int y, long maxtime) throws DatabaseException;
-
- /**
- * Reassign jobs from one node to another.
- *
- * @param oldnode node assigning from
- * @param newnode new node asssigning to
- * @return number of rows changed
- * @throws DatabaseException
- */
- int updateReassign(String oldnode, String newnode) throws DatabaseException;
-}
Deleted: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/DatabaseException.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/DatabaseException.java 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/DatabaseException.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ode.scheduler.simple;
-
-/**
- * Exception class thrown by {@link DatabaseDelegate} implementations.
- *
- * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
- *
- */
-public class DatabaseException extends Exception {
-
- private static final long serialVersionUID = 1L;
-
- public DatabaseException(String message) {
- super(message);
- }
-
- public DatabaseException(Exception ex) {
- super(ex);
- }
-
- public DatabaseException(String message, Exception ex) {
- super(message, ex);
- }
-}
Deleted: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -1,483 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ode.scheduler.simple;
-
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectInputStream;
-import java.io.Serializable;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.math.BigDecimal;
-
-import javax.sql.DataSource;
-import javax.xml.namespace.QName;
-
-import org.apache.ode.bpel.iapi.Scheduler;
-import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
-import org.apache.ode.utils.DbIsolation;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ode.utils.DbIsolation;
-import org.apache.ode.utils.StreamUtils;
-
-/**
- * JDBC-based implementation of the {@link DatabaseDelegate} interface. Should work with most
- * reasonably behaved databases.
- *
- * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
- */
-public class JdbcDelegate implements DatabaseDelegate {
-
- private static final Log __log = LogFactory.getLog(JdbcDelegate.class);
-
- private static final String DELETE_JOB = "delete from ODE_JOB where jobid = ? and nodeid = ?";
-
- private static final String UPDATE_REASSIGN = "update ODE_JOB set nodeid = ?, scheduled = 0 where nodeid = ?";
-
- private static final String UPDATE_JOB = "update ODE_JOB set ts = ?, retryCount = ? where jobid = ?";
-
- private static final String UPGRADE_JOB_DEFAULT = "update ODE_JOB set nodeid = ? where nodeid is null "
- + "and mod(ts,?) = ? and ts < ?";
-
- private static final String UPGRADE_JOB_DB2 = "update ODE_JOB set nodeid = ? where nodeid is null "
- + "and mod(ts,CAST(? AS BIGINT)) = ? and ts < ?";
-
- private static final String UPGRADE_JOB_SQLSERVER = "update ODE_JOB set nodeid = ? where nodeid is null "
- + "and (ts % ?) = ? and ts < ?";
-
- private static final String UPGRADE_JOB_SYBASE = "update ODE_JOB set nodeid = ? where nodeid is null "
- + "and convert(int, ts) % ? = ? and ts < ?";
-
- private static final String UPGRADE_JOB_SYBASE12 = "update ODE_JOB set nodeid = ? where nodeid is null "
- + "and -1 <> ? and -1 <> ? and ts < ?";
-
- private static final String SAVE_JOB = "insert into ODE_JOB "
- + " (jobid, nodeid, ts, scheduled, transacted, "
- + "instanceId,"
- + "mexId,"
- + "processId,"
- + "type,"
- + "channel,"
- + "correlatorId,"
- + "correlationKeySet,"
- + "retryCount,"
- + "inMem,"
- + "detailsExt"
- + ") values(?, ?, ?, ?, ?,"
- + "?,"
- + "?,"
- + "?,"
- + "?,"
- + "?,"
- + "?,"
- + "?,"
- + "?,"
- + "?,"
- + "?"
- + ")";
-
- private static final String GET_NODEIDS = "select distinct nodeid from ODE_JOB";
-
- private static final String SCHEDULE_IMMEDIATE = "select jobid, ts, transacted, scheduled, "
- + "instanceId,"
- + "mexId,"
- + "processId,"
- + "type,"
- + "channel,"
- + "correlatorId,"
- + "correlationKeySet,"
- + "retryCount,"
- + "inMem,"
- + "detailsExt"
- + " from ODE_JOB "
- + "where nodeid = ? and ts < ? order by ts";
-
-// public Long instanceId;
-// public String mexId;
-// public String processId;
-// public String type;
-// public String channel;
-// public String correlatorId;
-// public String correlationKeySet;
-// public Integer retryCount;
-// public Boolean inMem;
-// public Map<String, Object> detailsExt = new HashMap<String, Object>();
-
- private static final String UPDATE_SCHEDULED = "update ODE_JOB set scheduled = 1 where jobid in (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
-
- private static final int UPDATE_SCHEDULED_SLOTS = 10;
-
- private DataSource _ds;
-
- private Dialect _dialect;
-
- public JdbcDelegate(DataSource ds) {
- _ds = ds;
- _dialect = guessDialect();
- }
-
- public boolean deleteJob(String jobid, String nodeId) throws DatabaseException {
- if (__log.isDebugEnabled())
- __log.debug("deleteJob " + jobid + " on node " + nodeId);
-
- Connection con = null;
- PreparedStatement ps = null;
- try {
- con = getConnection();
- ps = con.prepareStatement(DELETE_JOB);
- ps.setString(1, jobid);
- ps.setString(2, nodeId);
- return ps.executeUpdate() == 1;
- } catch (SQLException se) {
- throw new DatabaseException(se);
- } finally {
- close(ps);
- close(con);
- }
- }
-
- public List<String> getNodeIds() throws DatabaseException {
- Connection con = null;
- PreparedStatement ps = null;
- try {
- con = getConnection();
- ps = con.prepareStatement(GET_NODEIDS, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
- ResultSet rs = ps.executeQuery();
- ArrayList<String> nodes = new ArrayList<String>();
- while (rs.next()) {
- String nodeId = rs.getString(1);
- if (nodeId != null)
- nodes.add(rs.getString(1));
- }
- if (__log.isDebugEnabled())
- __log.debug("getNodeIds: " + nodes);
- return nodes;
- } catch (SQLException se) {
- throw new DatabaseException(se);
- } finally {
- close(ps);
- close(con);
- }
- }
-
- public boolean insertJob(Job job, String nodeId, boolean loaded) throws DatabaseException {
- if (__log.isDebugEnabled())
- __log.debug("insertJob " + job.jobId + " on node " + nodeId + " loaded=" + loaded);
-
- Connection con = null;
- PreparedStatement ps = null;
- try {
- int i = 1;
- con = getConnection();
- ps = con.prepareStatement(SAVE_JOB);
- ps.setString(i++, job.jobId);
- ps.setString(i++, nodeId);
- ps.setLong(i++, job.schedDate);
- ps.setInt(i++, asInteger(loaded));
- ps.setInt(i++, asInteger(job.transacted));
-
- JobDetails details = job.detail;
- ps.setObject(i++, details.instanceId, Types.BIGINT);
- ps.setObject(i++, details.mexId, Types.VARCHAR);
- ps.setObject(i++, details.processId, Types.VARCHAR);
- ps.setObject(i++, details.type, Types.VARCHAR);
- ps.setObject(i++, details.channel, Types.VARCHAR);
- ps.setObject(i++, details.correlatorId, Types.VARCHAR);
- ps.setObject(i++, details.correlationKeySet, Types.VARCHAR);
- ps.setObject(i++, details.retryCount, Types.INTEGER);
- ps.setObject(i++, details.inMem, Types.INTEGER);
-
- if (details.detailsExt == null || details.detailsExt.size() == 0) {
- ps.setBytes(i++, null);
- } else {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- try {
- StreamUtils.write(bos, (Serializable) details.detailsExt);
- } catch (Exception ex) {
- __log.error("Error serializing job detail: " + job.detail);
- throw new DatabaseException(ex);
- }
- ps.setBytes(i++, bos.toByteArray());
- }
-
- return ps.executeUpdate() == 1;
- } catch (SQLException se) {
- throw new DatabaseException(se);
- } finally {
- close(ps);
- close(con);
- }
- }
-
- public boolean updateJob(Job job) throws DatabaseException {
- if (__log.isDebugEnabled())
- __log.debug("updateJob " + job.jobId + " retryCount=" + job.detail.getRetryCount());
-
- Connection con = null;
- PreparedStatement ps = null;
- try {
- con = getConnection();
- ps = con.prepareStatement(UPDATE_JOB);
- ps.setLong(1, job.schedDate);
- ps.setInt(2, job.detail.getRetryCount());
- ps.setString(3, job.jobId);
- return ps.executeUpdate() == 1;
- } catch (SQLException se) {
- throw new DatabaseException(se);
- } finally {
- close(ps);
- close(con);
- }
- }
-
- private Long asLong(Object o) {
- if (o == null) return null;
- else if (o instanceof BigDecimal) return ((BigDecimal) o).longValue();
- else if (o instanceof Long) return (Long) o;
- else if (o instanceof Integer) return ((Integer) o).longValue();
- else throw new IllegalStateException("Can't convert to long " + o.getClass());
- }
-
- private Integer asInteger(Object o) {
- if (o == null) return null;
- else if (o instanceof BigDecimal) return ((BigDecimal) o).intValue();
- else if (o instanceof Integer) return (Integer) o;
- else throw new IllegalStateException("Can't convert to integer " + o.getClass());
- }
-
- @SuppressWarnings("unchecked")
- public List<Job> dequeueImmediate(String nodeId, long maxtime, int maxjobs) throws DatabaseException {
- ArrayList<Job> ret = new ArrayList<Job>(maxjobs);
- Connection con = null;
- PreparedStatement ps = null;
- try {
- con = getConnection();
- ps = con.prepareStatement(SCHEDULE_IMMEDIATE);
- ps.setString(1, nodeId);
- ps.setLong(2, maxtime);
- ps.setMaxRows(maxjobs);
-
- ResultSet rs = ps.executeQuery();
- while (rs.next()) {
- Scheduler.JobDetails details = new Scheduler.JobDetails();
- details.instanceId = asLong(rs.getObject("instanceId"));
- details.mexId = (String) rs.getObject("mexId");
- details.processId = (String) rs.getObject("processId");
- details.type = (String) rs.getObject("type");
- details.channel = (String) rs.getObject("channel");
- details.correlatorId = (String) rs.getObject("correlatorId");
- details.correlationKeySet = (String) rs.getObject("correlationKeySet");
- details.retryCount = asInteger(rs.getObject("retryCount"));
- details.inMem = asBoolean(rs.getInt("inMem"));
- if (rs.getObject("detailsExt") != null) {
- try {
- ObjectInputStream is = new ObjectInputStream(rs.getBinaryStream("detailsExt"));
- details.detailsExt = (Map<String, Object>) is.readObject();
- is.close();
- } catch (Exception e) {
- throw new DatabaseException("Error deserializing job detailsExt", e);
- }
- }
-
- {
- //For compatibility reasons, we check whether there are entries inside
- //jobDetailsExt blob, which correspond to extracted entries. If so, we
- //use them.
-
- Map<String, Object> detailsExt = details.getDetailsExt();
- if (detailsExt.get("type") != null) {
- details.type = (String) detailsExt.get("type");
- }
- if (detailsExt.get("iid") != null) {
- details.instanceId = (Long) detailsExt.get("iid");
- }
- if (detailsExt.get("pid") != null) {
- details.processId = (String) detailsExt.get("pid");
- }
- if (detailsExt.get("inmem") != null) {
- details.inMem = (Boolean) detailsExt.get("inmem");
- }
- if (detailsExt.get("ckey") != null) {
- details.correlationKeySet = (String) detailsExt.get("ckey");
- }
- if (detailsExt.get("channel") != null) {
- details.channel = (String) detailsExt.get("channel");
- }
- if (detailsExt.get("mexid") != null) {
- details.mexId = (String) detailsExt.get("mexid");
- }
- if (detailsExt.get("correlatorId") != null) {
- details.correlatorId = (String) detailsExt.get("correlatorId");
- }
- if (detailsExt.get("retryCount") != null) {
- details.retryCount = Integer.parseInt((String) detailsExt.get("retryCount"));
- }
- }
-
- Job job = new Job(rs.getLong("ts"), rs.getString("jobid"), asBoolean(rs.getInt("transacted")), details);
- ret.add(job);
- }
- rs.close();
- ps.close();
- } catch (SQLException se) {
- throw new DatabaseException(se);
- } finally {
- close(ps);
- close(con);
- }
- return ret;
- }
-
- public int updateReassign(String oldnode, String newnode) throws DatabaseException {
- if (__log.isDebugEnabled())
- __log.debug("updateReassign from " + oldnode + " ---> " + newnode);
- Connection con = null;
- PreparedStatement ps = null;
- try {
- con = getConnection();
- ps = con.prepareStatement(UPDATE_REASSIGN);
- ps.setString(1, newnode);
- ps.setString(2, oldnode);
- return ps.executeUpdate();
- } catch (SQLException se) {
- throw new DatabaseException(se);
- } finally {
- close(ps);
- close(con);
- }
- }
-
- public int updateAssignToNode(String node, int i, int numNodes, long maxtime) throws DatabaseException {
- if (__log.isDebugEnabled())
- __log.debug("updateAsssignToNode node=" + node + " " + i + "/" + numNodes + " maxtime=" + maxtime);
- Connection con = null;
- PreparedStatement ps = null;
- try {
- con = getConnection();
- if (_dialect == Dialect.SQLSERVER) {
- ps = con.prepareStatement(UPGRADE_JOB_SQLSERVER);
- } else if (_dialect == Dialect.DB2) {
- ps = con.prepareStatement(UPGRADE_JOB_DB2);
- } else if (_dialect == Dialect.SYBASE) {
- ps = con.prepareStatement(UPGRADE_JOB_SYBASE);
- } else if (_dialect == Dialect.SYBASE12) {
- ps = con.prepareStatement(UPGRADE_JOB_SYBASE12);
- } else {
- ps = con.prepareStatement(UPGRADE_JOB_DEFAULT);
- }
- ps.setString(1, node);
- ps.setInt(2, numNodes);
- ps.setInt(3, i);
- ps.setLong(4, maxtime);
- return ps.executeUpdate();
- } catch (SQLException se) {
- throw new DatabaseException(se);
- } finally {
- close(ps);
- close(con);
- }
- }
-
- private Connection getConnection() throws SQLException {
- Connection c = _ds.getConnection();
- DbIsolation.setIsolationLevel(c);
- return c;
- }
-
- private int asInteger(boolean value) {
- return (value ? 1 : 0);
- }
-
- private boolean asBoolean(int value) {
- return (value != 0);
- }
-
- private void close(PreparedStatement ps) {
- if (ps != null) {
- try {
- ps.close();
- } catch (Exception e) {
- __log.warn("Exception while closing prepared statement", e);
- }
- }
- }
-
- private void close(Connection con) {
- if (con != null) {
- try {
- con.close();
- } catch (Exception e) {
- __log.warn("Exception while closing connection", e);
- }
- }
- }
-
- private Dialect guessDialect() {
- Dialect d = Dialect.UNKNOWN;
- Connection con = null;
- try {
- con = getConnection();
- DatabaseMetaData metaData = con.getMetaData();
- if (metaData != null) {
- String dbProductName = metaData.getDatabaseProductName();
- int dbMajorVer = metaData.getDatabaseMajorVersion();
- __log.info("Using database " + dbProductName + " major version " + dbMajorVer);
- if (dbProductName.indexOf("DB2") >= 0) {
- d = Dialect.DB2;
- } else if (dbProductName.indexOf("Derby") >= 0) {
- d = Dialect.DERBY;
- } else if (dbProductName.indexOf("Firebird") >= 0) {
- d = Dialect.FIREBIRD;
- } else if (dbProductName.indexOf("HSQL") >= 0) {
- d = Dialect.HSQL;
- } else if (dbProductName.indexOf("Microsoft SQL") >= 0) {
- d = Dialect.SQLSERVER;
- } else if (dbProductName.indexOf("MySQL") >= 0) {
- d = Dialect.MYSQL;
- } else if (dbProductName.indexOf("Sybase") >= 0 || dbProductName.indexOf("Adaptive") >= 0) {
- d = Dialect.SYBASE;
- if( dbMajorVer == 12 ) {
- d = Dialect.SYBASE12;
- }
- }
- }
- } catch (SQLException e) {
- __log.warn("Unable to determine database dialect", e);
- } finally {
- close(con);
- }
- __log.info("Using database dialect: " + d);
- return d;
- }
-
- enum Dialect {
- DB2, DERBY, FIREBIRD, HSQL, MYSQL, ORACLE, SQLSERVER, SYBASE, SYBASE12, UNKNOWN
- }
-
-}
Deleted: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ode.scheduler.simple;
-
-import java.util.Map;
-
-import java.text.SimpleDateFormat;
-import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
-import org.apache.ode.utils.GUID;
-
-/**
- * Like a task, but a little bit better.
- *
- * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
- */
-class Job extends Task {
- private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z");
-
- String jobId;
- boolean transacted;
- JobDetails detail;
- boolean persisted = true;
-
- public Job(long when, boolean transacted, JobDetails jobDetail) {
- this(when, new GUID().toString(),transacted,jobDetail);
- }
-
- public Job(long when, String jobId, boolean transacted, JobDetails jobDetail) {
- super(when);
- this.jobId = jobId;
- this.detail = jobDetail;
- this.transacted = transacted;
- }
-
- @Override
- public int hashCode() {
- return jobId.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj instanceof Job && jobId.equals(((Job) obj).jobId);
- }
-
- @Override
- public String toString() {
- SimpleDateFormat f = (SimpleDateFormat) DATE_FORMAT.clone();
- return "Job "+jobId+" time: "+f.format(schedDate)+" transacted: "+transacted+" persisted: "+persisted+" details: "+detail;
- }
-}
Modified: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JobComparatorByDate.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JobComparatorByDate.java 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JobComparatorByDate.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -21,6 +21,9 @@
import java.util.Comparator;
+import org.apache.ode.dao.scheduler.Task;
+
+
/**
* Compare jobs, using scheduled date as sort criteria.
*
@@ -29,7 +32,7 @@
class JobComparatorByDate implements Comparator<Task> {
public int compare(Task o1, Task o2) {
- long diff = o1.schedDate - o2.schedDate;
+ long diff = o1.getScheduledDate() - o2.getScheduledDate();
if (diff < 0) return -1;
if (diff > 0) return 1;
return 0;
Added: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JobDAOTask.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JobDAOTask.java (rev 0)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JobDAOTask.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -0,0 +1,44 @@
+/**
+ *
+ */
+package org.apache.ode.scheduler.simple;
+
+import org.apache.ode.dao.scheduler.JobDAO;
+import org.apache.ode.dao.scheduler.Task;
+
+/**
+ * @author jeffyu
+ *
+ */
+public class JobDAOTask extends Task {
+
+ public JobDAO dao;
+
+ public String jobId;
+
+ public JobDAOTask(JobDAO job) {
+ super(job.getScheduledDate());
+ this.dao = job;
+ this.jobId=job.getJobId();
+ }
+
+ public JobDAOTask(String jobId) {
+ super(0L);
+ this.jobId=jobId;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof JobDAOTask && jobId.equals(((JobDAOTask) obj).jobId);
+ }
+
+ @Override
+ public int hashCode() {
+ return jobId.hashCode();
+ }
+
+ public JobDAO getJobDAO() {
+ return this.dao;
+ }
+
+}
Modified: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -26,6 +26,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.ode.dao.scheduler.Task;
import org.apache.ode.utils.stl.CollectionsX;
import org.apache.ode.utils.stl.MemberOfFunction;
@@ -173,7 +174,7 @@
if (job == null)
return Long.MAX_VALUE;
- return Math.max(0, job.schedDate - System.currentTimeMillis());
+ return Math.max(0, job.getScheduledDate() - System.currentTimeMillis());
}
/**
Modified: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -19,7 +19,12 @@
package org.apache.ode.scheduler.simple;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -33,15 +38,17 @@
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
-import javax.xml.namespace.QName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.log4j.helpers.AbsoluteTimeDateFormat;
import org.apache.ode.bpel.iapi.ContextException;
import org.apache.ode.bpel.iapi.Scheduler;
-import org.apache.log4j.helpers.AbsoluteTimeDateFormat;
-import org.apache.ode.bpel.iapi.Scheduler.JobType;
+import org.apache.ode.dao.scheduler.DatabaseException;
+import org.apache.ode.dao.scheduler.JobDAO;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnection;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnectionFactory;
+import org.apache.ode.dao.scheduler.Task;
/**
* A reliable and relatively simple scheduler that uses a database to persist information about
@@ -66,8 +73,6 @@
public class SimpleScheduler implements Scheduler, TaskRunner {
private static final Log __log = LogFactory.getLog(SimpleScheduler.class);
- private static final int DEFAULT_TRANSACTION_TIMEOUT = 60 * 1000;
-
/**
* Jobs scheduled with a time that is between [now, now+immediateInterval] will be assigned to the current node, and placed
* directly on the todo queue.
@@ -109,7 +114,7 @@
private SchedulerThread _todo;
- private DatabaseDelegate _db;
+ private SchedulerDAOConnectionFactory _dbcf;
/** All the nodes we know about */
private CopyOnWriteArraySet<String> _knownNodes = new CopyOnWriteArraySet<String>();
@@ -142,9 +147,11 @@
/** Interval between immediate retries when the transaction fails **/
private long _immediateTransactionRetryInterval = 1000;
- public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf) {
+ public SimpleScheduler(String nodeId, SchedulerDAOConnectionFactory dbcf, TransactionManager txm, Properties conf) {
_nodeId = nodeId;
- _db = del;
+ _dbcf = dbcf;
+ _txm = txm;
+
_todoLimit = getIntProperty(conf, "ode.scheduler.queueLength", _todoLimit);
_immediateInterval = getLongProperty(conf, "ode.scheduler.immediateInterval", _immediateInterval);
_nearFutureInterval = getLongProperty(conf, "ode.scheduler.nearFutureInterval", _nearFutureInterval);
@@ -198,11 +205,11 @@
_txm = txm;
}
- public void setDatabaseDelegate(DatabaseDelegate dbd) {
- _db = dbd;
- }
+ public void setSchedulerDAOConnectionFactory(SchedulerDAOConnectionFactory dbcf) {
+ _dbcf = dbcf;
+ }
- public void setExecutorService(ExecutorService executorService) {
+ public void setExecutorService(ExecutorService executorService) {
_exec = executorService;
}
@@ -211,10 +218,11 @@
}
public void cancelJob(String jobId) throws ContextException {
- _todo.dequeue(new Job(0, jobId, false, null));
+ _todo.dequeue(new JobDAOTask(jobId));
_outstandingJobs.remove(jobId);
+ SchedulerDAOConnection conn = _dbcf.getConnection();
try {
- _db.deleteJob(jobId, _nodeId);
+ conn.deleteJob(jobId, _nodeId);
} catch (DatabaseException e) {
__log.debug("Job removal failed.", e);
throw new ContextException("Job removal failed.", e);
@@ -240,8 +248,7 @@
}
public <T> T execTransaction(Callable<T> transaction, int timeout) throws Exception, ContextException {
- TransactionManager txm = _txm;
- if( txm == null ) {
+ if( _txm == null ) {
throw new ContextException("Cannot locate the transaction manager; the server might be shutting down.");
}
@@ -252,7 +259,7 @@
boolean existingTransaction = false;
try {
- existingTransaction = txm.getTransaction() != null;
+ existingTransaction = ( _txm.getTransaction() != null );
} catch (Exception ex) {
String errmsg = "Internal Error, could not get current transaction.";
throw new ContextException(errmsg, ex);
@@ -266,14 +273,13 @@
// run in new transaction
Exception ex = null;
int immediateRetryCount = _immediateTransactionRetryLimit;
-
_txm.setTransactionTimeout(timeout);
if(__log.isDebugEnabled() && timeout!=0) __log.debug("Custom transaction timeout: "+timeout);
try {
do {
try {
if (__log.isDebugEnabled()) __log.debug("Beginning a new transaction");
- txm.begin();
+ _txm.begin();
} catch (Exception e) {
String errmsg = "Internal Error, could not begin transaction.";
throw new ContextException(errmsg, e);
@@ -287,12 +293,12 @@
} finally {
if (ex == null) {
if (__log.isDebugEnabled()) {
- __log.debug("Commiting on " + txm + "...");
+ __log.debug("Commiting on " + _txm + "...");
}
try {
- txm.commit();
+ _txm.commit();
if (__log.isDebugEnabled()) {
- __log.debug("committed on " + txm + " successfully.");
+ __log.debug("committed on " + _txm + " successfully.");
}
} catch( Exception e2 ) {
ex = e2;
@@ -300,9 +306,9 @@
}
} else {
if (__log.isDebugEnabled()) {
- __log.debug("Rollbacking on " + txm + "...");
+ __log.debug("Rollbacking on " + _txm + "...");
}
- txm.rollback();
+ _txm.rollback();
}
if( ex != null && immediateRetryCount > 0 ) {
@@ -361,7 +367,7 @@
if (__log.isDebugEnabled())
__log.debug("scheduling " + jobDetail + " for " + when);
- return schedulePersistedJob(new Job(when.getTime(), true, jobDetail), when, ctime);
+ return schedulePersistedJob(jobDetail, true, when, ctime);
}
public String scheduleMapSerializableRunnable(MapSerializableRunnable runnable, Date when) throws ContextException {
@@ -375,39 +381,57 @@
if (__log.isDebugEnabled())
__log.debug("scheduling " + jobDetails + " for " + when);
-
- return schedulePersistedJob(new Job(when.getTime(), true, jobDetails), when, ctime);
+
+ return schedulePersistedJob(jobDetails, true, when, ctime);
}
- private String schedulePersistedJob(Job job, Date when, long ctime) throws ContextException {
+ private String schedulePersistedJob(JobDetails jobDetails, boolean transacted, Date when, long ctime) throws ContextException {
boolean immediate = when.getTime() <= ctime + _immediateInterval;
- boolean nearfuture = !immediate && when.getTime() <= ctime + _nearFutureInterval;
+ boolean nearfuture = !immediate && ( when.getTime() <= ctime + _nearFutureInterval );
+ JobDAO job;
try {
if (immediate) {
- // Immediate scheduling means we put it in the DB for safe keeping
- _db.insertJob(job, _nodeId, true);
-
- // And add it to our todo list .
- if (_outstandingJobs.size() < _todoLimit) {
- addTodoOnCommit(job);
+
+ // If we have too many jobs in the queue, we don't allow any new ones
+ if (_outstandingJobs.size() > _todoLimit) {
+ __log.error("The execution queue is backed up, the engine can't keep up with the load. Either "
+ + "increase the queue size or regulate the flow.");
+ return null;
}
- __log.debug("scheduled immediate job: " + job.jobId);
+
+ job = insertJob(transacted, jobDetails, when.getTime(), _nodeId, true, true);
+ __log.debug("scheduled immediate job: " + job.getJobId());
} else if (nearfuture) {
// Near future, assign the job to ourselves (why? -- this makes it very unlikely that we
// would get two nodes trying to process the same instance, which causes unsightly rollbacks).
- _db.insertJob(job, _nodeId, false);
- __log.debug("scheduled near-future job: " + job.jobId);
+ job = insertJob(transacted, jobDetails, when.getTime(), _nodeId, false, false);
+ __log.debug("scheduled near-future job: " + job.getJobId());
} else /* far future */ {
// Not the near future, we don't assign a node-id, we'll assign it later.
- _db.insertJob(job, null, false);
- __log.debug("scheduled far-future job: " + job.jobId);
+ job = insertJob(transacted, jobDetails, when.getTime(), null, false, false);
+ __log.debug("scheduled far-future job: " + job.getJobId());
}
} catch (DatabaseException dbe) {
__log.error("Database error.", dbe);
throw new ContextException("Database error.", dbe);
}
- return job.jobId;
+ return job.getJobId();
}
+
+ private JobDAO insertJob(final boolean transacted, final JobDetails jobDetails, final long scheduledDate, final String nodeID,
+ final boolean loaded, final boolean enqueue) throws ContextException, DatabaseException {
+ SchedulerDAOConnection conn = _dbcf.getConnection();
+ final JobDAO job = conn.createJob(transacted, jobDetails, true, scheduledDate);
+ if (!conn.insertJob(job, nodeID, loaded)) {
+ String msg = String.format("Database insert failed. jobId %s nodeId %s", job.getJobId(), nodeID);
+ __log.error(msg);
+ throw new ContextException(msg);
+ }
+ if (enqueue) {
+ addTodoOnCommit(job);
+ }
+ return job;
+ }
public String scheduleVolatileJob(boolean transacted, JobDetails jobDetail) throws ContextException {
return scheduleVolatileJob(transacted, jobDetail, null);
@@ -415,10 +439,11 @@
public String scheduleVolatileJob(boolean transacted, JobDetails jobDetail, Date when) throws ContextException {
long ctime = System.currentTimeMillis();
- if (when == null)
+ if (when == null) {
when = new Date(ctime);
- Job job = new Job(when.getTime(), transacted, jobDetail);
- job.persisted = false;
+ }
+ SchedulerDAOConnection conn = _dbcf.getConnection();
+ JobDAO job = conn.createJob(transacted, jobDetail, false, when.getTime());
addTodoOnCommit(job);
return job.toString();
}
@@ -438,9 +463,10 @@
if (_running)
return;
- if (_exec == null)
+ if (_exec == null) {
_exec = Executors.newCachedThreadPool();
-
+ }
+
_todo.clearTasks(UpgradeJobsTask.class);
_todo.clearTasks(LoadImmediateTask.class);
_todo.clearTasks(CheckStaleNodes.class);
@@ -451,9 +477,13 @@
try {
execTransaction(new Callable<Void>() {
-
- public Void call() throws Exception {
- _knownNodes.addAll(_db.getNodeIds());
+ public Void call() throws ContextException, DatabaseException {
+ SchedulerDAOConnection conn = _dbcf.getConnection();
+ List<String> ids = conn.getNodeIds();
+ if (ids == null) {
+ throw new ContextException("Error retrieving node list.");
+ }
+ _knownNodes.addAll(ids);
return null;
}
@@ -466,7 +496,9 @@
long now = System.currentTimeMillis();
// Pretend we got a heartbeat...
- for (String s : _knownNodes) _lastHeartBeat.put(s, now);
+ for (String s : _knownNodes) {
+ _lastHeartBeat.put(s, now);
+ }
// schedule immediate job loading for now!
_todo.enqueue(new LoadImmediateTask(now));
@@ -502,93 +534,121 @@
// _exec.shutdown();
_running = false;
}
-
- class RunJob implements Callable<Void> {
- final Job job;
+
+
+ /**
+ * This is the class for delegating job to jobProcessor, also introduced retry mechanism here.
+ * @author jeffyu
+ *
+ */
+ private class RunJobCallable implements Callable<Void> {
final JobProcessor processor;
+ final JobDAO job;
- RunJob(Job job, JobProcessor processor) {
- this.job = job;
+ RunJobCallable(JobDAO jobDao, JobProcessor processor) {
+ this.job = jobDao;
this.processor = processor;
}
public Void call() throws Exception {
try {
- final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId, job.detail,
- job.detail.getRetryCount());
- if (job.transacted) {
- final boolean[] needRetry = new boolean[]{true};
- try {
- execTransaction(new Callable<Void>() {
- public Void call() throws Exception {
- if (job.persisted)
- if (!_db.deleteJob(job.jobId, _nodeId))
- throw new JobNoLongerInDbException(job.jobId, _nodeId);
- try {
- processor.onScheduledJob(jobInfo);
- // If the job is a "runnable" job, schedule the next job occurence
- if (job.detail.getDetailsExt().get("runnable") != null && !"COMPLETED".equals(String.valueOf(jobInfo.jobDetail.getDetailsExt().get("runnable_status")))) {
- // the runnable is still in progress, schedule checker to 10 mins later
- if (_pollIntervalForPolledRunnable < 0) {
- if (__log.isWarnEnabled())
- __log.warn("The poll interval for polled runnables is negative; setting it to 1000ms");
- _pollIntervalForPolledRunnable = 1000;
- }
- job.schedDate = System.currentTimeMillis() + _pollIntervalForPolledRunnable;
- _db.insertJob(job, _nodeId, false);
- }
- } catch (JobProcessorException jpe) {
- if (!jpe.retry) {
- needRetry[0] = false;
- }
- // Let execTransaction know that shit happened.
- throw jpe;
- }
- return null;
- }
- });
- } catch (JobNoLongerInDbException jde) {
- // This may happen if two node try to do the same job... we try to avoid
- // it the synchronization is a best-effort but not perfect.
- __log.debug("job no longer in db forced rollback: "+job);
- } catch (final Exception ex) {
- __log.error("Error while processing a "+(job.persisted?"":"non-")+"persisted job"+(needRetry[0] && job.persisted?": ":", no retry: ")+job, ex);
-
- // We only get here if the above execTransaction fails, so that transaction got
- // rollbacked already
- if (job.persisted) {
- execTransaction(new Callable<Void>() {
- public Void call() throws Exception {
- if (needRetry[0]) {
- int retry = job.detail.getRetryCount() + 1;
- if (retry <= 10) {
- job.detail.setRetryCount(retry);
- long delay = (long)(Math.pow(5, retry));
- job.schedDate = System.currentTimeMillis() + delay*1000;
- _db.updateJob(job);
- __log.error("Error while processing job, retrying in " + delay + "s");
- } else {
- _db.deleteJob(job.jobId, _nodeId);
- __log.error("Error while processing job after 10 retries, no more retries:" + job);
- }
- } else {
- _db.deleteJob(job.jobId, _nodeId);
- }
- return null;
- }
- });
- }
- }
+ final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.getJobId(), job.getDetails(), job.getDetails().getRetryCount());
+ if (job.isTransacted()) {
+ processInTransactionContext(jobInfo);
} else {
processor.onScheduledJob(jobInfo);
}
return null;
} finally {
// the order of these 2 actions is crucial to avoid a race condition.
- _processedSinceLastLoadTask.put(job.jobId, job.schedDate);
- _outstandingJobs.remove(job.jobId);
+ _processedSinceLastLoadTask.put(job.getJobId(), job.getScheduledDate());
+ _outstandingJobs.remove(job.getJobId());
}
}
+
+ private void processInTransactionContext(final Scheduler.JobInfo jobInfo) throws Exception {
+ final boolean[] needRetry = new boolean[]{true};
+ try {
+ execTransaction(new Callable<Void>() {
+ public Void call() throws ContextException, Exception {
+ SchedulerDAOConnection conn = _dbcf.getConnection();
+ if (job.isPersisted()) {
+ if (!conn.deleteJob(job.getJobId(), _nodeId)) {
+ throw new JobNoLongerInDbException(job.getJobId(), _nodeId);
+ }
+ }
+
+ try {
+ processor.onScheduledJob(jobInfo);
+ // If the job is a "runnable" job, schedule the next job occurence
+ if (job.getDetails().getDetailsExt().get("runnable") != null &&
+ !"COMPLETED".equals(String.valueOf(jobInfo.jobDetail.getDetailsExt().get("runnable_status")))) {
+ // the runnable is still in progress, schedule checker to 10 mins later
+ if (_pollIntervalForPolledRunnable < 0) {
+ if (__log.isWarnEnabled())
+ __log.warn("The poll interval for polled runnables is negative; setting it to 1000ms");
+ _pollIntervalForPolledRunnable = 1000;
+ }
+ long schedDate = System.currentTimeMillis() + _pollIntervalForPolledRunnable;
+ job.setScheduledDate(schedDate);
+ conn.insertJob(job, _nodeId, false);
+ }
+ } catch (JobProcessorException jpe) {
+ if (!jpe.retry) {
+ needRetry[0] = false;
+ }
+ // Let execTransaction know that shit happened.
+ throw jpe;
+ }
+ return null;
+ }
+ });
+ } catch (JobNoLongerInDbException jde) {
+ // This may happen if two node try to do the same job... we try to avoid
+ // it the synchronization is a best-effort but not perfect.
+ __log.debug("job no longer in db forced rollback: "+job);
+ } catch (final Exception ex) {
+ __log.error("Error while processing a "+(job.isPersisted()?"":"non-")+"persisted job"+(needRetry[0] && job.isPersisted()?": ":", no retry: ")+job, ex);
+
+ // We only get here if the above execTransaction fails, so that transaction got
+ // rollbacked already
+ if (job.isPersisted()) {
+ try {
+ execTransaction(new Callable<Void>() {
+ public Void call() throws Exception {
+ retryJob(needRetry);
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ private void retryJob(final boolean[] needRetry) throws DatabaseException {
+ SchedulerDAOConnection conn = _dbcf.getConnection();
+ int retry = job.getDetails().getRetryCount() + 1;
+ if (!needRetry[0] || retry > 10) {
+ conn.deleteJob(job.getJobId(), _nodeId);
+ if (retry > 10) {
+ __log.error("Error while processing job after 10 retries, no more retries:" + job);
+ }
+ } else {
+ job.getDetails().setRetryCount(retry);
+ long delay = (long)(Math.pow(5, retry));
+ long scheddate = System.currentTimeMillis() + delay*1000;
+ job.setScheduled(false);
+ job.setScheduledDate(scheddate);
+ conn.updateJob(job);
+
+ __log.error("Error while processing job, retrying in " + delay + "s");
+ }
+ }
+
+
+
}
/**
@@ -596,8 +656,8 @@
*
* @param job job to run.
*/
- protected void runJob(final Job job) {
- _exec.submit(new RunJob(job, _jobProcessor));
+ protected void runJob(final JobDAO jobDao) {
+ _exec.submit(new RunJobCallable(jobDao, _jobProcessor));
}
/**
@@ -622,11 +682,11 @@
*
* @param job job to run.
*/
- protected void runPolledRunnable(final Job job) {
- _exec.submit(new RunJob(job, _polledRunnableProcessor));
+ protected void runPolledRunnable(final JobDAO jobDao) {
+ _exec.submit(new RunJobCallable(jobDao, _polledRunnableProcessor));
}
- private void addTodoOnCommit(final Job job) {
+ private void addTodoOnCommit(final JobDAO job) {
registerSynchronizer(new Synchronizer() {
public void afterCompletion(boolean success) {
if (success) {
@@ -654,12 +714,12 @@
}
public void runTask(final Task task) {
- if (task instanceof Job) {
- Job job = (Job)task;
- if( job.detail.getDetailsExt().get("runnable") != null ) {
- runPolledRunnable(job);
+ if (task instanceof JobDAOTask) {
+ JobDAOTask job = (JobDAOTask)task;
+ if( job.getJobDAO().getDetails().getDetailsExt().get("runnable") != null ) {
+ runPolledRunnable(job.getJobDAO());
} else {
- runJob(job);
+ runJob(job.getJobDAO());
}
} else if (task instanceof SchedulerTask) {
_exec.submit(new Callable<Void>() {
@@ -690,9 +750,11 @@
__log.debug("LOAD IMMEDIATE started");
// don't load anything if we're already half-full; we've got plenty to do already
- if (_outstandingJobs.size() > _todoLimit/2) return true;
-
- List<Job> jobs;
+ if (_outstandingJobs.size() > _todoLimit/2) {
+ return true;
+ }
+
+ List<JobDAO> jobs;
try {
// don't load more than we can chew
final int batch = Math.min((int) (_immediateInterval * _tps / 1000), _todoLimit-_outstandingJobs.size());
@@ -704,29 +766,32 @@
}
if (__log.isDebugEnabled()) __log.debug("loading "+batch+" jobs from db");
- jobs = execTransaction(new Callable<List<Job>>() {
- public List<Job> call() throws Exception {
- return _db.dequeueImmediate(_nodeId, System.currentTimeMillis() + _immediateInterval, batch);
+
+ jobs = execTransaction(new Callable<List<JobDAO>>() {
+ public List<JobDAO> call() throws ContextException, DatabaseException {
+ SchedulerDAOConnection conn = _dbcf.getConnection();
+ return conn.dequeueImmediate(_nodeId, System.currentTimeMillis() + _immediateInterval, batch);
}
});
+
if (__log.isDebugEnabled()) __log.debug("loaded "+jobs.size()+" jobs from db");
long delayedTime = System.currentTimeMillis() - _warningDelay;
int delayedCount = 0;
boolean runningLate;
AbsoluteTimeDateFormat f = new AbsoluteTimeDateFormat();
- for (Job j : jobs) {
+ for (JobDAO j : jobs) {
// jobs might have been enqueued by #addTodoOnCommit meanwhile
if (_outstandingJobs.size() >= _todoLimit){
if (__log.isDebugEnabled()) __log.debug("Max capacity reached: "+_outstandingJobs.size()+" jobs dispacthed i.e. queued or being executed");
break;
}
- runningLate = j.schedDate <= delayedTime;
+ runningLate = (j.getScheduledDate() <= delayedTime);
if (runningLate) {
delayedCount++;
}
if (__log.isDebugEnabled())
- __log.debug("todo.enqueue job from db: " + j.jobId.trim() + " for " + j.schedDate + "(" + f.format(j.schedDate)+") "+(runningLate?" delayed=true":""));
+ __log.debug("todo.enqueue job from db: " + j.getJobId().trim() + " for " + j.getScheduledDate() + "(" + f.format(j.getScheduledDate())+") "+(runningLate?" delayed=true":""));
enqueue(j);
}
if (delayedCount > 0) {
@@ -744,19 +809,24 @@
}
}
- void enqueue(Job job) {
- if (_processedSinceLastLoadTask.get(job.jobId) == null) {
- if (_outstandingJobs.putIfAbsent(job.jobId, job.schedDate) == null) {
- if (job.schedDate <= System.currentTimeMillis()) {
+ /**
+ * Put job into _outstandingJobs for immediate execution.
+ *
+ * @param job
+ */
+ private void enqueue(JobDAO job) {
+ if (_processedSinceLastLoadTask.get(job.getJobId()) == null) {
+ if (_outstandingJobs.putIfAbsent(job.getJobId(), job.getScheduledDate()) == null) {
+ if (job.getScheduledDate() <= System.currentTimeMillis()) {
runJob(job);
} else {
- _todo.enqueue(job);
+ _todo.enqueue(new JobDAOTask(job));
}
} else {
- if (__log.isDebugEnabled()) __log.debug("Job "+job.jobId+" is being processed (outstanding job)");
+ if (__log.isDebugEnabled()) __log.debug("Job "+job.getJobId()+" is being processed (outstanding job)");
}
} else {
- if (__log.isDebugEnabled()) __log.debug("Job "+job.jobId+" is being processed (processed since last load)");
+ if (__log.isDebugEnabled()) __log.debug("Job "+job.getJobId()+" is being processed (processed since last load)");
}
}
@@ -776,11 +846,12 @@
try {
return execTransaction(new Callable<Boolean>() {
- public Boolean call() throws Exception {
+ public Boolean call() throws ContextException, DatabaseException {
+ SchedulerDAOConnection conn = _dbcf.getConnection();
int numNodes = knownNodes.size();
for (int i = 0; i < numNodes; ++i) {
String node = knownNodes.get(i);
- _db.updateAssignToNode(node, i, numNodes, maxtime);
+ conn.updateAssignToNode(node, i, numNodes, maxtime);
}
return true;
}
@@ -804,8 +875,9 @@
__log.debug("recovering stale node " + nodeId);
try {
int numrows = execTransaction(new Callable<Integer>() {
- public Integer call() throws Exception {
- return _db.updateReassign(nodeId, _nodeId);
+ public Integer call() throws ContextException, DatabaseException {
+ SchedulerDAOConnection conn = _dbcf.getConnection();
+ return conn.updateReassign(nodeId, _nodeId);
}
});
@@ -826,14 +898,6 @@
}
}
-// private long doRetry(Job job) throws DatabaseException {
-// int retry = job.detail.getRetryCount() + 1;
-// job.detail.setRetryCount(retry);
-// long delay = (long)(Math.pow(5, retry - 1));
-// Job jobRetry = new Job(System.currentTimeMillis() + delay*1000, true, job.detail);
-// _db.insertJob(jobRetry, _nodeId, false);
-// return delay;
-// }
private abstract class SchedulerTask extends Task implements Runnable {
SchedulerTask(long schedDate) {
@@ -851,13 +915,14 @@
try {
success = doLoadImmediate();
} finally {
- if (success)
+ if (success) {
_todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + (long) (_immediateInterval * .90)));
- else
+ } else {
_todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + 1000));
+ }
}
+
}
-
}
/**
@@ -873,7 +938,7 @@
public void run() {
long ctime = System.currentTimeMillis();
long ntime = _nextUpgrade.get();
- __log.debug("UPGRADE task for " + schedDate + " fired at " + ctime);
+ __log.debug("UPGRADE task for " + getScheduledDate() + " fired at " + ctime);
// We could be too early, this can happen if upgrade gets delayed due to another
// node
@@ -916,6 +981,7 @@
}
}
}
+
}
\ No newline at end of file
Deleted: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Task.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Task.java 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Task.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ode.scheduler.simple;
-
-/**
- * The thing that we schedule.
- *
- * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
- *
- */
-class Task {
- /** Scheduled date/time. */
- public long schedDate;
-
- Task(long schedDate) {
- this.schedDate = schedDate;
- }
-}
Modified: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/TaskRunner.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/TaskRunner.java 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/TaskRunner.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -19,14 +19,16 @@
package org.apache.ode.scheduler.simple;
+import org.apache.ode.dao.scheduler.Task;
+
+
/**
* The thing that runs the scheduled tasks.
*
* @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
*
*/
-interface TaskRunner {
-
+public interface TaskRunner {
public void runTask(Task task);
}
Added: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/JobDAOImpl.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/JobDAOImpl.java (rev 0)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/JobDAOImpl.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.scheduler.simple.jdbc;
+
+import java.text.SimpleDateFormat;
+
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
+import org.apache.ode.dao.scheduler.JobDAO;
+import org.apache.ode.utils.GUID;
+
+/**
+ * Like a task, but a little bit better.
+ *
+ * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
+ */
+public class JobDAOImpl implements JobDAO {
+ private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z");
+
+ private String jobId;
+ private boolean transacted;
+ private JobDetails detail;
+ private boolean persisted = true;
+ private long scheduledDate;
+ private boolean scheduled = false;
+
+ public JobDAOImpl(long when, boolean transacted, JobDetails jobDetail) {
+ this(when, new GUID().toString(),transacted,jobDetail);
+ }
+
+ public JobDAOImpl(long when, String jobId, boolean transacted, JobDetails jobDetail) {
+ this.scheduledDate = when;
+ this.jobId = jobId;
+ this.detail = jobDetail;
+ this.transacted = transacted;
+ }
+
+ public JobDetails getDetails() {
+ return detail;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public long getScheduledDate() {
+ return this.scheduledDate;
+ }
+
+ public void setScheduledDate(long scheduledDate) {
+ this.scheduledDate = scheduledDate;
+ }
+
+ public boolean isPersisted() {
+ return this.persisted;
+ }
+
+ public boolean isTransacted() {
+ return this.transacted;
+ }
+
+ public void setPersisted(boolean persisted) {
+ this.persisted = persisted;
+ }
+
+ public boolean isScheduled() {
+ return scheduled;
+ }
+
+ public void setScheduled(boolean scheduled) {
+ this.scheduled = scheduled;
+ }
+
+
+}
Added: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionFactoryImpl.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionFactoryImpl.java (rev 0)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionFactoryImpl.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -0,0 +1,45 @@
+/**
+ *
+ */
+package org.apache.ode.scheduler.simple.jdbc;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+
+import org.apache.ode.dao.scheduler.SchedulerDAOConnection;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnectionFactory;
+
+/**
+ * @author jeffyu
+ *
+ */
+public class SchedulerDAOConnectionFactoryImpl implements SchedulerDAOConnectionFactory {
+
+ static ThreadLocal<SchedulerDAOConnection> _connections = new ThreadLocal<SchedulerDAOConnection>();
+
+ DataSource _ds;
+ TransactionManager _txm;
+ AtomicBoolean _active = new AtomicBoolean(true);
+
+ public SchedulerDAOConnection getConnection() {
+ if (_connections.get()==null || _connections.get().isClosed() ){
+ _connections.set(new SchedulerDAOConnectionImpl(_active,_ds,_txm));
+ }
+ return _connections.get();
+ }
+
+
+ public void init(Properties p, TransactionManager txm, Object env) {
+ _ds = (DataSource) env;
+ _txm = txm;
+ }
+
+
+ public void shutdown() {
+ _active.set(false);
+ }
+
+}
Added: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionImpl.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionImpl.java (rev 0)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionImpl.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.scheduler.simple.jdbc;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
+import org.apache.ode.dao.scheduler.DatabaseException;
+import org.apache.ode.dao.scheduler.JobDAO;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnection;
+import org.apache.ode.utils.DbIsolation;
+import org.apache.ode.utils.GUID;
+import org.apache.ode.utils.StreamUtils;
+
+/**
+ * JDBC-based implementation of the {@link SchedulerDAOConnection} interface. Should work with most
+ * reasonably behaved databases.
+ *
+ * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
+ */
+public class SchedulerDAOConnectionImpl implements SchedulerDAOConnection {
+
+ private static final Log __log = LogFactory.getLog(SchedulerDAOConnectionImpl.class);
+
+ private static final String DELETE_JOB = "delete from ODE_JOB where jobid = ? and nodeid = ?";
+
+ private static final String UPDATE_REASSIGN = "update ODE_JOB set nodeid = ?, scheduled = 0 where nodeid = ?";
+
+ private static final String UPDATE_JOB = "update ODE_JOB set ts = ?, retryCount = ?, scheduled = ? where jobid = ?";
+
+ private static final String UPGRADE_JOB_DEFAULT = "update ODE_JOB set nodeid = ? where nodeid is null and scheduled = 0"
+ + "and mod(ts,?) = ? and ts < ?";
+
+ private static final String UPGRADE_JOB_DB2 = "update ODE_JOB set nodeid = ? where nodeid is null and scheduled = 0"
+ + "and mod(ts,CAST(? AS BIGINT)) = ? and ts < ?";
+
+ private static final String UPGRADE_JOB_SQLSERVER = "update ODE_JOB set nodeid = ? where nodeid is null and scheduled = 0"
+ + "and (ts % ?) = ? and ts < ?";
+
+ private static final String UPGRADE_JOB_SYBASE = "update ODE_JOB set nodeid = ? where nodeid is null and scheduled = 0"
+ + "and convert(int, ts) % ? = ? and ts < ?";
+
+ private static final String UPGRADE_JOB_SYBASE12 = "update ODE_JOB set nodeid = ? where nodeid is null and scheduled = 0"
+ + "and -1 <> ? and -1 <> ? and ts < ?";
+
+ private static final String SAVE_JOB = "insert into ODE_JOB "
+ + " (jobid, nodeid, ts, scheduled, transacted, "
+ + "instanceId,"
+ + "mexId,"
+ + "processId,"
+ + "type,"
+ + "channel,"
+ + "correlatorId,"
+ + "correlationKeySet,"
+ + "retryCount,"
+ + "inMem,"
+ + "detailsExt"
+ + ") values(?, ?, ?, ?, ?,"
+ + "?,"
+ + "?,"
+ + "?,"
+ + "?,"
+ + "?,"
+ + "?,"
+ + "?,"
+ + "?,"
+ + "?,"
+ + "?"
+ + ")";
+
+ private static final String GET_NODEIDS = "select distinct nodeid from ODE_JOB";
+
+ private static final String SCHEDULE_IMMEDIATE = "select jobid, ts, transacted, scheduled, "
+ + "instanceId,"
+ + "mexId,"
+ + "processId,"
+ + "type,"
+ + "channel,"
+ + "correlatorId,"
+ + "correlationKeySet,"
+ + "retryCount,"
+ + "inMem,"
+ + "detailsExt"
+ + " from ODE_JOB "
+ + "where nodeid = ? and scheduled = 0 and ts < ? order by ts";
+
+ private static final String UPDATE_SCHEDULED = "update ODE_JOB set scheduled = 1 where jobid in (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+ private static final int UPDATE_SCHEDULED_SLOTS = 10;
+
+
+ private DataSource _ds;
+
+ private Dialect _dialect;
+
+ private AtomicBoolean _active;
+
+ private TransactionManager _txm;
+
+ public SchedulerDAOConnectionImpl(AtomicBoolean active, DataSource ds, TransactionManager txm) {
+ _active = active;
+ _ds = ds;
+ _txm = txm;
+ _dialect = guessDialect();
+ }
+
+ public boolean deleteJob(String jobid, String nodeId) throws DatabaseException {
+ if (__log.isDebugEnabled())
+ __log.debug("deleteJob " + jobid + " on node " + nodeId);
+
+ Connection con = null;
+ PreparedStatement ps = null;
+ try {
+ con = getConnection();
+ ps = con.prepareStatement(DELETE_JOB);
+ ps.setString(1, jobid);
+ ps.setString(2, nodeId);
+ return ps.executeUpdate() == 1;
+ } catch (SQLException se) {
+ throw new DatabaseException(se);
+ } finally {
+ close(ps);
+ close(con);
+ }
+ }
+
+ public List<String> getNodeIds() throws DatabaseException {
+ Connection con = null;
+ PreparedStatement ps = null;
+ try {
+ con = getConnection();
+ ps = con.prepareStatement(GET_NODEIDS, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ ResultSet rs = ps.executeQuery();
+ ArrayList<String> nodes = new ArrayList<String>();
+ while (rs.next()) {
+ String nodeId = rs.getString(1);
+ if (nodeId != null)
+ nodes.add(rs.getString(1));
+ }
+ if (__log.isDebugEnabled())
+ __log.debug("getNodeIds: " + nodes);
+ return nodes;
+ } catch (SQLException se) {
+ throw new DatabaseException(se);
+ } finally {
+ close(ps);
+ close(con);
+ }
+ }
+
+ public boolean insertJob(JobDAO job, String nodeId, boolean loaded) throws DatabaseException {
+ if (__log.isDebugEnabled())
+ __log.debug("insertJob " + job.getJobId() + " on node " + nodeId + " loaded=" + loaded);
+
+ Connection con = null;
+ PreparedStatement ps = null;
+ try {
+ int i = 1;
+ con = getConnection();
+ ps = con.prepareStatement(SAVE_JOB);
+ ps.setString(i++, job.getJobId());
+ ps.setString(i++, nodeId);
+ ps.setLong(i++, job.getScheduledDate());
+ ps.setInt(i++, asInteger(loaded));
+ ps.setInt(i++, asInteger(job.isTransacted()));
+
+ JobDetails details = job.getDetails();
+ ps.setObject(i++, details.instanceId, Types.BIGINT);
+ ps.setObject(i++, details.mexId, Types.VARCHAR);
+ ps.setObject(i++, details.processId, Types.VARCHAR);
+ ps.setObject(i++, details.type, Types.VARCHAR);
+ ps.setObject(i++, details.channel, Types.VARCHAR);
+ ps.setObject(i++, details.correlatorId, Types.VARCHAR);
+ ps.setObject(i++, details.correlationKeySet, Types.VARCHAR);
+ ps.setObject(i++, details.retryCount, Types.INTEGER);
+ ps.setObject(i++, details.inMem, Types.INTEGER);
+
+ if (details.detailsExt == null || details.detailsExt.size() == 0) {
+ ps.setBytes(i++, null);
+ } else {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try {
+ StreamUtils.write(bos, (Serializable) details.detailsExt);
+ } catch (Exception ex) {
+ __log.error("Error serializing job detail: " + job.getDetails());
+ throw new DatabaseException(ex);
+ }
+ ps.setBytes(i++, bos.toByteArray());
+ }
+
+ return ps.executeUpdate() == 1;
+ } catch (SQLException se) {
+ throw new DatabaseException(se);
+ } finally {
+ close(ps);
+ close(con);
+ }
+ }
+
+ public boolean updateJob(JobDAO job) throws DatabaseException {
+ if (__log.isDebugEnabled())
+ __log.debug("updateJob " + job.getJobId() + " retryCount=" + job.getDetails().getRetryCount());
+
+ Connection con = null;
+ PreparedStatement ps = null;
+ try {
+ con = getConnection();
+ ps = con.prepareStatement(UPDATE_JOB);
+ ps.setLong(1, job.getScheduledDate());
+ ps.setInt(2, job.getDetails().getRetryCount());
+ ps.setInt(3, asInt(job.isScheduled()));
+ ps.setString(4, job.getJobId());
+ return ps.executeUpdate() == 1;
+ } catch (SQLException se) {
+ throw new DatabaseException(se);
+ } finally {
+ close(ps);
+ close(con);
+ }
+ }
+
+ private int asInt(boolean flag) {
+ if (flag) {
+ return 1;
+ }
+ return 0;
+ }
+
+ private Long asLong(Object o) {
+ if (o == null) return null;
+ else if (o instanceof BigDecimal) return ((BigDecimal) o).longValue();
+ else if (o instanceof Long) return (Long) o;
+ else if (o instanceof Integer) return ((Integer) o).longValue();
+ else throw new IllegalStateException("Can't convert to long " + o.getClass());
+ }
+
+ private Integer asInteger(Object o) {
+ if (o == null) return null;
+ else if (o instanceof BigDecimal) return ((BigDecimal) o).intValue();
+ else if (o instanceof Integer) return (Integer) o;
+ else throw new IllegalStateException("Can't convert to integer " + o.getClass());
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<JobDAO> dequeueImmediate(String nodeId, long maxtime, int maxjobs) throws DatabaseException {
+ ArrayList<JobDAO> ret = new ArrayList<JobDAO>(maxjobs);
+ Connection con = null;
+ PreparedStatement ps = null;
+ try {
+ con = getConnection();
+ ps = con.prepareStatement(SCHEDULE_IMMEDIATE);
+ ps.setString(1, nodeId);
+ ps.setLong(2, maxtime);
+ ps.setMaxRows(maxjobs);
+
+ ResultSet rs = ps.executeQuery();
+ while (rs.next()) {
+ Scheduler.JobDetails details = new Scheduler.JobDetails();
+ details.instanceId = asLong(rs.getObject("instanceId"));
+ details.mexId = (String) rs.getObject("mexId");
+ details.processId = (String) rs.getObject("processId");
+ details.type = (String) rs.getObject("type");
+ details.channel = (String) rs.getObject("channel");
+ details.correlatorId = (String) rs.getObject("correlatorId");
+ details.correlationKeySet = (String) rs.getObject("correlationKeySet");
+ details.retryCount = asInteger(rs.getObject("retryCount"));
+ details.inMem = asBoolean(rs.getInt("inMem"));
+ if (rs.getObject("detailsExt") != null) {
+ try {
+ ObjectInputStream is = new ObjectInputStream(rs.getBinaryStream("detailsExt"));
+ details.detailsExt = (Map<String, Object>) is.readObject();
+ is.close();
+ } catch (Exception e) {
+ throw new DatabaseException("Error deserializing job detailsExt", e);
+ }
+ }
+
+ {
+ //For compatibility reasons, we check whether there are entries inside
+ //jobDetailsExt blob, which correspond to extracted entries. If so, we
+ //use them.
+
+ Map<String, Object> detailsExt = details.getDetailsExt();
+ if (detailsExt.get("type") != null) {
+ details.type = (String) detailsExt.get("type");
+ }
+ if (detailsExt.get("iid") != null) {
+ details.instanceId = (Long) detailsExt.get("iid");
+ }
+ if (detailsExt.get("pid") != null) {
+ details.processId = (String) detailsExt.get("pid");
+ }
+ if (detailsExt.get("inmem") != null) {
+ details.inMem = (Boolean) detailsExt.get("inmem");
+ }
+ if (detailsExt.get("ckey") != null) {
+ details.correlationKeySet = (String) detailsExt.get("ckey");
+ }
+ if (detailsExt.get("channel") != null) {
+ details.channel = (String) detailsExt.get("channel");
+ }
+ if (detailsExt.get("mexid") != null) {
+ details.mexId = (String) detailsExt.get("mexid");
+ }
+ if (detailsExt.get("correlatorId") != null) {
+ details.correlatorId = (String) detailsExt.get("correlatorId");
+ }
+ if (detailsExt.get("retryCount") != null) {
+ details.retryCount = Integer.parseInt((String) detailsExt.get("retryCount"));
+ }
+ }
+
+ JobDAO job = new JobDAOImpl(rs.getLong("ts"), rs.getString("jobid"), asBoolean(rs.getInt("transacted")), details);
+ ret.add(job);
+ }
+ rs.close();
+ ps.close();
+
+ // mark jobs as scheduled, UPDATE_SCHEDULED_SLOTS at a time
+ int j = 0;
+ int updateCount = 0;
+ ps = con.prepareStatement(UPDATE_SCHEDULED);
+ for (int updates = 1; updates <= (ret.size() / UPDATE_SCHEDULED_SLOTS) + 1; updates++) {
+ for (int i = 1; i <= UPDATE_SCHEDULED_SLOTS; i++) {
+ ps.setString(i, j < ret.size() ? ret.get(j).getJobId() : "");
+ j++;
+ }
+ ps.execute();
+ updateCount += ps.getUpdateCount();
+ }
+ if (updateCount != ret.size()) {
+ __log.error("Updating scheduled jobs failed to update all jobs; expected=" + ret.size()
+ + " actual=" + updateCount);
+ return null;
+
+ }
+ } catch (SQLException se) {
+ throw new DatabaseException(se);
+ } finally {
+ close(ps);
+ close(con);
+ }
+ return ret;
+ }
+
+ public int updateReassign(String oldnode, String newnode) throws DatabaseException {
+ if (__log.isDebugEnabled())
+ __log.debug("updateReassign from " + oldnode + " ---> " + newnode);
+ Connection con = null;
+ PreparedStatement ps = null;
+ try {
+ con = getConnection();
+ ps = con.prepareStatement(UPDATE_REASSIGN);
+ ps.setString(1, newnode);
+ ps.setString(2, oldnode);
+ return ps.executeUpdate();
+ } catch (SQLException se) {
+ throw new DatabaseException(se);
+ } finally {
+ close(ps);
+ close(con);
+ }
+ }
+
+ public int updateAssignToNode(String node, int i, int numNodes, long maxtime) throws DatabaseException {
+ if (__log.isDebugEnabled())
+ __log.debug("updateAsssignToNode node=" + node + " " + i + "/" + numNodes + " maxtime=" + maxtime);
+ Connection con = null;
+ PreparedStatement ps = null;
+ try {
+ con = getConnection();
+ if (_dialect == Dialect.SQLSERVER) {
+ ps = con.prepareStatement(UPGRADE_JOB_SQLSERVER);
+ } else if (_dialect == Dialect.DB2) {
+ ps = con.prepareStatement(UPGRADE_JOB_DB2);
+ } else if (_dialect == Dialect.SYBASE) {
+ ps = con.prepareStatement(UPGRADE_JOB_SYBASE);
+ } else if (_dialect == Dialect.SYBASE12) {
+ ps = con.prepareStatement(UPGRADE_JOB_SYBASE12);
+ } else {
+ ps = con.prepareStatement(UPGRADE_JOB_DEFAULT);
+ }
+ ps.setString(1, node);
+ ps.setInt(2, numNodes);
+ ps.setInt(3, i);
+ ps.setLong(4, maxtime);
+ return ps.executeUpdate();
+ } catch (SQLException se) {
+ throw new DatabaseException(se);
+ } finally {
+ close(ps);
+ close(con);
+ }
+ }
+
+ private Connection getConnection() throws SQLException {
+ Connection c = _ds.getConnection();
+ DbIsolation.setIsolationLevel(c);
+ return c;
+ }
+
+ private int asInteger(boolean value) {
+ return (value ? 1 : 0);
+ }
+
+ private boolean asBoolean(int value) {
+ return (value != 0);
+ }
+
+ private void close(PreparedStatement ps) {
+ if (ps != null) {
+ try {
+ ps.close();
+ } catch (Exception e) {
+ __log.warn("Exception while closing prepared statement", e);
+ }
+ }
+ }
+
+ private void close(Connection con) {
+ if (con != null) {
+ try {
+ con.close();
+ } catch (Exception e) {
+ __log.warn("Exception while closing connection", e);
+ }
+ }
+ }
+
+ private Dialect guessDialect() {
+ Dialect d = Dialect.UNKNOWN;
+ Connection con = null;
+ try {
+ con = getConnection();
+ DatabaseMetaData metaData = con.getMetaData();
+ if (metaData != null) {
+ String dbProductName = metaData.getDatabaseProductName();
+ int dbMajorVer = metaData.getDatabaseMajorVersion();
+ __log.debug("Using database " + dbProductName + " major version " + dbMajorVer);
+ if (dbProductName.indexOf("DB2") >= 0) {
+ d = Dialect.DB2;
+ } else if (dbProductName.indexOf("Derby") >= 0) {
+ d = Dialect.DERBY;
+ } else if (dbProductName.indexOf("Firebird") >= 0) {
+ d = Dialect.FIREBIRD;
+ } else if (dbProductName.indexOf("HSQL") >= 0) {
+ d = Dialect.HSQL;
+ } else if (dbProductName.indexOf("Microsoft SQL") >= 0) {
+ d = Dialect.SQLSERVER;
+ } else if (dbProductName.indexOf("MySQL") >= 0) {
+ d = Dialect.MYSQL;
+ } else if (dbProductName.indexOf("Sybase") >= 0 || dbProductName.indexOf("Adaptive") >= 0) {
+ d = Dialect.SYBASE;
+ if( dbMajorVer == 12 ) {
+ d = Dialect.SYBASE12;
+ }
+ }
+ }
+ } catch (SQLException e) {
+ __log.warn("Unable to determine database dialect", e);
+ } finally {
+ close(con);
+ }
+ __log.debug("Using database dialect: " + d);
+ return d;
+ }
+
+ enum Dialect {
+ DB2, DERBY, FIREBIRD, HSQL, MYSQL, ORACLE, SQLSERVER, SYBASE, SYBASE12, UNKNOWN
+ }
+
+ public void close() {
+
+ }
+
+ public boolean isClosed() {
+ return _active.get();
+ }
+
+ public JobDAO createJob(String jobid, boolean transacted, JobDetails jobDetails,
+ boolean persisted, long scheduledDate) {
+ return new JobDAOImpl(scheduledDate, jobid, transacted, jobDetails);
+ }
+
+ public JobDAO createJob(boolean transacted, JobDetails jobDetails,
+ boolean persisted, long scheduledDate){
+ return createJob(new GUID().toString(), transacted, jobDetails, persisted, scheduledDate);
+ }
+
+}
Modified: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/resources/sched_schema.sql
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/resources/sched_schema.sql 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/main/resources/sched_schema.sql 2010-09-01 06:42:40 UTC (rev 929)
@@ -1,15 +1,22 @@
+#H2 DDL
-# MySQL DDL
+CREATE TABLE ode_job (
+ jobid CHAR(64) NOT NULL,
+ ts BIGINT NOT NULL,
+ nodeid char(64),
+ scheduled int NOT NULL,
+ transacted int NOT NULL,
-CREATE TABLE `ODE_JOB` (
- `jobid` CHAR(64) NOT NULL DEFAULT '',
- `ts` BIGINT NOT NULL DEFAULT 0,
- `nodeid` char(64) NULL,
- `scheduled` int NOT NULL DEFAULT 0,
- `transacted` int NOT NULL DEFAULT 0,
- `details` blob(4096) NULL,
- PRIMARY KEY(`jobid`),
- INDEX `IDX_ODE_JOB_TS`('ts'),
- INDEX `IDX_ODE_JOB_NODEID`('nodeid')
-)
+ instanceId BIGINT,
+ mexId varchar(255),
+ processId varchar(255),
+ type varchar(255),
+ channel varchar(255),
+ correlatorId varchar(255),
+ correlationKeySet varchar(255),
+ retryCount int,
+ inMem int,
+ detailsExt binary(4096),
+ PRIMARY KEY(jobid));
+
Added: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DAOConnectionTest.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DAOConnectionTest.java (rev 0)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DAOConnectionTest.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.scheduler.simple;
+
+import java.util.List;
+import java.util.Properties;
+
+import javax.xml.namespace.QName;
+
+import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.Scheduler.JobType;
+import org.apache.ode.dao.scheduler.JobDAO;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnection;
+
+/**
+ *
+ * Test of the JDBC delegate.
+ *
+ * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
+ */
+public class DAOConnectionTest extends SchedulerTestBase {
+
+ public void testGetNodeIds() throws Exception {
+ SchedulerDAOConnection conn = _factory.getConnection();
+ try {
+ // should have no node ids in the db, empty list (not null)
+ _txm.begin();
+ List<String> nids = conn.getNodeIds();
+ _txm.commit();
+ assertNotNull(nids);
+ assertEquals(0, nids.size());
+
+ // try for one nodeid
+ _txm.begin();
+ conn.insertJob(conn.createJob(true, new Scheduler.JobDetails(), true, 0L), "abc", true);
+ _txm.commit();
+ _txm.begin();
+ nids = conn.getNodeIds();
+ _txm.commit();
+ assertEquals(1, nids.size());
+ assertTrue(nids.contains("abc"));
+
+ // check that dups are ignored.
+ _txm.begin();
+ conn.insertJob(conn.createJob(true, new Scheduler.JobDetails(), true, 0L), "abc", true);
+ _txm.commit();
+ _txm.begin();
+ nids = conn.getNodeIds();
+ _txm.commit();
+ assertEquals(1, nids.size());
+ assertTrue(nids.contains("abc"));
+
+ // add another nodeid,
+ _txm.begin();
+ conn.insertJob(conn.createJob(true, new Scheduler.JobDetails(), true, 0L), "123", true);
+ _txm.commit();
+ _txm.begin();
+ nids = conn.getNodeIds();
+ _txm.commit();
+ assertEquals(2, nids.size());
+ assertTrue(nids.contains("abc"));
+ assertTrue(nids.contains("123"));
+ } finally {
+ conn.close();
+ }
+ }
+
+ public void testReassign() throws Exception {
+ SchedulerDAOConnection conn = _factory.getConnection();
+ try {
+ _txm.begin();
+ conn.insertJob(conn.createJob("j1", true, new Scheduler.JobDetails(), true, 100L), "n1", false);
+ conn.insertJob(conn.createJob("j2", true, new Scheduler.JobDetails(), true, 200L), "n2", false);
+ _txm.commit();
+
+ _txm.begin();;
+ int num = conn.updateReassign("n1","n2");
+ _txm.commit();
+
+ assertEquals(1,num);
+
+ _txm.begin();;
+ List<JobDAO> jobs = conn.dequeueImmediate("n2", 400L, 1000);
+ _txm.commit();
+
+ assertEquals(2,jobs.size());
+ } finally {
+ conn.close();
+ }
+ }
+
+ public void testScheduleImmediateTimeFilter() throws Exception {
+ SchedulerDAOConnection conn = _factory.getConnection();
+ try{
+ _txm.begin();
+ conn.insertJob(conn.createJob("j1", true, new Scheduler.JobDetails(), true, 100L), "n1", false);
+ conn.insertJob(conn.createJob("j2",true,new Scheduler.JobDetails(), true, 200L), "n1", false);
+ _txm.commit();
+
+ _txm.begin();
+ List<JobDAO> jobs = conn.dequeueImmediate("n1", 150L, 1000);
+ _txm.commit();
+ assertNotNull(jobs);
+ assertEquals(1, jobs.size());
+ assertEquals("j1",jobs.get(0).getJobId());
+ _txm.begin();
+ jobs = conn.dequeueImmediate("n1", 250L, 1000);
+ _txm.commit();
+ assertNotNull(jobs);
+ assertEquals(1, jobs.size());
+ assertEquals("j2",jobs.get(0).getJobId());
+ } finally {
+ conn.close();
+ }
+ }
+
+ public void testScheduleImmediateMaxRows() throws Exception {
+ SchedulerDAOConnection conn = _factory.getConnection();
+ try{
+ _txm.begin();
+ conn.insertJob(conn.createJob("j1",true,new Scheduler.JobDetails(), true, 100L), "n1", false);
+ conn.insertJob(conn.createJob("j2",true,new Scheduler.JobDetails(), true, 200L), "n1", false);
+ _txm.commit();
+
+ _txm.begin();
+ List<JobDAO> jobs = conn.dequeueImmediate("n1", 201L, 1);
+ _txm.commit();
+ assertNotNull(jobs);
+ assertEquals(1, jobs.size());
+ assertEquals("j1",jobs.get(0).getJobId());
+ } finally {
+ conn.close();
+ }
+ }
+
+ public void testScheduleImmediateNodeFilter() throws Exception {
+ SchedulerDAOConnection conn = _factory.getConnection();
+ try{
+ _txm.begin();
+ conn.insertJob(conn.createJob("j1",true,new Scheduler.JobDetails(), true, 100L), "n1", false);
+ conn.insertJob(conn.createJob("j2",true,new Scheduler.JobDetails(), true, 200L), "n2", false);
+ _txm.commit();
+
+ _txm.begin();
+ List<JobDAO> jobs = conn.dequeueImmediate("n2", 300L, 1000);
+ _txm.commit();
+ assertNotNull(jobs);
+ assertEquals(1, jobs.size());
+ assertEquals("j2",jobs.get(0).getJobId());
+ } finally {
+ conn.close();
+ }
+ }
+
+ public void testDeleteJob() throws Exception {
+ SchedulerDAOConnection conn = _factory.getConnection();
+ try{
+ _txm.begin();
+ conn.insertJob(conn.createJob("j1",true,new Scheduler.JobDetails(),true, 100L), "n1", false);
+ conn.insertJob(conn.createJob("j2",true,new Scheduler.JobDetails(),true, 200L), "n2", false);
+ _txm.commit();
+
+ // try deleting, wrong jobid -- del should fail
+ _txm.begin();;
+ assertFalse(conn.deleteJob("j1x", "n1"));
+ assertEquals(2,conn.getNodeIds().size());
+ _txm.commit();
+
+ // wrong nodeid
+ _txm.begin();
+ assertFalse(conn.deleteJob("j1", "n1x"));
+ _txm.commit();
+ _txm.begin();
+ assertEquals(2,conn.getNodeIds().size());
+ _txm.commit();
+
+ // now do the correct job
+ _txm.begin();
+ assertTrue(conn.deleteJob("j1", "n1"));
+ _txm.commit();
+ _txm.begin();
+ assertEquals(1,conn.getNodeIds().size());
+ _txm.commit();
+ } finally {
+ conn.close();
+ }
+ }
+
+ public void testUpgrade() throws Exception {
+ SchedulerDAOConnection conn = _factory.getConnection();
+ try{
+ _txm.begin();
+ for (int i = 0; i < 200; ++i) {
+ conn.insertJob(conn.createJob("j" +i,true,new Scheduler.JobDetails(), true, i), null, false);
+ }
+ int n1 = conn.updateAssignToNode("n1", 0, 3, 100);
+ int n2 = conn.updateAssignToNode("n2", 1, 3, 100);
+ int n3 = conn.updateAssignToNode("n3", 2, 3, 100);
+ _txm.commit();
+ // Make sure we got 100 upgraded nodes
+ assertEquals(100,n1+n2+n3);
+
+ // now do scheduling.
+ _txm.begin();
+ assertEquals(n1,conn.dequeueImmediate("n1", 10000L, 1000).size());
+ assertEquals(n2,conn.dequeueImmediate("n2", 10000L, 1000).size());
+ assertEquals(n3,conn.dequeueImmediate("n3", 10000L, 1000).size());
+ _txm.commit();
+ } finally {
+ conn.close();
+ }
+ }
+
+ public void testMigration() throws Exception {
+ Scheduler.JobDetails j1 = new Scheduler.JobDetails();
+ j1.getDetailsExt().put("type", "MATCHER");
+ j1.getDetailsExt().put("iid", 1234L);
+ j1.getDetailsExt().put("pid", new QName("http://test1", "test2").toString());
+ j1.getDetailsExt().put("inmem", true);
+ j1.getDetailsExt().put("ckey", "@2[some~001~002]");
+ j1.getDetailsExt().put("channel", "123");
+ j1.getDetailsExt().put("mexid", "mexid123");
+ j1.getDetailsExt().put("correlatorId", "cid123");
+ j1.getDetailsExt().put("retryCount", "15");
+
+ SchedulerDAOConnection conn = _factory.getConnection();
+ try{
+ _txm.begin();
+ conn.insertJob(conn.createJob("migration",true,j1, true, 0), null, false);
+ conn.updateAssignToNode("m", 0, 3, 100);
+ Scheduler.JobDetails j2 = conn.dequeueImmediate("m", 10000L, 1000).get(0).getDetails();
+ _txm.commit();
+ assertEquals(j2.getType(), JobType.MATCHER);
+ assertEquals(j2.getInstanceId(), (Object) 1234L);
+ assertEquals(j2.getProcessId(), new QName("http://test1", "test2"));
+ assertEquals(j2.getInMem(), (Object) true);
+ assertEquals(j2.getCorrelationKeySet().toCanonicalString(), (Object) "@2[some~001~002]");
+ assertEquals(j2.getChannel(), (Object) "123");
+ assertEquals(j2.getMexId(), (Object) "mexid123");
+ assertEquals(j2.getCorrelatorId(), (Object) "cid123");
+ assertEquals(j2.getRetryCount(), (Object) 15);
+ } finally {
+ conn.close();
+ }
+ }
+
+}
Deleted: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DelegateSupport.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DelegateSupport.java 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DelegateSupport.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ode.scheduler.simple;
-
-import java.io.InputStream;
-import java.sql.Connection;
-
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-
-import org.apache.ode.utils.GUID;
-import org.h2.jdbcx.JdbcDataSource;
-
-
-/**
- * Support class for creating a JDBC delegate (using in-mem HSQL db).
- *
- * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
- */
-public class DelegateSupport {
-
- protected DataSource _ds;
- protected JdbcDelegate _del;
-
- public DelegateSupport() throws Exception {
- this(null);
- }
-
- public DelegateSupport(TransactionManager txm) throws Exception {
- initialize(txm);
- }
-
- protected void initialize(TransactionManager txm) throws Exception {
- JdbcDataSource ds = new JdbcDataSource();
- ds.setURL("jdbc:h2:mem:" + new GUID().toString() + ";DB_CLOSE_DELAY=-1");
- ds.setUser("sa");
- ds.setPassword("");
- _ds = ds;
-
- setup();
- _del = new JdbcDelegate(_ds);
- }
-
- public DatabaseDelegate delegate() {
- return _del;
- }
-
- public void setup() throws Exception {
- Connection c = _ds.getConnection();
- try {
- StringBuffer sql = new StringBuffer();
-
- {
- InputStream in = getClass().getResourceAsStream("/simplesched-hsql.sql");
- int v;
- while ((v = in.read()) != -1) {
- sql.append((char) v);
- }
- }
-
- //c.createStatement().executeUpdate("CREATE ALIAS MOD FOR \"org.apache.ode.scheduler.simple.DelegateSupport.mod\";");
- c.createStatement().executeUpdate(sql.toString());
- } finally {
- c.close();
- }
-
- }
-
- public static long mod(long a, long b) {
- return a % b;
- }
-}
-
Deleted: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/GeronimoDelegateSupport.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/GeronimoDelegateSupport.java 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/GeronimoDelegateSupport.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ode.scheduler.simple;
-
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-
-import org.apache.geronimo.connector.outbound.GenericConnectionManager;
-import org.apache.geronimo.connector.outbound.connectionmanagerconfig.LocalTransactions;
-import org.apache.geronimo.connector.outbound.connectionmanagerconfig.PoolingSupport;
-import org.apache.geronimo.connector.outbound.connectionmanagerconfig.SinglePool;
-import org.apache.geronimo.connector.outbound.connectionmanagerconfig.TransactionSupport;
-import org.apache.geronimo.connector.outbound.connectiontracking.ConnectionTracker;
-import org.apache.geronimo.connector.outbound.connectiontracking.ConnectionTrackingCoordinator;
-import org.apache.geronimo.transaction.manager.RecoverableTransactionManager;
-import org.apache.ode.utils.GUID;
-import org.tranql.connector.jdbc.JDBCDriverMCF;
-
-public class GeronimoDelegateSupport extends DelegateSupport {
- private static final int CONNECTION_MAX_WAIT_MILLIS = 30000;
-
- private static final int CONNECTION_MAX_IDLE_MINUTES = 5;
-
- private GenericConnectionManager _connectionManager;
-
- public GeronimoDelegateSupport(TransactionManager txm) throws Exception {
- super(txm);
- }
-
- @Override
- protected void initialize(TransactionManager txm) throws Exception {
- _ds = createGeronimoDataSource(txm, "jdbc:h2:mem:" + new GUID().toString(), org.h2.Driver.class.getName(), "sa", "");
- setup();
- _del = new JdbcDelegate(_ds);
- }
-
- private DataSource createGeronimoDataSource(TransactionManager txm, String url, String driverClass, String username,String password) {
- TransactionSupport transactionSupport = LocalTransactions.INSTANCE;
- ConnectionTracker connectionTracker = new ConnectionTrackingCoordinator();
-
- PoolingSupport poolingSupport = new SinglePool(1, 1,
- CONNECTION_MAX_WAIT_MILLIS,
- CONNECTION_MAX_IDLE_MINUTES,
- true, // match one
- false, // match all
- false); // select one assume match
-
- _connectionManager = new GenericConnectionManager(
- transactionSupport,
- poolingSupport,
- null,
- connectionTracker,
- (RecoverableTransactionManager) txm,
- getClass().getName(),
- getClass().getClassLoader());
-
- JDBCDriverMCF mcf = new JDBCDriverMCF();
- try {
- mcf.setDriver(driverClass);
- mcf.setConnectionURL(url);
- if (username != null) {
- mcf.setUserName(username);
- }
- if (password != null) {
- mcf.setPassword(password);
- }
- _connectionManager.doStart();
- return (DataSource) mcf.createConnectionFactory(_connectionManager);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
-}
Deleted: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ode.scheduler.simple;
-
-import java.util.HashMap;
-import java.util.List;
-
-import javax.xml.namespace.QName;
-
-import org.apache.ode.bpel.iapi.Scheduler;
-import org.apache.ode.bpel.iapi.Scheduler.JobType;
-import org.apache.ode.scheduler.simple.DatabaseDelegate;
-import org.apache.ode.scheduler.simple.Job;
-
-
-import junit.framework.TestCase;
-
-/**
- *
- * Test of the JDBC delegate.
- *
- * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
- */
-public class JdbcDelegateTest extends TestCase {
-
- private DelegateSupport _ds;
- private DatabaseDelegate _del;
-
- public void setUp() throws Exception {
- _ds = new DelegateSupport();
- _del = _ds.delegate();
- }
-
-
- public void testGetNodeIds() throws Exception {
- // should have no node ids in the db, empty list (not null)
- List<String> nids = _del.getNodeIds();
- assertNotNull(nids);
- assertEquals(0, nids.size());
-
- // try for one nodeid
- _del.insertJob(new Job(0L,true,new Scheduler.JobDetails()), "abc", true);
- nids = _del.getNodeIds();
- assertEquals(1, nids.size());
- assertTrue(nids.contains("abc"));
-
- // check that dups are ignored.
- _del.insertJob(new Job(0L,true,new Scheduler.JobDetails()), "abc", true);
- nids = _del.getNodeIds();
- assertEquals(1, nids.size());
- assertTrue(nids.contains("abc"));
-
- // add another nodeid,
- _del.insertJob(new Job(0L,true,new Scheduler.JobDetails()), "123", true);
- nids = _del.getNodeIds();
- assertEquals(2, nids.size());
- assertTrue(nids.contains("abc"));
- assertTrue(nids.contains("123"));
- }
-
- public void testReassign() throws Exception {
- _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetails()), "n1", false);
- _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetails()), "n2", false);
-
- assertEquals(1,_del.updateReassign("n1","n2"));
- List<Job> jobs = _del.dequeueImmediate("n2", 400L, 1000);
- assertEquals(2,jobs.size());
- }
-
- public void testScheduleImmediateTimeFilter() throws Exception {
- _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetails()), "n1", false);
- _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetails()), "n1", false);
-
-
- List<Job> jobs = _del.dequeueImmediate("n1", 150L, 1000);
- assertNotNull(jobs);
- assertEquals(1, jobs.size());
- assertEquals("j1",jobs.get(0).jobId);
- jobs = _del.dequeueImmediate("n1", 250L, 1000);
- assertNotNull(jobs);
- assertEquals(2, jobs.size());
- assertEquals("j1",jobs.get(0).jobId);
- assertEquals("j2",jobs.get(1).jobId);
- }
-
- public void testScheduleImmediateMaxRows() throws Exception {
- _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetails()), "n1", false);
- _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetails()), "n1", false);
-
- List<Job> jobs = _del.dequeueImmediate("n1", 201L, 1);
- assertNotNull(jobs);
- assertEquals(1, jobs.size());
- assertEquals("j1",jobs.get(0).jobId);
- }
-
- public void testScheduleImmediateNodeFilter() throws Exception {
- _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetails()), "n1", false);
- _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetails()), "n2", false);
-
- List<Job> jobs = _del.dequeueImmediate("n2", 300L, 1000);
- assertNotNull(jobs);
- assertEquals(1, jobs.size());
- assertEquals("j2",jobs.get(0).jobId);
- }
-
- public void testDeleteJob() throws Exception {
- _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetails()), "n1", false);
- _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetails()), "n2", false);
-
- // try deleting, wrong jobid -- del should fail
- assertFalse(_del.deleteJob("j1x", "n1"));
- assertEquals(2,_del.getNodeIds().size());
-
- // wrong nodeid
- assertFalse(_del.deleteJob("j1", "n1x"));
- assertEquals(2,_del.getNodeIds().size());
-
- // now do the correct job
- assertTrue(_del.deleteJob("j1", "n1"));
- assertEquals(1,_del.getNodeIds().size());
- }
-
- public void testUpgrade() throws Exception {
- for (int i = 0; i < 200; ++i)
- _del.insertJob(new Job(i ,"j" +i,true,new Scheduler.JobDetails()), null, false);
-
- int n1 = _del.updateAssignToNode("n1", 0, 3, 100);
- int n2 = _del.updateAssignToNode("n2", 1, 3, 100);
- int n3 = _del.updateAssignToNode("n3", 2, 3, 100);
- // Make sure we got 100 upgraded nodes
- assertEquals(100,n1+n2+n3);
-
- // now do scheduling.
- assertEquals(n1,_del.dequeueImmediate("n1", 10000L, 1000).size());
- assertEquals(n2,_del.dequeueImmediate("n2", 10000L, 1000).size());
- assertEquals(n3,_del.dequeueImmediate("n3", 10000L, 1000).size());
- }
-
- public void testMigration() throws Exception {
- Scheduler.JobDetails j1 = new Scheduler.JobDetails();
- j1.getDetailsExt().put("type", "MATCHER");
- j1.getDetailsExt().put("iid", 1234L);
- j1.getDetailsExt().put("pid", new QName("http://test1", "test2").toString());
- j1.getDetailsExt().put("inmem", true);
- j1.getDetailsExt().put("ckey", "@2[some~001~002]");
- j1.getDetailsExt().put("channel", "123");
- j1.getDetailsExt().put("mexid", "mexid123");
- j1.getDetailsExt().put("correlatorId", "cid123");
- j1.getDetailsExt().put("retryCount", "15");
-
- _del.insertJob(new Job(0 ,"migration",true,j1), null, false);
- _del.updateAssignToNode("m", 0, 3, 100);
- Scheduler.JobDetails j2 = _del.dequeueImmediate("m", 10000L, 1000).get(0).detail;
-
- assertEquals(j2.getType(), JobType.MATCHER);
- assertEquals(j2.getInstanceId(), (Object) 1234L);
- assertEquals(j2.getProcessId(), new QName("http://test1", "test2"));
- assertEquals(j2.getInMem(), (Object) true);
- assertEquals(j2.getCorrelationKeySet().toCanonicalString(), (Object) "@2[some~001~002]");
- assertEquals(j2.getChannel(), (Object) "123");
- assertEquals(j2.getMexId(), (Object) "mexid123");
- assertEquals(j2.getCorrelatorId(), (Object) "cid123");
- assertEquals(j2.getRetryCount(), (Object) 15);
- }
-}
Modified: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -19,35 +19,40 @@
package org.apache.ode.scheduler.simple;
-import org.apache.ode.bpel.iapi.Scheduler;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
+import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnection;
+import org.junit.Ignore;
+import org.junit.Test;
-import javax.transaction.TransactionManager;
-import java.util.*;
-import java.util.concurrent.Callable;
-
-import junit.framework.TestCase;
-
/**
* @author Matthieu Riou <mriou(a)apache.org>
*/
-public class RetriesTest extends TestCase implements Scheduler.JobProcessor {
+public class RetriesTest extends SchedulerTestBase implements Scheduler.JobProcessor {
private static final Log __log = LogFactory.getLog(RetriesTest.class);
-
- DelegateSupport _ds;
+
SimpleScheduler _scheduler;
ArrayList<Scheduler.JobInfo> _jobs;
ArrayList<Scheduler.JobInfo> _commit;
- TransactionManager _txm;
+
int _tried = 0;
- Scheduler.JobInfo _jobInfo = null;
+ @Override
+ protected Properties getProperties() {
+ Properties p = new Properties();
+ p.put("needed.Rollback", "true");
+ return p;
+ }
+
public void setUp() throws Exception {
- _txm = new GeronimoTransactionManager();
- _ds = new GeronimoDelegateSupport(_txm);
-
+ super.setUp();
+
_scheduler = newScheduler("n1");
_jobs = new ArrayList<Scheduler.JobInfo>(100);
_commit = new ArrayList<Scheduler.JobInfo>(100);
@@ -55,6 +60,7 @@
public void tearDown() throws Exception {
_scheduler.shutdown();
+ super.tearDown();
}
public void testRetries() throws Exception {
@@ -62,17 +68,20 @@
_scheduler.setNearFutureInterval(5000);
_scheduler.setImmediateInterval(1000);
_scheduler.start();
+
+ SchedulerDAOConnection conn = _factory.getConnection();
_txm.begin();
try {
_scheduler.schedulePersistedJob(newDetail("123"), new Date());
} finally {
_txm.commit();
+ conn.close();
}
- Thread.sleep(10000);
- assertEquals(6, _tried);
+ Thread.sleep(20000);
+ assertEquals(8, _tried);
}
-
+
public void testExecTransaction() throws Exception {
final int[] tryCount = new int[1];
tryCount[0] = 0;
@@ -91,11 +100,11 @@
_scheduler.execTransaction(transaction);
assertEquals(3, tryCount[0]);
}
-
- public void onScheduledJob(Scheduler.JobInfo jobInfo) throws Scheduler.JobProcessorException {
- _jobInfo = jobInfo;
-
- _tried++;
+
+
+ public void onScheduledJob(Scheduler.JobInfo jobInfo) throws Scheduler.JobProcessorException {
+ __log.info("onScheduledJob " + jobInfo.jobName + ": " + jobInfo.retryCount);
+ _tried ++;
throw new Scheduler.JobProcessorException(jobInfo.retryCount < 1);
}
@@ -106,9 +115,9 @@
}
private SimpleScheduler newScheduler(String nodeId) {
- SimpleScheduler scheduler = new SimpleScheduler(nodeId, _ds.delegate(), new Properties());
+ SimpleScheduler scheduler = new SimpleScheduler(nodeId, _factory, _txm, new Properties());
+ scheduler.setTransactionManager(_txm);
scheduler.setJobProcessor(this);
- scheduler.setTransactionManager(_txm);
return scheduler;
}
}
Added: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerTestBase.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerTestBase.java (rev 0)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerTestBase.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.scheduler.simple;
+
+import java.io.InputStream;
+import java.sql.Connection;
+import java.util.Properties;
+
+import javax.transaction.TransactionManager;
+
+import junit.framework.TestCase;
+
+import org.apache.ode.dao.scheduler.SchedulerDAOConnectionFactory;
+import org.apache.ode.il.config.OdeConfigProperties;
+import org.apache.ode.il.dbutil.Database;
+import org.apache.ode.il.txutil.TxManager;
+import org.apache.ode.scheduler.simple.jdbc.SchedulerDAOConnectionFactoryImpl;
+
+
+/**
+ * Support class for creating a JDBC delegate (using in-mem HSQL db).
+ *
+ * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
+ */
+public class SchedulerTestBase extends TestCase{
+
+ protected Database _db;
+ protected SchedulerDAOConnectionFactory _factory;
+ protected TransactionManager _txm;
+
+ @Override
+ public void setUp() throws Exception {
+ Properties props = getProperties();
+ props.put(OdeConfigProperties.PROP_DAOCF_SCHEDULER, System.getProperty(OdeConfigProperties.PROP_DAOCF_SCHEDULER,OdeConfigProperties.DEFAULT_DAOCF_SCHEDULER_CLASS));
+ OdeConfigProperties odeProps = new OdeConfigProperties(props, "");
+ TxManager tx = new TxManager(odeProps);
+ _txm = tx.createTransactionManager();
+ _db = new Database(odeProps);
+ _db.setTransactionManager(_txm);
+ _db.start();
+ _factory = _db.createDaoSchedulerCF();
+
+ if (_factory instanceof SchedulerDAOConnectionFactoryImpl) {
+ Connection c = _db.getDataSource().getConnection();
+ try {
+ StringBuffer sql = new StringBuffer();
+
+ {
+ InputStream in = getClass().getResourceAsStream("/simplesched-hsql.sql");
+ int v;
+ while ((v = in.read()) != -1) {
+ sql.append((char) v);
+ }
+ }
+
+ String[] cmds = sql.toString().split(";");
+ for (String cmd : cmds) {
+ c.createStatement().executeUpdate(cmd);
+ }
+ } finally {
+ c.close();
+ }
+ }
+
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ _factory.shutdown();
+ _db.shutdown();
+
+ }
+
+ protected Properties getProperties() {
+ return new Properties();
+ }
+
+ public static long mod(long a, long b) {
+ return a % b;
+ }
+}
+
Modified: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -25,8 +25,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ode.dao.scheduler.Task;
import org.apache.ode.scheduler.simple.SchedulerThread;
-import org.apache.ode.scheduler.simple.Task;
import org.apache.ode.scheduler.simple.TaskRunner;
@@ -91,12 +91,12 @@
// Make sure they got scheduled in the right order
for (int i = 0; i < 299; ++i)
- assertTrue(_tasks.get(i).task.schedDate < _tasks.get(i+1).task.schedDate);
+ assertTrue(_tasks.get(i).task.getScheduledDate() < _tasks.get(i+1).task.getScheduledDate());
// Check scheduling tolerance
for (TR tr : _tasks) {
- assertTrue(tr.time < tr.task.schedDate + SCHED_TOLERANCE / 2);
- assertTrue(tr.time > tr.task.schedDate - SCHED_TOLERANCE / 2);
+ assertTrue(tr.time < tr.task.getScheduledDate() + SCHED_TOLERANCE / 2);
+ assertTrue(tr.time > tr.task.getScheduledDate() - SCHED_TOLERANCE / 2);
}
}
Modified: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java 2010-09-01 06:42:40 UTC (rev 929)
@@ -19,66 +19,50 @@
package org.apache.ode.scheduler.simple;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Properties;
-import javax.transaction.RollbackException;
-import javax.transaction.Status;
-import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
-import javax.transaction.TransactionManager;
-
-import junit.framework.TestCase;
-
-import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
import org.apache.ode.bpel.iapi.Scheduler.JobProcessor;
import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
-import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
-public class SimpleSchedulerTest extends TestCase implements JobProcessor {
+public class SimpleSchedulerTest extends SchedulerTestBase implements JobProcessor {
- DelegateSupport _ds;
SimpleScheduler _scheduler;
ArrayList<JobInfo> _jobs;
- ArrayList<JobInfo> _commit;
- TransactionManager _txm;
-
+
+ @Override
public void setUp() throws Exception {
- _txm = new GeronimoTransactionManager();
- _ds = new DelegateSupport();
-
+ super.setUp();
+
_scheduler = newScheduler("n1");
_jobs = new ArrayList<JobInfo>(100);
- _commit = new ArrayList<JobInfo>(100);
}
-
+
+ @Override
public void tearDown() throws Exception {
_scheduler.shutdown();
+ super.tearDown();
}
public void testConcurrentExec() throws Exception {
_scheduler.start();
- for (int i=0; i<10; i++) {
- _txm.begin();
- String jobId;
- try {
- int jobs = _jobs.size();
- jobId = _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 200));
- Thread.sleep(100);
- // we're using transacted jobs which means it will commit at the end
- // if the job is scheduled, the following assert is not valid @seanahn
- // assertEquals(jobs, _jobs.size());
- } finally {
- _txm.commit();
- }
- // Delete from DB
- assertEquals(true,_ds.delegate().deleteJob(jobId, "n1"));
- // Wait for the job to be execed.
- Thread.sleep(250);
- // We should always have same number of jobs/commits
- assertEquals(_jobs.size(), _commit.size());
+ _txm.begin();
+ String jobId;
+ try {
+ jobId = _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 100));
+ Thread.sleep(200);
+ // Make sure we don't schedule until commit.
+ assertEquals(0, _jobs.size());
+ } finally {
+ _txm.commit();
}
+ // Wait for the job to be execed.
+ Thread.sleep(100);
+ // Should execute job,
+ assertEquals(1, _jobs.size());
}
public void testImmediateScheduling() throws Exception {
@@ -212,32 +196,6 @@
synchronized (_jobs) {
_jobs.add(jobInfo);
}
-
- try {
- _txm.getTransaction().registerSynchronization(new Synchronization() {
-
- public void afterCompletion(int arg0) {
- if (arg0 == Status.STATUS_COMMITTED)
- _commit.add(jobInfo);
- }
-
- public void beforeCompletion() {
- // TODO Auto-generated method stub
-
- }
-
- });
- } catch (IllegalStateException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (RollbackException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (SystemException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
}
Scheduler.JobDetails newDetail(String x) {
@@ -247,10 +205,9 @@
}
private SimpleScheduler newScheduler(String nodeId) {
- SimpleScheduler scheduler = new SimpleScheduler(nodeId, _ds.delegate(), new Properties());
+ SimpleScheduler scheduler = new SimpleScheduler(nodeId, _factory, _txm, new Properties());
scheduler.setJobProcessor(this);
- scheduler.setTransactionManager(_txm);
return scheduler;
- }
+ }
}
Modified: branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/resources/log4j.properties
===================================================================
--- branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/resources/log4j.properties 2010-08-25 14:32:17 UTC (rev 928)
+++ branches/ODE/RiftSaw-ODE-trunk/scheduler-simple/src/test/resources/log4j.properties 2010-09-01 06:42:40 UTC (rev 929)
@@ -19,10 +19,13 @@
log4j.rootLogger=WARN, CONSOLE
# log4j properties to work with commandline tools.
-log4j.category.org.apache.ode.scheduler.simple.RetriesTest=DEBUG
+log4j.category.org.apache.ode.scheduler.simple.RetriesTest=INFO
+log4j.category.org.apache.ode.scheduler.simple.SimpleScheduler=INFO
+log4j.category.org.apache.ode.il.dbutil.H2Database=INFO
+log4j.category.org.apache.ode.dao.jpa.hibernate=DEBUG
log4j.category.org.apache.ode.bpel.engine=INFO
# Console appender
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%p - %C{1}.%M(%L) | %m%n
+log4j.appender.CONSOLE.layout.ConversionPattern=%d %-5r %-5p [%c] (%t:%x) %m%n
14 years, 6 months