[jboss-cvs] JBossAS SVN: r105507 - projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jun 1 17:34:31 EDT 2010
Author: bstansberry at jboss.com
Date: 2010-06-01 17:34:30 -0400 (Tue, 01 Jun 2010)
New Revision: 105507
Modified:
projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java
projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/HAPartitionImpl.java
Log:
[JBCLUSTER-276] More sophisticated state transfer handling
Misc small fixes that arose from running AS clustering testsuite
Modified: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java 2010-06-01 21:09:57 UTC (rev 105506)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java 2010-06-01 21:34:30 UTC (rev 105507)
@@ -60,6 +60,7 @@
import org.jboss.ha.framework.interfaces.SerializableStateTransferResult;
import org.jboss.ha.framework.interfaces.StateTransferProvider;
import org.jboss.ha.framework.interfaces.StateTransferResult;
+import org.jboss.ha.framework.interfaces.StateTransferStreamProvider;
import org.jboss.ha.framework.interfaces.StreamStateTransferResult;
import org.jboss.logging.Logger;
import org.jboss.util.loading.ContextClassLoaderSwitcher;
@@ -344,11 +345,12 @@
}
RspList rsp = this.dispatcher.callRemoteMethods(null, m, ro);
ArrayList<T> result = this.processResponseList(rsp, returnType, trace);
- if (!excludeSelf && this.directlyInvokeLocal)
+
+ if (!excludeSelf && this.directlyInvokeLocal && (filter == null || filter.needMoreResponses()))
{
try
{
- invokeDirectly(serviceName, methodName, args, types, returnType, result);
+ invokeDirectly(serviceName, methodName, args, types, returnType, result, filter);
}
catch (RuntimeException e)
{
@@ -366,7 +368,7 @@
return result;
}
- private <T> T invokeDirectly(String serviceName, String methodName, Object[] args, Class<?>[] types, Class<T> returnType, List<T> remoteResponses) throws Exception
+ private <T> T invokeDirectly(String serviceName, String methodName, Object[] args, Class<?>[] types, Class<T> returnType, List<T> remoteResponses, ResponseFilter filter) throws Exception
{
T retVal = null;
Object handler = this.rpcHandlers.get(serviceName);
@@ -379,7 +381,7 @@
if (returnType != null && void.class != returnType)
{
retVal = returnType.cast(result);
- if (remoteResponses != null)
+ if (remoteResponses != null && (filter == null || filter.isAcceptable(retVal, me)))
{
remoteResponses.add(retVal);
}
@@ -397,7 +399,6 @@
{
throw new RuntimeException(e);
}
- return null;
}
return retVal;
@@ -406,25 +407,25 @@
/**
* {@inheritDoc}
*/
- public Object callMethodOnCoordinatorNode(String objName, String methodName,
+ public Object callMethodOnCoordinatorNode(String serviceName, String methodName,
Object[] args, Class<?>[] types,boolean excludeSelf) throws Exception
{
- return this.callMethodOnCoordinatorNode(objName,methodName,args,types,Object.class,excludeSelf, this.getMethodCallTimeout(),false);
+ return this.callMethodOnCoordinatorNode(serviceName,methodName,args,types,Object.class,excludeSelf, this.getMethodCallTimeout(),false);
}
/**
* {@inheritDoc}
*/
- public <T> T callMethodOnCoordinatorNode(String objName, String methodName,
+ public <T> T callMethodOnCoordinatorNode(String serviceName, String methodName,
Object[] args, Class<?>[] types, Class<T> returnType, boolean excludeSelf, long methodTimeout, boolean unordered) throws Exception
{
boolean trace = this.log.isTraceEnabled();
- MethodCall m = new MethodCall(objName + "." + methodName, args, types);
+ MethodCall m = new MethodCall(serviceName + "." + methodName, args, types);
if( trace )
{
- this.log.trace("callMethodOnCoordinatorNode(false), objName="+objName
+ this.log.trace("callMethodOnCoordinatorNode(false), objName="+serviceName
+", methodName="+methodName);
}
@@ -447,7 +448,7 @@
}
else if (this.directlyInvokeLocal)
{
- return invokeDirectly(objName, methodName, args, types, returnType, null);
+ return invokeDirectly(serviceName, methodName, args, types, returnType, null, null);
}
}
@@ -524,7 +525,7 @@
}
if (this.directlyInvokeLocal && this.me.equals(targetNode))
{
- return invokeDirectly(serviceName, methodName, args, types, returnType, null);
+ return invokeDirectly(serviceName, methodName, args, types, returnType, null, null);
}
Object rsp = null;
@@ -886,9 +887,9 @@
return scopeId;
}
- public void setScopeId(short scopeId)
+ public void setScopeId(Short scopeId)
{
- this.scopeId = Short.valueOf(scopeId);
+ this.scopeId = scopeId;
}
public int getMaxHistoryLength()
@@ -972,8 +973,6 @@
createService();
state = CREATED;
-
- this.log.debug("created");
}
public void start() throws Exception
@@ -999,9 +998,9 @@
catch (Throwable t)
{
state = FAILED;
- this.log.debug("Caught exception after channel connected; closing channel -- " + t.getLocalizedMessage());
- if (this.channel != null)
+ if (this.channel != null && this.channelSelfConnected)
{
+ this.log.debug("Caught exception after channel connected; closing channel -- " + t.getLocalizedMessage());
this.channel.close();
this.channel = null;
}
@@ -1171,7 +1170,7 @@
{
try
{
- if (this.channel != null && this.channel.isOpen())
+ if (this.channelSelfConnected && this.channel != null && this.channel.isOpen())
{
this.channel.close();
}
@@ -2195,13 +2194,20 @@
StateTransferProvider provider = stateProviders.get(serviceName);
if (provider != null)
{
- MarshalledValueOutputStream mvos = null;
- // FIXME add a streaming api to StateTransferProvider
+ OutputStream toClose = ostream;
Object state = provider.getCurrentState();
try
{
- mvos = new MarshalledValueOutputStream(ostream);
- mvos.writeObject(state);
+ if (provider instanceof StateTransferStreamProvider)
+ {
+ ((StateTransferStreamProvider) provider).getCurrentState(ostream);
+ }
+ else
+ {
+ MarshalledValueOutputStream mvos = new MarshalledValueOutputStream(ostream);
+ toClose = mvos;
+ mvos.writeObject(state);
+ }
}
catch (Exception ex)
{
@@ -2209,12 +2215,12 @@
}
finally
{
- if (mvos != null)
+ if (toClose != null)
{
try
{
- mvos.flush();
- mvos.close();
+ toClose.flush();
+ toClose.close();
}
catch (IOException ignored)
{
@@ -2648,7 +2654,7 @@
{
try
{
- CoreGroupCommunicationService.this.invokeDirectly(serviceName, methodName, args, types, void.class, null);
+ CoreGroupCommunicationService.this.invokeDirectly(serviceName, methodName, args, types, void.class, null, null);
}
catch (Exception e)
{
Modified: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/HAPartitionImpl.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/HAPartitionImpl.java 2010-06-01 21:09:57 UTC (rev 105506)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/HAPartitionImpl.java 2010-06-01 21:34:30 UTC (rev 105507)
@@ -55,7 +55,7 @@
{
// Constants -----------------------------------------------------
- public static final short HAPARTITION_SCOPE_ID = 111;
+ public static final Short HAPARTITION_SCOPE_ID = Short.valueOf((short) 111);
// Attributes ----------------------------------------------------
@@ -268,32 +268,29 @@
for (Map.Entry<String, HAPartitionStateTransfer> entry : this.initialStateRecipients.entrySet())
{
- try
+ log.debug("Fetching state for " + entry.getKey());
+ Future<SerializableStateTransferResult> future = this.getServiceState(entry.getKey());
+ SerializableStateTransferResult result = future.get(this.getStateTransferTimeout(), TimeUnit.MILLISECONDS);
+ if (result.stateReceived())
{
- log.debug("Fetching state for " + entry.getKey());
- Future<SerializableStateTransferResult> future = this.getServiceState(entry.getKey());
- SerializableStateTransferResult result = future.get(this.getStateTransferTimeout(), TimeUnit.MILLISECONDS);
- if (result.stateReceived())
+ if (result.getStateTransferException() != null)
{
- if (result.getStateTransferException() != null)
- {
- throw result.getStateTransferException();
- }
- else
- {
- entry.getValue().setCurrentState(result.getState());
- log.debug("Received state for " + entry.getKey());
- }
+ throw result.getStateTransferException();
}
else
{
- log.debug("No state available for " + entry.getKey() + " -- we must be sole member of group");
+ entry.getValue().setCurrentState(result.getState());
+ log.debug("Received state for " + entry.getKey());
}
}
- catch (Exception e)
+ else if (result.getStateTransferException() != null)
{
- log.error("Could not acquire initial state for service " + entry.getKey(), e);
+ throw result.getStateTransferException();
}
+ else
+ {
+ log.debug("No state available for " + entry.getKey() + " -- we must be sole member of group");
+ }
}
}
More information about the jboss-cvs-commits
mailing list