[jboss-cvs] JBossAS SVN: r98942 - trunk/cluster/src/main/java/org/jboss/invocation/http/interfaces.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Dec 29 01:30:41 EST 2009


Author: bstansberry at jboss.com
Date: 2009-12-29 01:30:41 -0500 (Tue, 29 Dec 2009)
New Revision: 98942

Modified:
   trunk/cluster/src/main/java/org/jboss/invocation/http/interfaces/HttpInvokerProxyHA.java
Log:
[JBAS-6196] Add logic to prevent failover inside tx

Modified: trunk/cluster/src/main/java/org/jboss/invocation/http/interfaces/HttpInvokerProxyHA.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/invocation/http/interfaces/HttpInvokerProxyHA.java	2009-12-29 04:57:19 UTC (rev 98941)
+++ trunk/cluster/src/main/java/org/jboss/invocation/http/interfaces/HttpInvokerProxyHA.java	2009-12-29 06:30:41 UTC (rev 98942)
@@ -29,7 +29,10 @@
 import java.net.URL;
 import java.rmi.ServerException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.WeakHashMap;
 
 import org.jboss.ha.framework.interfaces.ClusteringTargetsRepository;
 import org.jboss.ha.framework.interfaces.GenericClusteringException;
@@ -42,7 +45,10 @@
 import org.jboss.invocation.InvokerProxyHA;
 import org.jboss.invocation.MarshalledInvocation;
 import org.jboss.invocation.PayloadKey;
+import org.jboss.invocation.ServiceUnavailableException;
 import org.jboss.logging.Logger;
+import org.jboss.tm.TransactionPropagationContextFactory;
+import org.jboss.tm.TransactionPropagationContextUtil;
 
 /** The client side Http invoker proxy that posts an invocation to the
  InvokerServlet using the HttpURLConnection created from a target url.
@@ -64,6 +70,8 @@
     * @since 1.1.4.5
     */
    private static final long serialVersionUID = -7081220026780794383L;
+   
+   public static final Map<Object, Object> txFailoverAuthorizations = Collections.synchronizedMap(new WeakHashMap<Object, Object>());
 
    // Attributes ----------------------------------------------------
 
@@ -118,7 +126,7 @@
 
    public void forbidTransactionFailover(Object tpc)
    {
-      log.debug("Transaction failover authorization not supported for HttpInvokerProxyHA - see JBAS-6196");
+      txFailoverAuthorizations.put(tpc, null);
    }
    
    public String getProxyFamilyName()
@@ -182,11 +190,12 @@
       MarshalledInvocation mi = new MarshalledInvocation(invocation);         
       mi.setValue("CLUSTER_VIEW_ID", new Long(familyClusterInfo.getCurrentViewId()));
       String target = (String) getRemoteTarget(invocation);
+      boolean failoverAuthorized = true;
       URL externalURL = Util.resolveURL(target);
       Exception lastException = null;
-      while( externalURL != null )
+      while( externalURL != null && failoverAuthorized)
       {
-         boolean definitivlyRemoveNodeOnFailure = true;
+         boolean definitivelyRemoveNodeOnFailure = true;
          invocation.setValue("FAILOVER_COUNTER", new Integer(failoverCounter), PayloadKey.AS_IS);
          try
          {
@@ -197,28 +206,15 @@
 
             if (rsp.newReplicants != null)
                updateClusterInfo(rsp.newReplicants, rsp.currentViewId);
+            
+            invocationHasReachedAServer (invocation);
+            
             return rsp.response;
          }
          catch(GenericClusteringException e)
          {
-            // this is a generic clustering exception that contain the
-            // completion status: usefull to determine if we are authorized
-            // to re-issue a query to another node
-            //               
-            if( e.getCompletionStatus() == GenericClusteringException.COMPLETED_NO )
-            {
-                  // we don't want to remove the node from the list of targets 
-                  // UNLESS there is a risk to loop
-                  if (totalNumberOfTargets() >= failoverCounter)
-                  {
-                     if( e.isDefinitive() == false )
-                        definitivlyRemoveNodeOnFailure = false;                     
-                  }
-            }
-            else
-            {
-               throw new ServerException("Cannot proceed beyond target="+externalURL, e);
-            }
+            definitivelyRemoveNodeOnFailure = handleGenericClusteringException(invocation, failoverCounter,
+                  definitivelyRemoveNodeOnFailure, e);
          }
          catch(InvocationException e)
          {
@@ -227,27 +223,12 @@
             if (cause instanceof GenericClusteringException)
             {
                GenericClusteringException gce = (GenericClusteringException) cause;
-               // this is a generic clustering exception that contain the
-               // completion status: usefull to determine if we are authorized
-               // to re-issue a query to another node
-               //               
-               if( gce.getCompletionStatus() == GenericClusteringException.COMPLETED_NO )
-               {
-                     // we don't want to remove the node from the list of targets 
-                     // UNLESS there is a risk to loop
-                     if (totalNumberOfTargets() >= failoverCounter)
-                     {
-                        if( gce.isDefinitive() == false )
-                           definitivlyRemoveNodeOnFailure = false;                     
-                     }
-               }
-               else
-               {
-                  throw new ServerException("Cannot proceed beyond target="+externalURL, gce);
-               }
+               definitivelyRemoveNodeOnFailure = handleGenericClusteringException(invocation, failoverCounter,
+                     definitivelyRemoveNodeOnFailure, gce);
             }
             else
             {
+               invocationHasReachedAServer(invocation);
                if( cause instanceof Exception )
                   throw (Exception) cause;
                else if (cause instanceof Error)
@@ -255,30 +236,101 @@
                throw new InvocationTargetException(cause);
             }
          }
-         catch(IOException e)
+         catch (java.net.ConnectException e)
          {
-            if( trace )
-               log.trace("Invoke failed, target="+externalURL, e);
             lastException = e;
          }
+         catch (java.net.UnknownHostException e)
+         {
+            lastException = e;
+         }
          catch(Exception e)
          {
+            if( trace )
+               log.trace("Invoke failed, target="+externalURL, e);
+            invocationHasReachedAServer(invocation);
             // Rethrow for the application to handle
             throw e;
          }
 
          // If we reach here, this means that we must fail-over
          remoteTargetHasFailed(target);
-         if( definitivlyRemoveNodeOnFailure )
+         if( definitivelyRemoveNodeOnFailure )
             resetView();
+         failoverAuthorized = txContextAllowsFailover (invocation);            
          target = (String) getRemoteTarget(invocation);
          externalURL = Util.resolveURL(target);
          failoverCounter ++;
       }
+      
       // if we get here this means list was exhausted
-      throw new ServerException("Service unavailable last exception:", lastException);
+      String msg = failoverAuthorized ? "Service unavailable." : "Service unavailable (failover not possible inside a user transaction).";
+      throw new ServiceUnavailableException(msg, lastException);
    }
 
+   private boolean handleGenericClusteringException(Invocation invocation, int failoverCounter,
+         boolean definitivlyRemoveNodeOnFailure, GenericClusteringException gce) throws ServerException
+   {
+      // this is a generic clustering exception that contain the
+      // completion status: usefull to determine if we are authorized
+      // to re-issue a query to another node
+      //               
+      if( gce.getCompletionStatus() == GenericClusteringException.COMPLETED_NO )
+      {
+            // we don't want to remove the node from the list of targets 
+            // UNLESS there is a risk to loop
+            if (totalNumberOfTargets() >= failoverCounter)
+            {
+               if( gce.isDefinitive() == false )
+                  definitivlyRemoveNodeOnFailure = false;                     
+            }
+      }
+      else
+      {
+         invocationHasReachedAServer(invocation);
+         throw new ServerException("Clustering error", gce);
+      }
+      return definitivlyRemoveNodeOnFailure;
+   }
+   
+   public void invocationHasReachedAServer (Invocation invocation)
+   {
+      Object tpc = getTransactionPropagationContext();
+      if (tpc != null)
+      {
+         forbidTransactionFailover(tpc);
+      }
+   }
+
+
+   /**
+    * Overriden method to rethrow any potential SystemException arising from it.
+    * Looking at the parent implementation, none of the methods called actually 
+    * throw SystemException.
+    */
+   public Object getTransactionPropagationContext()
+   {
+      TransactionPropagationContextFactory tpcFactory = TransactionPropagationContextUtil.getTPCFactoryClientSide();
+      return (tpcFactory == null) ? null : tpcFactory.getTransactionPropagationContext();
+   }  
+   
+   public boolean txContextAllowsFailover (Invocation invocation)
+   {
+      Object tpc = getTransactionPropagationContext();
+      if (tpc != null)
+      {
+         if (trace)
+         {
+            log.trace("Checking tx failover authorisation map with tpc " + tpc);
+         }
+         
+         /* If the map contains the tpc, then we can't allow a failover */
+         return ! txFailoverAuthorizations.containsKey(tpc);            
+      }
+
+      return true;
+   } 
+
    /** Externalize this instance.
    */
    @SuppressWarnings("unchecked")




More information about the jboss-cvs-commits mailing list