[exo-jcr-commits] exo-jcr SVN: r3293 - kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Oct 13 07:53:35 EDT 2010


Author: nfilotto
Date: 2010-10-13 07:53:35 -0400 (Wed, 13 Oct 2010)
New Revision: 3293

Modified:
   jcr/trunk/exo.jcr.docs/exo.jcr.docs.developer/en/src/main/docbook/en-US/modules/kernel/rpc-service.xml
   kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java
   kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java
Log:
EXOJCR-967: Parameters allow-failover and retry-timeout have been added

Modified: jcr/trunk/exo.jcr.docs/exo.jcr.docs.developer/en/src/main/docbook/en-US/modules/kernel/rpc-service.xml
===================================================================
--- jcr/trunk/exo.jcr.docs/exo.jcr.docs.developer/en/src/main/docbook/en-US/modules/kernel/rpc-service.xml	2010-10-12 09:47:34 UTC (rev 3292)
+++ jcr/trunk/exo.jcr.docs/exo.jcr.docs.developer/en/src/main/docbook/en-US/modules/kernel/rpc-service.xml	2010-10-13 11:53:35 UTC (rev 3293)
@@ -149,6 +149,14 @@
         <name>jgroups-default-timeout</name>
         <value>0</value>
       </value-param>
+      <value-param>
+        <name>allow-failover</name>
+        <value>true</value>
+      </value-param>
+      <value-param>
+        <name>retry-timeout</name>
+        <value>20000</value>
+      </value-param>
     </init-params>
   </component>   
 ...
@@ -182,8 +190,31 @@
             <entry>This is the default timeout to use if the timeout is not
             given, if no response could be get after this timeout an exception
             will be thrown. This parameter is optional and its default value
-            is 0 which means that we don't use any timeout by default.</entry>
+            is 0 which means that we don't use any timeout by default. This
+            parameter is expressed in milliseconds.</entry>
           </row>
+
+          <row>
+            <entry><emphasis>allow-failover</emphasis></entry>
+
+            <entry>This is parameter indicates whether a command on the
+            coordinator needs to be relaunched or not if the coordintator
+            seems to have left the cluster. This parameter only affects the
+            behavior of the methods
+            <emphasis>executeCommandOnCoordinator</emphasis>. This parameter
+            is optional and its default value is true.</entry>
+          </row>
+
+          <row>
+            <entry><emphasis>retry-timeout</emphasis></entry>
+
+            <entry>This parameter is the maximum amount of time to wait until
+            the new coordinator is elected. This parameter is linked to the
+            parameter <emphasis>allow-failover</emphasis>, and thus used in
+            the exact same conditions. This parameter is optional and its
+            default value is 20000. This parameter is expressed in
+            milliseconds.</entry>
+          </row>
         </tbody>
       </tgroup>
     </table>
@@ -213,8 +244,8 @@
       <listitem>
         <para>Register a <emphasis>SingleMethodCallCommand</emphasis> that
         will call <emphasis>getName()</emphasis> on the Object
-        <emphasis>myService</emphasis> anytime the command will be executed.
-        </para>
+        <emphasis>myService</emphasis> anytime the command will be
+        executed.</para>
       </listitem>
 
       <listitem>

Modified: kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java
===================================================================
--- kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java	2010-10-12 09:47:34 UTC (rev 3292)
+++ kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java	2010-10-13 11:53:35 UTC (rev 3293)
@@ -85,27 +85,42 @@
    /**
     * The name of the parameter for the location of the JGroups configuration.
     */
-   public static final String PARAM_JGROUPS_CONFIG = "jgroups-configuration";
+   protected static final String PARAM_JGROUPS_CONFIG = "jgroups-configuration";
 
    /**
     * The name of the parameter for the name of the cluster.
     */
-   public static final String PARAM_CLUSTER_NAME = "jgroups-cluster-name";
+   protected static final String PARAM_CLUSTER_NAME = "jgroups-cluster-name";
 
    /**
     * The name of the parameter for the default timeout
     */
-   public static final String PARAM_DEFAULT_TIMEOUT = "jgroups-default-timeout";
+   protected static final String PARAM_DEFAULT_TIMEOUT = "jgroups-default-timeout";
 
    /**
+    * The name of the parameter to allow the failover
+    */
+   protected static final String PARAM_ALLOW_FAILOVER = "allow-failover";
+
+   /**
+    * The name of the parameter for the retry timeout
+    */
+   protected static final String PARAM_RETRY_TIMEOUT = "retry-timeout";
+   
+   /**
     * The value of the default timeout
     */
-   public static final int DEFAULT_TIMEOUT = 0;
+   protected static final int DEFAULT_TIMEOUT = 0;
+   
+   /**
+    * The value of the default retry timeout
+    */
+   protected static final int DEFAULT_RETRY_TIMEOUT = 20000;
 
    /**
     * The default value of the cluster name
     */
-   public static final String CLUSTER_NAME = "RPCService-Cluster";
+   protected static final String CLUSTER_NAME = "RPCService-Cluster";
    
    /**
     * The configurator used to create the JGroups Channel
@@ -113,6 +128,11 @@
    private final ProtocolStackConfigurator configurator;
 
    /**
+    * The lock used to synchronize all the threads waiting for a topology change.
+    */
+   private final Object topologyChangeLock = new Object();
+   
+   /**
     * The name of the cluster
     */
    private final String clusterName;
@@ -143,6 +163,16 @@
    private long defaultTimeout = DEFAULT_TIMEOUT;
 
    /**
+    * The value of the retry timeout
+    */
+   private long retryTimeout = DEFAULT_RETRY_TIMEOUT;
+   
+   /**
+    * Indicates whether the failover capabilities are enabled
+    */
+   private boolean allowFailover = true;
+   
+   /**
     * The dispatcher used to launch the command of the cluster nodes
     */
    private MessageDispatcher dispatcher;
@@ -212,6 +242,24 @@
             LOG.debug("The default timeout of the RPCServiceImpl has been set to " + defaultTimeout);
          }
       }
+      String sAllowFailover = getValueParam(params, PARAM_ALLOW_FAILOVER);
+      if (sAllowFailover != null)
+      {
+         allowFailover = Boolean.valueOf(sAllowFailover);
+         if (LOG.isDebugEnabled())
+         {
+            LOG.debug("The parameter '" + PARAM_ALLOW_FAILOVER + "' of the RPCServiceImpl has been set to " + allowFailover);
+         }
+      }
+      sTimeout = getValueParam(params, PARAM_RETRY_TIMEOUT);
+      if (sTimeout != null)
+      {
+         retryTimeout = Integer.parseInt(sTimeout);
+         if (LOG.isDebugEnabled())
+         {
+            LOG.debug("The retry timeout of the RPCServiceImpl has been set to " + retryTimeout);
+         }
+      }
       this.state = State.INITIALIZED;
    }
 
@@ -294,18 +342,31 @@
       v.add(coordinator);
       List<Object> lResults = excecuteCommand(v, command, synchronous, timeout, args);
       Object result = lResults == null || lResults.size() == 0 ? null : lResults.get(0);
-      if (result instanceof MemberHasLeftException)
+      if (allowFailover && result instanceof MemberHasLeftException)
       {
+         // The failover capabilities have been enabled and the coordinator seems to have left
          if (coordinator.equals(this.coordinator))
          {
-            throw new RPCException("The coordinator did not change, we faced an unexpected situation",
-               (MemberHasLeftException)result);
+            synchronized(topologyChangeLock)
+            {
+               if (coordinator.equals(this.coordinator))
+               {
+                  if (LOG.isTraceEnabled())
+                     LOG.trace("The coordinator did not change yet, we will relaunch the command after " + retryTimeout + " ms or once a topology change has been detected");                  
+                  try
+                  {
+                     topologyChangeLock.wait(retryTimeout);
+                  }
+                  catch (InterruptedException e)
+                  {
+                     Thread.currentThread().interrupt();
+                  }                  
+               }
+            }
          }
-         else
-         {
-            // The coordinator has changed, we will automatically retry with the new coordinator
-            return executeCommandOnCoordinator(command, synchronous, timeout, args);
-         }
+         if (LOG.isTraceEnabled())
+            LOG.trace("The coordinator has changed, we will automatically retry with the new coordinator");                  
+         return executeCommandOnCoordinator(command, synchronous, timeout, args);
       }
       else if (result instanceof RPCException)
       {
@@ -441,11 +502,18 @@
     */
    public void viewAccepted(View view)
    {
-      this.members = view.getMembers();
-      Address currentCoordinator = coordinator;
-      this.coordinator = members != null && members.size() > 0 ? members.get(0) : null;
-      this.isCoordinator = coordinator != null && coordinator.equals(channel.getLocalAddress());
-      onTopologyChange(currentCoordinator != null && !currentCoordinator.equals(coordinator));
+      boolean coordinatorHasChanged;
+      synchronized (topologyChangeLock)
+      {
+         this.members = view.getMembers();
+         Address currentCoordinator = coordinator;
+         this.coordinator = members != null && members.size() > 0 ? members.get(0) : null;
+         this.isCoordinator = coordinator != null && coordinator.equals(channel.getLocalAddress());
+         coordinatorHasChanged = currentCoordinator != null && !currentCoordinator.equals(coordinator);
+         // Release all the nodes
+         topologyChangeLock.notifyAll();
+      }
+      onTopologyChange(coordinatorHasChanged);
    }
 
    /**
@@ -665,7 +733,7 @@
     * Gives the value of the default timeout
     * @return the default timeout
     */
-   public long getDefaultTimeout()
+   protected long getDefaultTimeout()
    {
       return defaultTimeout;
    }
@@ -674,12 +742,31 @@
     * Gives the name of the cluster
     * @return the name of the cluster
     */
-   public String getClusterName()
+   protected String getClusterName()
    {
       return clusterName;
    }
+   
+   /**
+    * Gives the value of the retry timeout
+    * @return the value of the retry timeout
+    */
+   protected long getRetryTimeout()
+   {
+      return retryTimeout;
+   }
 
    /**
+    * Indicates whether the failover capabilities are enabled or not
+    * @return <code>true</code> if the failover capabilities are allowed, <code>false</code>
+    * otherwise
+    */
+   protected boolean isAllowFailover()
+   {
+      return allowFailover;
+   }
+
+   /**
     * Gives the value of the {@link ValueParam} corresponding to the given key
     * @param params the list of initial parameters from which we want to extract the {@link ValueParam}
     * @param parameterKey the name of the {@link ValueParam} that we are looking for

Modified: kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java
===================================================================
--- kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java	2010-10-12 09:47:34 UTC (rev 3292)
+++ kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java	2010-10-13 11:53:35 UTC (rev 3293)
@@ -106,6 +106,8 @@
       {
          service = new RPCServiceImpl(container.getContext(), params, configManager);
          assertEquals(RPCServiceImpl.DEFAULT_TIMEOUT, service.getDefaultTimeout());
+         assertEquals(RPCServiceImpl.DEFAULT_RETRY_TIMEOUT, service.getRetryTimeout());
+         assertEquals(true, service.isAllowFailover());
          assertEquals(RPCServiceImpl.CLUSTER_NAME + "-" + container.getContext().getName(), service.getClusterName());
       }
       finally
@@ -133,6 +135,8 @@
       {
          service = new RPCServiceImpl(container.getContext(), params, configManager);
          assertEquals(60, service.getDefaultTimeout());
+         assertEquals(RPCServiceImpl.DEFAULT_RETRY_TIMEOUT, service.getRetryTimeout());
+         assertEquals(true, service.isAllowFailover());
          assertEquals(RPCServiceImpl.CLUSTER_NAME + "-" + container.getContext().getName(), service.getClusterName());
       }
       finally
@@ -142,7 +146,72 @@
             service.stop();            
          }
       }
-      ValueParam paramClusterName = new ValueParam();
+      ValueParam paramRetryTimeout = new ValueParam();
+      paramRetryTimeout.setName(RPCServiceImpl.PARAM_RETRY_TIMEOUT);
+      paramRetryTimeout.setValue("fakeValue");
+      params.addParameter(paramRetryTimeout);
+      try
+      {
+         new RPCServiceImpl(container.getContext(), params, configManager);
+         fail("We expect a NumberFormatException since the retry timeout is not properly set");
+      }
+      catch (NumberFormatException e)
+      {
+         // OK
+      }      
+      paramRetryTimeout.setValue("60");
+      try
+      {
+         service = new RPCServiceImpl(container.getContext(), params, configManager);
+         assertEquals(60, service.getDefaultTimeout());
+         assertEquals(60, service.getRetryTimeout());
+         assertEquals(true, service.isAllowFailover());
+         assertEquals(RPCServiceImpl.CLUSTER_NAME + "-" + container.getContext().getName(), service.getClusterName());
+      }
+      finally
+      {
+         if (service != null)
+         {
+            service.stop();            
+         }
+      }
+      ValueParam paramAllowFailover = new ValueParam();
+      paramAllowFailover.setName(RPCServiceImpl.PARAM_ALLOW_FAILOVER);
+      paramAllowFailover.setValue("fakeValue");
+      params.addParameter(paramAllowFailover);
+      try
+      {
+         service = new RPCServiceImpl(container.getContext(), params, configManager);
+         assertEquals(60, service.getDefaultTimeout());
+         assertEquals(60, service.getRetryTimeout());
+         assertEquals(false, service.isAllowFailover());
+         assertEquals(RPCServiceImpl.CLUSTER_NAME + "-" + container.getContext().getName(), service.getClusterName());
+      }
+      finally
+      {
+         if (service != null)
+         {
+            service.stop();            
+         }
+      }
+      paramAllowFailover.setValue("TRUE");
+      try
+      {
+         service = new RPCServiceImpl(container.getContext(), params, configManager);
+         assertEquals(60, service.getDefaultTimeout());
+         assertEquals(60, service.getRetryTimeout());
+         assertEquals(true, service.isAllowFailover());
+         assertEquals(RPCServiceImpl.CLUSTER_NAME + "-" + container.getContext().getName(), service.getClusterName());
+      }
+      finally
+      {
+         if (service != null)
+         {
+            service.stop();            
+         }
+      }
+      
+      ValueParam paramClusterName = new ValueParam();      
       paramClusterName.setName(RPCServiceImpl.PARAM_CLUSTER_NAME);
       paramClusterName.setValue("MyName");
       params.addParameter(paramClusterName);



More information about the exo-jcr-commits mailing list