[jboss-cvs] JBossAS SVN: r104443 - in trunk: cluster/src/main/java/org/jboss/ha/framework/interfaces and 17 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue May 4 12:10:24 EDT 2010
Author: bstansberry at jboss.com
Date: 2010-05-04 12:10:23 -0400 (Tue, 04 May 2010)
New Revision: 104443
Modified:
trunk/aspects/src/main/java/org/jboss/aspects/versioned/DistributedSynchronizationManager.java
trunk/aspects/src/main/java/org/jboss/aspects/versioned/DistributedTxCache.java
trunk/cluster/src/main/java/org/jboss/ha/framework/interfaces/HARMIClient.java
trunk/cluster/src/main/java/org/jboss/ha/framework/interfaces/HARMIResponse.java
trunk/cluster/src/main/java/org/jboss/ha/framework/interfaces/HARMIServer.java
trunk/cluster/src/main/java/org/jboss/ha/framework/server/ClusterNodeImpl.java
trunk/cluster/src/main/java/org/jboss/ha/framework/server/ClusterPartition.java
trunk/cluster/src/main/java/org/jboss/ha/framework/server/ClusterPartitionMBean.java
trunk/cluster/src/main/java/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java
trunk/cluster/src/main/java/org/jboss/ha/framework/server/HARMIServerImpl.java
trunk/cluster/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java
trunk/cluster/src/main/java/org/jboss/ha/framework/server/util/TopologyMonitorService.java
trunk/cluster/src/main/java/org/jboss/ha/framework/server/util/TopologyMonitorServiceMBean.java
trunk/cluster/src/main/java/org/jboss/ha/hasessionstate/server/HASessionStateImpl.java
trunk/cluster/src/main/java/org/jboss/invocation/InvokerProxyHA.java
trunk/cluster/src/main/java/org/jboss/invocation/http/interfaces/HttpInvokerProxyHA.java
trunk/cluster/src/main/java/org/jboss/invocation/jrmp/interfaces/JRMPInvokerProxyHA.java
trunk/cluster/src/main/java/org/jboss/invocation/jrmp/server/JRMPInvokerHA.java
trunk/cluster/src/main/java/org/jboss/invocation/unified/interfaces/UnifiedInvokerHAProxy.java
trunk/cluster/src/main/java/org/jboss/invocation/unified/server/UnifiedInvokerHA.java
trunk/component-matrix/pom.xml
trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/RPCTestCase.java
trunk/testsuite/src/main/org/jboss/test/cluster/hapartition/drm/DRMUser.java
trunk/testsuite/src/main/org/jboss/test/cluster/hapartition/drm/IReplicants.java
trunk/testsuite/src/main/org/jboss/test/cluster/hapartition/drm/MockHAPartition.java
trunk/testsuite/src/main/org/jboss/test/cluster/hapartition/rpc/RPCUser.java
trunk/testsuite/src/main/org/jboss/test/cluster/hapartition/rpc/RPCUserMBean.java
trunk/testsuite/src/main/org/jboss/test/cluster/hasingleton/MockHAPartition.java
trunk/testsuite/src/main/org/jboss/test/cluster/invokerha/JRMPInvokerHaMockUtils.java
trunk/testsuite/src/main/org/jboss/test/cluster/invokerha/UnifiedInvokerHaMockUtils.java
trunk/testsuite/src/main/org/jboss/test/cluster/testutil/MockHAPartition.java
Log:
[JBAS-7991] Move to ha-server-api 2.0.0.Alpha2
Modified: trunk/aspects/src/main/java/org/jboss/aspects/versioned/DistributedSynchronizationManager.java
===================================================================
--- trunk/aspects/src/main/java/org/jboss/aspects/versioned/DistributedSynchronizationManager.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/aspects/src/main/java/org/jboss/aspects/versioned/DistributedSynchronizationManager.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -21,9 +21,9 @@
*/
package org.jboss.aspects.versioned;
+import org.jboss.ha.framework.interfaces.ClusterNode;
import org.jboss.ha.framework.interfaces.HAPartition;
import org.jboss.ha.framework.interfaces.HAPartition.HAMembershipListener;
-import org.jboss.ha.framework.interfaces.HAPartition.HAPartitionStateTransfer;
import org.jboss.logging.Logger;
import org.jboss.util.id.GUID;
@@ -40,8 +40,10 @@
* @author <a href="mailto:bill at jboss.org">Bill Burke</a>
* @version $Revision$
*/
-public class DistributedSynchronizationManager extends LocalSynchronizationManager implements HAPartitionStateTransfer, HAMembershipListener
+public class DistributedSynchronizationManager extends LocalSynchronizationManager implements HAMembershipListener
{
+ private static final Object[] NULL_ARGS = {};
+ private static final Class[] NULL_TYPES = new Class[]{};
protected static final Class[] STRING_TYPE = new Class[]{String.class};
protected static final Class[] LIST_TYPE = new Class[]{List.class};
protected static final Class[] LOCK_TYPES = new Class[]{String.class, GUID.class, List.class};
@@ -60,7 +62,6 @@
public void create() throws Exception
{
- //partition.subscribeToStateTransferEvents(domainName, this);
partition.registerRPCHandler(domainName, this);
}
@@ -71,10 +72,9 @@
protected void pullState() throws Exception
{
- Object[] args = {};
- ArrayList rsp = partition.callMethodOnCluster(domainName, "getCurrentState", args, null, true);
+ List<Serializable> rsp = partition.callMethodOnCluster(domainName, "getCurrentState", NULL_ARGS, NULL_TYPES, Serializable.class, true, null, partition.getMethodCallTimeout(), false);
if (rsp.size() > 0)
- setCurrentState((Serializable)rsp.get(0));
+ setCurrentState(rsp.get(0));
}
public Serializable getCurrentState()
@@ -110,7 +110,7 @@
}
- public void membershipChanged(Vector deadMembers, Vector newMembers, Vector allMembers)
+ public void membershipChanged(Vector<ClusterNode> deadMembers, Vector<ClusterNode> newMembers, Vector<ClusterNode> allMembers)
{
for (int i = 0; i < deadMembers.size(); i++)
{
Modified: trunk/aspects/src/main/java/org/jboss/aspects/versioned/DistributedTxCache.java
===================================================================
--- trunk/aspects/src/main/java/org/jboss/aspects/versioned/DistributedTxCache.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/aspects/src/main/java/org/jboss/aspects/versioned/DistributedTxCache.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -27,7 +27,6 @@
import org.jboss.aop.InstanceAdvised;
import org.jboss.ha.framework.interfaces.HAPartition;
-import org.jboss.ha.framework.interfaces.HAPartition.HAPartitionStateTransfer;
import org.jboss.ha.framework.server.HAPartitionLocator;
import org.jboss.logging.Logger;
import org.jboss.util.id.GUID;
@@ -35,7 +34,7 @@
* This is a LRU cache. The TxCache itself is not transactional
* but any accesses to objects within the cache ARE transactional.
*/
-public class DistributedTxCache implements HAPartitionStateTransfer
+public class DistributedTxCache
{
protected static final Class[] INSERT_TYPES = new Class[]{Object.class, Object.class};
protected static final Class[] REMOVE_TYPES = new Class[]{Object.class};
@@ -79,11 +78,6 @@
this.cacheName = "DistributedTxCache/" + cacheName;
}
- // HAPartition.HAPartitionStateTransfer Implementation --------------------------------------------------------
-
- /**
- * FIXME Replace this with an SPI. Don't leak the ClusterPartitionMBean class.
- */
protected HAPartition findHAPartitionWithName (String name) throws Exception
{
return HAPartitionLocator.getHAPartitionLocator().getHAPartition(name, null);
Modified: trunk/cluster/src/main/java/org/jboss/ha/framework/interfaces/HARMIClient.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/ha/framework/interfaces/HARMIClient.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/cluster/src/main/java/org/jboss/ha/framework/interfaces/HARMIClient.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -68,8 +68,8 @@
{
try
{
- final Class[] empty = {};
- final Class type = Object.class;
+ final Class<?>[] empty = {};
+ final Class<?> type = Object.class;
TO_STRING = type.getMethod("toString", empty);
HASH_CODE = type.getMethod("hashCode", empty);
@@ -98,12 +98,12 @@
public HARMIClient() {}
- public HARMIClient(List targets, LoadBalancePolicy policy, String key)
+ public HARMIClient(List<?> targets, LoadBalancePolicy policy, String key)
{
this(targets, 0, policy, key, null);
}
- public HARMIClient(List targets,
+ public HARMIClient(List<?> targets,
long initViewId,
LoadBalancePolicy policy,
String key,
@@ -140,7 +140,7 @@
}
}
*/
- public void updateClusterInfo (ArrayList targets, long viewId)
+ public void updateClusterInfo (List<?> targets, long viewId)
{
if (familyClusterInfo != null)
this.familyClusterInfo.updateClusterInfo (targets, viewId);
@@ -345,7 +345,7 @@
throws IOException, ClassNotFoundException
{
this.key = stream.readUTF();
- List targets = (List)stream.readObject();
+ List<?> targets = (List<?>)stream.readObject();
long vid = stream.readLong ();
this.loadBalancePolicy = (LoadBalancePolicy)stream.readObject();
HARMIServer server = (HARMIServer)HARMIServer.rmiServers.get(key);
@@ -366,7 +366,7 @@
{
try
{
- targets = (List)server.getReplicants();
+ targets = (List<?>)server.getReplicants();
local = server.getLocal();
}
catch (Exception ignored)
Modified: trunk/cluster/src/main/java/org/jboss/ha/framework/interfaces/HARMIResponse.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/ha/framework/interfaces/HARMIResponse.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/cluster/src/main/java/org/jboss/ha/framework/interfaces/HARMIResponse.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -38,7 +38,7 @@
*/
private static final long serialVersionUID = -2027283499335547610L;
- public java.util.ArrayList newReplicants = null;
+ public java.util.ArrayList<?> newReplicants = null;
public long currentViewId = 0;
public Object response = null;
}
Modified: trunk/cluster/src/main/java/org/jboss/ha/framework/interfaces/HARMIServer.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/ha/framework/interfaces/HARMIServer.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/cluster/src/main/java/org/jboss/ha/framework/interfaces/HARMIServer.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -23,6 +23,8 @@
import java.util.Hashtable;
+import java.util.List;
+
import org.jboss.invocation.MarshalledInvocation;
/**
@@ -40,7 +42,7 @@
public interface HARMIServer extends java.rmi.Remote
{
- public static Hashtable rmiServers = new Hashtable();
+ public static Hashtable<String, HARMIServer> rmiServers = new Hashtable<String, HARMIServer>();
/**
* Performs an invocation through this HA-RMI for the target object hidden behind it.
@@ -50,7 +52,7 @@
/**
* Returns a list of node stubs that are current replica of this service.
*/
- public java.util.List getReplicants () throws Exception;
+ public List<?> getReplicants () throws Exception;
/**
* Get local stub for this service.
Modified: trunk/cluster/src/main/java/org/jboss/ha/framework/server/ClusterNodeImpl.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/ha/framework/server/ClusterNodeImpl.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/cluster/src/main/java/org/jboss/ha/framework/server/ClusterNodeImpl.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -107,14 +107,15 @@
// Comparable implementation ----------------------------------------------
- public int compareTo(Object o)
+ public int compareTo(ClusterNode o)
{
- if ((o == null) || !(o instanceof ClusterNodeImpl))
- throw new ClassCastException("ClusterNode.compareTo(): comparison between different classes");
+ if (o == null)
+ throw new ClassCastException("Comparison to null value");
+
+ if (!(o instanceof ClusterNodeImpl))
+ throw new ClassCastException("Comparison between different classes");
- ClusterNodeImpl other = (ClusterNodeImpl) o;
-
- return this.id.compareTo(other.id);
+ return this.id.compareTo(o.getName());
}
// java.lang.Object overrides ---------------------------------------------------
Modified: trunk/cluster/src/main/java/org/jboss/ha/framework/server/ClusterPartition.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/ha/framework/server/ClusterPartition.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/cluster/src/main/java/org/jboss/ha/framework/server/ClusterPartition.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -41,6 +41,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
@@ -54,8 +55,11 @@
import org.jboss.bootstrap.spi.as.config.JBossASBasedConfigurationInitializer;
import org.jboss.ha.framework.interfaces.ClusterNode;
import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
+import org.jboss.ha.framework.interfaces.GroupCommunicationService;
+import org.jboss.ha.framework.interfaces.GroupMembershipListener;
import org.jboss.ha.framework.interfaces.HAPartition;
import org.jboss.ha.framework.interfaces.ResponseFilter;
+import org.jboss.ha.framework.interfaces.StateTransferProvider;
import org.jboss.ha.framework.server.deployers.DefaultHAPartitionDependencyCreator;
import org.jboss.ha.framework.server.deployers.HAPartitionDependencyCreator;
import org.jboss.ha.framework.server.spi.HAPartitionCacheHandler;
@@ -78,7 +82,6 @@
import org.jboss.system.ServiceMBeanSupport;
import org.jgroups.Address;
import org.jgroups.Channel;
-import org.jgroups.ChannelFactory;
import org.jgroups.Event;
import org.jgroups.ExtendedMembershipListener;
import org.jgroups.ExtendedMessageListener;
@@ -207,7 +210,8 @@
private HAPartitionCacheHandler cacheHandler;
private String cacheConfigName;
- private ChannelFactory channelFactory;
+ @SuppressWarnings("deprecation")
+ private org.jgroups.ChannelFactory channelFactory;
private String stackName;
private String partitionName = JBossASBasedConfigurationInitializer.VALUE_PARTITION_NAME_DEFAULT;
private long state_transfer_timeout=60000;
@@ -217,12 +221,16 @@
private Executor threadPool;
private final Map<String, Object> rpcHandlers = new ConcurrentHashMap<String, Object>();
- private final Map<String, HAPartitionStateTransfer> stateHandlers = new HashMap<String, HAPartitionStateTransfer>();
+ private final Map<String, StateTransferProvider> stateProviders = new HashMap<String, StateTransferProvider>();
+ @SuppressWarnings("deprecation")
+ private final Map<String, HAPartitionStateTransfer> initialStateRecipients = new HashMap<String, HAPartitionStateTransfer>();
/** Do we send any membership change notifications synchronously? */
private boolean allowSyncListeners = false;
/** The HAMembershipListener and HAMembershipExtendedListeners */
+ @SuppressWarnings("deprecation")
private final ArrayList<HAMembershipListener> synchListeners = new ArrayList<HAMembershipListener>();
/** The asynch HAMembershipListener and HAMembershipExtendedListeners */
+ @SuppressWarnings("deprecation")
private final ArrayList<HAMembershipListener> asynchListeners = new ArrayList<HAMembershipListener>();
/** The handler used to send membership change notifications asynchronously */
private AsynchEventHandler asynchHandler;
@@ -275,10 +283,10 @@
private HAPartitionDependencyCreator haPartitionDependencyCreator;
private KernelControllerContext kernelControllerContext;
-
+
// Static --------------------------------------------------------
- // Constructors --------------------------------------------------
+ // Constructors --------------------------------------------------
public ClusterPartition()
{
@@ -342,9 +350,7 @@
this.dispatcher = new RpcHandler(this.channel, null, null, new Object(), false);
// Subscribe to events generated by the channel
- this.log.debug("setMembershipListener");
this.dispatcher.setMembershipListener(this);
- this.log.debug("setMessageListener");
this.dispatcher.setMessageListener(this.messageListener);
this.dispatcher.setRequestMarshaller(new RequestMarshallerImpl());
this.dispatcher.setResponseMarshaller(new ResponseMarshallerImpl());
@@ -385,7 +391,6 @@
this.waitForView();
// get current JG group properties
- this.log.debug("get nodeName");
this.localJGAddress = this.channel.getAddress();
this.me = getClusterNode(localJGAddress);
@@ -670,15 +675,16 @@
}
}
+ @SuppressWarnings("deprecation")
private void getStateInternal(OutputStream stream) throws IOException
{
MarshalledValueOutputStream mvos = null; // don't create until we know we need it
- for (Map.Entry<String, HAPartitionStateTransfer> entry: this.stateHandlers.entrySet())
+ for (Map.Entry<String, HAPartitionStateTransfer> entry: this.initialStateRecipients.entrySet())
{
HAPartitionStateTransfer subscriber = entry.getValue();
this.log.debug("getState for " + entry.getKey());
- Object state = subscriber.getCurrentState();
+ Serializable state = subscriber.getCurrentState();
if (state != null)
{
if (mvos == null)
@@ -708,6 +714,7 @@
}
+ @SuppressWarnings("deprecation")
private void setStateInternal(InputStream stream) throws IOException, ClassNotFoundException
{
byte type = (byte) stream.read();
@@ -740,7 +747,7 @@
String key = (String) obj;
this.log.debug("setState for " + key);
Object someState = mvis.readObject();
- HAPartitionStateTransfer subscriber = this.stateHandlers.get(key);
+ HAPartitionStateTransfer subscriber = this.initialStateRecipients.get(key);
if (subscriber != null)
{
try
@@ -850,6 +857,7 @@
{
// we update the view id
this.currentViewId = newView.getVid().getId();
+
Vector<ClusterNode> translatedNewView = this.translateAddresses (cloneMembers(newView));
this.logHistory ("New view: " + translatedNewView + " with viewId: " + this.currentViewId +
" (old view: " + this.members + " )");
@@ -907,7 +915,12 @@
if(newView instanceof MergeView)
{
MergeView mergeView = (MergeView) newView;
- event.originatingGroups = mergeView.getSubgroups();
+ Vector<View> subgroups = mergeView.getSubgroups();
+ event.originatingGroups = new Vector<List<ClusterNode>>(subgroups.size());
+ for (View view : subgroups)
+ {
+ event.originatingGroups.add(this.translateAddresses(view.getMembers()));
+ }
}
this.log.debug("membership changed from " + oldMembers.size() + " to " + event.allMembers.size());
@@ -970,13 +983,25 @@
// HAPartition implementation ----------------------------------------------
+ public boolean isConsistentWith(GroupCommunicationService other)
+ {
+ return this == other;
+ }
+
@ManagementProperty(use={ViewUse.STATISTIC}, description="The identifier for this node in cluster topology views")
public String getNodeName()
{
return this.me == null ? null : this.me.getName();
}
+
+ @ManagementProperty(use={ViewUse.CONFIGURATION}, description="Deprecated, legacy term for group name")
+ @ManagementObjectID(type="HAPartition")
+ public String getGroupName()
+ {
+ return this.partitionName;
+ }
- @ManagementProperty(use={ViewUse.CONFIGURATION}, description="The partition's name")
+ @ManagementProperty(use={ViewUse.CONFIGURATION}, description="Deprecated, legacy term for group name")
@ManagementObjectID(type="HAPartition")
public String getPartitionName()
{
@@ -1045,17 +1070,26 @@
// ***************************
// ***************************
+ /**
+ * {@inheritDoc}
+ */
public void registerRPCHandler(String objName, Object subscriber)
{
this.rpcHandlers.put(objName, subscriber);
}
+ /**
+ * {@inheritDoc}
+ */
public void registerRPCHandler(String objName, Object subscriber, ClassLoader classloader)
{
this.registerRPCHandler(objName, subscriber);
this.clmap.put(objName, new WeakReference<ClassLoader>(classloader));
}
+ /**
+ * {@inheritDoc}
+ */
public void unregisterRPCHandler(String objName, Object subscriber)
{
this.rpcHandlers.remove(objName);
@@ -1063,27 +1097,31 @@
}
/**
- * This function is an abstraction of RpcDispatcher.
+ * {@inheritDoc}
*/
- @SuppressWarnings("unchecked")
- public ArrayList callMethodOnCluster(String objName, String methodName,
- Object[] args, Class[] types, boolean excludeSelf) throws Exception
+ public ArrayList<?> callMethodOnCluster(String serviceName, String methodName,
+ Object[] args, Class<?>[] types, boolean excludeSelf) throws InterruptedException
{
- return this.callMethodOnCluster(objName, methodName, args, types, excludeSelf, null);
+ return this.callMethodOnCluster(serviceName, methodName, args, types, Object.class, excludeSelf, null, this.getMethodCallTimeout(), false);
}
- @SuppressWarnings("unchecked")
- public ArrayList callMethodOnCluster(String objName, String methodName,
- Object[] args, Class[] types, boolean excludeSelf, ResponseFilter filter) throws Exception
+ /**
+ * {@inheritDoc}
+ */
+ public ArrayList<?> callMethodOnCluster(String serviceName, String methodName,
+ Object[] args, Class<?>[] types, boolean excludeSelf, ResponseFilter filter) throws InterruptedException
{
- return this.callMethodOnCluster(objName, methodName, args, types, excludeSelf, this.getMethodCallTimeout(), filter);
+ return this.callMethodOnCluster(serviceName, methodName, args, types, Object.class, excludeSelf, filter, this.getMethodCallTimeout(), false);
}
- @SuppressWarnings("unchecked")
- public ArrayList callMethodOnCluster(String objName, String methodName,
- Object[] args, Class[] types, boolean excludeSelf, long methodTimeout, ResponseFilter filter) throws Exception
- {
- MethodCall m = new MethodCall(objName + "." + methodName, args, types);
+ /**
+ * {@inheritDoc}
+ */
+ public <T> ArrayList<T> callMethodOnCluster(String serviceName, String methodName, Object[] args, Class<?>[] types,
+ Class<T> returnType, boolean excludeSelf, ResponseFilter filter, long methodTimeout, boolean unordered)
+ throws InterruptedException
+ {
+ MethodCall m = new MethodCall(serviceName + "." + methodName, args, types);
RspFilterAdapter rspFilter = filter == null ? null : new RspFilterAdapter(filter,this);
RequestOptions ro = new RequestOptions( GroupRequest.GET_ALL, methodTimeout, false, rspFilter);
if (excludeSelf)
@@ -1093,55 +1131,33 @@
if(this.channel.flushSupported())
{
- this.flushBlockGate.await(this.getStateTransferTimeout());
+ this.flushBlockGate.await(this.getStateTransferTimeout());
}
boolean trace = this.log.isTraceEnabled();
if(trace)
{
- this.log.trace("calling synchronous method on cluster, serviceName="+objName
+ this.log.trace("calling synchronous method on cluster, serviceName="+serviceName
+", methodName="+methodName+", members="+this.members+", excludeSelf="+excludeSelf);
}
RspList rsp = this.dispatcher.callRemoteMethods(null, m, ro);
- return this.processResponseList(rsp, trace);
-
- }
+ return this.processResponseList(rsp, returnType, trace);
+ }
/**
- * Calls method on Cluster coordinator node only. The cluster coordinator node is the first node to join the
- * cluster.
- * and is replaced
- * @param objName
- * @param methodName
- * @param args
- * @param types
- * @param excludeSelf
- * @return an array of responses from remote nodes
- * @throws Exception
+ * {@inheritDoc}
*/
- @SuppressWarnings("unchecked")
- public ArrayList callMethodOnCoordinatorNode(String objName, String methodName,
- Object[] args, Class[] types,boolean excludeSelf) throws Exception
+ public Object callMethodOnCoordinatorNode(String objName, String methodName,
+ Object[] args, Class<?>[] types,boolean excludeSelf) throws Exception
{
- return this.callMethodOnCoordinatorNode(objName,methodName,args,types,excludeSelf, this.getMethodCallTimeout());
+ return this.callMethodOnCoordinatorNode(objName,methodName,args,types,Object.class,excludeSelf, this.getMethodCallTimeout(),false);
}
/**
- * Calls method on Cluster coordinator node only. The cluster coordinator node is the first node to join the
- * cluster.
- * and is replaced
- * @param objName
- * @param methodName
- * @param args
- * @param types
- * @param excludeSelf
- * @param methodTimeout
- * @return an array of responses from remote nodes
- * @throws Exception
+ * {@inheritDoc}
*/
- @SuppressWarnings("unchecked")
- public ArrayList callMethodOnCoordinatorNode(String objName, String methodName,
- Object[] args, Class[] types,boolean excludeSelf, long methodTimeout) throws Exception
+ public <T> T callMethodOnCoordinatorNode(String objName, String methodName,
+ Object[] args, Class<?>[] types, Class<T> returnType, boolean excludeSelf, long methodTimeout, boolean unordered) throws Exception
{
boolean trace = this.log.isTraceEnabled();
@@ -1152,156 +1168,210 @@
this.log.trace("callMethodOnCoordinatorNode(false), objName="+objName
+", methodName="+methodName);
}
+
+ if (returnType == null)
+ {
+ // Use void.class as return type; a call to void.class.cast(object)
+ // below will throw CCE for anything other than null response
+ @SuppressWarnings("unchecked")
+ Class<T> unchecked = (Class<T>) void.class;
+ returnType = unchecked;
+ }
// the first cluster view member is the coordinator
- Vector<Address> coordinatorOnly = new Vector<Address>();
// If we are the coordinator, only call ourself if 'excludeSelf' is false
- if (false == this.isCurrentNodeCoordinator () ||
- false == excludeSelf)
+ if (this.isCurrentNodeCoordinator () && excludeSelf)
{
- coordinatorOnly.addElement(this.jgmembers.elementAt(0));
- }
- RspList rsp = this.dispatcher.callRemoteMethods(coordinatorOnly, m, new RequestOptions( GroupRequest.GET_ALL, methodTimeout));
+ return null;
+ }
+
+ Address coord = this.jgmembers.elementAt(0);
+ RequestOptions opt = new RequestOptions( GroupRequest.GET_ALL, methodTimeout);
+ if (unordered)
+ {
+ opt.setFlags(Message.OOB);
+ }
+ try
+ {
+ return returnType.cast(this.dispatcher.callRemoteMethod(coord, m, opt));
+ }
+ catch (Exception e)
+ {
+ throw e;
+ }
+ catch (Error e)
+ {
+ throw e;
+ }
+ catch (Throwable e)
+ {
+ throw new RuntimeException("Caught raw Throwable on remote invocation", e);
+ }
+ }
- return this.processResponseList(rsp, trace);
+ /**
+ * {@inheritDoc}
+ */
+ public Object callMethodOnNode(String serviceName, String methodName, Object[] args, Class<?>[] types,
+ ClusterNode targetNode) throws Exception
+ {
+ return this.callMethodOnNode(serviceName, methodName, args, types, Object.class, this.getMethodCallTimeout(), targetNode, false);
}
- /**
- * Calls method synchronously on target node only.
- * @param serviceName Name of the target service name on which calls are de-multiplexed
- * @param methodName name of the Java method to be called on remote services
- * @param args array of Java Object representing the set of parameters to be
- * given to the remote method
- * @param types The types of the parameters
- * node of the partition or only on remote nodes
- * @param targetNode is the target of the call
- * @return the value returned by the target method
- * @throws Exception Throws if a communication exception occurs
- */
- @SuppressWarnings("unchecked")
+ /**
+ * {@inheritDoc}
+ */
public Object callMethodOnNode(String serviceName, String methodName,
- Object[] args, Class[] types, long methodTimeout, ClusterNode targetNode) throws Throwable
- {
- if (!(targetNode instanceof ClusterNodeImpl))
+ Object[] args, Class<?>[] types, long methodTimeout, ClusterNode targetNode) throws Exception
+ {
+ return this.callMethodOnNode(serviceName, methodName, args, types, Object.class, methodTimeout, targetNode, false);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public <T> T callMethodOnNode(String serviceName, String methodName, Object[] args, Class<?>[] types,
+ Class<T> returnType, long methodTimeout, ClusterNode targetNode, boolean unordered)
+ throws Exception
+ {
+ if (returnType == null)
{
+ // Use void.class as return type; a call to void.class.cast(object)
+ // below will throw CCE for anything other than null response
+ @SuppressWarnings("unchecked")
+ Class<T> unchecked = (Class<T>) void.class;
+ returnType = unchecked;
+ }
+
+ if (!(targetNode instanceof ClusterNodeImpl))
+ {
throw new IllegalArgumentException("targetNode " + targetNode + " is not an instance of " +
ClusterNodeImpl.class + " -- only targetNodes provided by this HAPartition should be used");
}
- boolean trace = this.log.isTraceEnabled();
-
- MethodCall m = new MethodCall(serviceName + "." + methodName, args, types);
+ boolean trace = this.log.isTraceEnabled();
- if( trace )
- {
- this.log.trace("callMethodOnNode( objName="+serviceName
- +", methodName="+methodName);
- }
- Object rc = this.dispatcher.callRemoteMethod(((ClusterNodeImpl)targetNode).getOriginalJGAddress(), m, new RequestOptions( GroupRequest.GET_FIRST, methodTimeout));
- if (rc != null)
- {
- Object item = rc;
- if (item instanceof Rsp)
- {
- Rsp response = (Rsp) item;
- // Only include received responses
- boolean wasReceived = response.wasReceived();
- if( wasReceived == true )
- {
- item = response.getValue();
- if (!(item instanceof NoHandlerForRPC))
- {
- rc = item;
- }
- }
- else if( trace )
- {
- this.log.trace("Ignoring non-received response: "+response);
- }
- }
- else
- {
- if (!(item instanceof NoHandlerForRPC))
- {
- rc = item;
- }
- else if( trace )
- {
- this.log.trace("Ignoring NoHandlerForRPC");
- }
- }
- }
- return rc;
- }
+ MethodCall m = new MethodCall(serviceName + "." + methodName, args, types);
+ if (trace)
+ {
+ this.log.trace("callMethodOnNode( objName=" + serviceName + ", methodName=" + methodName);
+ }
+ Object rsp = null;
+ RequestOptions opt = new RequestOptions(GroupRequest.GET_FIRST, methodTimeout);
+ if (unordered)
+ {
+ opt.setFlags(Message.OOB);
+ }
+ try
+ {
+ rsp = this.dispatcher.callRemoteMethod(((ClusterNodeImpl) targetNode).getOriginalJGAddress(), m, opt);
+ }
+ catch (Exception e)
+ {
+ throw e;
+ }
+ catch (Error e)
+ {
+ throw e;
+ }
+ catch (Throwable e)
+ {
+ throw new RuntimeException("Caught raw Throwable on remote invocation", e);
+ }
+
+ if (rsp instanceof NoHandlerForRPC)
+ {
+ this.log.trace("Ignoring NoHandlerForRPC");
+ rsp = null;
+ }
+ return returnType.cast(rsp);
+ }
+
/**
- * Calls method on target node only.
- * @param serviceName Name of the target service name on which calls are de-multiplexed
- * @param methodName name of the Java method to be called on remote services
- * @param args array of Java Object representing the set of parameters to be
- * given to the remote method
- * @param types The types of the parameters
- * node of the partition or only on remote nodes
- * @param targetNode is the target of the call
- * @return none
- * @throws Exception Throws if a communication exception occurs
- */
- @SuppressWarnings("unchecked")
- public void callAsyncMethodOnNode(String serviceName, String methodName,
- Object[] args, Class[] types, long methodTimeout, ClusterNode targetNode) throws Throwable
+ * {@inheritDoc}
+ */
+ public void callAsyncMethodOnNode(String serviceName, String methodName, Object[] args, Class<?>[] types,
+ ClusterNode targetNode) throws Exception
{
+ this.callAsyncMethodOnNode(serviceName, methodName, args, types, targetNode, false);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void callAsyncMethodOnNode(String serviceName, String methodName, Object[] args, Class<?>[] types,
+ ClusterNode targetNode, boolean unordered) throws Exception
+ {
+
if (!(targetNode instanceof ClusterNodeImpl))
{
throw new IllegalArgumentException("targetNode " + targetNode + " is not an instance of " +
ClusterNodeImpl.class + " -- only targetNodes provided by this HAPartition should be used");
}
- boolean trace = this.log.isTraceEnabled();
+ boolean trace = this.log.isTraceEnabled();
- MethodCall m = new MethodCall(serviceName + "." + methodName, args, types);
+ MethodCall m = new MethodCall(serviceName + "." + methodName, args, types);
- if( trace )
- {
- this.log.trace("callAsyncMethodOnNode( objName="+serviceName
- +", methodName="+methodName);
- }
- this.dispatcher.callRemoteMethod(((ClusterNodeImpl)targetNode).getOriginalJGAddress(), m, new RequestOptions( GroupRequest.GET_NONE, methodTimeout));
+ if (trace)
+ {
+ this.log.trace("callAsyncMethodOnNode( objName=" + serviceName + ", methodName=" + methodName);
+ }
+ RequestOptions opt = new RequestOptions(GroupRequest.GET_NONE, this.getMethodCallTimeout());
+ if (unordered)
+ {
+ opt.setFlags(Message.OOB);
+ }
+ try
+ {
+ this.dispatcher.callRemoteMethod(((ClusterNodeImpl) targetNode).getOriginalJGAddress(), m, opt);
+ }
+ catch (Exception e)
+ {
+ throw e;
+ }
+ catch (Error e)
+ {
+ throw e;
+ }
+ catch (Throwable e)
+ {
+ throw new RuntimeException("Caught raw Throwable on remote invocation", e);
+ }
}
- private ArrayList<Object> processResponseList(RspList rsp, boolean trace)
+ private <T> ArrayList<T> processResponseList(RspList rspList, Class<T> returnType, boolean trace)
{
- ArrayList<Object> rtn = new ArrayList<Object>();
- if (rsp != null)
+ if (returnType == null)
{
- for (Object item : rsp.values())
+ // Use void.class as return type; a call to void.class.cast(object)
+ // below will throw CCE for anything other than null response
+ @SuppressWarnings("unchecked")
+ Class<T> unchecked = (Class<T>) void.class;
+ returnType = unchecked;
+ }
+
+ ArrayList<T> rtn = new ArrayList<T>();
+ if (rspList != null)
+ {
+ for (Rsp<?> response : rspList.values())
{
- if (item instanceof Rsp)
+ // Only include received responses
+ if(response.wasReceived())
{
- Rsp response = (Rsp) item;
- // Only include received responses
- boolean wasReceived = response.wasReceived();
- if( wasReceived == true )
+ Object item = response.getValue();
+ if (item instanceof NoHandlerForRPC)
{
- item = response.getValue();
- if (!(item instanceof NoHandlerForRPC))
- {
- rtn.add(item);
- }
+ continue;
}
- else if( trace )
+ else
{
- this.log.trace("Ignoring non-received response: "+response);
+ rtn.add(returnType.cast(item));
}
}
- else
+ else if( trace )
{
- if (!(item instanceof NoHandlerForRPC))
- {
- rtn.add(item);
- }
- else if( trace )
- {
- this.log.trace("Ignoring NoHandlerForRPC");
- }
+ this.log.trace("Ignoring non-received response: "+response);
}
}
@@ -1310,13 +1380,21 @@
}
/**
- * This function is an abstraction of RpcDispatcher for asynchronous messages
+ * {@inheritDoc}
*/
- @SuppressWarnings("unchecked")
- public void callAsynchMethodOnCluster(String objName, String methodName,
- Object[] args, Class[] types, boolean excludeSelf) throws Exception
+ public void callAsynchMethodOnCluster(String serviceName, String methodName,
+ Object[] args, Class<?>[] types, boolean excludeSelf) throws InterruptedException
{
- MethodCall m = new MethodCall(objName + "." + methodName, args, types);
+ this.callAsynchMethodOnCluster(serviceName, methodName, args, types, excludeSelf, false);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void callAsynchMethodOnCluster(String serviceName, String methodName, Object[] args, Class<?>[] types,
+ boolean excludeSelf, boolean unordered) throws InterruptedException
+ {
+ MethodCall m = new MethodCall(serviceName + "." + methodName, args, types);
RequestOptions ro = new RequestOptions( GroupRequest.GET_NONE, this.getMethodCallTimeout());
if (excludeSelf)
{
@@ -1325,15 +1403,67 @@
if(this.channel.flushSupported())
{
- this.flushBlockGate.await(this.getStateTransferTimeout());
+ this.flushBlockGate.await(this.getStateTransferTimeout());
}
if(this.log.isTraceEnabled())
{
- this.log.trace("calling asynch method on cluster, serviceName="+objName
+ this.log.trace("calling asynch method on cluster, serviceName="+serviceName
+", methodName="+methodName+", members="+this.members+", excludeSelf="+excludeSelf);
}
this.dispatcher.callRemoteMethods(null, m, ro);
+
}
+
+ public void callAsyncMethodOnCoordinatorNode(String serviceName, String methodName, Object[] args, Class<?>[] types,
+ boolean excludeSelf) throws Exception
+ {
+ this.callAsyncMethodOnCoordinatorNode(serviceName, methodName, args, types, excludeSelf, false);
+ }
+
+ public void callAsyncMethodOnCoordinatorNode(String serviceName, String methodName, Object[] args,
+ Class<?>[] types, boolean excludeSelf, boolean unordered) throws Exception
+ {
+
+ boolean trace = this.log.isTraceEnabled();
+
+ MethodCall m = new MethodCall(serviceName + "." + methodName, args, types);
+
+ if( trace )
+ {
+ this.log.trace("callMethodOnCoordinatorNode(false), objName=" + serviceName
+ +", methodName="+methodName);
+ }
+
+ // the first cluster view member is the coordinator
+ // If we are the coordinator, only call ourself if 'excludeSelf' is false
+ if (this.isCurrentNodeCoordinator () && excludeSelf)
+ {
+ return;
+ }
+
+ Address coord = this.jgmembers.elementAt(0);
+ RequestOptions opt = new RequestOptions( GroupRequest.GET_ALL, this.getMethodCallTimeout());
+ if (unordered)
+ {
+ opt.setFlags(Message.OOB);
+ }
+ try
+ {
+ this.dispatcher.callRemoteMethod(coord, m, opt);
+ }
+ catch (Exception e)
+ {
+ throw e;
+ }
+ catch (Error e)
+ {
+ throw e;
+ }
+ catch (Throwable e)
+ {
+ throw new RuntimeException("Caught raw Throwable on remote invocation", e);
+ }
+ }
// *************************
// *************************
@@ -1341,15 +1471,37 @@
// *************************
// *************************
- public void subscribeToStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
+ public Future<Serializable> getServiceState(String serviceName, ClassLoader classloader)
{
- this.stateHandlers.put(objectName, subscriber);
+ throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
}
+
+ public Future<Serializable> getServiceState(String serviceName)
+ {
+ return getServiceState(serviceName, null);
+ }
+
+ public void registerStateTransferProvider(String serviceName, StateTransferProvider provider)
+ {
+ this.stateProviders.put(serviceName, provider);
+ }
+
+ public void unregisterStateTransferProvider(String serviceName)
+ {
+ this.stateProviders.remove(serviceName);
+ }
- public void unsubscribeFromStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
+ @SuppressWarnings("deprecation")
+ public void subscribeToStateTransferEvents(String serviceName, HAPartitionStateTransfer subscriber)
{
- this.stateHandlers.remove(objectName);
+ this.initialStateRecipients.put(serviceName, subscriber);
}
+
+ @SuppressWarnings("deprecation")
+ public void unsubscribeFromStateTransferEvents(String serviceName, HAPartitionStateTransfer subscriber)
+ {
+ this.initialStateRecipients.remove(serviceName);
+ }
// *************************
// *************************
@@ -1357,6 +1509,22 @@
// *************************
// *************************
+
+ public void registerGroupMembershipListener(GroupMembershipListener listener)
+ {
+ synchronized(this.asynchListeners) {
+ this.asynchListeners.add(new GroupMembershipListenerAdapter(listener));
+ }
+ }
+
+ public void unregisterGroupMembershipListener(GroupMembershipListener listener)
+ {
+ synchronized(this.asynchListeners) {
+ this.asynchListeners.remove(new GroupMembershipListenerAdapter(listener));
+ }
+ }
+
+ @SuppressWarnings("deprecation")
public void registerMembershipListener(HAMembershipListener listener)
{
boolean isAsynch = (this.allowSyncListeners == false)
@@ -1373,7 +1541,8 @@
}
}
}
-
+
+ @SuppressWarnings("deprecation")
public void unregisterMembershipListener(HAMembershipListener listener)
{
boolean isAsynch = (this.allowSyncListeners == false)
@@ -1437,9 +1606,10 @@
// Protected -----------------------------------------------------
+ @SuppressWarnings("deprecation")
protected Channel createChannel()
{
- ChannelFactory factory = this.getChannelFactory();
+ org.jgroups.ChannelFactory factory = this.getChannelFactory();
if (factory == null)
{
throw new IllegalStateException("HAPartitionConfig has no JChannelFactory");
@@ -1574,9 +1744,10 @@
return newMembers;
}
+ @SuppressWarnings("deprecation")
protected void notifyListeners(ArrayList<HAMembershipListener> theListeners, long viewID,
Vector<ClusterNode> allMembers, Vector<ClusterNode> deadMembers, Vector<ClusterNode> newMembers,
- Vector<View> originatingGroups)
+ Vector<List<ClusterNode>> originatingGroups)
{
this.log.debug("Begin notifyListeners, viewID: "+viewID);
List<HAMembershipListener> toNotify = null;
@@ -1604,7 +1775,7 @@
catch (Throwable e)
{
// a problem in a listener should not prevent other members to receive the new view
- this.log.warn("HAMembershipListener callback failure: "+aListener, e);
+ this.log.warn("Membership listener callback failure: "+aListener, e);
}
}
@@ -1724,19 +1895,13 @@
return Version.description + "( " + Version.cvs + ")";
}
-// @ManagementProperty(name="distributedReplicantManager", use={ViewUse.STATISTIC}, description="The DistributedReplicantManager")
-// @ManagementObjectRef(type="DistributedReplicantManager")
-// public String getDRMName()
-// {
-// return getPartitionName();
-// }
-
public DistributedReplicantManagerImpl getDistributedReplicantManagerImpl()
{
return this.replicantManager;
}
- public ChannelFactory getChannelFactory()
+ @SuppressWarnings("deprecation")
+ public org.jgroups.ChannelFactory getChannelFactory()
{
return this.channelFactory;
}
@@ -1857,7 +2022,6 @@
impact=Impact.ReadOnly,
params={@ManagementParameter(name="key",
description="The name of the service")})
- @SuppressWarnings("deprecation")
public List<String> lookupDRMNodeNames(String key)
{
return this.replicantManager == null ? null : this.replicantManager.lookupReplicantsNodeNames(key);
@@ -2102,7 +2266,7 @@
Vector<ClusterNode> deadMembers;
Vector<ClusterNode> newMembers;
Vector<ClusterNode> allMembers;
- Vector<View> originatingGroups;
+ Vector<List<ClusterNode>> originatingGroups;
}
private class RequestMarshallerImpl implements org.jgroups.blocks.RpcDispatcher.Marshaller
@@ -2415,6 +2579,244 @@
}
}
+ @SuppressWarnings("deprecation")
+ private static class GroupMembershipListenerAdapter implements AsynchHAMembershipExtendedListener
+ {
+ private final GroupMembershipListener listener;
+
+ GroupMembershipListenerAdapter(GroupMembershipListener listener)
+ {
+ this.listener = listener;
+ }
+
+ public void membershipChangedDuringMerge(Vector<ClusterNode> deadMembers, Vector<ClusterNode> newMembers,
+ Vector<ClusterNode> allMembers, Vector<List<ClusterNode>> originatingGroups)
+ {
+ listener.membershipChangedDuringMerge(deadMembers, newMembers, allMembers, originatingGroups);
+ }
+
+ public void membershipChanged(Vector<ClusterNode> deadMembers, Vector<ClusterNode> newMembers,
+ Vector<ClusterNode> allMembers)
+ {
+ listener.membershipChanged(deadMembers, newMembers, allMembers);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ boolean result = obj instanceof GroupMembershipListenerAdapter;
+ if (result)
+ {
+ result = listener == ((GroupMembershipListenerAdapter) obj).listener;
+ }
+ return result;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return listener.hashCode();
+ }
+ }
+
+// private class StateTransferTask implements Callable<Serializable>
+// {
+// private final String serviceName;
+// private final WeakReference<ClassLoader> classloader;
+// private Serializable result;
+// private boolean isStateSet;
+// private Exception setStateException;
+//
+// StateTransferTask(String serviceName, ClassLoader cl)
+// {
+// this.serviceName = serviceName;
+// if (cl != null)
+// {
+// classloader = null;
+// }
+// else
+// {
+// classloader = new WeakReference<ClassLoader>(cl);
+// }
+// }
+//
+// public Serializable call() throws Exception
+// {
+// boolean intr = false;
+// try
+// {
+// long start, stop;
+// this.isStateSet = false;
+// start = System.currentTimeMillis();
+// boolean rc = ClusterPartition.this.channel.getState(null, serviceName, ClusterPartition.this.getStateTransferTimeout());
+// if (rc)
+// {
+// synchronized (this)
+// {
+// while (!this.isStateSet)
+// {
+// if (this.setStateException != null)
+// {
+// throw this.setStateException;
+// }
+//
+// try
+// {
+// wait();
+// }
+// catch (InterruptedException iex)
+// {
+// intr = true;
+// }
+// }
+// }
+// stop = System.currentTimeMillis();
+// ClusterPartition.this.log.debug("serviceState was retrieved successfully (in " + (stop - start) + " milliseconds)");
+// }
+// else
+// {
+// // No one provided us with serviceState.
+// // We need to find out if we are the coordinator, so we must
+// // block until viewAccepted() is called at least once
+//
+// synchronized (ClusterPartition.this.members)
+// {
+// while (ClusterPartition.this.members.size() == 0)
+// {
+// ClusterPartition.this.log.debug("waiting on viewAccepted()");
+// try
+// {
+// ClusterPartition.this.members.wait();
+// }
+// catch (InterruptedException iex)
+// {
+// intr = true;
+// }
+// }
+// }
+//
+// if (ClusterPartition.this.isCurrentNodeCoordinator())
+// {
+// ClusterPartition.this.log.debug("State could not be retrieved for service " + serviceName + " (we are the first member in group)");
+// }
+// else
+// {
+// throw new IllegalStateException("Initial serviceState transfer failed: " +
+// "Channel.getState() returned false");
+// }
+// }
+// }
+// finally
+// {
+// if (intr) Thread.currentThread().interrupt();
+// }
+//
+// return result;
+// }
+//
+// void setState(byte[] state)
+// {
+// try
+// {
+// if (state == null)
+// {
+// ClusterPartition.this.log.debug("transferred state for service " +
+// serviceName + " is null (may be first member in cluster)");
+// }
+// else
+// {
+// ByteArrayInputStream bais = new ByteArrayInputStream(state);
+// setStateInternal(bais);
+// bais.close();
+// }
+//
+// this.isStateSet = true;
+// }
+// catch (Throwable t)
+// {
+// recordSetStateFailure(t);
+// }
+// finally
+// {
+// // Notify waiting thread that serviceState has been set.
+// synchronized(this)
+// {
+// notifyAll();
+// }
+// }
+// }
+//
+// void setState(InputStream state)
+// {
+// try
+// {
+// if (state == null)
+// {
+// ClusterPartition.this.log.debug("transferred state for service " +
+// serviceName + " is null (may be first member in cluster)");
+// }
+// else
+// {
+// setStateInternal(state);
+// }
+//
+// this.isStateSet = true;
+// }
+// catch (Throwable t)
+// {
+// recordSetStateFailure(t);
+// }
+// finally
+// {
+// // Notify waiting thread that serviceState has been set.
+// synchronized(this)
+// {
+// notifyAll();
+// }
+// }
+//
+// }
+//
+// private void setStateInternal(InputStream is) throws IOException, ClassNotFoundException
+// {
+// ClassLoader cl = getStateTransferClassLoader();
+// SwitchContext switchContext = ClusterPartition.this.classLoaderSwitcher.getSwitchContext(cl);
+// try
+// {
+// MarshalledValueInputStream mvis = new MarshalledValueInputStream(is);
+// this.result = (Serializable) mvis.readObject();
+// }
+// finally
+// {
+// switchContext.reset();
+// }
+// }
+//
+// private void recordSetStateFailure(Throwable t)
+// {
+// ClusterPartition.this.log.error("failed setting serviceState for service " + serviceName, t);
+// if (t instanceof Exception)
+// {
+// this.setStateException = (Exception) t;
+// }
+// else
+// {
+// this.setStateException = new Exception(t);
+// }
+// }
+//
+// private ClassLoader getStateTransferClassLoader()
+// {
+// ClassLoader cl = classloader == null ? null : classloader.get();
+// if (cl == null)
+// {
+// cl = this.getClass().getClassLoader();
+// }
+// return cl;
+// }
+//
+// }
+
@SuppressWarnings("unchecked")
private static Vector<Address> cloneMembers(View view)
{
@@ -2427,7 +2829,7 @@
return (Vector<ClusterNode>) toClone.clone();
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked","deprecation"})
private static List<HAMembershipListener> cloneListeners(ArrayList<HAMembershipListener> toClone)
{
return (List<HAMembershipListener>) toClone.clone();
Modified: trunk/cluster/src/main/java/org/jboss/ha/framework/server/ClusterPartitionMBean.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/ha/framework/server/ClusterPartitionMBean.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/cluster/src/main/java/org/jboss/ha/framework/server/ClusterPartitionMBean.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -21,7 +21,7 @@
*/
package org.jboss.ha.framework.server;
-import java.util.Vector;
+import java.util.List;
import org.jboss.ha.framework.interfaces.HAPartition;
@@ -93,7 +93,7 @@
/** Return the list of member nodes that built from the current view
* @return A Vector Strings representing the host:port values of the nodes
*/
- Vector<String> getCurrentView();
+ List<String> getCurrentView();
/**
* Gets a listing of significant events since the instantiation of this
Modified: trunk/cluster/src/main/java/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/cluster/src/main/java/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -44,15 +44,6 @@
import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
import org.jboss.ha.framework.interfaces.HAPartition;
import org.jboss.logging.Logger;
-//import org.jboss.managed.api.ManagedOperation.Impact;
-//import org.jboss.managed.api.annotation.ManagementComponent;
-//import org.jboss.managed.api.annotation.ManagementObject;
-//import org.jboss.managed.api.annotation.ManagementObjectID;
-//import org.jboss.managed.api.annotation.ManagementOperation;
-//import org.jboss.managed.api.annotation.ManagementParameter;
-//import org.jboss.managed.api.annotation.ManagementProperties;
-//import org.jboss.managed.api.annotation.ManagementProperty;
-//import org.jboss.managed.api.annotation.ViewUse;
/**
@@ -529,7 +520,6 @@
// impact=Impact.ReadOnly,
// params={@ManagementParameter(name="key",
// description="The name of the service")})
- @Deprecated
public List<String> lookupReplicantsNodeNames(String key)
{
List<ClusterNode> nodes = this.lookupReplicantsNodes(key);
@@ -642,8 +632,7 @@
return true;
}
- @SuppressWarnings("unchecked")
- Vector<String> allNodes = this.partition.getCurrentView();
+ List<String> allNodes = this.partition.getCurrentView();
for (String node: allNodes)
{
if (this.log.isTraceEnabled())
@@ -934,7 +923,7 @@
{
this.log.debug("Start merging members in DRM service...");
- ArrayList<?> rsp = this.partition.callMethodOnCluster(SERVICE_NAME,
+ List<?> rsp = this.partition.callMethodOnCluster(SERVICE_NAME,
"lookupLocalReplicants",
new Object[]{}, new Class[]{}, true);
if (rsp.isEmpty())
Modified: trunk/cluster/src/main/java/org/jboss/ha/framework/server/HARMIServerImpl.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/ha/framework/server/HARMIServerImpl.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/cluster/src/main/java/org/jboss/ha/framework/server/HARMIServerImpl.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -56,17 +56,17 @@
implements HARMIServer
{
protected Object handler;
- protected Map invokerMap = new HashMap();
+ protected Map<Long, Method> invokerMap = new HashMap<Long, Method>();
protected org.jboss.logging.Logger log;
protected RemoteStub rmistub;
protected Object stub;
protected String key;
- protected Class intf;
+ protected Class<?> intf;
protected RefreshProxiesHATarget target;
public HARMIServerImpl(HAPartition partition,
String replicantName,
- Class intf,
+ Class<?> intf,
Object handler,
int port,
RMIClientSocketFactory csf,
@@ -86,7 +86,7 @@
public HARMIServerImpl(HAPartition partition,
String replicantName,
- Class intf,
+ Class<?> intf,
Object handler,
int port,
RMIClientSocketFactory clientSocketFactory,
@@ -100,10 +100,11 @@
this.key = partition.getPartitionName() + "/" + replicantName;
// Obtain the hashes for the supported handler interfaces
- Class[] ifaces = handler.getClass().getInterfaces();
+ Class<?>[] ifaces = handler.getClass().getInterfaces();
for (int i = 0; i < ifaces.length; i++)
{
- Map tmp = MarshalledInvocation.methodToHashesMap(ifaces[i]);
+ @SuppressWarnings("unchecked")
+ Map<Long, Method> tmp = MarshalledInvocation.methodToHashesMap(ifaces[i]);
invokerMap.putAll(tmp);
}
@@ -117,8 +118,8 @@
// See if the server socket supports setBindAddress(String)
try
{
- Class[] parameterTypes = {String.class};
- Class ssfClass = serverSocketFactory.getClass();
+ Class<?>[] parameterTypes = {String.class};
+ Class<?> ssfClass = serverSocketFactory.getClass();
Method m = ssfClass.getMethod("setBindAddress", parameterTypes);
Object[] args = {bindAddress.getHostAddress()};
m.invoke(serverSocketFactory, args);
@@ -151,7 +152,7 @@
* @param handler Target object to which calls will be forwarded
* @throws Exception Thrown if any exception occurs during call forwarding
*/
- public HARMIServerImpl(HAPartition partition, String replicantName, Class intf, Object handler) throws Exception
+ public HARMIServerImpl(HAPartition partition, String replicantName, Class<?> intf, Object handler) throws Exception
{
this(partition, replicantName, intf, handler, 0, null, null);
}
@@ -202,7 +203,7 @@
HARMIResponse rsp = new HARMIResponse();
if (clientViewId != target.getCurrentViewId())
{
- rsp.newReplicants = new ArrayList(target.getReplicants());
+ rsp.newReplicants = new ArrayList<Object>(target.getReplicants());
rsp.currentViewId = target.getCurrentViewId();
}
@@ -223,7 +224,7 @@
}
}
- public List getReplicants() throws Exception
+ public List<?> getReplicants() throws Exception
{
return target.getReplicants();
}
@@ -235,7 +236,7 @@
public class RefreshProxiesHATarget extends HATarget
{
- protected ArrayList generatedProxies;
+ protected List<SoftReference<HARMIClient>> generatedProxies;
public RefreshProxiesHATarget(HAPartition partition,
String replicantName,
@@ -246,31 +247,25 @@
super (partition, replicantName, target, allowInvocations);
}
- public void init() throws Exception
- {
- super.init ();
- generatedProxies = new ArrayList ();
- }
-
public synchronized void addProxy (HARMIClient client)
{
- SoftReference ref = new SoftReference(client);
- generatedProxies.add (ref);
+ initGeneratedProxies();
+ generatedProxies.add (new SoftReference<HARMIClient>(client));
}
- public synchronized void replicantsChanged(String key, List newReplicants, int newReplicantsViewId, boolean merge)
+ public synchronized void replicantsChanged(String key, List<?> newReplicants, int newReplicantsViewId, boolean merge)
{
super.replicantsChanged (key, newReplicants, newReplicantsViewId, merge);
+ initGeneratedProxies();
+
// we now update all generated proxies
//
- int max = generatedProxies.size ();
- ArrayList trash = new ArrayList();
- for (int i=0; i<max; i++)
+ List<SoftReference<HARMIClient>> trash = new ArrayList<SoftReference<HARMIClient>>();
+ for (SoftReference<HARMIClient> ref : generatedProxies)
{
- SoftReference ref = (SoftReference)generatedProxies.get (i);
- HARMIClient proxy = (HARMIClient)ref.get ();
+ HARMIClient proxy = ref.get ();
if (proxy == null)
{
trash.add (ref);
@@ -282,8 +277,16 @@
}
if (trash.size () > 0)
- generatedProxies.removeAll (trash);
+ generatedProxies.removeAll(trash);
}
+
+ private void initGeneratedProxies()
+ {
+ if (generatedProxies == null)
+ {
+ generatedProxies = new ArrayList<SoftReference<HARMIClient>>();
+ }
+ }
}
}
Modified: trunk/cluster/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/cluster/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -154,8 +154,7 @@
{
// Get the lock on all other nodes in the cluster
- @SuppressWarnings("unchecked")
- ArrayList rsps = partition.callMethodOnCluster(getServiceHAName(),
+ List<?> rsps = partition.callMethodOnCluster(getServiceHAName(),
"remoteLock", new Object[]{lockId, me, new Long(left)},
REMOTE_LOCK_TYPES, true);
@@ -312,8 +311,7 @@
// ---------------------------------------------------- HAMembershipListener
- @SuppressWarnings("unchecked")
- public synchronized void membershipChanged(Vector deadMembers, Vector newMembers, Vector allMembers)
+ public synchronized void membershipChanged(Vector<ClusterNode> deadMembers, Vector<ClusterNode> newMembers, Vector<ClusterNode> allMembers)
{
this.members.clear();
this.members.addAll(allMembers);
Modified: trunk/cluster/src/main/java/org/jboss/ha/framework/server/util/TopologyMonitorService.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/ha/framework/server/util/TopologyMonitorService.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/cluster/src/main/java/org/jboss/ha/framework/server/util/TopologyMonitorService.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -24,6 +24,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
+import java.util.List;
import java.util.Vector;
import javax.management.ObjectName;
@@ -31,6 +32,7 @@
import org.apache.log4j.MDC;
import org.jboss.bootstrap.spi.as.config.JBossASBasedConfigurationInitializer;
+import org.jboss.ha.framework.interfaces.ClusterNode;
import org.jboss.ha.framework.interfaces.HAPartition;
import org.jboss.ha.framework.interfaces.HAPartition.AsynchHAMembershipListener;
import org.jboss.ha.framework.server.AddressPort;
@@ -158,7 +160,7 @@
this.triggerServiceName = triggerServiceName;
}
- public Vector getClusterNodes()
+ public List<String> getClusterNodes()
{
try
{
@@ -183,12 +185,12 @@
* the previous view
* @param allMembers A list of nodes that built the current view
*/
- public void membershipChanged(final Vector deadMembers, final Vector newMembers, final Vector allMembers)
+ public void membershipChanged(final Vector<ClusterNode> deadMembers, final Vector<ClusterNode> newMembers, final Vector<ClusterNode> allMembers)
{
MDC.put("RegexEventEvaluator", "End membershipChange.*");
- ArrayList removed = new ArrayList();
- ArrayList added = new ArrayList();
- ArrayList members = new ArrayList();
+ ArrayList<AddressPort> removed = new ArrayList<AddressPort>();
+ ArrayList<AddressPort> added = new ArrayList<AddressPort>();
+ ArrayList<AddressPort> members = new ArrayList<AddressPort>();
changeLog.info("Begin membershipChanged info, hostname=" + this.hostname);
changeLog.info("DeadMembers: size=" + deadMembers.size());
for (int m = 0; m < deadMembers.size(); m++)
Modified: trunk/cluster/src/main/java/org/jboss/ha/framework/server/util/TopologyMonitorServiceMBean.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/ha/framework/server/util/TopologyMonitorServiceMBean.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/cluster/src/main/java/org/jboss/ha/framework/server/util/TopologyMonitorServiceMBean.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -21,7 +21,8 @@
*/
package org.jboss.ha.framework.server.util;
-import java.util.Vector;
+import java.util.List;
+
import javax.management.ObjectName;
import org.jboss.ha.framework.interfaces.HAPartition;
@@ -61,5 +62,5 @@
*@return a Vector of org.jgroups.Address implementations, for example,
*org.jgroups.stack.IpAddress
*/
- public Vector getClusterNodes();
+ public List<String> getClusterNodes();
}
Modified: trunk/cluster/src/main/java/org/jboss/ha/hasessionstate/server/HASessionStateImpl.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/ha/hasessionstate/server/HASessionStateImpl.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/cluster/src/main/java/org/jboss/ha/hasessionstate/server/HASessionStateImpl.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -27,8 +27,8 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -426,7 +426,7 @@
if (!session.getOwner().equals(this.myNodeName))
{
Object[] args = { appName, keyId, this.myNodeName, new Long(session.getVersion()) };
- ArrayList<?> answers = null;
+ List<?> answers = null;
try
{
answers = this.partition.callMethodOnCluster(this.sessionStateIdentifier, "_setOwnership", args, SET_OWNERSHIP_TYPES, true);
Modified: trunk/cluster/src/main/java/org/jboss/invocation/InvokerProxyHA.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/invocation/InvokerProxyHA.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/cluster/src/main/java/org/jboss/invocation/InvokerProxyHA.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -21,7 +21,7 @@
*/
package org.jboss.invocation;
-import java.util.ArrayList;
+import java.util.List;
import org.jboss.ha.framework.interfaces.FamilyClusterInfo;
@@ -44,7 +44,7 @@
public interface InvokerProxyHA
{
- void updateClusterInfo (ArrayList targets, long viewId);
+ void updateClusterInfo (List<?> targets, long viewId);
FamilyClusterInfo getFamilyClusterInfo();
Modified: trunk/cluster/src/main/java/org/jboss/invocation/http/interfaces/HttpInvokerProxyHA.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/invocation/http/interfaces/HttpInvokerProxyHA.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/cluster/src/main/java/org/jboss/invocation/http/interfaces/HttpInvokerProxyHA.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -35,10 +35,10 @@
import java.util.WeakHashMap;
import org.jboss.ha.framework.interfaces.ClusteringTargetsRepository;
+import org.jboss.ha.framework.interfaces.FamilyClusterInfo;
import org.jboss.ha.framework.interfaces.GenericClusteringException;
import org.jboss.ha.framework.interfaces.HARMIResponse;
import org.jboss.ha.framework.interfaces.LoadBalancePolicy;
-import org.jboss.ha.framework.interfaces.FamilyClusterInfo;
import org.jboss.invocation.Invocation;
import org.jboss.invocation.InvocationException;
import org.jboss.invocation.Invoker;
@@ -108,7 +108,7 @@
// Public --------------------------------------------------------
- public void updateClusterInfo (ArrayList targets, long viewId)
+ public void updateClusterInfo (List<?> targets, long viewId)
{
if (familyClusterInfo != null)
this.familyClusterInfo.updateClusterInfo (targets, viewId);
Modified: trunk/cluster/src/main/java/org/jboss/invocation/jrmp/interfaces/JRMPInvokerProxyHA.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/invocation/jrmp/interfaces/JRMPInvokerProxyHA.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/cluster/src/main/java/org/jboss/invocation/jrmp/interfaces/JRMPInvokerProxyHA.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -71,8 +71,8 @@
*/
private static final long serialVersionUID = -967671822225981666L;
private static final Logger log = Logger.getLogger(JRMPInvokerProxyHA.class);
- public static final HashSet colocation = new HashSet();
- public static final Map txFailoverAuthorizations = Collections.synchronizedMap(new WeakHashMap());
+ public static final HashSet<Object> colocation = new HashSet<Object>();
+ public static final Map<Object, Object> txFailoverAuthorizations = Collections.synchronizedMap(new WeakHashMap<Object, Object>());
protected LoadBalancePolicy loadBalancePolicy;
protected String proxyFamilyName = null;
@@ -84,7 +84,7 @@
public JRMPInvokerProxyHA() {}
- public JRMPInvokerProxyHA(List targets, LoadBalancePolicy policy,
+ public JRMPInvokerProxyHA(List<?> targets, LoadBalancePolicy policy,
String proxyFamilyName, long viewId)
{
this.familyClusterInfo = ClusteringTargetsRepository.initTarget (proxyFamilyName, targets, viewId);
@@ -100,7 +100,7 @@
return proxyFamilyName;
}
- public void updateClusterInfo (ArrayList targets, long viewId)
+ public void updateClusterInfo (List<?> targets, long viewId)
{
if (familyClusterInfo != null)
this.familyClusterInfo.updateClusterInfo (targets, viewId);
@@ -223,9 +223,11 @@
log.trace("Invoking on target="+target);
Object rtnObj = target.invoke(mi);
HARMIResponse rsp = null;
- if (rtnObj instanceof MarshalledObject)
+ if (rtnObj instanceof MarshalledObject<?>)
{
- rsp = (HARMIResponse)((MarshalledObject)rtnObj).get();
+ @SuppressWarnings("unchecked")
+ HARMIResponse unsafe = ((MarshalledObject<HARMIResponse>)rtnObj).get();
+ rsp = unsafe;
}
else
{
@@ -358,12 +360,12 @@
throws IOException
{
// JBAS-2071 - sync on FCI to ensure targets and vid are consistent
- ArrayList targets = null;
+ ArrayList<Object> targets = null;
long vid = 0;
synchronized (this.familyClusterInfo)
{
// JBAS-6345 -- write an ArrayList for compatibility with AS 3.x/4.x clients
- targets = new ArrayList(this.familyClusterInfo.getTargets ());
+ targets = new ArrayList<Object>(this.familyClusterInfo.getTargets ());
vid = this.familyClusterInfo.getCurrentViewId ();
}
out.writeObject(targets);
@@ -380,7 +382,7 @@
public void readExternal(final ObjectInput in)
throws IOException, ClassNotFoundException
{
- List targets = (List)in.readObject();
+ List<?> targets = (List<?>) in.readObject();
this.loadBalancePolicy = (LoadBalancePolicy)in.readObject();
this.proxyFamilyName = (String)in.readObject();
long vid = in.readLong ();
Modified: trunk/cluster/src/main/java/org/jboss/invocation/jrmp/server/JRMPInvokerHA.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/invocation/jrmp/server/JRMPInvokerHA.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/cluster/src/main/java/org/jboss/invocation/jrmp/server/JRMPInvokerHA.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -22,28 +22,27 @@
package org.jboss.invocation.jrmp.server;
import java.rmi.MarshalledObject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import javax.management.InstanceNotFoundException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import javax.management.InstanceNotFoundException;
import javax.management.ReflectionException;
-import org.jboss.invocation.jrmp.interfaces.JRMPInvokerProxyHA;
+import org.jboss.ha.framework.interfaces.GenericClusteringException;
+import org.jboss.ha.framework.interfaces.HARMIResponse;
+import org.jboss.ha.framework.interfaces.LoadBalancePolicy;
+import org.jboss.ha.framework.server.HATarget;
import org.jboss.invocation.Invocation;
import org.jboss.invocation.Invoker;
import org.jboss.invocation.InvokerHA;
import org.jboss.invocation.MarshalledInvocation;
+import org.jboss.invocation.jrmp.interfaces.JRMPInvokerProxyHA;
import org.jboss.system.Registry;
-import org.jboss.ha.framework.interfaces.HARMIResponse;
-import org.jboss.ha.framework.server.HATarget;
-import org.jboss.ha.framework.interfaces.LoadBalancePolicy;
-import org.jboss.ha.framework.interfaces.GenericClusteringException;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-
/**
* The JRMPInvokerHA is an HA-RMI implementation that can generate Invocations from RMI/JRMP
* into the JMX base
@@ -238,7 +237,7 @@
return result;
}
- protected Invoker createProxy(ArrayList targets, LoadBalancePolicy policy,
+ protected Invoker createProxy(List<?> targets, LoadBalancePolicy policy,
String proxyFamilyName, long viewId)
{
return new JRMPInvokerProxyHA(targets, policy, proxyFamilyName, viewId);
Modified: trunk/cluster/src/main/java/org/jboss/invocation/unified/interfaces/UnifiedInvokerHAProxy.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/invocation/unified/interfaces/UnifiedInvokerHAProxy.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/cluster/src/main/java/org/jboss/invocation/unified/interfaces/UnifiedInvokerHAProxy.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
+
import org.jboss.ha.framework.interfaces.ClusteringTargetsRepository;
import org.jboss.ha.framework.interfaces.FamilyClusterInfo;
import org.jboss.ha.framework.interfaces.GenericClusteringException;
@@ -86,7 +87,7 @@
}
public UnifiedInvokerHAProxy(InvokerLocator locator, boolean isStrictRMIException,
- List targets, LoadBalancePolicy policy,
+ List<?> targets, LoadBalancePolicy policy,
String proxyFamilyName, long viewId)
{
super(locator, isStrictRMIException);
@@ -423,7 +424,7 @@
}
- public void updateClusterInfo(ArrayList newReplicants, long currentViewId)
+ public void updateClusterInfo(List<?> newReplicants, long currentViewId)
{
if(familyClusterInfo != null)
{
Modified: trunk/cluster/src/main/java/org/jboss/invocation/unified/server/UnifiedInvokerHA.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/invocation/unified/server/UnifiedInvokerHA.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/cluster/src/main/java/org/jboss/invocation/unified/server/UnifiedInvokerHA.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -24,6 +24,7 @@
import java.rmi.MarshalledObject;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanServer;
@@ -200,7 +201,7 @@
}
protected Invoker createProxy(boolean isStrictRMIException,
- ArrayList targets, LoadBalancePolicy policy,
+ List<?> targets, LoadBalancePolicy policy,
String proxyFamilyName, long viewId)
{
return new UnifiedInvokerHAProxy(getInvoker().getLocator(), isStrictRMIException,
Modified: trunk/component-matrix/pom.xml
===================================================================
--- trunk/component-matrix/pom.xml 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/component-matrix/pom.xml 2010-05-04 16:10:23 UTC (rev 104443)
@@ -95,7 +95,7 @@
<version.org.jboss.cluster.cache.jbc>2.2.0.Final</version.org.jboss.cluster.cache.jbc>
<version.org.jboss.cluster.cache.spi>2.1.0.Final</version.org.jboss.cluster.cache.spi>
<version.org.jboss.cluster.client>1.1.1.GA</version.org.jboss.cluster.client>
- <version.org.jboss.cluster.server.api>1.1.1.GA</version.org.jboss.cluster.server.api>
+ <version.org.jboss.cluster.server.api>2.0.0.Alpha2</version.org.jboss.cluster.server.api>
<version.org.jboss.common.core>2.2.17.GA</version.org.jboss.common.core>
<version.org.jboss.deployers>2.2.0.Alpha4</version.org.jboss.deployers>
<version.org.jboss.ejb3.bom>0.1.2</version.org.jboss.ejb3.bom>
Modified: trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/RPCTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/RPCTestCase.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/RPCTestCase.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -166,19 +166,11 @@
ObjectName rpcService = new ObjectName(RPC_SERVICE);
Object obj0 = server0.invoke(rpcService, "runRetrieveFromCoordinator", null, null);
- assertNotNull("expected ArrayList as result type, got null", obj0);
- assertTrue( "expected ArrayList as result type, got " +obj0.getClass().getName(), obj0 instanceof ArrayList);
- ArrayList responses = (ArrayList)obj0;
+ assertNotNull("expected Person as result type, got null", obj0);
+ assertTrue( "expected Person as result type, got " +obj0.getClass().getName(), obj0 instanceof Person);
- // there should be one Person response
- assertEquals("Result should contain one response; ", 1, responses.size());
- Object response = responses.get(0);
- if (response instanceof Exception)
- fail("received exception response: " + ((Exception)response).toString());
- assertTrue("expected Person as response type, got " +response.getClass().getName(), response instanceof Person);
-
String employer = "WidgetsRUs";
- String respEmpl = ((Person)response).getEmployer();
+ String respEmpl = ((Person)obj0).getEmployer();
assertTrue("expected " + employer + " as selected response value, got " + respEmpl,
(employer.equalsIgnoreCase(respEmpl)));
}
Modified: trunk/testsuite/src/main/org/jboss/test/cluster/hapartition/drm/DRMUser.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/hapartition/drm/DRMUser.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/hapartition/drm/DRMUser.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -94,11 +94,11 @@
return drm.lookupLocalReplicant(key);
}
- public List lookupReplicants()
+ public List<?> lookupReplicants()
{
return drm.lookupReplicants(category);
}
- public List lookupReplicants(String key)
+ public List<?> lookupReplicants(String key)
{
return drm.lookupReplicants(key);
}
@@ -117,7 +117,7 @@
return sequence ++;
}
- public void replicantsChanged(String key, List newReplicants, int newReplicantsViewId, boolean merge)
+ public void replicantsChanged(String key, List<?> newReplicants, int newReplicantsViewId, boolean merge)
{
NotifyData data = new NotifyData();
data.key = key;
Modified: trunk/testsuite/src/main/org/jboss/test/cluster/hapartition/drm/IReplicants.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/hapartition/drm/IReplicants.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/hapartition/drm/IReplicants.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -35,7 +35,7 @@
{
static final long serialVersionUID = -1698998375571817852L;
public Serializable key;
- public List newReplicants;
+ public List<?> newReplicants;
public int newReplicantsViewId;
public String toString()
{
@@ -50,8 +50,8 @@
public Serializable lookupLocalReplicant();
public Serializable lookupLocalReplicant(String key);
- public List lookupReplicants();
- public List lookupReplicants(String key);
+ public List<?> lookupReplicants();
+ public List<?> lookupReplicants(String key);
public void add(String key, Serializable data)
throws Exception;
public void remove(String key)
Modified: trunk/testsuite/src/main/org/jboss/test/cluster/hapartition/drm/MockHAPartition.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/hapartition/drm/MockHAPartition.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/hapartition/drm/MockHAPartition.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -47,13 +47,13 @@
* Create a new MockHAPartition.
*
*/
- public MockHAPartition()
+ public MockHAPartition()
{
}
- public void callAsynchMethodOnCluster(String serviceName, String methodName, Object[] args, Class[] types,
- boolean excludeSelf) throws Exception
+ public void callAsynchMethodOnCluster(String serviceName, String methodName, Object[] args, Class<?>[] types,
+ boolean excludeSelf) throws InterruptedException
{
if (excludeSelf && "_remove".equals(methodName))
{
Modified: trunk/testsuite/src/main/org/jboss/test/cluster/hapartition/rpc/RPCUser.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/hapartition/rpc/RPCUser.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/hapartition/rpc/RPCUser.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -21,7 +21,6 @@
*/
package org.jboss.test.cluster.hapartition.rpc;
-import java.util.ArrayList;
import java.util.GregorianCalendar;
import org.jboss.ha.framework.interfaces.ClusterNode;
@@ -103,19 +102,19 @@
}
- public ArrayList runRetrieveAll() throws Exception
+ public Object runRetrieveAll() throws Exception
{
return partition.callMethodOnCluster(rpcServiceName, METHOD_GET_PERSON, null, null, false);
}
- public ArrayList runRetrieveQuery(PersonQuery query) throws Exception
+ public Object runRetrieveQuery(PersonQuery query) throws Exception
{
Object[] parms = new Object[]{query};
- Class[] types = new Class[]{PersonQuery.class};
+ Class<?>[] types = new Class[]{PersonQuery.class};
return partition.callMethodOnCluster(rpcServiceName, METHOD_GET_PERSON_MATCH, parms, types, false);
}
- public ArrayList runRetrieveFromCoordinator() throws Exception
+ public Object runRetrieveFromCoordinator() throws Exception
{
return partition.callMethodOnCoordinatorNode(rpcServiceName, METHOD_GET_PERSON, null, null, false);
}
@@ -123,7 +122,7 @@
public void runNotifyAllAsynch() throws Exception
{
Object[] parms = new Object[]{Boolean.TRUE};
- Class[] types = new Class[]{Boolean.class};
+ Class<?>[] types = new Class[]{Boolean.class};
partition.callAsynchMethodOnCluster(rpcServiceName, METHOD_NOTIFY_PERSON, parms, types, false);
}
Modified: trunk/testsuite/src/main/org/jboss/test/cluster/hapartition/rpc/RPCUserMBean.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/hapartition/rpc/RPCUserMBean.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/hapartition/rpc/RPCUserMBean.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -21,16 +21,14 @@
*/
package org.jboss.test.cluster.hapartition.rpc;
-import java.util.ArrayList;
-
import org.jboss.ha.framework.interfaces.HAPartition;
import org.jboss.system.ServiceMBean;
public interface RPCUserMBean extends ServiceMBean {
- public ArrayList runRetrieveAll() throws Exception;
- public ArrayList runRetrieveQuery(PersonQuery query) throws Exception;
- public ArrayList runRetrieveFromCoordinator() throws Exception;
+ public Object runRetrieveAll() throws Exception;
+ public Object runRetrieveQuery(PersonQuery query) throws Exception;
+ public Object runRetrieveFromCoordinator() throws Exception;
public void runNotifyAllAsynch() throws Exception;
public HAPartition getHAPartition();
public void setHAPartition(HAPartition partition);
Modified: trunk/testsuite/src/main/org/jboss/test/cluster/hasingleton/MockHAPartition.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/hasingleton/MockHAPartition.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/hasingleton/MockHAPartition.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -53,7 +53,7 @@
}
public void callAsynchMethodOnCluster(String serviceName, String methodName, Object[] args, Class[] types,
- boolean excludeSelf) throws Exception
+ boolean excludeSelf) throws InterruptedException
{
if (excludeSelf && "stopOldMaster".equals(methodName))
{
Modified: trunk/testsuite/src/main/org/jboss/test/cluster/invokerha/JRMPInvokerHaMockUtils.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/invokerha/JRMPInvokerHaMockUtils.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/invokerha/JRMPInvokerHaMockUtils.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -21,7 +21,7 @@
*/
package org.jboss.test.cluster.invokerha;
-import java.util.ArrayList;
+import java.util.List;
import org.jboss.ha.framework.interfaces.LoadBalancePolicy;
import org.jboss.invocation.Invocation;
@@ -83,7 +83,7 @@
}
@Override
- protected Invoker createProxy(ArrayList targets, LoadBalancePolicy policy, String proxyFamilyName,
+ protected Invoker createProxy(List<?> targets, LoadBalancePolicy policy, String proxyFamilyName,
long viewId)
{
return new MockJRMPInvokerProxyHA(targets, policy, proxyFamilyName, viewId);
@@ -106,7 +106,7 @@
public static class MockJRMPInvokerProxyHA extends JRMPInvokerProxyHA
{
- public MockJRMPInvokerProxyHA(ArrayList targets, LoadBalancePolicy policy, String proxyFamilyName, long viewId)
+ public MockJRMPInvokerProxyHA(List<?> targets, LoadBalancePolicy policy, String proxyFamilyName, long viewId)
{
super(targets, policy, proxyFamilyName, viewId);
}
Modified: trunk/testsuite/src/main/org/jboss/test/cluster/invokerha/UnifiedInvokerHaMockUtils.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/invokerha/UnifiedInvokerHaMockUtils.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/invokerha/UnifiedInvokerHaMockUtils.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -22,7 +22,6 @@
package org.jboss.test.cluster.invokerha;
import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -84,7 +83,7 @@
}
@Override
- protected Invoker createProxy(boolean isStrictRMIException, ArrayList targets,
+ protected Invoker createProxy(boolean isStrictRMIException, List<?> targets,
LoadBalancePolicy policy, String proxyFamilyName, long viewId)
{
/* default invoker locator that will later be replaced by what the
@@ -117,7 +116,7 @@
public static class MockUnifiedInvokerHAProxy extends UnifiedInvokerHAProxy
{
- public MockUnifiedInvokerHAProxy(InvokerLocator locator, boolean isStrictRMIException, ArrayList targets,
+ public MockUnifiedInvokerHAProxy(InvokerLocator locator, boolean isStrictRMIException, List<?> targets,
LoadBalancePolicy policy, String proxyFamilyName, long viewId)
{
super(locator, isStrictRMIException, targets, policy, proxyFamilyName, viewId);
Modified: trunk/testsuite/src/main/org/jboss/test/cluster/testutil/MockHAPartition.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/testutil/MockHAPartition.java 2010-05-04 15:48:52 UTC (rev 104442)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/testutil/MockHAPartition.java 2010-05-04 16:10:23 UTC (rev 104443)
@@ -21,14 +21,20 @@
*/
package org.jboss.test.cluster.testutil;
+import java.io.Serializable;
import java.util.ArrayList;
+import java.util.List;
import java.util.Vector;
+import java.util.concurrent.Future;
import org.jboss.ha.framework.interfaces.ClusterNode;
import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
import org.jboss.ha.framework.interfaces.DistributedState;
+import org.jboss.ha.framework.interfaces.GroupCommunicationService;
+import org.jboss.ha.framework.interfaces.GroupMembershipListener;
import org.jboss.ha.framework.interfaces.HAPartition;
import org.jboss.ha.framework.interfaces.ResponseFilter;
+import org.jboss.ha.framework.interfaces.StateTransferProvider;
/**
* Mock implementation of HAPartition intended to support unit testing
@@ -44,8 +50,9 @@
public static final String PARTITION_NAME = "MockPartition";
private DistributedReplicantManager drm;
- private Vector currentNodes;
+ private Vector<ClusterNode> currentNodes;
private ClusterNode localAddress;
+ @SuppressWarnings("unchecked")
private ArrayList remoteReplicants;
public MockHAPartition(ClusterNode localAddress)
@@ -102,9 +109,16 @@
throw new UnsupportedOperationException("not implemented");
}
- public ArrayList callMethodOnCluster(String serviceName, String methodName, Object[] args, Class[] types,
- boolean excludeSelf) throws Exception
+ public ArrayList callMethodOnCluster(String serviceName, String methodName, Object[] args, Class<?>[] types,
+ boolean excludeSelf) throws InterruptedException
{
+ return callMethodOnCluster(serviceName, methodName, args, types, Object.class, true, null, 0, false);
+ }
+
+ public <T> ArrayList<T> callMethodOnCluster(String serviceName, String methodName, Object[] args, Class<?>[] types,
+ Class<T> returnType, boolean excludeSelf, ResponseFilter filter, long methodTimeout, boolean unordered)
+ throws InterruptedException
+ {
if (excludeSelf)
{
if ("_add".equals(methodName))
@@ -122,20 +136,20 @@
throw new UnsupportedOperationException("not implemented");
}
- public ArrayList callMethodOnCluster(String serviceName, String methodName, Object[] args, Class[] types,
- boolean excludeSelf, ResponseFilter filter) throws Exception
+ public ArrayList callMethodOnCluster(String serviceName, String methodName, Object[] args, Class<?>[] types,
+ boolean excludeSelf, ResponseFilter filter) throws InterruptedException
{
throw new UnsupportedOperationException("not implemented");
}
- public ArrayList callMethodOnCluster(String serviceName, String methodName, Object[] args, boolean excludeSelf)
+ public void callMethodOnCluster(String serviceName, String methodName, Object[] args, boolean excludeSelf)
throws Exception
{
throw new UnsupportedOperationException("not implemented");
}
- public void callAsynchMethodOnCluster(String serviceName, String methodName, Object[] args, Class[] types,
- boolean excludeSelf) throws Exception
+ public void callAsynchMethodOnCluster(String serviceName, String methodName, Object[] args, Class<?>[] types,
+ boolean excludeSelf) throws InterruptedException
{
throw new UnsupportedOperationException("not implemented");
}
@@ -144,28 +158,80 @@
throws Exception
{
throw new UnsupportedOperationException("not implemented");
+ }
+
+ public void callAsynchMethodOnCluster(String objName, String methodName, Object[] args, Class<?>[] types,
+ boolean excludeSelf, boolean unordered) throws InterruptedException
+ {
+ throw new UnsupportedOperationException("not implemented");
}
- public ArrayList callMethodOnCoordinatorNode(String serviceName, String methodName, Object[] args, Class[] types,
+ public ArrayList callMethodOnCoordinatorNode(String serviceName, String methodName, Object[] args, Class<?>[] types,
boolean excludeSelf) throws Exception
{
throw new UnsupportedOperationException("not implemented");
}
+ public <T> T callMethodOnCoordinatorNode(String serviceName, String methodName, Object[] args, Class<?>[] types,
+ Class<T> returnType, boolean excludeSelf, long methodTimeout, boolean unordered) throws Exception
+ {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
public Object callMethodOnNode(String serviceName, String methodName,
- Object[] args, Class[] types, long methodTimeout, ClusterNode targetNode) throws Throwable
+ Object[] args, Class[] types, long methodTimeout, ClusterNode targetNode) throws Exception
{
throw new UnsupportedOperationException("not implemented");
}
+ public <T> T callMethodOnNode(String serviceName, String methodName, Object[] args, Class<?>[] types,
+ Class<T> returnType, long methodTimeout, ClusterNode targetNode, boolean unordered) throws Exception
+ {
+ throw new UnsupportedOperationException("not implemented");
+ }
+ public Object callMethodOnNode(String serviceName, String methodName, Object[] args, Class<?>[] types,
+ ClusterNode targetNode) throws Exception
+ {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+
public void callAsyncMethodOnNode(String serviceName, String methodName,
- Object[] args, Class[] types, long methodTimeout, ClusterNode targetNode) throws Throwable
+ Object[] args, Class<?>[] types, long methodTimeout, ClusterNode targetNode) throws Throwable
{
throw new UnsupportedOperationException("not implemented");
}
-
+ public void callAsyncMethodOnCoordinatorNode(String serviceName, String methodName, Object[] args, Class<?>[] types,
+ boolean excludeSelf, boolean unordered) throws Exception
+ {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ public void callAsyncMethodOnCoordinatorNode(String serviceName, String methodName, Object[] args, Class<?>[] types,
+ boolean excludeSelf) throws Exception
+ {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ public void callAsyncMethodOnNode(String serviceName, String methodName, Object[] args, Class<?>[] types,
+ ClusterNode targetNode, boolean unordered) throws Exception
+ {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ public void callAsyncMethodOnNode(String serviceName, String methodName, Object[] args, Class<?>[] types,
+ ClusterNode targetNode) throws Exception
+ {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ public long getMethodCallTimeout()
+ {
+ return 60000;
+ }
+
public void subscribeToStateTransferEvents(String serviceName, HAPartitionStateTransfer subscriber)
{
// no-op. at this point the test fixture directly passes state
@@ -206,11 +272,11 @@
throw new UnsupportedOperationException("not implemented");
}
- public Vector getCurrentView()
+ public Vector<String> getCurrentView()
{
- Vector result = new Vector();
+ Vector<String> result = new Vector<String>();
for (int i = 0; i < currentNodes.size(); i++)
- result.add(((ClusterNode) currentNodes.elementAt(i)).getName());
+ result.add(currentNodes.elementAt(i).getName());
return result;
}
@@ -228,12 +294,12 @@
// --------------------------------------------------------- Public Methods
- public void setCurrentViewClusterNodes(Vector nodes)
+ public void setCurrentViewClusterNodes(Vector<ClusterNode> nodes)
{
this.currentNodes = nodes;
}
- public void setRemoteReplicants(ArrayList remoteReplicants)
+ public void setRemoteReplicants(ArrayList<?> remoteReplicants)
{
this.remoteReplicants = remoteReplicants;
}
@@ -243,4 +309,50 @@
this.localAddress = localAddress;
}
+ public String getGroupName()
+ {
+ return getPartitionName();
+ }
+
+ public boolean isConsistentWith(GroupCommunicationService other)
+ {
+ return this == other;
+ }
+
+ public void registerGroupMembershipListener(GroupMembershipListener listener)
+ {
+ // no-op. at this point the test fixture directly passes membership
+ // changes to the target DRM
+ }
+
+ public void unregisterGroupMembershipListener(GroupMembershipListener listener)
+ {
+ // no-op. at this point the test fixture directly passes membership
+ // changes to the target DRM
+ }
+
+ public Future<Serializable> getServiceState(String serviceName)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public Future<Serializable> getServiceState(String serviceName, ClassLoader classloader)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void registerStateTransferProvider(String serviceName, StateTransferProvider provider)
+ {
+ // no-op. at this point the test fixture directly passes state
+ // to the target DRM
+ }
+
+ public void unregisterStateTransferProvider(String serviceName)
+ {
+ // no-op. at this point the test fixture directly passes state
+ // to the target DRM
+ }
+
}
More information about the jboss-cvs-commits
mailing list