[jboss-cvs] JBossAS SVN: r112081 - in projects/cluster/ha-client/trunk: src/main/java/org/jboss/aspects/remoting and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Aug 17 03:57:41 EDT 2011


Author: pferraro
Date: 2011-08-17 03:57:41 -0400 (Wed, 17 Aug 2011)
New Revision: 112081

Modified:
   projects/cluster/ha-client/trunk/pom.xml
   projects/cluster/ha-client/trunk/src/main/java/org/jboss/aspects/remoting/ClusterChooserInterceptor.java
Log:
EJBTHREE-1137 ClusterChooserInterceptor does not support transaction failover authorisations


Modified: projects/cluster/ha-client/trunk/pom.xml
===================================================================
--- projects/cluster/ha-client/trunk/pom.xml	2011-08-16 22:42:20 UTC (rev 112080)
+++ projects/cluster/ha-client/trunk/pom.xml	2011-08-17 07:57:41 UTC (rev 112081)
@@ -34,6 +34,7 @@
     <version.jboss.remoting.aspects>1.0.2</version.jboss.remoting.aspects>
     <version.jboss.remoting>2.5.3</version.jboss.remoting>
     <version.jboss.aop>2.2.0.Final</version.jboss.aop>
+    <version.jboss.integration>5.1.0.GA</version.jboss.integration>
     <version.junit>4.8.2</version.junit>
   </properties>
   
@@ -125,6 +126,11 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+       <groupId>org.jboss.integration</groupId>
+       <artifactId>jboss-transaction-spi</artifactId>
+       <version>${version.jboss.integration}</version>
+    </dependency>
     
     <!-- Test dependencies -->
     <dependency>

Modified: projects/cluster/ha-client/trunk/src/main/java/org/jboss/aspects/remoting/ClusterChooserInterceptor.java
===================================================================
--- projects/cluster/ha-client/trunk/src/main/java/org/jboss/aspects/remoting/ClusterChooserInterceptor.java	2011-08-16 22:42:20 UTC (rev 112080)
+++ projects/cluster/ha-client/trunk/src/main/java/org/jboss/aspects/remoting/ClusterChooserInterceptor.java	2011-08-17 07:57:41 UTC (rev 112081)
@@ -21,16 +21,22 @@
  */ 
 package org.jboss.aspects.remoting;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.WeakHashMap;
+
 import org.jboss.aop.DispatcherConnectException;
 import org.jboss.aop.util.PayloadKey;
 import org.jboss.ha.client.loadbalance.AopLoadBalancePolicy;
 import org.jboss.ha.client.loadbalance.LoadBalancePolicy;
 import org.jboss.ha.framework.interfaces.GenericClusteringException;
+import org.jboss.logging.Logger;
 import org.jboss.remoting.CannotConnectException;
 import org.jboss.remoting.InvokerLocator;
+import org.jboss.tm.TransactionPropagationContextFactory;
+import org.jboss.tm.TransactionPropagationContextUtil;
 
-import java.util.ArrayList;
-
 /**
  * Pick an invocation target
  *
@@ -41,15 +47,17 @@
 {
    private static final long serialVersionUID = -8666382019058421135L;
 
+   private static Logger log = Logger.getLogger(ClusterChooserInterceptor.class);
+   private static boolean trace = log.isTraceEnabled();
    public static final ClusterChooserInterceptor singleton = new ClusterChooserInterceptor();
-
+   private static final Map<Object, InvokerLocator> txStickyTargets = Collections.synchronizedMap(new WeakHashMap<Object, InvokerLocator>());
+   
    public String getName()
    {
       return "ClusterChooserInterceptor";
    }
 
-   public Object invoke(org.jboss.aop.joinpoint.Invocation invocation)
-   throws Throwable
+   public Object invoke(org.jboss.aop.joinpoint.Invocation invocation) throws Throwable
    {
       LoadBalancePolicy lb = (LoadBalancePolicy) invocation.getMetaData(CLUSTERED_REMOTING, LOADBALANCE_POLICY);
       FamilyWrapper family = (FamilyWrapper) invocation.getMetaData(CLUSTERED_REMOTING, CLUSTER_FAMILY_WRAPPER);
@@ -60,37 +68,77 @@
       int failoverCounter = 0;
       String familyName = family.get().getFamilyName();
       invocation.getMetaData().addMetaData(CLUSTERED_REMOTING, CLUSTER_FAMILY, familyName, PayloadKey.AS_IS);
+      
       InvokerLocator target = null;
-      if (lb instanceof AopLoadBalancePolicy)
+      InvokerLocator txStickyTarget = null;
+      Object tpc = this.getTransactionPropagationContext();
+      if (tpc != null)
       {
-         target = (InvokerLocator) ((AopLoadBalancePolicy) lb).chooseTarget(family.get(), invocation);
+         txStickyTarget = txStickyTargets.get(tpc);
+         if (txStickyTarget != null)
+         {
+            if (family.get().getTargets().contains(txStickyTarget))
+            {
+               if (trace)
+               {
+                  log.trace("Using transaction sticky target: " + txStickyTarget);
+               }
+               target = txStickyTarget;
+            }
+            else
+            {
+               throw new RuntimeException("Current transaction is stuck to " + txStickyTarget + " which is no longer available.  Halting invocation.");
+            }
+         }
       }
-      else
+      
+      if (target == null)
       {
-         target = (InvokerLocator) lb.chooseTarget(family.get());
+         if (lb instanceof AopLoadBalancePolicy)
+         {
+            target = (InvokerLocator) ((AopLoadBalancePolicy) lb).chooseTarget(family.get(), invocation);
+         }
+         else
+         {
+            target = (InvokerLocator) lb.chooseTarget(family.get());
+         }
       }
       
       Throwable lastException = null;
-      while (target != null)
+      boolean failoverAuthorized = true;
+      while ((target != null) && failoverAuthorized)
       {
          invocation.getMetaData().addMetaData(CLUSTERED_REMOTING, FAILOVER_COUNTER, new Integer(failoverCounter), PayloadKey.AS_IS);
          invocation.getMetaData().addMetaData(InvokeRemoteInterceptor.REMOTING, InvokeRemoteInterceptor.INVOKER_LOCATOR, target, PayloadKey.AS_IS);
          invocation.getMetaData().addMetaData(CLUSTERED_REMOTING, CLUSTER_VIEW_ID, new Long(family.get().getCurrentViewId()), PayloadKey.AS_IS);
-
+         
          boolean definitivlyRemoveNodeOnFailure = true;
          lastException = null;
          try
          {
             Object rsp = invocation.invokeNext();
-            ArrayList newReplicants = (ArrayList) invocation.getResponseAttachment("replicants");
+            List<?> newReplicants = (List<?>) invocation.getResponseAttachment("replicants");
             if (newReplicants != null)
             {
                long newViewId = ((Long) invocation.getResponseAttachment("viewId")).longValue();
                family.get().updateClusterInfo(newReplicants, newViewId);
             }
+            
+            if (txStickyTarget == null)
+            {
+               if (invocationHasReachedAServer(target))
+               {
+                  if (trace)
+                  {
+                     log.trace("Setting " + target + " as the new sticky target");
+                  }
+                  txStickyTarget = target;
+               }
+            }
+            
             return rsp;
          }
-         catch(DispatcherConnectException dce)
+         catch (DispatcherConnectException dce)
          {
             //In case of graceful shutdown, the target object will no longer exist in the Dispatcher,
             //fail over to another server if we can...
@@ -121,6 +169,7 @@
             }
             else
             {
+               this.invocationHasReachedAServer(target);
                throw new RuntimeException("Clustering exception thrown", gce);
             }
          }
@@ -129,7 +178,7 @@
             // Just in case this get wrapped in a Throwable. This can happen when
             // the exception is generated inside the container and it is wrapped in
             // a ForwardId exception.
-            if(t.getCause() instanceof GenericClusteringException)
+            if (t.getCause() instanceof GenericClusteringException)
             {
                GenericClusteringException gce = (GenericClusteringException)t.getCause();
                lastException = gce;
@@ -150,14 +199,17 @@
                }
                else
                {
+                  this.invocationHasReachedAServer(target);
                   throw new RuntimeException("Clustering exception thrown", gce);
                }
-            } else
+            }
+            else
             {
                throw t;
             }
          }
 
+         
          // If we reach here, this means that we must fail-over
          family.get().removeDeadTarget(target);
          if (!definitivlyRemoveNodeOnFailure)
@@ -172,14 +224,44 @@
             {
                throw new RuntimeException("cluster invocation failed, last exception was: ", lastException);
             }
-            else
-            {
-               throw new RuntimeException("cluster invocation failed");
-            }
+
+            throw new RuntimeException("cluster invocation failed");
          }
+         failoverAuthorized = txContextAllowsFailover();
          failoverCounter++;
       }
+      if (!failoverAuthorized)
+      {
+         throw new RuntimeException("Current transaction is stuck to " + txStickyTarget + " which is no longer available.  Halting invocation.");
+      }
       // if we get here this means list was exhausted
       throw new RuntimeException("Unreachable?: Service unavailable.");
    }
+
+   private Object getTransactionPropagationContext()
+   {
+      TransactionPropagationContextFactory factory = TransactionPropagationContextUtil.getTPCFactoryClientSide();
+      return (factory != null) ? factory.getTransactionPropagationContext() : null;
+   }
+   
+   private boolean txContextAllowsFailover()
+   {
+      Object tpc = this.getTransactionPropagationContext();
+      if (tpc != null)
+      {
+         /* If the map contains the tpc, then we can't allow a failover */
+         return !txStickyTargets.containsKey(tpc);
+      }
+      return true;
+   }
+
+   private boolean invocationHasReachedAServer(InvokerLocator target)
+   {
+      Object tpc = getTransactionPropagationContext();
+      if(tpc != null)
+      {
+         txStickyTargets.put(tpc, target);
+      }
+      return (tpc != null);
+   }
 }



More information about the jboss-cvs-commits mailing list