[jboss-cvs] JBossAS SVN: r69834 - in branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss: invocation/jrmp/interfaces and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Feb 13 14:53:44 EST 2008


Author: galder.zamarreno at jboss.com
Date: 2008-02-13 14:53:44 -0500 (Wed, 13 Feb 2008)
New Revision: 69834

Added:
   branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/ha/framework/interfaces/TransactionSticky.java
   branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/ha/framework/interfaces/TransactionStickyFirstAvailable.java
   branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/ha/framework/interfaces/TransactionStickyFirstAvailableIdenticalAllProxies.java
   branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/ha/framework/interfaces/TransactionStickyRandomRobin.java
   branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/ha/framework/interfaces/TransactionStickyRoundRobin.java
Modified:
   branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/invocation/jrmp/interfaces/JRMPInvokerProxyHA.java
   branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/invocation/jrmp/server/JRMPInvokerHA.java
   branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/invocation/unified/interfaces/UnifiedInvokerHAProxy.java
Log:
[JBPAPP-546] Transaction sticky load balance policies have been created for each of the default existing ones. JRMP and Unified invoker proxy ha classes now contain the logic to put/get sticky target from the transaction failover authorisation map.

Added: branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/ha/framework/interfaces/TransactionSticky.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/ha/framework/interfaces/TransactionSticky.java	                        (rev 0)
+++ branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/ha/framework/interfaces/TransactionSticky.java	2008-02-13 19:53:44 UTC (rev 69834)
@@ -0,0 +1,112 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.ha.framework.interfaces;
+
+import org.jboss.invocation.Invocation;
+import org.jboss.logging.Logger;
+
+/**
+ * Root transaction sticky load balance policy class that checks whether there's
+ * a sticky target associated with the current invocation and based on that 
+ * returns the associated target or delegates to the given load balance policy 
+ * to choose a new target if there's no target associated with the invocation.
+ * 
+ * @author <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
+ */
+public class TransactionSticky implements LoadBalancePolicy
+{
+   /** The serialVersionUID */
+   private static final long serialVersionUID = -8750524198817324850L;
+
+   private static final Logger log = Logger.getLogger(TransactionSticky.class);
+   
+   private transient boolean trace;
+   
+   private final LoadBalancePolicy delegateLoadBalancePolicy;
+   
+   public TransactionSticky(LoadBalancePolicy delegate)
+   {
+      delegateLoadBalancePolicy = delegate;
+      
+      if (trace)
+      {
+         log.trace("transaction sticky load balance policy delegates to: " + delegateLoadBalancePolicy);
+      }
+   }
+
+   /**
+    * This method returns either, a new target based on RoundRobin policy, or 
+    * if there's a ongoing transaction, the target associated with that 
+    * transaction.
+    *
+    * @param familyClusterInfo cluster family information
+    * @param invocation current invocation
+    * @return a new target or the target associated with the transaction
+    */
+   public Object chooseTarget(FamilyClusterInfo clusterFamily, Invocation routingDecision)
+   {
+      trace = log.isTraceEnabled();
+      Object txStickyTarget = routingDecision.getTransientValue("TX_STICKY_TARGET");
+      if (txStickyTarget != null && clusterFamily.getTargets().contains(txStickyTarget))
+      {
+         if (trace) 
+         {
+            log.trace("transaction bound target exists: " + txStickyTarget);
+         } 
+         
+         return txStickyTarget;
+      }
+
+      return chooseNewTarget(clusterFamily, routingDecision);
+   }   
+   
+   public void init(HARMIClient father)
+   {
+      delegateLoadBalancePolicy.init(father);
+   }
+
+   public Object chooseTarget(FamilyClusterInfo clusterFamily)
+   {
+      return delegateLoadBalancePolicy.chooseTarget(clusterFamily);
+   }
+   
+   /**
+    * Choses a new target based on delegate load balance policy.
+    *
+    * @param familyClusterInfo cluster family information
+    * @param invocation current invocation
+    * @return a new target
+    */
+   protected Object chooseNewTarget(FamilyClusterInfo familyClusterInfo, Invocation invocation)
+   {
+      Object newTarget = delegateLoadBalancePolicy.chooseTarget(familyClusterInfo, invocation);
+      
+      if (trace) 
+      {
+         log.trace("new target chosen: " + newTarget);
+      }
+      
+      invocation.getTransientPayload().put("TX_STICKY_TARGET", newTarget);
+
+      return newTarget;
+   }
+}

Added: branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/ha/framework/interfaces/TransactionStickyFirstAvailable.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/ha/framework/interfaces/TransactionStickyFirstAvailable.java	                        (rev 0)
+++ branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/ha/framework/interfaces/TransactionStickyFirstAvailable.java	2008-02-13 19:53:44 UTC (rev 69834)
@@ -0,0 +1,43 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.ha.framework.interfaces;
+
+/**
+ * First available transaction sticky load balance policy.  
+ * 
+ * @author <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
+ */
+public class TransactionStickyFirstAvailable extends TransactionSticky
+{
+
+   /** The serialVersionUID */
+   private static final long serialVersionUID = -8688525543058853326L;
+
+   /**
+    * Create a new TransactionStickyFirstAvailable.
+    */
+   public TransactionStickyFirstAvailable()
+   {
+      super(new FirstAvailable());
+   }
+
+}

Added: branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/ha/framework/interfaces/TransactionStickyFirstAvailableIdenticalAllProxies.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/ha/framework/interfaces/TransactionStickyFirstAvailableIdenticalAllProxies.java	                        (rev 0)
+++ branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/ha/framework/interfaces/TransactionStickyFirstAvailableIdenticalAllProxies.java	2008-02-13 19:53:44 UTC (rev 69834)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.ha.framework.interfaces;
+
+/**
+ * First available indentical all proxies transaction sticky load balance policy.  
+ * 
+ * @author <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
+ */
+public class TransactionStickyFirstAvailableIdenticalAllProxies extends TransactionSticky
+{
+
+   /** The serialVersionUID */
+   private static final long serialVersionUID = -108026886517429364L;
+
+   /**
+    * Create a new TransactionStickyFirstAvailableIdenticalAllProxies.
+    * 
+    */
+   public TransactionStickyFirstAvailableIdenticalAllProxies()
+   {
+      super(new FirstAvailableIdenticalAllProxies());
+   }
+
+}

Added: branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/ha/framework/interfaces/TransactionStickyRandomRobin.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/ha/framework/interfaces/TransactionStickyRandomRobin.java	                        (rev 0)
+++ branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/ha/framework/interfaces/TransactionStickyRandomRobin.java	2008-02-13 19:53:44 UTC (rev 69834)
@@ -0,0 +1,43 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.ha.framework.interfaces;
+
+/**
+ * Random robin transaction sticky load balace policy.
+ * 
+ * @author <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
+ */
+public class TransactionStickyRandomRobin extends TransactionSticky
+{
+
+   /** The serialVersionUID */
+   private static final long serialVersionUID = -2684882134710754122L;
+
+   /**
+    * Create a new TransactionStickyRandomRobin.
+    */
+   public TransactionStickyRandomRobin()
+   {
+      super(new RandomRobin());
+   }
+
+}

Added: branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/ha/framework/interfaces/TransactionStickyRoundRobin.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/ha/framework/interfaces/TransactionStickyRoundRobin.java	                        (rev 0)
+++ branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/ha/framework/interfaces/TransactionStickyRoundRobin.java	2008-02-13 19:53:44 UTC (rev 69834)
@@ -0,0 +1,43 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.ha.framework.interfaces;
+
+/**
+ * Round robin transaction sticky load balance policy
+ * 
+ * @author <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
+ */
+public class TransactionStickyRoundRobin extends TransactionSticky
+{
+   
+   /** The serialVersionUID */
+   private static final long serialVersionUID = 8195610955344716827L;
+   
+   /**
+    * Create a new TransactionStickyRoundRobin.
+    */
+   public TransactionStickyRoundRobin()
+   {
+      super(new RoundRobin());
+   }   
+   
+}

Modified: branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/invocation/jrmp/interfaces/JRMPInvokerProxyHA.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/invocation/jrmp/interfaces/JRMPInvokerProxyHA.java	2008-02-13 19:52:08 UTC (rev 69833)
+++ branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/invocation/jrmp/interfaces/JRMPInvokerProxyHA.java	2008-02-13 19:53:44 UTC (rev 69834)
@@ -30,6 +30,8 @@
 import java.rmi.ServerException;
 import java.util.ArrayList;
 import java.util.WeakHashMap;
+
+import javax.transaction.SystemException;
 import javax.transaction.TransactionRolledbackException;
 
 import org.jboss.ha.framework.interfaces.ClusteringTargetsRepository;
@@ -51,6 +53,7 @@
  * 
  * @author <a href="mailto:marc.fleury at jboss.org">Marc Fleury</a>
  * @author Scott.Stark at jboss.org
+ * @author <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
  * @version $Revision$
  */
 public class JRMPInvokerProxyHA
@@ -128,12 +131,18 @@
    
    public boolean txContextAllowsFailover (Invocation invocation)
    {
-      javax.transaction.Transaction tx = invocation.getTransaction();
-      if (tx != null)
+      Object tpc = getTransactionPropagationContext();
+      if (tpc != null)
       {
-         synchronized (tx)
+         synchronized (tpc)
          {
-            return ! txFailoverAuthorizations.containsKey (tx);               
+            if (trace)
+            {
+               log.trace("checking tx failover authorisation map with tpc " + tpc);
+            }
+            
+            /* if the map contains the tpc, then we can't allow a failover */
+            return ! txFailoverAuthorizations.containsKey (tpc);               
          }
       }
       else
@@ -144,12 +153,24 @@
    
    public void invocationHasReachedAServer (Invocation invocation)
    {
-      javax.transaction.Transaction tx = invocation.getTransaction();
-      if (tx != null)
+      Object tpc = getTransactionPropagationContext();
+      if (tpc != null)
       {
-         synchronized (tx)
+         synchronized (tpc)
          {
-            txFailoverAuthorizations.put (tx, null);               
+            if (trace)
+            {
+               log.trace("after reaching the server, transaction propagation context (tpc) is " + tpc);
+            }
+            
+            Object stickyTarget = invocation.getTransientValue("TX_STICKY_TARGET");
+            
+            if (trace && stickyTarget != null)
+            {
+               log.trace("remember transaction bound target[" + stickyTarget + "] for tpc[" + tpc + "]");
+            }
+            
+            txFailoverAuthorizations.put(tpc, stickyTarget);
          }
       }
    }
@@ -166,6 +187,9 @@
       //
       int failoverCounter = 0;
       invocation.setValue ("FAILOVER_COUNTER", new Integer(failoverCounter), PayloadKey.AS_IS);
+      
+      // If transaction sticky, put chosen target
+      putIfExistsTransactionTarget(invocation, getTransactionPropagationContext());
 
       // We are going to go through a Remote invocation, switch to a Marshalled Invocation
       MarshalledInvocation mi = new MarshalledInvocation(invocation);
@@ -348,6 +372,58 @@
          log.trace("Init, clusterInfo: "+familyClusterInfo+", policy="+loadBalancePolicy);
    }
 
+   /**
+    * Overriden method to rethrow any potential SystemException arising from it.
+    * Looking at the parent implementation, none of the methods called actually 
+    * throw SystemException.
+    */
+   public Object getTransactionPropagationContext()
+   {
+      Object tpc;
+      try
+      {
+         tpc = super.getTransactionPropagationContext();
+      }
+      catch (SystemException e)
+      {
+         throw new RuntimeException("Unable to retrieve transaction propagation context", e);
+      }
+      
+      return tpc;
+   }
+
+   /**
+    * Called at the beginning of the invocation to check whether the current tpc
+    * is already present in the tx failover map. If it is, get the chosen 
+    * target associated to it and add it to the invocation transient payload so 
+    * that the load balance policy can choose the right target.
+    */
+   protected void putIfExistsTransactionTarget(Invocation invocation, Object tpc)
+   {
+      if (tpc != null)
+      {
+         synchronized (tpc)
+         {
+            if (trace)
+            {
+               log.trace("in the proxy, transaction propagation context (tpc) is " + tpc);
+            }
+
+            Object stickyTarget = txFailoverAuthorizations.get(tpc);
+               
+            if (stickyTarget != null)
+            {
+               if (trace)
+               {
+                  log.trace("put transaction bound target [" + stickyTarget + "] into transient payload");
+               }
+               
+               invocation.getTransientPayload().put("TX_STICKY_TARGET", stickyTarget);
+            }
+         }
+      }
+   }     
+   
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------

Modified: branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/invocation/jrmp/server/JRMPInvokerHA.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/invocation/jrmp/server/JRMPInvokerHA.java	2008-02-13 19:52:08 UTC (rev 69833)
+++ branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/invocation/jrmp/server/JRMPInvokerHA.java	2008-02-13 19:53:44 UTC (rev 69834)
@@ -50,6 +50,7 @@
  * @author <a href="mailto:bill at burkecentral.com>Bill Burke</a>
  * @author  <a href="mailto:sacha.labourey at cogito-info.ch">Sacha Labourey</a>.
  * @author Scott.Stark at jboss.org
+ * @author <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
  * @version $Revision$
  */
 public class JRMPInvokerHA
@@ -124,11 +125,7 @@
       if (familyName == null)
          familyName= target.getAssociatedPartition().getPartitionName() + "/" + beanName;
 
-      JRMPInvokerProxyHA proxy = new JRMPInvokerProxyHA(target.getReplicants(), 
-                                                        policy, 
-                                                        familyName, 
-                                                        target.getCurrentViewId ());
-      return proxy;
+      return createProxy(target.getReplicants(), policy, familyName, target.getCurrentViewId ());
    }
 
    public void unregisterBean(ObjectName beanName) throws Exception
@@ -210,5 +207,11 @@
          Thread.currentThread().setContextClassLoader(oldCl);
       }      
    }
+   
+   protected Invoker createProxy(ArrayList targets, LoadBalancePolicy policy, 
+         String proxyFamilyName, long viewId)
+   {
+      return new JRMPInvokerProxyHA(targets, policy, proxyFamilyName, viewId);
+   }
 }
 

Modified: branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/invocation/unified/interfaces/UnifiedInvokerHAProxy.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/invocation/unified/interfaces/UnifiedInvokerHAProxy.java	2008-02-13 19:52:08 UTC (rev 69833)
+++ branches/JBPAPP_4_2_0_GA_CP/cluster/src/main/org/jboss/invocation/unified/interfaces/UnifiedInvokerHAProxy.java	2008-02-13 19:53:44 UTC (rev 69834)
@@ -44,6 +44,8 @@
 import org.jboss.remoting.Client;
 import org.jboss.remoting.InvokerLocator;
 import org.jboss.remoting.serialization.IMarshalledValue;
+import org.jboss.tm.TransactionPropagationContextFactory;
+import org.jboss.tm.TransactionPropagationContextUtil;
 
 
 /**
@@ -64,11 +66,19 @@
 
    public static final WeakHashMap txFailoverAuthorizations = new WeakHashMap();
 
-
+   /** Trace level logging flag only set when the proxy is created or read from JNDI */
+   private static boolean trace = false;
+   
    public UnifiedInvokerHAProxy()
    {
       super();
-      log.debug("UnifiedInvokerHAProxy constructor called with no arguments.");
+      trace = log.isTraceEnabled();
+      
+      if (trace)
+      {
+         log.trace("UnifiedInvokerHAProxy constructor called with no arguments.");
+      }
+      
       setSubSystem("invokerha");
    }
 
@@ -81,34 +91,51 @@
       this.familyClusterInfo = ClusteringTargetsRepository.initTarget(proxyFamilyName, targets, viewId);
       this.loadBalancePolicy = policy;
       this.proxyFamilyName = proxyFamilyName;
-
+      trace = log.isTraceEnabled();
+      
       setSubSystem("invokerha");
    }
 
    public boolean txContextAllowsFailover(Invocation invocation)
    {
-      javax.transaction.Transaction tx = invocation.getTransaction();
-      if(tx != null)
+      Object tpc = getTransactionPropagationContext();
+      if(tpc != null)
       {
-         synchronized(tx)
+         synchronized(tpc)
          {
-            return ! txFailoverAuthorizations.containsKey(tx);
+            if (trace)
+            {
+               log.trace("checking tx failover authorisation map with tpc " + tpc);
+            }
+
+            /* if the map contains the tpc, then we can't allow a failover */
+            return ! txFailoverAuthorizations.containsKey(tpc);
          }
       }
-      else
-      {
-         return true;
-      }
+      
+      return true;
    }
 
    public void invocationHasReachedAServer(Invocation invocation)
    {
-      javax.transaction.Transaction tx = invocation.getTransaction();
-      if(tx != null)
+      Object tpc = getTransactionPropagationContext();
+      if(tpc != null)
       {
-         synchronized(tx)
+         synchronized(tpc)
          {
-            txFailoverAuthorizations.put(tx, null);
+            if (trace)
+            {
+               log.trace("after reaching the server, transaction propagation context (tpc) is " + tpc);
+            }
+            
+            Object stickyTarget = invocation.getTransientValue("TX_STICKY_TARGET");
+            
+            if (trace && stickyTarget != null)
+            {
+               log.trace("remember transaction bound target[" + stickyTarget + "] for tpc[" + tpc + "]");
+            }
+            
+            txFailoverAuthorizations.put(tpc, stickyTarget);
          }
       }
    }
@@ -166,6 +193,9 @@
       int failoverCounter = 0;
       invocation.setValue("FAILOVER_COUNTER", new Integer(failoverCounter), PayloadKey.AS_IS);
 
+      // If transaction sticky, put chosen target
+      putIfExistsTransactionTarget(invocation, getTransactionPropagationContext());      
+      
       Object response = null;
       Exception lastException = null;
 
@@ -178,12 +208,18 @@
          {
             invocation.setValue("CLUSTER_VIEW_ID", new Long(this.familyClusterInfo.getCurrentViewId()));
 
-            log.debug("Client cluster view id: " + familyClusterInfo.getCurrentViewId());
-            log.debug(printPossibleTargets());
+            if (trace)
+            {
+               log.trace("Client cluster view id: " + familyClusterInfo.getCurrentViewId());
+               log.trace(printPossibleTargets());
+            }
 
             Client clientInstance = getClient(invocation);
 
-            log.debug("Making invocation on " + clientInstance.getInvoker().getLocator());
+            if (trace)
+            {
+               log.trace("Making invocation on " + clientInstance.getInvoker().getLocator());
+            }
 
             response = clientInstance.invoke(invocation, null);
 
@@ -191,7 +227,10 @@
 
             if(response instanceof Exception)
             {
-               log.debug("Invocation returened exception: " + response);
+               if (trace)
+               {
+                  log.trace("Invocation returned exception: " + response);
+               }
                if(response instanceof GenericClusteringException)
                {
                   GenericClusteringException gcex = (GenericClusteringException) response;
@@ -222,7 +261,10 @@
                      failoverCounter++;
                      invocation.setValue("FAILOVER_COUNTER", new Integer(failoverCounter), PayloadKey.AS_IS);
 
-                     log.debug("Received GenericClusteringException where request was not completed.  Will retry.");
+                     if (trace)
+                     {
+                        log.trace("Received GenericClusteringException where request was not completed.  Will retry.");
+                     }
 
                      continue;
                   }
@@ -265,7 +307,11 @@
          }
          catch(CannotConnectException cncEx)
          {
-            log.debug("Invocation failed: CannotConnectException - " + cncEx, cncEx);
+            if (trace)
+            {
+               log.trace("Invocation failed: CannotConnectException - " + cncEx, cncEx);
+            }
+
             removeDeadTarget(getLocator());
             resetView();
             failoverAuthorized = txContextAllowsFailover(invocation);
@@ -302,7 +348,10 @@
                failoverCounter++;
                invocation.setValue("FAILOVER_COUNTER", new Integer(failoverCounter), PayloadKey.AS_IS);
 
-               log.debug("Received GenericClusteringException where request was not completed.  Will retry.");
+               if (trace)
+               {
+                  log.trace("Received GenericClusteringException where request was not completed.  Will retry.");
+               }
             }
             else
             {
@@ -312,7 +361,10 @@
          }
          catch(RemoteException aex)
          {
-            log.debug("Invocation failed: RemoteException - " + aex, aex);
+            if (trace)
+            {
+               log.trace("Invocation failed: RemoteException - " + aex, aex);
+            }
 
             // per Jira issue JBREM-61
             if(isStrictRMIException())
@@ -326,7 +378,10 @@
          }
          catch(Throwable throwable)
          {
-            log.debug("Invocation failed: " + throwable, throwable);
+            if (trace)
+            {
+               log.trace("Invocation failed: " + throwable, throwable);
+            }
 
             // this is somewhat of a hack as remoting throws throwable,
             // so will let Exception types bubble up, but if Throwable type,
@@ -378,7 +433,10 @@
          if(this.familyClusterInfo != null)
          {
             familyClusterInfo.removeDeadTarget(locator);
-            log.debug("Removed " + locator + " from target list.");
+            if (trace)
+            {
+               log.trace("Removed " + locator + " from target list.");
+            }
          }
       }
    }
@@ -389,11 +447,14 @@
       if(familyClusterInfo != null)
       {
          familyClusterInfo.updateClusterInfo(newReplicants, currentViewId);
-         log.debug("Updating cluster info.  New view id: " + currentViewId);
-         log.debug("New cluster target list is:");
-         for(int x = 0; x < newReplicants.size(); x++)
+         if (trace)
          {
-            log.debug(newReplicants.get(x));
+            log.trace("Updating cluster info.  New view id: " + currentViewId);
+            log.trace("New cluster target list is:");
+            for(int x = 0; x < newReplicants.size(); x++)
+            {
+               log.trace(newReplicants.get(x));
+            }
          }
       }
    }
@@ -450,7 +511,54 @@
          default:
             throw new StreamCorruptedException("Unknown version seen: " + version);
       }
+      
+      trace = log.isTraceEnabled();
+      if(trace)
+      {
+         log.trace("Init, clusterInfo: "+familyClusterInfo+", policy="+loadBalancePolicy);
+      }
    }
 
+   /**
+    * Before invocation, access to transaction propagation context is needed
+    * to find out whether the invocation is part of an on going transaction and 
+    * might need it's target being sticky to this tx. 
+    */
+   protected Object getTransactionPropagationContext()
+   {
+      TransactionPropagationContextFactory tpcFactory = TransactionPropagationContextUtil.getTPCFactoryClientSide();
+      return (tpcFactory == null) ? null : tpcFactory.getTransactionPropagationContext();
+   }
+   
+   /**
+    * Called at the beginning of the invocation to check whether the current tpc
+    * is already present in the tx failover map. If it is, get the chosen 
+    * target associated to it and add it to the invocation transient payload so 
+    * that the load balance policy can choose the right target.
+    */
+   protected void putIfExistsTransactionTarget(Invocation invocation, Object tpc)
+   {
+      if (tpc != null)
+      {
+         synchronized (tpc)
+         {
+            if (trace)
+            {
+               log.trace("in the proxy, transaction propagation context (tpc) is " + tpc);
+            }
 
+            Object stickyTarget = txFailoverAuthorizations.get(tpc);
+               
+            if (stickyTarget != null)
+            {
+               if (trace)
+               {
+                  log.trace("put transaction bound target [" + stickyTarget + "] into transient payload");
+               }
+               
+               invocation.getTransientPayload().put("TX_STICKY_TARGET", stickyTarget);
+            }
+         }
+      }
+   }
 }
\ No newline at end of file




More information about the jboss-cvs-commits mailing list