Author: jeff.yuchang
Date: 2012-08-13 00:37:52 -0400 (Mon, 13 Aug 2012)
New Revision: 1588
Modified:
branches/RiftSaw-2.3.x/distribution/src/main/release/conf/bpel.properties
branches/RiftSaw-2.3.x/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/engine/ode/BPELEngineImpl.java
Log:
RIFTSAW-504, added retry mechanism for invoking ODE.
Modified: branches/RiftSaw-2.3.x/distribution/src/main/release/conf/bpel.properties
===================================================================
--- branches/RiftSaw-2.3.x/distribution/src/main/release/conf/bpel.properties 2012-07-30
10:04:52 UTC (rev 1587)
+++ branches/RiftSaw-2.3.x/distribution/src/main/release/conf/bpel.properties 2012-08-13
04:37:52 UTC (rev 1588)
@@ -103,3 +103,7 @@
# Property used to configure whether ODE should store the BPEL events in its default
location
# (Set to false, as RiftSaw uses the BPEL events stored by the BPAFLogAdapter)
persist.bpel.events = false
+
+# Retry limit and time interval (milli-second) for the client invoking ODE engine.
+#bpel.ode.engine.immediateTransactionRetryInterval = 1000
+#bpel.ode.engine.immediateTransactionRetryLimit = 5
Modified:
branches/RiftSaw-2.3.x/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/engine/ode/BPELEngineImpl.java
===================================================================
---
branches/RiftSaw-2.3.x/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/engine/ode/BPELEngineImpl.java 2012-07-30
10:04:52 UTC (rev 1587)
+++
branches/RiftSaw-2.3.x/runtime/engine/src/main/java/org/jboss/soa/bpel/runtime/engine/ode/BPELEngineImpl.java 2012-08-13
04:37:52 UTC (rev 1588)
@@ -67,6 +67,12 @@
public final static String BPEL_WEBSERVICE_SECURE = "webservice.secure";
public final static String BPEL_WEBSERVICE_BASEURL = "webservice.baseurl";
+ /** Number of immediate retries when the transaction fails **/
+ private static int _immediateTransactionRetryLimit = 3;
+
+ /** Interval between immediate retries when the transaction fails **/
+ private static long _immediateTransactionRetryInterval = 1000;
+
protected final Log __log = LogFactory.getLog(getClass());
protected final Log __logTx = LogFactory.getLog("org.apache.ode.tx");
@@ -94,6 +100,16 @@
{
return new BpelManagementFacadeImpl(_bpelServer, _store);
}
+
+ private void loadImmediateTransactionRetryInterval() {
+ String value =
_odeConfig.getProperty("ode.engine.immediateTransactionRetryInterval",
"1000");
+ _immediateTransactionRetryInterval = Long.valueOf(value).longValue();
+ }
+
+ private void loadImmediateTransactionRetryLimit() {
+ String value =
_odeConfig.getProperty("ode.engine.immediateTransactionRetryLimit",
"3");
+ _immediateTransactionRetryLimit = Integer.valueOf(value).intValue();
+ }
/**
* This method invokes a BPEL process, associated with the supplied
@@ -104,69 +120,90 @@
public void invoke(InvocationAdapter invocationAdapter)
throws Exception
{
- boolean success = true;
MyRoleMessageExchange odeMex = null;
Future responseFuture = null;
-
- try
- {
- // start TX
- _txMgr.begin();
- if (__log.isDebugEnabled()) __log.debug("Starting transaction.");
-
- odeMex = createMessageExchange(invocationAdapter);
- odeMex.setProperty("isTwoWay",
Boolean.toString(odeMex.getOperation().getOutput() != null));
- if (__log.isDebugEnabled()) __log.debug("Is two way operation?
"+odeMex.getProperty("isTwoWay"));
-
- if (odeMex.getOperation() != null)
- {
- // Preparing message to send to ODE
- Message odeRequest =
odeMex.createMessage(odeMex.getOperation().getInput().getMessage().getQName());
-
- // distinguish WS and ESB invocation
- invocationAdapter.initRequest(odeMex.getOperation(), new
ODEMessageAdapter(odeRequest));
-
- // TODO: Might need to store session id/epr of caller?
- //readHeader(msgContext, odeMex);
-
- if (__log.isDebugEnabled()) {
- __log.debug("Invoking ODE using MEX " + odeMex);
- __log.debug("Message content: " +
DOMUtils.domToString(odeRequest.getMessage()));
- }
-
- // Invoke ODE
- responseFuture = odeMex.invoke(odeRequest);
-
- __log.debug("Commiting ODE MEX " + odeMex);
- try {
- if (__log.isDebugEnabled()) __log.debug("Commiting transaction.");
- _txMgr.commit();
- } catch (Exception e) {
- __log.error("Commit failed", e);
- success = false;
- }
- } else {
- success = false;
- }
- } catch (Exception e) {
- __log.error("Exception occured while invoking ODE", e);
- success = false;
- String mesg = e.getMessage();
- if (mesg == null) {
- mesg = "An exception occured while invoking ODE.";
- }
- throw new Exception(mesg, e);
+ boolean success = true;
+ int immediateRetryCount = _immediateTransactionRetryLimit;
+ Exception theException = null;
+
+ try {
+ do {
+ success = true;
+ theException = null;
+
+ try
+ {
+ // start TX
+ _txMgr.begin();
+ if (__log.isDebugEnabled()) __log.debug("Starting transaction.");
+
+ odeMex = createMessageExchange(invocationAdapter);
+ odeMex.setProperty("isTwoWay",
Boolean.toString(odeMex.getOperation().getOutput() != null));
+ if (__log.isDebugEnabled()) __log.debug("Is two way operation?
"+odeMex.getProperty("isTwoWay"));
+
+ if (odeMex.getOperation() != null)
+ {
+ // Preparing message to send to ODE
+ Message odeRequest =
odeMex.createMessage(odeMex.getOperation().getInput().getMessage().getQName());
+
+ // distinguish WS and ESB invocation
+ invocationAdapter.initRequest(odeMex.getOperation(), new
ODEMessageAdapter(odeRequest));
+
+ // TODO: Might need to store session id/epr of caller?
+ //readHeader(msgContext, odeMex);
+
+ if (__log.isDebugEnabled()) {
+ __log.debug("Invoking ODE using MEX " + odeMex);
+ __log.debug("Message content: " +
DOMUtils.domToString(odeRequest.getMessage()));
+ }
+
+ // Invoke ODE
+ responseFuture = odeMex.invoke(odeRequest);
+
+ } else {
+ success = false;
+ }
+ } catch (Exception e) {
+ __log.error("Exception occured while invoking ODE", e);
+ success = false;
+ String mesg = e.getMessage();
+ if (mesg == null) {
+ mesg = "An exception occured while invoking ODE.";
+ }
+ theException = e;
+ } finally {
+ if (success) {
+ __log.debug("Commiting ODE MEX " + odeMex);
+ try {
+ if (__log.isDebugEnabled()) __log.debug("Commiting
transaction.");
+ _txMgr.commit();
+ } catch (Exception e) {
+ __log.debug("Invoking ODE commit failed", e);
+ success = false;
+ theException = e;
+ }
+ } else {
+ if (odeMex != null) odeMex.release(success);
+ try {
+ _txMgr.rollback();
+ } catch (Exception e) {
+ __log.debug("Invoking ODE transaction rollback failed", e);
+ theException = e;
+ }
+ }
+ if (theException != null && immediateRetryCount > 0) {
+ if (__log.isDebugEnabled()) __log.debug("Will retry the transaction in
" + _immediateTransactionRetryInterval + " msecs on for following error: ",
theException);
+ Thread.sleep(_immediateTransactionRetryInterval);
+ }
+ }
+ } while (!success && immediateRetryCount-- > 0) ;
} finally {
- if (!success) {
- if (odeMex != null) odeMex.release(success);
- try {
- _txMgr.rollback();
- } catch (Exception e) {
- throw new Exception("Rollback failed", e);
- }
- }
+ if (theException != null) {
+ throw theException;
+ }
}
-
+
+
if (odeMex.getOperation().getOutput() != null) {
// Waits for the response to arrive
try {
@@ -329,6 +366,10 @@
__log.debug("Starting scheduler");
_scheduler.start();
+
+ loadImmediateTransactionRetryInterval();
+ loadImmediateTransactionRetryLimit();
+
}
@@ -865,4 +906,5 @@
return _uddiRegistration;
}
+
}
Show replies by date