[jboss-cvs] JBossAS SVN: r104703 - in projects/cluster/ha-server-api/trunk/src: test/java/org/jboss/test/ha/framework/server/lock and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue May 11 21:47:44 EDT 2010
Author: bstansberry at jboss.com
Date: 2010-05-11 21:47:43 -0400 (Tue, 11 May 2010)
New Revision: 104703
Modified:
projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java
projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/LocalAndClusterLockManager.java
projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/NonGloballyExclusiveClusterLockSupport.java
projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/SharedLocalYieldingClusterLockManager.java
projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/YieldingGloballyExclusiveClusterLockSupport.java
projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/ClusteredLockManagerTestBase.java
projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/ReadWriteClusteredLockManagerUnitTestCase.java
Log:
{JBCLUSTER-223] Use the new group RPC API
Modified: projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java
===================================================================
--- projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java 2010-05-12 01:47:01 UTC (rev 104702)
+++ projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java 2010-05-12 01:47:43 UTC (rev 104703)
@@ -27,14 +27,15 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.jboss.ha.framework.interfaces.ClusterNode;
+import org.jboss.ha.framework.interfaces.GroupMembershipListener;
+import org.jboss.ha.framework.interfaces.GroupMembershipNotifier;
+import org.jboss.ha.framework.interfaces.GroupRpcDispatcher;
import org.jboss.ha.framework.interfaces.HAPartition;
-import org.jboss.ha.framework.interfaces.HAPartition.HAMembershipListener;
import org.jboss.ha.framework.server.lock.ClusterLockState.State;
import org.jboss.logging.Logger;
@@ -45,7 +46,7 @@
*
* @version $Revision:$
*/
-public abstract class AbstractClusterLockSupport implements HAMembershipListener
+public abstract class AbstractClusterLockSupport implements GroupMembershipListener
{
public static final Class<?>[] REMOTE_LOCK_TYPES = new Class[]{Serializable.class, ClusterNode.class, long.class};
public static final Class<?>[] RELEASE_REMOTE_LOCK_TYPES = new Class[]{Serializable.class, ClusterNode.class};
@@ -80,7 +81,8 @@
private final ConcurrentMap<ClusterNode, Set<ClusterLockState>> lockStatesByOwner = new ConcurrentHashMap<ClusterNode, Set<ClusterLockState>>();
private ClusterNode me;
private final String serviceHAName;
- private final HAPartition partition;
+ private final GroupRpcDispatcher rpcDispatcher;
+ private final GroupMembershipNotifier membershipNotifier;
private final LocalLockHandler localHandler;
private final List<ClusterNode> members = new CopyOnWriteArrayList<ClusterNode>();
// private final boolean supportLocalOnly;
@@ -90,29 +92,42 @@
HAPartition partition,
LocalLockHandler handler)
{
+ this(serviceHAName, partition, partition, handler);
+ }
+
+ public AbstractClusterLockSupport(String serviceHAName,
+ GroupRpcDispatcher rpcDispatcher,
+ GroupMembershipNotifier membershipNotifier,
+ LocalLockHandler handler)
+ {
if (serviceHAName == null)
{
throw new IllegalArgumentException("serviceHAName is null");
}
- if (partition == null)
+ if (rpcDispatcher == null)
{
- throw new IllegalArgumentException("partition is null");
+ throw new IllegalArgumentException("rpcDispatcher is null");
}
+ if (membershipNotifier == null)
+ {
+ throw new IllegalArgumentException("membershipNotifier is null");
+ }
if (handler == null)
{
throw new IllegalArgumentException("localHandler is null");
}
- this.partition = partition;
+ this.rpcDispatcher = rpcDispatcher;
+ this.membershipNotifier = membershipNotifier;
this.localHandler = handler;
this.serviceHAName = serviceHAName;
}
// -------------------------------------------------------------- Properties
- public HAPartition getPartition()
+ public GroupRpcDispatcher getGroupRpcDispatcher()
{
- return partition;
+ return rpcDispatcher;
}
public String getServiceHAName()
@@ -154,9 +169,9 @@
{
// Get the lock on all other nodes in the cluster
- List<RemoteLockResponse> rsps = partition.callMethodOnCluster(getServiceHAName(),
+ List<RemoteLockResponse> rsps = rpcDispatcher.callMethodOnCluster(getServiceHAName(),
"remoteLock", new Object[]{lockId, me, new Long(left)},
- REMOTE_LOCK_TYPES, RemoteLockResponse.class, true, null, partition.getMethodCallTimeout(), false);
+ REMOTE_LOCK_TYPES, RemoteLockResponse.class, true, null, rpcDispatcher.getMethodCallTimeout(), false);
boolean remoteLocked = true;
if (rsps != null)
@@ -258,9 +273,9 @@
public abstract void unlock(Serializable lockId);
- public String getPartitionName()
+ public String getGroupName()
{
- return partition.getPartitionName();
+ return rpcDispatcher.getGroupName();
}
public ClusterNode getLocalClusterNode()
@@ -275,39 +290,38 @@
public void start() throws Exception
{
- this.me = this.partition.getClusterNode();
+ this.me = this.rpcDispatcher.getClusterNode();
this.localHandler.setLocalNode(this.me);
this.rpcTarget = new RpcTarget(this);
- this.partition.registerRPCHandler(this.serviceHAName, this.rpcTarget);
- this.partition.registerMembershipListener(this);
+ this.rpcDispatcher.registerRPCHandler(this.serviceHAName, this.rpcTarget);
+ this.membershipNotifier.registerGroupMembershipListener(this);
- Vector<ClusterNode> allMembers = new Vector<ClusterNode>();
- for (ClusterNode node : this.partition.getClusterNodes())
+ List<ClusterNode> allMembers = new ArrayList<ClusterNode>();
+ for (ClusterNode node : this.rpcDispatcher.getClusterNodes())
{
allMembers.add(node);
}
- membershipChanged(new Vector<ClusterNode>(), allMembers, allMembers);
+ membershipChanged(new ArrayList<ClusterNode>(), allMembers, allMembers);
}
public void stop() throws Exception
{
if (this.rpcTarget != null)
{
- this.partition.unregisterRPCHandler(this.serviceHAName, this.rpcTarget);
+ this.rpcDispatcher.unregisterRPCHandler(this.serviceHAName, this.rpcTarget);
this.rpcTarget = null;
- this.partition.unregisterMembershipListener(this);
- Vector<ClusterNode> dead = new Vector<ClusterNode>(members);
- Vector<ClusterNode> empty = new Vector<ClusterNode>();
+ this.membershipNotifier.unregisterGroupMembershipListener(this);
+ List<ClusterNode> dead = new ArrayList<ClusterNode>(members);
+ List<ClusterNode> empty = new ArrayList<ClusterNode>();
membershipChanged(dead, empty, empty);
this.me = null;
}
}
- // ---------------------------------------------------- HAMembershipListener
+ // ---------------------------------------------------- GroupMembershipListener
- @SuppressWarnings("unchecked")
- public synchronized void membershipChanged(Vector deadMembers, Vector newMembers, Vector allMembers)
+ public synchronized void membershipChanged(List<ClusterNode> deadMembers, List<ClusterNode> newMembers, List<ClusterNode> allMembers)
{
this.members.clear();
this.members.addAll(allMembers);
@@ -334,6 +348,13 @@
}
}
+ public void membershipChangedDuringMerge(List<ClusterNode> deadMembers, List<ClusterNode> newMembers,
+ List<ClusterNode> allMembers, List<List<ClusterNode>> originatingGroups)
+ {
+ // TODO how does a merge relate to lock ownership?
+ membershipChanged(deadMembers, newMembers, allMembers);
+ }
+
// --------------------------------------------------------------- Protected
protected abstract RemoteLockResponse handleLockSuccess(ClusterLockState lockState, ClusterNode caller);
@@ -537,7 +558,7 @@
{
try
{
- partition.callMethodOnCluster(getServiceHAName(), "releaseRemoteLock", new Object[]{categoryName, me}, RELEASE_REMOTE_LOCK_TYPES, true);
+ rpcDispatcher.callMethodOnCluster(getServiceHAName(), "releaseRemoteLock", new Object[]{categoryName, me}, RELEASE_REMOTE_LOCK_TYPES, true);
}
catch (RuntimeException e)
{
Modified: projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/LocalAndClusterLockManager.java
===================================================================
--- projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/LocalAndClusterLockManager.java 2010-05-12 01:47:01 UTC (rev 104702)
+++ projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/LocalAndClusterLockManager.java 2010-05-12 01:47:43 UTC (rev 104703)
@@ -31,6 +31,8 @@
import java.util.concurrent.locks.LockSupport;
import org.jboss.ha.framework.interfaces.ClusterNode;
+import org.jboss.ha.framework.interfaces.GroupMembershipNotifier;
+import org.jboss.ha.framework.interfaces.GroupRpcDispatcher;
import org.jboss.ha.framework.interfaces.HAPartition;
/**
@@ -156,8 +158,13 @@
public LocalAndClusterLockManager(String serviceHAName, HAPartition partition)
{
+ this(serviceHAName, partition, partition);
+ }
+
+ public LocalAndClusterLockManager(String serviceHAName, GroupRpcDispatcher rpcDispatcher, GroupMembershipNotifier membershipNotifier)
+ {
ClusterHandler handler = new ClusterHandler();
- clusterSupport = new NonGloballyExclusiveClusterLockSupport(serviceHAName, partition, handler);
+ clusterSupport = new NonGloballyExclusiveClusterLockSupport(serviceHAName, rpcDispatcher, membershipNotifier, handler);
}
// ----------------------------------------------------------------- Public
Modified: projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/NonGloballyExclusiveClusterLockSupport.java
===================================================================
--- projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/NonGloballyExclusiveClusterLockSupport.java 2010-05-12 01:47:01 UTC (rev 104702)
+++ projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/NonGloballyExclusiveClusterLockSupport.java 2010-05-12 01:47:43 UTC (rev 104703)
@@ -25,6 +25,8 @@
import java.io.Serializable;
import org.jboss.ha.framework.interfaces.ClusterNode;
+import org.jboss.ha.framework.interfaces.GroupMembershipNotifier;
+import org.jboss.ha.framework.interfaces.GroupRpcDispatcher;
import org.jboss.ha.framework.interfaces.HAPartition;
/**
@@ -50,9 +52,16 @@
HAPartition partition,
LocalLockHandler handler)
{
- super(serviceHAName, partition, handler);
+ this(serviceHAName, partition, partition, handler);
}
+ public NonGloballyExclusiveClusterLockSupport(String serviceHAName,
+ GroupRpcDispatcher rpcDispatcher, GroupMembershipNotifier membershipNotifier,
+ LocalLockHandler handler)
+ {
+ super(serviceHAName, rpcDispatcher, membershipNotifier, handler);
+ }
+
// ------------------------------------------------------------------ Public
public void unlock(Serializable lockId)
@@ -72,7 +81,7 @@
try
{
- getPartition().callMethodOnCluster(getServiceHAName(), "releaseRemoteLock",
+ getGroupRpcDispatcher().callMethodOnCluster(getServiceHAName(), "releaseRemoteLock",
new Object[]{lockId, myself},
RELEASE_REMOTE_LOCK_TYPES, true);
}
Modified: projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/SharedLocalYieldingClusterLockManager.java
===================================================================
--- projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/SharedLocalYieldingClusterLockManager.java 2010-05-12 01:47:01 UTC (rev 104702)
+++ projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/SharedLocalYieldingClusterLockManager.java 2010-05-12 01:47:43 UTC (rev 104703)
@@ -31,6 +31,8 @@
import java.util.concurrent.locks.LockSupport;
import org.jboss.ha.framework.interfaces.ClusterNode;
+import org.jboss.ha.framework.interfaces.GroupMembershipNotifier;
+import org.jboss.ha.framework.interfaces.GroupRpcDispatcher;
import org.jboss.ha.framework.interfaces.HAPartition;
/**
@@ -206,8 +208,13 @@
public SharedLocalYieldingClusterLockManager(String serviceHAName, HAPartition partition)
{
+ this(serviceHAName, partition, partition);
+ }
+
+ public SharedLocalYieldingClusterLockManager(String serviceHAName, GroupRpcDispatcher rpcDispatcher, GroupMembershipNotifier membershipNotifier)
+ {
ClusterHandler handler = new ClusterHandler();
- clusterSupport = new YieldingGloballyExclusiveClusterLockSupport(serviceHAName, partition, handler);
+ clusterSupport = new YieldingGloballyExclusiveClusterLockSupport(serviceHAName, rpcDispatcher, membershipNotifier, handler);
}
// ----------------------------------------------------------------- Public
Modified: projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/YieldingGloballyExclusiveClusterLockSupport.java
===================================================================
--- projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/YieldingGloballyExclusiveClusterLockSupport.java 2010-05-12 01:47:01 UTC (rev 104702)
+++ projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/YieldingGloballyExclusiveClusterLockSupport.java 2010-05-12 01:47:43 UTC (rev 104703)
@@ -25,6 +25,8 @@
import java.io.Serializable;
import org.jboss.ha.framework.interfaces.ClusterNode;
+import org.jboss.ha.framework.interfaces.GroupMembershipNotifier;
+import org.jboss.ha.framework.interfaces.GroupRpcDispatcher;
import org.jboss.ha.framework.interfaces.HAPartition;
/**
@@ -48,9 +50,16 @@
HAPartition partition,
LocalLockHandler handler)
{
- super(serviceHAName, partition, handler);
+ this(serviceHAName, partition, partition, handler);
}
+ public YieldingGloballyExclusiveClusterLockSupport(String serviceHAName,
+ GroupRpcDispatcher rpcDispatcher, GroupMembershipNotifier membershipNotifier,
+ LocalLockHandler handler)
+ {
+ super(serviceHAName, rpcDispatcher, membershipNotifier, handler);
+ }
+
// ------------------------------------------------------ ClusterLockManager
public void unlock(Serializable lockId)
Modified: projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/ClusteredLockManagerTestBase.java
===================================================================
--- projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/ClusteredLockManagerTestBase.java 2010-05-12 01:47:01 UTC (rev 104702)
+++ projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/ClusteredLockManagerTestBase.java 2010-05-12 01:47:43 UTC (rev 104703)
@@ -97,7 +97,7 @@
catch (IllegalArgumentException good) {}
expect(haPartition.getClusterNode()).andReturn(node1);
- expect(haPartition.getPartitionName()).andReturn("TestPartition");
+ expect(haPartition.getGroupName()).andReturn("TestPartition");
replay(haPartition);
replay(handler);
@@ -105,7 +105,7 @@
T testee = createClusteredLockManager("test", haPartition, handler);
assertEquals("test", testee.getServiceHAName());
- assertEquals("TestPartition", testee.getPartitionName());
+ assertEquals("TestPartition", testee.getGroupName());
}
public void testStart() throws Exception
@@ -113,7 +113,7 @@
HAPartition haPartition = createNiceMock(HAPartition.class);
LocalLockHandler handler = createNiceMock(LocalLockHandler.class);
expect(haPartition.getClusterNode()).andReturn(node1);
- expect(haPartition.getPartitionName()).andReturn("TestPartition");
+ expect(haPartition.getGroupName()).andReturn("TestPartition");
replay(haPartition);
replay(handler);
@@ -139,7 +139,7 @@
assertEquals("Current view is empty when unstarted", 0, testee.getCurrentView().size());
haPartition.registerRPCHandler(eq("test"), isA(RpcTarget.class));
- haPartition.registerMembershipListener(testee);
+ haPartition.registerGroupMembershipListener(testee);
expect(haPartition.getClusterNodes()).andReturn(new ClusterNode[]{node1});
replay(haPartition);
@@ -156,11 +156,11 @@
{
TesteeSet<T> testeeSet = getTesteeSet(node1, 0, 1);
T testee = testeeSet.impl;
- HAPartition haPartition = testee.getPartition();
+ HAPartition haPartition = (HAPartition) testee.getGroupRpcDispatcher();
reset(haPartition);
- haPartition.unregisterMembershipListener(testee);
+ haPartition.unregisterGroupMembershipListener(testee);
haPartition.unregisterRPCHandler(eq("test"), same(testeeSet.target));
replay(haPartition);
@@ -245,7 +245,7 @@
int viewPos = viewSize == 1 ? 0 : 1;
TesteeSet<T> testeeSet = getTesteeSet(node1, viewPos, viewSize);
AbstractClusterLockSupport testee = testeeSet.impl;
- HAPartition partition = testee.getPartition();
+ HAPartition partition = (HAPartition) testee.getGroupRpcDispatcher();
LocalLockHandler handler = testee.getLocalHandler();
resetToStrict(partition);
@@ -283,7 +283,7 @@
{
TesteeSet<T> testeeSet = getTesteeSet(node1, 1, 3);
AbstractClusterLockSupport testee = testeeSet.impl;
- HAPartition partition = testee.getPartition();
+ HAPartition partition = (HAPartition) testee.getGroupRpcDispatcher();
LocalLockHandler handler = testee.getLocalHandler();
resetToNice(partition);
@@ -321,7 +321,7 @@
{
TesteeSet<T> testeeSet = getTesteeSet(node1, 1, 3);
AbstractClusterLockSupport testee = testeeSet.impl;
- HAPartition partition = testee.getPartition();
+ HAPartition partition = (HAPartition) testee.getGroupRpcDispatcher();
LocalLockHandler handler = testee.getLocalHandler();
resetToStrict(partition);
@@ -384,7 +384,7 @@
{
TesteeSet<T> testeeSet = getTesteeSet(node1, 1, 3);
T testee = testeeSet.impl;
- HAPartition partition = testee.getPartition();
+ HAPartition partition = (HAPartition) testee.getGroupRpcDispatcher();
LocalLockHandler handler = testee.getLocalHandler();
final RpcTarget target = testeeSet.target;
@@ -460,7 +460,7 @@
{
TesteeSet<T> testeeSet = getTesteeSet(node1, 1, 3);
T testee = testeeSet.impl;
- HAPartition partition = testee.getPartition();
+ HAPartition partition = (HAPartition) testee.getGroupRpcDispatcher();
LocalLockHandler handler = testee.getLocalHandler();
final RpcTarget target = testeeSet.target;
@@ -543,7 +543,7 @@
{
TesteeSet<T> testeeSet = getTesteeSet(node1, 1, 3);
T testee = testeeSet.impl;
- HAPartition partition = testee.getPartition();
+ HAPartition partition = (HAPartition) testee.getGroupRpcDispatcher();
LocalLockHandler handler = testee.getLocalHandler();
final RpcTarget target = testeeSet.target;
@@ -651,7 +651,7 @@
{
TesteeSet<T> testeeSet = getTesteeSet(node1, 1, 2);
AbstractClusterLockSupport testee = testeeSet.impl;
- HAPartition partition = testee.getPartition();
+ HAPartition partition = (HAPartition) testee.getGroupRpcDispatcher();
LocalLockHandler handler = testee.getLocalHandler();
ClusterNode other = testee.getCurrentView().get(0);
@@ -691,7 +691,7 @@
HAPartition haPartition = createNiceMock(HAPartition.class);
LocalLockHandler handler = createNiceMock(LocalLockHandler.class);
expect(haPartition.getClusterNode()).andReturn(node);
- expect(haPartition.getPartitionName()).andReturn("TestPartition");
+ expect(haPartition.getGroupName()).andReturn("TestPartition");
Capture<RpcTarget> c = new Capture<RpcTarget>();
haPartition.registerRPCHandler(eq("test"), and(isA(RpcTarget.class), capture(c)));
Modified: projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/ReadWriteClusteredLockManagerUnitTestCase.java
===================================================================
--- projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/ReadWriteClusteredLockManagerUnitTestCase.java 2010-05-12 01:47:01 UTC (rev 104702)
+++ projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/ReadWriteClusteredLockManagerUnitTestCase.java 2010-05-12 01:47:43 UTC (rev 104703)
@@ -301,7 +301,7 @@
int viewPos = viewSize == 1 ? 0 : 1;
TesteeSet<NonGloballyExclusiveClusterLockSupport> testeeSet = getTesteeSet(node1, viewPos, viewSize);
NonGloballyExclusiveClusterLockSupport testee = testeeSet.impl;
- HAPartition partition = testee.getPartition();
+ HAPartition partition = (HAPartition) testee.getGroupRpcDispatcher();
LocalLockHandler handler = testee.getLocalHandler();
resetToNice(partition);
@@ -411,7 +411,7 @@
{
TesteeSet<NonGloballyExclusiveClusterLockSupport> testeeSet = getTesteeSet(node1, 1, 3);
NonGloballyExclusiveClusterLockSupport testee = testeeSet.impl;
- HAPartition partition = testee.getPartition();
+ HAPartition partition = (HAPartition) testee.getGroupRpcDispatcher();
LocalLockHandler handler = testee.getLocalHandler();
RpcTarget target = testeeSet.target;
More information about the jboss-cvs-commits
mailing list