[infinispan-commits] Infinispan SVN: r704 - in trunk: core/src/main/java/org/infinispan/commands and 5 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Tue Aug 18 12:19:18 EDT 2009
Author: manik.surtani at jboss.com
Date: 2009-08-18 12:19:17 -0400 (Tue, 18 Aug 2009)
New Revision: 704
Added:
trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
Removed:
trunk/core/src/main/java/org/infinispan/commands/control/GetConsistentHashCommand.java
trunk/core/src/main/java/org/infinispan/commands/control/InstallConsistentHashCommand.java
trunk/core/src/main/java/org/infinispan/commands/control/JoinCompleteCommand.java
trunk/core/src/main/java/org/infinispan/commands/control/PullStateCommand.java
trunk/core/src/main/java/org/infinispan/commands/control/PushStateCommand.java
Modified:
trunk/cachestore/jdbc/src/test/java/org/infinispan/loaders/jdbc/TableNameUniquenessTest.java
trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java
trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java
trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java
trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java
trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java
trunk/core/src/main/java/org/infinispan/marshall/Ids.java
trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java
trunk/core/src/test/java/org/infinispan/test/fwk/TcpMPingEnvironmentTest.java
Log:
More DIST work
Modified: trunk/cachestore/jdbc/src/test/java/org/infinispan/loaders/jdbc/TableNameUniquenessTest.java
===================================================================
--- trunk/cachestore/jdbc/src/test/java/org/infinispan/loaders/jdbc/TableNameUniquenessTest.java 2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/cachestore/jdbc/src/test/java/org/infinispan/loaders/jdbc/TableNameUniquenessTest.java 2009-08-18 16:19:17 UTC (rev 704)
@@ -72,6 +72,7 @@
assertNoOverlapingState(first, second, firstCs, secondCs);
}
+ @SuppressWarnings("unchecked")
public void testForMixedCacheStore() throws Exception {
CacheManager cm = new DefaultCacheManager("configs/mixed.xml");
Cache first = cm.getCache("first");
@@ -95,13 +96,13 @@
Person person1 = new Person(29, "Mircea");
Person person2 = new Person(29, "Manik");
- first.put("k",person1);
+ first.put("k", person1);
assert firstCs.containsKey("k");
assert !secondCs.containsKey("k");
assert first.get("k").equals(person1);
assert second.get("k") == null;
- second.put("k2",person2);
+ second.put("k2", person2);
assert second.get("k2").equals(person2);
assert first.get("k2") == null;
}
@@ -145,13 +146,13 @@
}
private void assertNoOverlapingState(Cache first, Cache second, CacheStore firstCs, CacheStore secondCs) throws CacheLoaderException {
- first.put("k","v");
+ first.put("k", "v");
assert firstCs.containsKey("k");
assert !secondCs.containsKey("k");
assert first.get("k").equals("v");
assert second.get("k") == null;
- second.put("k2","v2");
+ second.put("k2", "v2");
assert second.get("k2").equals("v2");
assert first.get("k2") == null;
}
Modified: trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java 2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java 2009-08-18 16:19:17 UTC (rev 704)
@@ -21,12 +21,8 @@
*/
package org.infinispan.commands;
-import org.infinispan.commands.control.GetConsistentHashCommand;
-import org.infinispan.commands.control.InstallConsistentHashCommand;
-import org.infinispan.commands.control.JoinCompleteCommand;
import org.infinispan.commands.control.LockControlCommand;
-import org.infinispan.commands.control.PullStateCommand;
-import org.infinispan.commands.control.PushStateCommand;
+import org.infinispan.commands.control.RehashControlCommand;
import org.infinispan.commands.control.StateTransferControlCommand;
import org.infinispan.commands.read.EntrySetCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
@@ -120,13 +116,9 @@
LockControlCommand buildLockControlCommand(Collection keys, boolean implicit);
- GetConsistentHashCommand buildGetConsistentHashCommand(Address joiner);
+ RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type subtype, Address sender);
- InstallConsistentHashCommand buildInstallConsistentHashCommand(Address joiner, boolean starting);
+ RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type subtype, Address sender, Map<Object, InternalCacheValue> state);
- PushStateCommand buildPushStateCommand(Address sender, Map<Object, InternalCacheValue> state);
-
- PullStateCommand buildPullStateCommand(Address requestor, ConsistentHash newCH);
-
- JoinCompleteCommand buildJoinCompleteCommand(Address joiner);
+ RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type subtype, Address sender, Map<Object, InternalCacheValue> state, ConsistentHash consistentHash);
}
Modified: trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java 2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java 2009-08-18 16:19:17 UTC (rev 704)
@@ -22,12 +22,8 @@
package org.infinispan.commands;
import org.infinispan.Cache;
-import org.infinispan.commands.control.GetConsistentHashCommand;
-import org.infinispan.commands.control.InstallConsistentHashCommand;
-import org.infinispan.commands.control.JoinCompleteCommand;
import org.infinispan.commands.control.LockControlCommand;
-import org.infinispan.commands.control.PullStateCommand;
-import org.infinispan.commands.control.PushStateCommand;
+import org.infinispan.commands.control.RehashControlCommand;
import org.infinispan.commands.control.StateTransferControlCommand;
import org.infinispan.commands.read.EntrySetCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
@@ -274,22 +270,10 @@
LockControlCommand lcc = (LockControlCommand) c;
lcc.init(interceptorChain, icc, txTable);
break;
- case GetConsistentHashCommand.COMMAND_ID:
- GetConsistentHashCommand gchc = (GetConsistentHashCommand) c;
- gchc.initialize(distributionManager);
+ case RehashControlCommand.COMMAND_ID:
+ RehashControlCommand rcc = (RehashControlCommand) c;
+ rcc.init(distributionManager, configuration, dataContainer);
break;
- case InstallConsistentHashCommand.COMMAND_ID:
- InstallConsistentHashCommand ichc = (InstallConsistentHashCommand) c;
- ichc.initialize(distributionManager);
- break;
- case PullStateCommand.COMMAND_ID:
- PullStateCommand psc = (PullStateCommand) c;
- psc.init(dataContainer, cacheLoaderManager, distributionManager, configuration);
- break;
- case JoinCompleteCommand.COMMAND_ID:
- JoinCompleteCommand jcc = (JoinCompleteCommand) c;
- jcc.init(distributionManager);
- break;
}
}
@@ -297,23 +281,16 @@
return new LockControlCommand(keys, cacheName, implicit);
}
- public GetConsistentHashCommand buildGetConsistentHashCommand(Address joiner) {
- return new GetConsistentHashCommand(cacheName, joiner);
+ public RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type type, Address sender) {
+ return buildRehashControlCommand(type, sender, null, null);
}
- public InstallConsistentHashCommand buildInstallConsistentHashCommand(Address joiner, boolean starting) {
- return new InstallConsistentHashCommand(cacheName, joiner, starting);
+ public RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type type, Address sender, Map<Object, InternalCacheValue> state) {
+ return buildRehashControlCommand(type, sender, state, null);
}
- public PushStateCommand buildPushStateCommand(Address sender, Map<Object, InternalCacheValue> state) {
- return new PushStateCommand(cacheName, sender, state);
+ public RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type type, Address sender, Map<Object, InternalCacheValue> state, ConsistentHash consistentHash) {
+ return new RehashControlCommand(cacheName, type, sender, state, consistentHash);
}
- public PullStateCommand buildPullStateCommand(Address requestor, ConsistentHash newCH) {
- return new PullStateCommand(cacheName, requestor, newCH);
- }
-
- public JoinCompleteCommand buildJoinCompleteCommand(Address joiner) {
- return new JoinCompleteCommand(cacheName, joiner);
- }
}
Modified: trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java 2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java 2009-08-18 16:19:17 UTC (rev 704)
@@ -1,12 +1,8 @@
package org.infinispan.commands;
import org.infinispan.CacheException;
-import org.infinispan.commands.control.GetConsistentHashCommand;
-import org.infinispan.commands.control.InstallConsistentHashCommand;
-import org.infinispan.commands.control.JoinCompleteCommand;
import org.infinispan.commands.control.LockControlCommand;
-import org.infinispan.commands.control.PullStateCommand;
-import org.infinispan.commands.control.PushStateCommand;
+import org.infinispan.commands.control.RehashControlCommand;
import org.infinispan.commands.control.StateTransferControlCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
@@ -105,21 +101,9 @@
case ClusteredGetCommand.COMMAND_ID:
command = new ClusteredGetCommand();
break;
- case GetConsistentHashCommand.COMMAND_ID:
- command = new GetConsistentHashCommand();
+ case RehashControlCommand.COMMAND_ID:
+ command = new RehashControlCommand(transport);
break;
- case InstallConsistentHashCommand.COMMAND_ID:
- command = new InstallConsistentHashCommand();
- break;
- case PushStateCommand.COMMAND_ID:
- command = new PushStateCommand();
- break;
- case PullStateCommand.COMMAND_ID:
- command = new PullStateCommand(transport);
- break;
- case JoinCompleteCommand.COMMAND_ID:
- command = new JoinCompleteCommand();
- break;
default:
throw new CacheException("Unknown command id " + id + "!");
}
Deleted: trunk/core/src/main/java/org/infinispan/commands/control/GetConsistentHashCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/GetConsistentHashCommand.java 2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/commands/control/GetConsistentHashCommand.java 2009-08-18 16:19:17 UTC (rev 704)
@@ -1,59 +0,0 @@
-package org.infinispan.commands.control;
-
-import org.infinispan.commands.remote.BaseRpcCommand;
-import org.infinispan.context.InvocationContext;
-import org.infinispan.distribution.DistributionManager;
-import org.infinispan.marshall.Ids;
-import org.infinispan.marshall.Marshallable;
-import org.infinispan.marshall.exts.ReplicableCommandExternalizer;
-import org.infinispan.remoting.transport.Address;
-
-/**
- * Retrieves a consistent hash instance from the distribution manager. This command is always sent to the coordinator
- * by a new joiner.
- * <p/>
- * // TODO rename to GET_ADDRESS_LIST_FROM_COORD, document accordingly
- *
- * @author Manik Surtani
- * @since 4.0
- */
- at Marshallable(externalizer = ReplicableCommandExternalizer.class, id = Ids.GET_CONSISTENT_HASH_COMMAND)
-public class GetConsistentHashCommand extends BaseRpcCommand {
-
- public static final int COMMAND_ID = 17;
- DistributionManager distributionManager;
- Address joiner;
-
- public GetConsistentHashCommand() {
- }
-
- public GetConsistentHashCommand(String cacheName, Address joiner) {
- super(cacheName);
- this.joiner = joiner;
- }
-
- public GetConsistentHashCommand(DistributionManager distributionManager) {
- this.distributionManager = distributionManager;
- }
-
- public void initialize(DistributionManager distributionManager) {
- this.distributionManager = distributionManager;
- }
-
- public Object perform(InvocationContext ctx) throws Throwable {
- return distributionManager.requestPermissionToJoin(joiner);
- }
-
- public byte getCommandId() {
- return COMMAND_ID;
- }
-
- public Object[] getParameters() {
- return new Object[]{cacheName, joiner};
- }
-
- public void setParameters(int commandId, Object[] parameters) {
- cacheName = (String) parameters[0];
- joiner = (Address) parameters[1];
- }
-}
Deleted: trunk/core/src/main/java/org/infinispan/commands/control/InstallConsistentHashCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/InstallConsistentHashCommand.java 2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/commands/control/InstallConsistentHashCommand.java 2009-08-18 16:19:17 UTC (rev 704)
@@ -1,58 +0,0 @@
-package org.infinispan.commands.control;
-
-import org.infinispan.commands.remote.BaseRpcCommand;
-import org.infinispan.context.InvocationContext;
-import org.infinispan.distribution.DistributionManager;
-import org.infinispan.marshall.Ids;
-import org.infinispan.marshall.Marshallable;
-import org.infinispan.marshall.exts.ReplicableCommandExternalizer;
-import org.infinispan.remoting.transport.Address;
-
-/**
- * Installs a consistent hash in a distribution manager
- * <p/>
- * // TODO rename to INFORM_REHASH_ON_JOIN
- *
- * @author Manik Surtani
- * @since 4.0
- */
- at Marshallable(externalizer = ReplicableCommandExternalizer.class, id = Ids.INSTALL_CONSISTENT_HASH_COMMAND)
-public class InstallConsistentHashCommand extends BaseRpcCommand {
-
- public static final int COMMAND_ID = 18;
- DistributionManager distributionManager;
- Address joiner;
- boolean starting; // if true the rehash is starting; if false it has completed.
-
- public InstallConsistentHashCommand() {
- }
-
- public InstallConsistentHashCommand(String cacheName, Address joiner, boolean starting) {
- super(cacheName);
- this.joiner = joiner;
- this.starting = starting;
- }
-
- public void initialize(DistributionManager distributionManager) {
- this.distributionManager = distributionManager;
- }
-
- public Object perform(InvocationContext ctx) throws Throwable {
- distributionManager.informRehashOnJoin(joiner, starting);
- return null;
- }
-
- public byte getCommandId() {
- return COMMAND_ID;
- }
-
- public Object[] getParameters() {
- return new Object[]{cacheName, joiner, starting};
- }
-
- public void setParameters(int commandId, Object[] parameters) {
- cacheName = (String) parameters[0];
- joiner = (Address) parameters[1];
- starting = (Boolean) parameters[2];
- }
-}
Deleted: trunk/core/src/main/java/org/infinispan/commands/control/JoinCompleteCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/JoinCompleteCommand.java 2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/commands/control/JoinCompleteCommand.java 2009-08-18 16:19:17 UTC (rev 704)
@@ -1,52 +0,0 @@
-package org.infinispan.commands.control;
-
-import org.infinispan.commands.remote.BaseRpcCommand;
-import org.infinispan.context.InvocationContext;
-import org.infinispan.distribution.DistributionManager;
-import org.infinispan.marshall.Ids;
-import org.infinispan.marshall.Marshallable;
-import org.infinispan.marshall.exts.ReplicableCommandExternalizer;
-import org.infinispan.remoting.transport.Address;
-
-/**
- * // TODO: Manik: Document this
- *
- * @author Manik Surtani
- * @since 4.0
- */
- at Marshallable(externalizer = ReplicableCommandExternalizer.class, id = Ids.JOIN_COMPLETE_COMMAND)
-public class JoinCompleteCommand extends BaseRpcCommand {
- public static final int COMMAND_ID = 21;
- Address joiner;
- DistributionManager distributionManager;
-
- public JoinCompleteCommand() {
- }
-
- public JoinCompleteCommand(String cacheName, Address joiner) {
- super(cacheName);
- this.joiner = joiner;
- }
-
- public void init(DistributionManager distributionManager) {
- this.distributionManager = distributionManager;
- }
-
- public Object perform(InvocationContext ctx) throws Throwable {
- distributionManager.notifyJoinComplete(joiner);
- return null;
- }
-
- public byte getCommandId() {
- return COMMAND_ID;
- }
-
- public Object[] getParameters() {
- return new Object[]{cacheName, joiner};
- }
-
- public void setParameters(int commandId, Object[] parameters) {
- cacheName = (String) parameters[0];
- joiner = (Address) parameters[1];
- }
-}
Deleted: trunk/core/src/main/java/org/infinispan/commands/control/PullStateCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/PullStateCommand.java 2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/commands/control/PullStateCommand.java 2009-08-18 16:19:17 UTC (rev 704)
@@ -1,121 +0,0 @@
-package org.infinispan.commands.control;
-
-import org.infinispan.commands.remote.BaseRpcCommand;
-import org.infinispan.config.Configuration;
-import org.infinispan.container.DataContainer;
-import org.infinispan.container.entries.InternalCacheEntry;
-import org.infinispan.container.entries.InternalCacheValue;
-import org.infinispan.context.InvocationContext;
-import org.infinispan.distribution.ConsistentHash;
-import org.infinispan.distribution.DistributionManager;
-import org.infinispan.loaders.CacheLoaderManager;
-import org.infinispan.loaders.CacheStore;
-import org.infinispan.marshall.Ids;
-import org.infinispan.marshall.Marshallable;
-import org.infinispan.marshall.exts.ReplicableCommandExternalizer;
-import org.infinispan.remoting.transport.Address;
-import org.infinispan.remoting.transport.Transport;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * // TODO: Manik: Document this
- *
- * @author Manik Surtani
- * @since 4.0
- */
- at Marshallable(externalizer = ReplicableCommandExternalizer.class, id = Ids.PULL_STATE_COMMAND)
-public class PullStateCommand extends BaseRpcCommand {
-
- public static final int COMMAND_ID = 19;
- Address requestor;
- DataContainer dataContainer;
- CacheLoaderManager clm;
- Transport transport;
- ConsistentHash newCH, oldCH;
- Address self;
- DistributionManager distributionManager;
- Configuration configuration;
- int numCopies;
-
- public PullStateCommand() {
- }
-
- public PullStateCommand(String cacheName, Address requestor, ConsistentHash newCH) {
- super(cacheName);
- this.requestor = requestor;
- this.newCH = newCH;
- }
-
- public PullStateCommand(Transport transport) {
- this.transport = transport;
- }
-
- public void init(DataContainer dataContainer, CacheLoaderManager clm, DistributionManager distributionManager, Configuration c) {
- this.dataContainer = dataContainer;
- this.clm = clm;
- this.distributionManager = distributionManager;
- this.configuration = c;
- }
-
- /**
- * @param ctx invocation context
- * @return Should return a Map<Object, InternalCacheValue>
- * @throws Throwable
- */
- public Object perform(InvocationContext ctx) throws Throwable {
- self = transport.getAddress();
- oldCH = distributionManager.getConsistentHash();
- numCopies = configuration.getNumOwners();
-
- Map<Object, InternalCacheValue> state = new HashMap<Object, InternalCacheValue>();
- for (InternalCacheEntry ice : dataContainer) {
- Object k = ice.getKey();
- if (shouldAddToMap(k)) {
- state.put(k, ice.toInternalCacheValue());
- }
- }
-
- CacheStore cacheStore = getCacheStore();
- if (cacheStore != null) {
- for (InternalCacheEntry ice : cacheStore.loadAll()) {
- Object k = ice.getKey();
- if (shouldAddToMap(k) && !state.containsKey(k)) {
- state.put(k, ice.toInternalCacheValue());
- }
- }
- }
- return state;
- }
-
- final boolean shouldAddToMap(Object k) {
- // if the current address is the current "owner" of this key (in old_ch), and the requestor is in the owner list
- // in new_ch, then add this to the map.
- List<Address> oldOwnerList = oldCH.locate(k, numCopies);
- if (oldOwnerList.size() > 0 && self.equals(oldOwnerList.get(0))) {
- List<Address> newOwnerList = newCH.locate(k, numCopies);
- if (newOwnerList.contains(requestor)) return true;
- }
- return false;
- }
-
- final CacheStore getCacheStore() {
- return clm != null && clm.isEnabled() ? clm.getCacheStore() : null;
- }
-
- public byte getCommandId() {
- return COMMAND_ID;
- }
-
- public Object[] getParameters() {
- return new Object[]{cacheName, requestor, newCH};
- }
-
- public void setParameters(int commandId, Object[] parameters) {
- cacheName = (String) parameters[0];
- requestor = (Address) parameters[1];
- newCH = (ConsistentHash) parameters[2];
- }
-}
Deleted: trunk/core/src/main/java/org/infinispan/commands/control/PushStateCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/PushStateCommand.java 2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/commands/control/PushStateCommand.java 2009-08-18 16:19:17 UTC (rev 704)
@@ -1,53 +0,0 @@
-package org.infinispan.commands.control;
-
-import org.infinispan.commands.remote.BaseRpcCommand;
-import org.infinispan.container.entries.InternalCacheValue;
-import org.infinispan.context.InvocationContext;
-import org.infinispan.marshall.Ids;
-import org.infinispan.marshall.Marshallable;
-import org.infinispan.marshall.exts.ReplicableCommandExternalizer;
-import org.infinispan.remoting.transport.Address;
-
-import java.util.Map;
-
-/**
- * // TODO: Manik: Document this
- *
- * @author Manik Surtani
- * @since 4.0
- */
- at Marshallable(externalizer = ReplicableCommandExternalizer.class, id = Ids.PUSH_STATE_COMMAND)
-public class PushStateCommand extends BaseRpcCommand {
-
- public static final int COMMAND_ID = 20;
- Address sender;
- Map<Object, InternalCacheValue> state;
-
- public PushStateCommand() {
- }
-
- public PushStateCommand(String cacheName, Address sender, Map<Object, InternalCacheValue> state) {
- super(cacheName);
- this.sender = sender;
- this.state = state;
- }
-
- public Object perform(InvocationContext ctx) throws Throwable {
- return null; // TODO: Customise this generated block
- }
-
- public byte getCommandId() {
- return COMMAND_ID;
- }
-
- public Object[] getParameters() {
- return new Object[]{cacheName, sender, state};
- }
-
- @SuppressWarnings("unchecked")
- public void setParameters(int commandId, Object[] parameters) {
- cacheName = (String) parameters[0];
- sender = (Address) parameters[1];
- state = (Map<Object, InternalCacheValue>) parameters[2];
- }
-}
Added: trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java 2009-08-18 16:19:17 UTC (rev 704)
@@ -0,0 +1,157 @@
+package org.infinispan.commands.control;
+
+import org.infinispan.CacheException;
+import org.infinispan.commands.remote.BaseRpcCommand;
+import org.infinispan.config.Configuration;
+import org.infinispan.container.DataContainer;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.container.entries.InternalCacheValue;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.distribution.ConsistentHash;
+import org.infinispan.distribution.DistributionManager;
+import org.infinispan.loaders.CacheLoaderException;
+import org.infinispan.loaders.CacheStore;
+import org.infinispan.marshall.Ids;
+import org.infinispan.marshall.Marshallable;
+import org.infinispan.marshall.exts.ReplicableCommandExternalizer;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.remoting.transport.Transport;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A control command to coordinate rehashes that may occur when nodes join or leave a cluster, when DIST is used as a
+ * cache mode.
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+ at Marshallable(externalizer = ReplicableCommandExternalizer.class, id = Ids.REHASH_CONTROL_COMMAND)
+public class RehashControlCommand extends BaseRpcCommand {
+
+ public static final int COMMAND_ID = 17;
+
+ public enum Type {
+ JOIN_REQ, JOIN_REHASH_START, JOIN_REHASH_END, JOIN_COMPLETE, JOIN_ABORT, PULL_STATE, PUSH_STATE
+ }
+
+ Type type;
+ Address sender;
+ Map<Object, InternalCacheValue> state;
+ ConsistentHash consistentHash;
+
+ // cache components
+ DistributionManager distributionManager;
+ Transport transport;
+ Configuration configuration;
+ DataContainer dataContainer;
+
+ public RehashControlCommand(String cacheName, Type type, Address sender, Map<Object, InternalCacheValue> state, ConsistentHash consistentHash) {
+ super(cacheName);
+ this.type = type;
+ this.sender = sender;
+ this.state = state;
+ this.consistentHash = consistentHash;
+ }
+
+ public RehashControlCommand() {
+ }
+
+ public RehashControlCommand(Transport transport) {
+ this.transport = transport;
+ }
+
+ public void init(DistributionManager distributionManager, Configuration configuration, DataContainer dataContainer) {
+ this.distributionManager = distributionManager;
+ this.configuration = configuration;
+ this.dataContainer = dataContainer;
+ }
+
+ public Object perform(InvocationContext ctx) throws Throwable {
+ switch (type) {
+ case JOIN_REQ:
+ return distributionManager.requestPermissionToJoin(sender);
+ case JOIN_REHASH_START:
+ distributionManager.informRehashOnJoin(sender, true);
+ return null;
+ case JOIN_REHASH_END:
+ distributionManager.informRehashOnJoin(sender, false);
+ return null;
+ case JOIN_COMPLETE:
+ distributionManager.notifyJoinComplete(sender);
+ return null;
+ case PULL_STATE:
+ return pullState();
+ case PUSH_STATE:
+ return pushState();
+ }
+ throw new CacheException("Unknown rehash control command type " + type);
+ }
+
+ public Map<Object, InternalCacheValue> pullState() throws CacheLoaderException {
+ Address self = transport.getAddress();
+ ConsistentHash oldCH = distributionManager.getConsistentHash();
+ int numCopies = configuration.getNumOwners();
+
+ Map<Object, InternalCacheValue> state = new HashMap<Object, InternalCacheValue>();
+ for (InternalCacheEntry ice : dataContainer) {
+ Object k = ice.getKey();
+ if (shouldAddToMap(k, oldCH, numCopies, self)) state.put(k, ice.toInternalCacheValue());
+ }
+
+ CacheStore cacheStore = distributionManager.getCacheStoreForRehashing();
+ if (cacheStore != null) {
+ for (InternalCacheEntry ice : cacheStore.loadAll()) {
+ Object k = ice.getKey();
+ if (shouldAddToMap(k, oldCH, numCopies, self) && !state.containsKey(k))
+ state.put(k, ice.toInternalCacheValue());
+ }
+ }
+ return state;
+ }
+
+ final boolean shouldAddToMap(Object k, ConsistentHash oldCH, int numCopies, Address self) {
+ // if the current address is the current "owner" of this key (in old_ch), and the requestor is in the owner list
+ // in new_ch, then add this to the map.
+ List<Address> oldOwnerList = oldCH.locate(k, numCopies);
+ if (oldOwnerList.size() > 0 && self.equals(oldOwnerList.get(0))) {
+ List<Address> newOwnerList = consistentHash.locate(k, numCopies);
+ if (newOwnerList.contains(sender)) return true;
+ }
+ return false;
+ }
+
+ public Object pushState() {
+ throw new RuntimeException("implement me");
+ }
+
+ public byte getCommandId() {
+ return COMMAND_ID;
+ }
+
+ public Object[] getParameters() {
+ return new Object[]{cacheName, (byte) type.ordinal(), sender, state, consistentHash};
+ }
+
+ @SuppressWarnings("unchecked")
+ public void setParameters(int commandId, Object[] parameters) {
+ int i = 0;
+ cacheName = (String) parameters[i++];
+ type = Type.values()[(Byte) parameters[i++]];
+ sender = (Address) parameters[i++];
+ state = (Map<Object, InternalCacheValue>) parameters[i++];
+ consistentHash = (ConsistentHash) parameters[i++];
+ }
+
+ @Override
+ public String toString() {
+ return "RehashControlCommand{" +
+ "type=" + type +
+ ", sender=" + sender +
+ ", state=" + state +
+ ", consistentHash=" + consistentHash +
+ '}';
+ }
+}
Property changes on: trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java 2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java 2009-08-18 16:19:17 UTC (rev 704)
@@ -7,18 +7,58 @@
import java.util.Map;
/**
- * A consistent hash algorithm
+ * A consistent hash algorithm implementation. Implementations would typically be constructed via reflection so should
+ * implement a public, no-arg constructor.
*
* @author Manik Surtani
* @since 4.0
*/
public interface ConsistentHash {
+ /**
+ * Sets the collection of cache addresses in the cluster. The implementation should store these internally and use
+ * these to locate keys.
+ *
+ * @param caches caches in cluster.
+ */
void setCaches(Collection<Address> caches);
+ /**
+ * Should return a collection of cache addresses in the cluster.
+ *
+ * @return collection of cache addresses
+ */
Collection<Address> getCaches();
+ /**
+ * Locates a key, given a replication count (number of copies).
+ *
+ * @param key key to locate
+ * @param replCount replication count (number of copies)
+ * @return a list of addresses where the key resides, where this list is a subset of the addresses set in {@link
+ * #setCaches(java.util.Collection)}. Should never be null, and should contain replCount elements or the max
+ * number of caches available, whichever is smaller.
+ */
List<Address> locate(Object key, int replCount);
+ /**
+ * The logical equivalent of calling {@link #locate(Object, int)} multiple times for each key in the collection of
+ * keys. Implementations may be optimised for such a bulk lookup, or may just repeatedly call {@link #locate(Object,
+ * int)}.
+ *
+ * @param keys keys to locate
+ * @param replCount replication count (number of copies) for each key
+ * @return Map of locations, keyed on key.
+ */
Map<Object, List<Address>> locateAll(Collection<Object> keys, int replCount);
+
+ /**
+ * Tests whether a group of addresses are in the same subspace of the hash space. Addresses are in the same subspace
+ * if an arbitrary key mapped to one address could also be mapped to the other.
+ *
+ * @param a1 address to test
+ * @param a2 address to test
+ * @return true of the two addresses are in the same subspace, false otherwise.
+ */
+ boolean isInSameSubspace(Address a1, Address a2);
}
Modified: trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java 2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java 2009-08-18 16:19:17 UTC (rev 704)
@@ -137,4 +137,8 @@
"addresses (in order of hash space position)=" + positions.values() +
'}';
}
+
+ public boolean isInSameSubspace(Address a1, Address a2) {
+ throw new UnsupportedOperationException("TODO Implement me!");
+ }
}
Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java 2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java 2009-08-18 16:19:17 UTC (rev 704)
@@ -4,6 +4,7 @@
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
+import org.infinispan.loaders.CacheStore;
import org.infinispan.remoting.transport.Address;
import java.util.Collection;
@@ -93,5 +94,12 @@
* @param starting
*/
void informRehashOnJoin(Address joiner, boolean starting);
+
+ /**
+ * Retrieves a cache store if one is available and set up for use in rehashing. May return null!
+ *
+ * @return a cache store is one is available and configured for use in rehashing, or null otherwise.
+ */
+ CacheStore getCacheStoreForRehashing();
}
Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2009-08-18 16:19:17 UTC (rev 704)
@@ -13,6 +13,8 @@
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.interceptors.InterceptorChain;
+import org.infinispan.loaders.CacheLoaderManager;
+import org.infinispan.loaders.CacheStore;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
@@ -52,6 +54,7 @@
private final boolean trace = log.isTraceEnabled();
Configuration configuration;
ConsistentHash consistentHash;
+ CacheLoaderManager cacheLoaderManager;
RpcManager rpcManager;
CacheManagerNotifier notifier;
int replCount;
@@ -83,7 +86,9 @@
@Inject
public void init(Configuration configuration, RpcManager rpcManager, CacheManagerNotifier notifier, CommandsFactory cf,
- DataContainer dataContainer, InterceptorChain interceptorChain, InvocationContextContainer icc) {
+ DataContainer dataContainer, InterceptorChain interceptorChain, InvocationContextContainer icc,
+ CacheLoaderManager cacheLoaderManager) {
+ this.cacheLoaderManager = cacheLoaderManager;
this.configuration = configuration;
this.rpcManager = rpcManager;
this.notifier = notifier;
@@ -251,4 +256,10 @@
rehash(e.getNewMembers(), e.getOldMembers());
}
}
+
+ public CacheStore getCacheStoreForRehashing() {
+ if (cacheLoaderManager == null || !cacheLoaderManager.isEnabled() || cacheLoaderManager.isShared())
+ return null;
+ return cacheLoaderManager.getCacheStore();
+ }
}
Modified: trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java 2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java 2009-08-18 16:19:17 UTC (rev 704)
@@ -2,7 +2,8 @@
import org.infinispan.CacheException;
import org.infinispan.commands.CommandsFactory;
-import org.infinispan.commands.control.PullStateCommand;
+import org.infinispan.commands.control.RehashControlCommand;
+import static org.infinispan.commands.control.RehashControlCommand.Type.*;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.WriteCommand;
@@ -15,7 +16,7 @@
import org.infinispan.interceptors.InterceptorChain;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
-import org.infinispan.remoting.rpc.ResponseMode;
+import static org.infinispan.remoting.rpc.ResponseMode.SYNCHRONOUS;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.Util;
@@ -73,6 +74,21 @@
self = rpcManager.getTransport().getAddress();
}
+ @SuppressWarnings("unchecked")
+ private List<Address> parseResponses(List<Response> resp) {
+ for (Response r : resp) {
+ if (r instanceof SuccessfulResponse) {
+ return (List<Address>) ((SuccessfulResponse) r).getResponseValue();
+ }
+ }
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map<Object, InternalCacheValue> getStateFromResponse(SuccessfulResponse r) {
+ return (Map<Object, InternalCacheValue>) r.getResponseValue();
+ }
+
protected void performRehash() throws Exception {
log.trace("Starting rehash on new joiner");
boolean unlocked = false;
@@ -87,15 +103,10 @@
long giveupTime = System.currentTimeMillis() + maxWaitTime;
do {
log.trace("Requesting old consistent hash from coordinator");
- List<Response> resp = rpcManager.invokeRemotely(coordinator(), commandsFactory.buildGetConsistentHashCommand(self),
- ResponseMode.SYNCHRONOUS, 100000, true);
- List<Address> addresses = null;
- for (Response r : resp) {
- if (r instanceof SuccessfulResponse) {
- addresses = (List<Address>) ((SuccessfulResponse) r).getResponseValue();
- break;
- }
- }
+ List<Response> resp = rpcManager.invokeRemotely(coordinator(),
+ commandsFactory.buildRehashControlCommand(JOIN_REQ, self),
+ SYNCHRONOUS, 100000, true);
+ List<Address> addresses = parseResponses(resp);
log.trace("Retrieved old consistent hash address list {0}", addresses);
if (addresses == null) {
@@ -119,21 +130,21 @@
transactionLogger.enable();
// 4. Broadcast new temp CH
- rpcManager.broadcastRpcCommand(commandsFactory.buildInstallConsistentHashCommand(self, true), true, true);
+ rpcManager.broadcastRpcCommand(commandsFactory.buildRehashControlCommand(JOIN_REHASH_START, self), true, true);
// 5. txLogger being enabled will cause CLusteredGetCommands to return uncertain responses.
// 6. pull state from everyone.
Address myAddress = rpcManager.getTransport().getAddress();
- PullStateCommand cmd = commandsFactory.buildPullStateCommand(myAddress, chNew);
+ RehashControlCommand cmd = commandsFactory.buildRehashControlCommand(PULL_STATE, myAddress, null, chNew);
// TODO I should be able to process state chunks from different nodes simultaneously!!
// TODO I should only send this pull state request to nodes which I know will send me state. Not everyone in chOld!!
- List<Response> resps = rpcManager.invokeRemotely(chOld.getCaches(), cmd, ResponseMode.SYNCHRONOUS, 10000, true);
+ List<Response> resps = rpcManager.invokeRemotely(chOld.getCaches(), cmd, SYNCHRONOUS, 10000, true);
// 7. Apply state
for (Response r : resps) {
if (r instanceof SuccessfulResponse) {
- Map<Object, InternalCacheValue> state = (Map<Object, InternalCacheValue>) ((SuccessfulResponse) r).getResponseValue();
+ Map<Object, InternalCacheValue> state = getStateFromResponse((SuccessfulResponse) r);
for (Map.Entry<Object, InternalCacheValue> e : state.entrySet()) {
if (chNew.locate(e.getKey(), configuration.getNumOwners()).contains(myAddress)) {
InternalCacheValue v = e.getValue();
@@ -164,8 +175,8 @@
// 10.
// TODO this phase should also "tell" the coord that the join is complete so that other waiting joiners
// may proceed. Ideally another command, directed to the coord.
- rpcManager.broadcastRpcCommand(commandsFactory.buildInstallConsistentHashCommand(self, false), true, true);
- rpcManager.invokeRemotely(coordinator(), commandsFactory.buildJoinCompleteCommand(self), ResponseMode.SYNCHRONOUS,
+ rpcManager.broadcastRpcCommand(commandsFactory.buildRehashControlCommand(JOIN_REHASH_END, self), true, true);
+ rpcManager.invokeRemotely(coordinator(), commandsFactory.buildRehashControlCommand(JOIN_COMPLETE, self), SYNCHRONOUS,
100000, true);
// 11.
Modified: trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java 2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java 2009-08-18 16:19:17 UTC (rev 704)
@@ -16,7 +16,8 @@
import java.util.Set;
/**
- * // TODO: Manik: Document this
+ * A delegating wrapper that locates keys by getting a union of locations reported by two other ConsistentHash
+ * implementations it delegates to.
*
* @author Manik Surtani
* @since 4.0
@@ -68,4 +69,8 @@
return new UnionConsistentHash((ConsistentHash) input.readObject(), (ConsistentHash) input.readObject());
}
}
+
+ public boolean isInSameSubspace(Address a1, Address a2) {
+ throw new UnsupportedOperationException("Not supported by this impl");
+ }
}
Modified: trunk/core/src/main/java/org/infinispan/marshall/Ids.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/marshall/Ids.java 2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/marshall/Ids.java 2009-08-18 16:19:17 UTC (rev 704)
@@ -103,11 +103,8 @@
static final byte ATOMIC_REMOVE_OPERATION = 48;
static final byte ATOMIC_CLEAR_OPERATION = 49;
- static final byte GET_CONSISTENT_HASH_COMMAND = 50;
- static final byte PUSH_STATE_COMMAND = 51;
- static final byte PULL_STATE_COMMAND = 52;
- static final byte INSTALL_CONSISTENT_HASH_COMMAND = 53;
- static final byte DEFAULT_CONSISTENT_HASH = 54;
- static final byte UNION_CONSISTENT_HASH = 55;
- static final byte JOIN_COMPLETE_COMMAND = 56;
+ static final byte REHASH_CONTROL_COMMAND = 50;
+ static final byte DEFAULT_CONSISTENT_HASH = 51;
+ static final byte UNION_CONSISTENT_HASH = 52;
+ static final byte JOIN_COMPLETE_COMMAND = 53;
}
Modified: trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java 2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java 2009-08-18 16:19:17 UTC (rev 704)
@@ -28,12 +28,8 @@
import org.infinispan.atomic.PutOperation;
import org.infinispan.atomic.RemoveOperation;
import org.infinispan.commands.RemoteCommandFactory;
-import org.infinispan.commands.control.GetConsistentHashCommand;
-import org.infinispan.commands.control.InstallConsistentHashCommand;
-import org.infinispan.commands.control.JoinCompleteCommand;
import org.infinispan.commands.control.LockControlCommand;
-import org.infinispan.commands.control.PullStateCommand;
-import org.infinispan.commands.control.PushStateCommand;
+import org.infinispan.commands.control.RehashControlCommand;
import org.infinispan.commands.control.StateTransferControlCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
@@ -149,11 +145,7 @@
MARSHALLABLES.add(RollbackCommand.class.getName());
MARSHALLABLES.add(InvalidateL1Command.class.getName());
MARSHALLABLES.add(LockControlCommand.class.getName());
- MARSHALLABLES.add(GetConsistentHashCommand.class.getName());
- MARSHALLABLES.add(PushStateCommand.class.getName());
- MARSHALLABLES.add(PullStateCommand.class.getName());
- MARSHALLABLES.add(InstallConsistentHashCommand.class.getName());
- MARSHALLABLES.add(JoinCompleteCommand.class.getName());
+ MARSHALLABLES.add(RehashControlCommand.class.getName());
MARSHALLABLES.add(ImmortalCacheEntry.class.getName());
MARSHALLABLES.add(MortalCacheEntry.class.getName());
Modified: trunk/core/src/test/java/org/infinispan/test/fwk/TcpMPingEnvironmentTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/test/fwk/TcpMPingEnvironmentTest.java 2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/test/java/org/infinispan/test/fwk/TcpMPingEnvironmentTest.java 2009-08-18 16:19:17 UTC (rev 704)
@@ -114,8 +114,6 @@
private void initiChannel(JChannel channel) throws Exception {
openedChannles.add(channel);
channel.setOpt(org.jgroups.Channel.LOCAL, false);
- channel.setOpt(org.jgroups.Channel.AUTO_RECONNECT, true);
- channel.setOpt(org.jgroups.Channel.AUTO_GETSTATE, false);
channel.setOpt(org.jgroups.Channel.BLOCK, true);
channel.connect("someChannel");
}
More information about the infinispan-commits
mailing list