[jboss-cvs] JBossAS SVN: r105507 - projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jun 1 17:34:31 EDT 2010


Author: bstansberry at jboss.com
Date: 2010-06-01 17:34:30 -0400 (Tue, 01 Jun 2010)
New Revision: 105507

Modified:
   projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java
   projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/HAPartitionImpl.java
Log:
[JBCLUSTER-276] More sophisticated state transfer handling
Misc small fixes that arose from running AS clustering testsuite

Modified: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java	2010-06-01 21:09:57 UTC (rev 105506)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java	2010-06-01 21:34:30 UTC (rev 105507)
@@ -60,6 +60,7 @@
 import org.jboss.ha.framework.interfaces.SerializableStateTransferResult;
 import org.jboss.ha.framework.interfaces.StateTransferProvider;
 import org.jboss.ha.framework.interfaces.StateTransferResult;
+import org.jboss.ha.framework.interfaces.StateTransferStreamProvider;
 import org.jboss.ha.framework.interfaces.StreamStateTransferResult;
 import org.jboss.logging.Logger;
 import org.jboss.util.loading.ContextClassLoaderSwitcher;
@@ -344,11 +345,12 @@
       }
       RspList rsp = this.dispatcher.callRemoteMethods(null, m, ro);
       ArrayList<T> result = this.processResponseList(rsp, returnType, trace);
-      if (!excludeSelf && this.directlyInvokeLocal)
+      
+      if (!excludeSelf && this.directlyInvokeLocal && (filter == null || filter.needMoreResponses()))
       {
          try
          {
-            invokeDirectly(serviceName, methodName, args, types, returnType, result);
+            invokeDirectly(serviceName, methodName, args, types, returnType, result, filter);
          }
          catch (RuntimeException e)
          {
@@ -366,7 +368,7 @@
       return result;
    }
 
-   private <T> T invokeDirectly(String serviceName, String methodName, Object[] args, Class<?>[] types, Class<T> returnType, List<T> remoteResponses) throws Exception
+   private <T> T invokeDirectly(String serviceName, String methodName, Object[] args, Class<?>[] types, Class<T> returnType, List<T> remoteResponses, ResponseFilter filter) throws Exception
    {
       T retVal = null;
       Object handler = this.rpcHandlers.get(serviceName);
@@ -379,7 +381,7 @@
             if (returnType != null && void.class != returnType)
             {
                retVal = returnType.cast(result);
-               if (remoteResponses != null)
+               if (remoteResponses != null && (filter == null || filter.isAcceptable(retVal, me)))
                {
                   remoteResponses.add(retVal);
                }
@@ -397,7 +399,6 @@
          {
             throw new RuntimeException(e);
          }
-         return null;
       }
       
       return retVal;
@@ -406,25 +407,25 @@
    /**
     * {@inheritDoc}
     */
-   public Object callMethodOnCoordinatorNode(String objName, String methodName,
+   public Object callMethodOnCoordinatorNode(String serviceName, String methodName,
           Object[] args, Class<?>[] types,boolean excludeSelf) throws Exception
    {
-      return this.callMethodOnCoordinatorNode(objName,methodName,args,types,Object.class,excludeSelf, this.getMethodCallTimeout(),false);
+      return this.callMethodOnCoordinatorNode(serviceName,methodName,args,types,Object.class,excludeSelf, this.getMethodCallTimeout(),false);
    }
 
    /**
     * {@inheritDoc}
     */
-   public <T> T callMethodOnCoordinatorNode(String objName, String methodName,
+   public <T> T callMethodOnCoordinatorNode(String serviceName, String methodName,
           Object[] args, Class<?>[] types, Class<T> returnType, boolean excludeSelf, long methodTimeout, boolean unordered) throws Exception
    {
       boolean trace = this.log.isTraceEnabled();
 
-      MethodCall m = new MethodCall(objName + "." + methodName, args, types);
+      MethodCall m = new MethodCall(serviceName + "." + methodName, args, types);
       
       if( trace )
       {
-         this.log.trace("callMethodOnCoordinatorNode(false), objName="+objName
+         this.log.trace("callMethodOnCoordinatorNode(false), objName="+serviceName
             +", methodName="+methodName);
       }
       
@@ -447,7 +448,7 @@
          }
          else if (this.directlyInvokeLocal)
          {
-            return invokeDirectly(objName, methodName, args, types, returnType, null);
+            return invokeDirectly(serviceName, methodName, args, types, returnType, null, null);
          }
       } 
       
@@ -524,7 +525,7 @@
       }
       if (this.directlyInvokeLocal && this.me.equals(targetNode))
       {
-         return invokeDirectly(serviceName, methodName, args, types, returnType, null);
+         return invokeDirectly(serviceName, methodName, args, types, returnType, null, null);
       }
       
       Object rsp = null;
@@ -886,9 +887,9 @@
       return scopeId;
    }
    
-   public void setScopeId(short scopeId)
+   public void setScopeId(Short scopeId)
    {
-      this.scopeId = Short.valueOf(scopeId);
+      this.scopeId = scopeId;
    }
    
    public int getMaxHistoryLength()
@@ -972,8 +973,6 @@
       
       createService();
       state = CREATED;
-      
-      this.log.debug("created");
    }
    
    public void start() throws Exception
@@ -999,9 +998,9 @@
       catch (Throwable t)
       {
          state = FAILED;
-         this.log.debug("Caught exception after channel connected; closing channel -- " + t.getLocalizedMessage());
-         if (this.channel != null)
+         if (this.channel != null && this.channelSelfConnected)
          {
+            this.log.debug("Caught exception after channel connected; closing channel -- " + t.getLocalizedMessage());
             this.channel.close();
             this.channel = null;
          }
@@ -1171,7 +1170,7 @@
    {
       try
       {
-         if (this.channel != null && this.channel.isOpen())
+         if (this.channelSelfConnected && this.channel != null && this.channel.isOpen())
          {
             this.channel.close();
          }
@@ -2195,13 +2194,20 @@
          StateTransferProvider provider = stateProviders.get(serviceName);
          if (provider != null)
          {
-            MarshalledValueOutputStream mvos = null;
-            // FIXME add a streaming api to StateTransferProvider
+            OutputStream toClose = ostream;
             Object state = provider.getCurrentState();
             try
             {
-               mvos = new MarshalledValueOutputStream(ostream);
-               mvos.writeObject(state);
+               if (provider instanceof StateTransferStreamProvider)
+               {
+                  ((StateTransferStreamProvider) provider).getCurrentState(ostream);
+               }
+               else
+               {
+                  MarshalledValueOutputStream mvos = new MarshalledValueOutputStream(ostream);
+                  toClose = mvos;
+                  mvos.writeObject(state);
+               }
             }
             catch (Exception ex)
             {
@@ -2209,12 +2215,12 @@
             }
             finally
             {
-               if (mvos != null)
+               if (toClose != null)
                {
                   try
                   {
-                     mvos.flush();
-                     mvos.close();
+                     toClose.flush();
+                     toClose.close();
                   }
                   catch (IOException ignored)
                   {
@@ -2648,7 +2654,7 @@
       {            
          try
          {
-            CoreGroupCommunicationService.this.invokeDirectly(serviceName, methodName, args, types, void.class, null);
+            CoreGroupCommunicationService.this.invokeDirectly(serviceName, methodName, args, types, void.class, null, null);
          } 
          catch (Exception e)
          {

Modified: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/HAPartitionImpl.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/HAPartitionImpl.java	2010-06-01 21:09:57 UTC (rev 105506)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/HAPartitionImpl.java	2010-06-01 21:34:30 UTC (rev 105507)
@@ -55,7 +55,7 @@
 {
    // Constants -----------------------------------------------------
 
-   public static final short HAPARTITION_SCOPE_ID = 111;
+   public static final Short HAPARTITION_SCOPE_ID = Short.valueOf((short) 111);
    
    // Attributes ----------------------------------------------------
 
@@ -268,32 +268,29 @@
       
       for (Map.Entry<String, HAPartitionStateTransfer> entry : this.initialStateRecipients.entrySet())
       {
-         try
+         log.debug("Fetching state for " + entry.getKey());
+         Future<SerializableStateTransferResult> future = this.getServiceState(entry.getKey());
+         SerializableStateTransferResult result = future.get(this.getStateTransferTimeout(), TimeUnit.MILLISECONDS);
+         if (result.stateReceived())
          {
-            log.debug("Fetching state for " + entry.getKey());
-            Future<SerializableStateTransferResult> future = this.getServiceState(entry.getKey());
-            SerializableStateTransferResult result = future.get(this.getStateTransferTimeout(), TimeUnit.MILLISECONDS);
-            if (result.stateReceived())
+            if (result.getStateTransferException() != null)
             {
-               if (result.getStateTransferException() != null)
-               {
-                  throw result.getStateTransferException();
-               }
-               else
-               {
-                  entry.getValue().setCurrentState(result.getState());
-                  log.debug("Received state for " + entry.getKey());
-               }
+               throw result.getStateTransferException();
             }
             else
             {
-               log.debug("No state available for " + entry.getKey() + " -- we must be sole member of group");
+               entry.getValue().setCurrentState(result.getState());
+               log.debug("Received state for " + entry.getKey());
             }
          }
-         catch (Exception e)
+         else if (result.getStateTransferException() != null)
          {
-            log.error("Could not acquire initial state for service " + entry.getKey(), e);
+            throw result.getStateTransferException();
          }
+         else
+         {
+            log.debug("No state available for " + entry.getKey() + " -- we must be sole member of group");
+         }
       }
    }
 




More information about the jboss-cvs-commits mailing list