[jboss-cvs] JBossAS SVN: r112081 - in projects/cluster/ha-client/trunk: src/main/java/org/jboss/aspects/remoting and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Aug 17 03:57:41 EDT 2011
Author: pferraro
Date: 2011-08-17 03:57:41 -0400 (Wed, 17 Aug 2011)
New Revision: 112081
Modified:
projects/cluster/ha-client/trunk/pom.xml
projects/cluster/ha-client/trunk/src/main/java/org/jboss/aspects/remoting/ClusterChooserInterceptor.java
Log:
EJBTHREE-1137 ClusterChooserInterceptor does not support transaction failover authorisations
Modified: projects/cluster/ha-client/trunk/pom.xml
===================================================================
--- projects/cluster/ha-client/trunk/pom.xml 2011-08-16 22:42:20 UTC (rev 112080)
+++ projects/cluster/ha-client/trunk/pom.xml 2011-08-17 07:57:41 UTC (rev 112081)
@@ -34,6 +34,7 @@
<version.jboss.remoting.aspects>1.0.2</version.jboss.remoting.aspects>
<version.jboss.remoting>2.5.3</version.jboss.remoting>
<version.jboss.aop>2.2.0.Final</version.jboss.aop>
+ <version.jboss.integration>5.1.0.GA</version.jboss.integration>
<version.junit>4.8.2</version.junit>
</properties>
@@ -125,6 +126,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.jboss.integration</groupId>
+ <artifactId>jboss-transaction-spi</artifactId>
+ <version>${version.jboss.integration}</version>
+ </dependency>
<!-- Test dependencies -->
<dependency>
Modified: projects/cluster/ha-client/trunk/src/main/java/org/jboss/aspects/remoting/ClusterChooserInterceptor.java
===================================================================
--- projects/cluster/ha-client/trunk/src/main/java/org/jboss/aspects/remoting/ClusterChooserInterceptor.java 2011-08-16 22:42:20 UTC (rev 112080)
+++ projects/cluster/ha-client/trunk/src/main/java/org/jboss/aspects/remoting/ClusterChooserInterceptor.java 2011-08-17 07:57:41 UTC (rev 112081)
@@ -21,16 +21,22 @@
*/
package org.jboss.aspects.remoting;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.WeakHashMap;
+
import org.jboss.aop.DispatcherConnectException;
import org.jboss.aop.util.PayloadKey;
import org.jboss.ha.client.loadbalance.AopLoadBalancePolicy;
import org.jboss.ha.client.loadbalance.LoadBalancePolicy;
import org.jboss.ha.framework.interfaces.GenericClusteringException;
+import org.jboss.logging.Logger;
import org.jboss.remoting.CannotConnectException;
import org.jboss.remoting.InvokerLocator;
+import org.jboss.tm.TransactionPropagationContextFactory;
+import org.jboss.tm.TransactionPropagationContextUtil;
-import java.util.ArrayList;
-
/**
* Pick an invocation target
*
@@ -41,15 +47,17 @@
{
private static final long serialVersionUID = -8666382019058421135L;
+ private static Logger log = Logger.getLogger(ClusterChooserInterceptor.class);
+ private static boolean trace = log.isTraceEnabled();
public static final ClusterChooserInterceptor singleton = new ClusterChooserInterceptor();
-
+ private static final Map<Object, InvokerLocator> txStickyTargets = Collections.synchronizedMap(new WeakHashMap<Object, InvokerLocator>());
+
public String getName()
{
return "ClusterChooserInterceptor";
}
- public Object invoke(org.jboss.aop.joinpoint.Invocation invocation)
- throws Throwable
+ public Object invoke(org.jboss.aop.joinpoint.Invocation invocation) throws Throwable
{
LoadBalancePolicy lb = (LoadBalancePolicy) invocation.getMetaData(CLUSTERED_REMOTING, LOADBALANCE_POLICY);
FamilyWrapper family = (FamilyWrapper) invocation.getMetaData(CLUSTERED_REMOTING, CLUSTER_FAMILY_WRAPPER);
@@ -60,37 +68,77 @@
int failoverCounter = 0;
String familyName = family.get().getFamilyName();
invocation.getMetaData().addMetaData(CLUSTERED_REMOTING, CLUSTER_FAMILY, familyName, PayloadKey.AS_IS);
+
InvokerLocator target = null;
- if (lb instanceof AopLoadBalancePolicy)
+ InvokerLocator txStickyTarget = null;
+ Object tpc = this.getTransactionPropagationContext();
+ if (tpc != null)
{
- target = (InvokerLocator) ((AopLoadBalancePolicy) lb).chooseTarget(family.get(), invocation);
+ txStickyTarget = txStickyTargets.get(tpc);
+ if (txStickyTarget != null)
+ {
+ if (family.get().getTargets().contains(txStickyTarget))
+ {
+ if (trace)
+ {
+ log.trace("Using transaction sticky target: " + txStickyTarget);
+ }
+ target = txStickyTarget;
+ }
+ else
+ {
+ throw new RuntimeException("Current transaction is stuck to " + txStickyTarget + " which is no longer available. Halting invocation.");
+ }
+ }
}
- else
+
+ if (target == null)
{
- target = (InvokerLocator) lb.chooseTarget(family.get());
+ if (lb instanceof AopLoadBalancePolicy)
+ {
+ target = (InvokerLocator) ((AopLoadBalancePolicy) lb).chooseTarget(family.get(), invocation);
+ }
+ else
+ {
+ target = (InvokerLocator) lb.chooseTarget(family.get());
+ }
}
Throwable lastException = null;
- while (target != null)
+ boolean failoverAuthorized = true;
+ while ((target != null) && failoverAuthorized)
{
invocation.getMetaData().addMetaData(CLUSTERED_REMOTING, FAILOVER_COUNTER, new Integer(failoverCounter), PayloadKey.AS_IS);
invocation.getMetaData().addMetaData(InvokeRemoteInterceptor.REMOTING, InvokeRemoteInterceptor.INVOKER_LOCATOR, target, PayloadKey.AS_IS);
invocation.getMetaData().addMetaData(CLUSTERED_REMOTING, CLUSTER_VIEW_ID, new Long(family.get().getCurrentViewId()), PayloadKey.AS_IS);
-
+
boolean definitivlyRemoveNodeOnFailure = true;
lastException = null;
try
{
Object rsp = invocation.invokeNext();
- ArrayList newReplicants = (ArrayList) invocation.getResponseAttachment("replicants");
+ List<?> newReplicants = (List<?>) invocation.getResponseAttachment("replicants");
if (newReplicants != null)
{
long newViewId = ((Long) invocation.getResponseAttachment("viewId")).longValue();
family.get().updateClusterInfo(newReplicants, newViewId);
}
+
+ if (txStickyTarget == null)
+ {
+ if (invocationHasReachedAServer(target))
+ {
+ if (trace)
+ {
+ log.trace("Setting " + target + " as the new sticky target");
+ }
+ txStickyTarget = target;
+ }
+ }
+
return rsp;
}
- catch(DispatcherConnectException dce)
+ catch (DispatcherConnectException dce)
{
//In case of graceful shutdown, the target object will no longer exist in the Dispatcher,
//fail over to another server if we can...
@@ -121,6 +169,7 @@
}
else
{
+ this.invocationHasReachedAServer(target);
throw new RuntimeException("Clustering exception thrown", gce);
}
}
@@ -129,7 +178,7 @@
// Just in case this get wrapped in a Throwable. This can happen when
// the exception is generated inside the container and it is wrapped in
// a ForwardId exception.
- if(t.getCause() instanceof GenericClusteringException)
+ if (t.getCause() instanceof GenericClusteringException)
{
GenericClusteringException gce = (GenericClusteringException)t.getCause();
lastException = gce;
@@ -150,14 +199,17 @@
}
else
{
+ this.invocationHasReachedAServer(target);
throw new RuntimeException("Clustering exception thrown", gce);
}
- } else
+ }
+ else
{
throw t;
}
}
+
// If we reach here, this means that we must fail-over
family.get().removeDeadTarget(target);
if (!definitivlyRemoveNodeOnFailure)
@@ -172,14 +224,44 @@
{
throw new RuntimeException("cluster invocation failed, last exception was: ", lastException);
}
- else
- {
- throw new RuntimeException("cluster invocation failed");
- }
+
+ throw new RuntimeException("cluster invocation failed");
}
+ failoverAuthorized = txContextAllowsFailover();
failoverCounter++;
}
+ if (!failoverAuthorized)
+ {
+ throw new RuntimeException("Current transaction is stuck to " + txStickyTarget + " which is no longer available. Halting invocation.");
+ }
// if we get here this means list was exhausted
throw new RuntimeException("Unreachable?: Service unavailable.");
}
+
+ private Object getTransactionPropagationContext()
+ {
+ TransactionPropagationContextFactory factory = TransactionPropagationContextUtil.getTPCFactoryClientSide();
+ return (factory != null) ? factory.getTransactionPropagationContext() : null;
+ }
+
+ private boolean txContextAllowsFailover()
+ {
+ Object tpc = this.getTransactionPropagationContext();
+ if (tpc != null)
+ {
+ /* If the map contains the tpc, then we can't allow a failover */
+ return !txStickyTargets.containsKey(tpc);
+ }
+ return true;
+ }
+
+ private boolean invocationHasReachedAServer(InvokerLocator target)
+ {
+ Object tpc = getTransactionPropagationContext();
+ if(tpc != null)
+ {
+ txStickyTargets.put(tpc, target);
+ }
+ return (tpc != null);
+ }
}
More information about the jboss-cvs-commits
mailing list