[infinispan-commits] Infinispan SVN: r704 - in trunk: core/src/main/java/org/infinispan/commands and 5 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Tue Aug 18 12:19:18 EDT 2009


Author: manik.surtani at jboss.com
Date: 2009-08-18 12:19:17 -0400 (Tue, 18 Aug 2009)
New Revision: 704

Added:
   trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
Removed:
   trunk/core/src/main/java/org/infinispan/commands/control/GetConsistentHashCommand.java
   trunk/core/src/main/java/org/infinispan/commands/control/InstallConsistentHashCommand.java
   trunk/core/src/main/java/org/infinispan/commands/control/JoinCompleteCommand.java
   trunk/core/src/main/java/org/infinispan/commands/control/PullStateCommand.java
   trunk/core/src/main/java/org/infinispan/commands/control/PushStateCommand.java
Modified:
   trunk/cachestore/jdbc/src/test/java/org/infinispan/loaders/jdbc/TableNameUniquenessTest.java
   trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java
   trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
   trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java
   trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java
   trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java
   trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
   trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
   trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
   trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java
   trunk/core/src/main/java/org/infinispan/marshall/Ids.java
   trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java
   trunk/core/src/test/java/org/infinispan/test/fwk/TcpMPingEnvironmentTest.java
Log:
More DIST work

Modified: trunk/cachestore/jdbc/src/test/java/org/infinispan/loaders/jdbc/TableNameUniquenessTest.java
===================================================================
--- trunk/cachestore/jdbc/src/test/java/org/infinispan/loaders/jdbc/TableNameUniquenessTest.java	2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/cachestore/jdbc/src/test/java/org/infinispan/loaders/jdbc/TableNameUniquenessTest.java	2009-08-18 16:19:17 UTC (rev 704)
@@ -72,6 +72,7 @@
       assertNoOverlapingState(first, second, firstCs, secondCs);
    }
 
+   @SuppressWarnings("unchecked")
    public void testForMixedCacheStore() throws Exception {
       CacheManager cm = new DefaultCacheManager("configs/mixed.xml");
       Cache first = cm.getCache("first");
@@ -95,13 +96,13 @@
       Person person1 = new Person(29, "Mircea");
       Person person2 = new Person(29, "Manik");
 
-      first.put("k",person1);
+      first.put("k", person1);
       assert firstCs.containsKey("k");
       assert !secondCs.containsKey("k");
       assert first.get("k").equals(person1);
       assert second.get("k") == null;
 
-      second.put("k2",person2);
+      second.put("k2", person2);
       assert second.get("k2").equals(person2);
       assert first.get("k2") == null;
    }
@@ -145,13 +146,13 @@
    }
 
    private void assertNoOverlapingState(Cache first, Cache second, CacheStore firstCs, CacheStore secondCs) throws CacheLoaderException {
-      first.put("k","v");
+      first.put("k", "v");
       assert firstCs.containsKey("k");
       assert !secondCs.containsKey("k");
       assert first.get("k").equals("v");
       assert second.get("k") == null;
 
-      second.put("k2","v2");
+      second.put("k2", "v2");
       assert second.get("k2").equals("v2");
       assert first.get("k2") == null;
    }

Modified: trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java	2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java	2009-08-18 16:19:17 UTC (rev 704)
@@ -21,12 +21,8 @@
  */
 package org.infinispan.commands;
 
-import org.infinispan.commands.control.GetConsistentHashCommand;
-import org.infinispan.commands.control.InstallConsistentHashCommand;
-import org.infinispan.commands.control.JoinCompleteCommand;
 import org.infinispan.commands.control.LockControlCommand;
-import org.infinispan.commands.control.PullStateCommand;
-import org.infinispan.commands.control.PushStateCommand;
+import org.infinispan.commands.control.RehashControlCommand;
 import org.infinispan.commands.control.StateTransferControlCommand;
 import org.infinispan.commands.read.EntrySetCommand;
 import org.infinispan.commands.read.GetKeyValueCommand;
@@ -120,13 +116,9 @@
 
    LockControlCommand buildLockControlCommand(Collection keys, boolean implicit);
 
-   GetConsistentHashCommand buildGetConsistentHashCommand(Address joiner);
+   RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type subtype, Address sender);
 
-   InstallConsistentHashCommand buildInstallConsistentHashCommand(Address joiner, boolean starting);
+   RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type subtype, Address sender, Map<Object, InternalCacheValue> state);
 
-   PushStateCommand buildPushStateCommand(Address sender, Map<Object, InternalCacheValue> state);
-
-   PullStateCommand buildPullStateCommand(Address requestor, ConsistentHash newCH);
-
-   JoinCompleteCommand buildJoinCompleteCommand(Address joiner);
+   RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type subtype, Address sender, Map<Object, InternalCacheValue> state, ConsistentHash consistentHash);
 }

Modified: trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java	2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java	2009-08-18 16:19:17 UTC (rev 704)
@@ -22,12 +22,8 @@
 package org.infinispan.commands;
 
 import org.infinispan.Cache;
-import org.infinispan.commands.control.GetConsistentHashCommand;
-import org.infinispan.commands.control.InstallConsistentHashCommand;
-import org.infinispan.commands.control.JoinCompleteCommand;
 import org.infinispan.commands.control.LockControlCommand;
-import org.infinispan.commands.control.PullStateCommand;
-import org.infinispan.commands.control.PushStateCommand;
+import org.infinispan.commands.control.RehashControlCommand;
 import org.infinispan.commands.control.StateTransferControlCommand;
 import org.infinispan.commands.read.EntrySetCommand;
 import org.infinispan.commands.read.GetKeyValueCommand;
@@ -274,22 +270,10 @@
             LockControlCommand lcc = (LockControlCommand) c;
             lcc.init(interceptorChain, icc, txTable);
             break;
-         case GetConsistentHashCommand.COMMAND_ID:
-            GetConsistentHashCommand gchc = (GetConsistentHashCommand) c;
-            gchc.initialize(distributionManager);
+         case RehashControlCommand.COMMAND_ID:
+            RehashControlCommand rcc = (RehashControlCommand) c;
+            rcc.init(distributionManager, configuration, dataContainer);
             break;
-         case InstallConsistentHashCommand.COMMAND_ID:
-            InstallConsistentHashCommand ichc = (InstallConsistentHashCommand) c;
-            ichc.initialize(distributionManager);
-            break;
-         case PullStateCommand.COMMAND_ID:
-            PullStateCommand psc = (PullStateCommand) c;
-            psc.init(dataContainer, cacheLoaderManager, distributionManager, configuration);
-            break;
-         case JoinCompleteCommand.COMMAND_ID:
-            JoinCompleteCommand jcc = (JoinCompleteCommand) c;
-            jcc.init(distributionManager);
-            break;
       }
    }
 
@@ -297,23 +281,16 @@
       return new LockControlCommand(keys, cacheName, implicit);
    }
 
-   public GetConsistentHashCommand buildGetConsistentHashCommand(Address joiner) {
-      return new GetConsistentHashCommand(cacheName, joiner);
+   public RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type type, Address sender) {
+      return buildRehashControlCommand(type, sender, null, null);
    }
 
-   public InstallConsistentHashCommand buildInstallConsistentHashCommand(Address joiner, boolean starting) {
-      return new InstallConsistentHashCommand(cacheName, joiner, starting);
+   public RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type type, Address sender, Map<Object, InternalCacheValue> state) {
+      return buildRehashControlCommand(type, sender, state, null);
    }
 
-   public PushStateCommand buildPushStateCommand(Address sender, Map<Object, InternalCacheValue> state) {
-      return new PushStateCommand(cacheName, sender, state);
+   public RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type type, Address sender, Map<Object, InternalCacheValue> state, ConsistentHash consistentHash) {
+      return new RehashControlCommand(cacheName, type, sender, state, consistentHash);
    }
 
-   public PullStateCommand buildPullStateCommand(Address requestor, ConsistentHash newCH) {
-      return new PullStateCommand(cacheName, requestor, newCH);
-   }
-
-   public JoinCompleteCommand buildJoinCompleteCommand(Address joiner) {
-      return new JoinCompleteCommand(cacheName, joiner);
-   }
 }

Modified: trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java	2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java	2009-08-18 16:19:17 UTC (rev 704)
@@ -1,12 +1,8 @@
 package org.infinispan.commands;
 
 import org.infinispan.CacheException;
-import org.infinispan.commands.control.GetConsistentHashCommand;
-import org.infinispan.commands.control.InstallConsistentHashCommand;
-import org.infinispan.commands.control.JoinCompleteCommand;
 import org.infinispan.commands.control.LockControlCommand;
-import org.infinispan.commands.control.PullStateCommand;
-import org.infinispan.commands.control.PushStateCommand;
+import org.infinispan.commands.control.RehashControlCommand;
 import org.infinispan.commands.control.StateTransferControlCommand;
 import org.infinispan.commands.read.GetKeyValueCommand;
 import org.infinispan.commands.remote.ClusteredGetCommand;
@@ -105,21 +101,9 @@
          case ClusteredGetCommand.COMMAND_ID:
             command = new ClusteredGetCommand();
             break;
-         case GetConsistentHashCommand.COMMAND_ID:
-            command = new GetConsistentHashCommand();
+         case RehashControlCommand.COMMAND_ID:
+            command = new RehashControlCommand(transport);
             break;
-         case InstallConsistentHashCommand.COMMAND_ID:
-            command = new InstallConsistentHashCommand();
-            break;
-         case PushStateCommand.COMMAND_ID:
-            command = new PushStateCommand();
-            break;
-         case PullStateCommand.COMMAND_ID:
-            command = new PullStateCommand(transport);
-            break;
-         case JoinCompleteCommand.COMMAND_ID:
-            command = new JoinCompleteCommand();
-            break;
          default:
             throw new CacheException("Unknown command id " + id + "!");
       }

Deleted: trunk/core/src/main/java/org/infinispan/commands/control/GetConsistentHashCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/GetConsistentHashCommand.java	2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/commands/control/GetConsistentHashCommand.java	2009-08-18 16:19:17 UTC (rev 704)
@@ -1,59 +0,0 @@
-package org.infinispan.commands.control;
-
-import org.infinispan.commands.remote.BaseRpcCommand;
-import org.infinispan.context.InvocationContext;
-import org.infinispan.distribution.DistributionManager;
-import org.infinispan.marshall.Ids;
-import org.infinispan.marshall.Marshallable;
-import org.infinispan.marshall.exts.ReplicableCommandExternalizer;
-import org.infinispan.remoting.transport.Address;
-
-/**
- * Retrieves a consistent hash instance from the distribution manager.  This command is always sent to the coordinator
- * by a new joiner.
- * <p/>
- * // TODO rename to GET_ADDRESS_LIST_FROM_COORD, document accordingly
- *
- * @author Manik Surtani
- * @since 4.0
- */
- at Marshallable(externalizer = ReplicableCommandExternalizer.class, id = Ids.GET_CONSISTENT_HASH_COMMAND)
-public class GetConsistentHashCommand extends BaseRpcCommand {
-
-   public static final int COMMAND_ID = 17;
-   DistributionManager distributionManager;
-   Address joiner;
-
-   public GetConsistentHashCommand() {
-   }
-
-   public GetConsistentHashCommand(String cacheName, Address joiner) {
-      super(cacheName);
-      this.joiner = joiner;
-   }
-
-   public GetConsistentHashCommand(DistributionManager distributionManager) {
-      this.distributionManager = distributionManager;
-   }
-
-   public void initialize(DistributionManager distributionManager) {
-      this.distributionManager = distributionManager;
-   }
-
-   public Object perform(InvocationContext ctx) throws Throwable {
-      return distributionManager.requestPermissionToJoin(joiner);
-   }
-
-   public byte getCommandId() {
-      return COMMAND_ID;
-   }
-
-   public Object[] getParameters() {
-      return new Object[]{cacheName, joiner};
-   }
-
-   public void setParameters(int commandId, Object[] parameters) {
-      cacheName = (String) parameters[0];
-      joiner = (Address) parameters[1];
-   }
-}

Deleted: trunk/core/src/main/java/org/infinispan/commands/control/InstallConsistentHashCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/InstallConsistentHashCommand.java	2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/commands/control/InstallConsistentHashCommand.java	2009-08-18 16:19:17 UTC (rev 704)
@@ -1,58 +0,0 @@
-package org.infinispan.commands.control;
-
-import org.infinispan.commands.remote.BaseRpcCommand;
-import org.infinispan.context.InvocationContext;
-import org.infinispan.distribution.DistributionManager;
-import org.infinispan.marshall.Ids;
-import org.infinispan.marshall.Marshallable;
-import org.infinispan.marshall.exts.ReplicableCommandExternalizer;
-import org.infinispan.remoting.transport.Address;
-
-/**
- * Installs a consistent hash in a distribution manager
- * <p/>
- * // TODO rename to INFORM_REHASH_ON_JOIN
- *
- * @author Manik Surtani
- * @since 4.0
- */
- at Marshallable(externalizer = ReplicableCommandExternalizer.class, id = Ids.INSTALL_CONSISTENT_HASH_COMMAND)
-public class InstallConsistentHashCommand extends BaseRpcCommand {
-
-   public static final int COMMAND_ID = 18;
-   DistributionManager distributionManager;
-   Address joiner;
-   boolean starting; // if true the rehash is starting; if false it has completed.
-
-   public InstallConsistentHashCommand() {
-   }
-
-   public InstallConsistentHashCommand(String cacheName, Address joiner, boolean starting) {
-      super(cacheName);
-      this.joiner = joiner;
-      this.starting = starting;
-   }
-
-   public void initialize(DistributionManager distributionManager) {
-      this.distributionManager = distributionManager;
-   }
-
-   public Object perform(InvocationContext ctx) throws Throwable {
-      distributionManager.informRehashOnJoin(joiner, starting);
-      return null;
-   }
-
-   public byte getCommandId() {
-      return COMMAND_ID;
-   }
-
-   public Object[] getParameters() {
-      return new Object[]{cacheName, joiner, starting};
-   }
-
-   public void setParameters(int commandId, Object[] parameters) {
-      cacheName = (String) parameters[0];
-      joiner = (Address) parameters[1];
-      starting = (Boolean) parameters[2];
-   }
-}

Deleted: trunk/core/src/main/java/org/infinispan/commands/control/JoinCompleteCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/JoinCompleteCommand.java	2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/commands/control/JoinCompleteCommand.java	2009-08-18 16:19:17 UTC (rev 704)
@@ -1,52 +0,0 @@
-package org.infinispan.commands.control;
-
-import org.infinispan.commands.remote.BaseRpcCommand;
-import org.infinispan.context.InvocationContext;
-import org.infinispan.distribution.DistributionManager;
-import org.infinispan.marshall.Ids;
-import org.infinispan.marshall.Marshallable;
-import org.infinispan.marshall.exts.ReplicableCommandExternalizer;
-import org.infinispan.remoting.transport.Address;
-
-/**
- * // TODO: Manik: Document this
- *
- * @author Manik Surtani
- * @since 4.0
- */
- at Marshallable(externalizer = ReplicableCommandExternalizer.class, id = Ids.JOIN_COMPLETE_COMMAND)
-public class JoinCompleteCommand extends BaseRpcCommand {
-   public static final int COMMAND_ID = 21;
-   Address joiner;
-   DistributionManager distributionManager;
-
-   public JoinCompleteCommand() {
-   }
-
-   public JoinCompleteCommand(String cacheName, Address joiner) {
-      super(cacheName);
-      this.joiner = joiner;
-   }
-
-   public void init(DistributionManager distributionManager) {
-      this.distributionManager = distributionManager;
-   }
-
-   public Object perform(InvocationContext ctx) throws Throwable {
-      distributionManager.notifyJoinComplete(joiner);
-      return null;
-   }
-
-   public byte getCommandId() {
-      return COMMAND_ID;
-   }
-
-   public Object[] getParameters() {
-      return new Object[]{cacheName, joiner};
-   }
-
-   public void setParameters(int commandId, Object[] parameters) {
-      cacheName = (String) parameters[0];
-      joiner = (Address) parameters[1];
-   }
-}

Deleted: trunk/core/src/main/java/org/infinispan/commands/control/PullStateCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/PullStateCommand.java	2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/commands/control/PullStateCommand.java	2009-08-18 16:19:17 UTC (rev 704)
@@ -1,121 +0,0 @@
-package org.infinispan.commands.control;
-
-import org.infinispan.commands.remote.BaseRpcCommand;
-import org.infinispan.config.Configuration;
-import org.infinispan.container.DataContainer;
-import org.infinispan.container.entries.InternalCacheEntry;
-import org.infinispan.container.entries.InternalCacheValue;
-import org.infinispan.context.InvocationContext;
-import org.infinispan.distribution.ConsistentHash;
-import org.infinispan.distribution.DistributionManager;
-import org.infinispan.loaders.CacheLoaderManager;
-import org.infinispan.loaders.CacheStore;
-import org.infinispan.marshall.Ids;
-import org.infinispan.marshall.Marshallable;
-import org.infinispan.marshall.exts.ReplicableCommandExternalizer;
-import org.infinispan.remoting.transport.Address;
-import org.infinispan.remoting.transport.Transport;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * // TODO: Manik: Document this
- *
- * @author Manik Surtani
- * @since 4.0
- */
- at Marshallable(externalizer = ReplicableCommandExternalizer.class, id = Ids.PULL_STATE_COMMAND)
-public class PullStateCommand extends BaseRpcCommand {
-
-   public static final int COMMAND_ID = 19;
-   Address requestor;
-   DataContainer dataContainer;
-   CacheLoaderManager clm;
-   Transport transport;
-   ConsistentHash newCH, oldCH;
-   Address self;
-   DistributionManager distributionManager;
-   Configuration configuration;
-   int numCopies;
-
-   public PullStateCommand() {
-   }
-
-   public PullStateCommand(String cacheName, Address requestor, ConsistentHash newCH) {
-      super(cacheName);
-      this.requestor = requestor;
-      this.newCH = newCH;
-   }
-
-   public PullStateCommand(Transport transport) {
-      this.transport = transport;
-   }
-
-   public void init(DataContainer dataContainer, CacheLoaderManager clm, DistributionManager distributionManager, Configuration c) {
-      this.dataContainer = dataContainer;
-      this.clm = clm;
-      this.distributionManager = distributionManager;
-      this.configuration = c;
-   }
-
-   /**
-    * @param ctx invocation context
-    * @return Should return a Map<Object, InternalCacheValue>
-    * @throws Throwable
-    */
-   public Object perform(InvocationContext ctx) throws Throwable {
-      self = transport.getAddress();
-      oldCH = distributionManager.getConsistentHash();
-      numCopies = configuration.getNumOwners();
-
-      Map<Object, InternalCacheValue> state = new HashMap<Object, InternalCacheValue>();
-      for (InternalCacheEntry ice : dataContainer) {
-         Object k = ice.getKey();
-         if (shouldAddToMap(k)) {
-            state.put(k, ice.toInternalCacheValue());
-         }
-      }
-
-      CacheStore cacheStore = getCacheStore();
-      if (cacheStore != null) {
-         for (InternalCacheEntry ice : cacheStore.loadAll()) {
-            Object k = ice.getKey();
-            if (shouldAddToMap(k) && !state.containsKey(k)) {
-               state.put(k, ice.toInternalCacheValue());
-            }
-         }
-      }
-      return state;
-   }
-
-   final boolean shouldAddToMap(Object k) {
-      // if the current address is the current "owner" of this key (in old_ch), and the requestor is in the owner list
-      // in new_ch, then add this to the map.
-      List<Address> oldOwnerList = oldCH.locate(k, numCopies);
-      if (oldOwnerList.size() > 0 && self.equals(oldOwnerList.get(0))) {
-         List<Address> newOwnerList = newCH.locate(k, numCopies);
-         if (newOwnerList.contains(requestor)) return true;
-      }
-      return false;
-   }
-
-   final CacheStore getCacheStore() {
-      return clm != null && clm.isEnabled() ? clm.getCacheStore() : null;
-   }
-
-   public byte getCommandId() {
-      return COMMAND_ID;
-   }
-
-   public Object[] getParameters() {
-      return new Object[]{cacheName, requestor, newCH};
-   }
-
-   public void setParameters(int commandId, Object[] parameters) {
-      cacheName = (String) parameters[0];
-      requestor = (Address) parameters[1];
-      newCH = (ConsistentHash) parameters[2];
-   }
-}

Deleted: trunk/core/src/main/java/org/infinispan/commands/control/PushStateCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/PushStateCommand.java	2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/commands/control/PushStateCommand.java	2009-08-18 16:19:17 UTC (rev 704)
@@ -1,53 +0,0 @@
-package org.infinispan.commands.control;
-
-import org.infinispan.commands.remote.BaseRpcCommand;
-import org.infinispan.container.entries.InternalCacheValue;
-import org.infinispan.context.InvocationContext;
-import org.infinispan.marshall.Ids;
-import org.infinispan.marshall.Marshallable;
-import org.infinispan.marshall.exts.ReplicableCommandExternalizer;
-import org.infinispan.remoting.transport.Address;
-
-import java.util.Map;
-
-/**
- * // TODO: Manik: Document this
- *
- * @author Manik Surtani
- * @since 4.0
- */
- at Marshallable(externalizer = ReplicableCommandExternalizer.class, id = Ids.PUSH_STATE_COMMAND)
-public class PushStateCommand extends BaseRpcCommand {
-
-   public static final int COMMAND_ID = 20;
-   Address sender;
-   Map<Object, InternalCacheValue> state;
-
-   public PushStateCommand() {
-   }
-
-   public PushStateCommand(String cacheName, Address sender, Map<Object, InternalCacheValue> state) {
-      super(cacheName);
-      this.sender = sender;
-      this.state = state;
-   }
-
-   public Object perform(InvocationContext ctx) throws Throwable {
-      return null;  // TODO: Customise this generated block
-   }
-
-   public byte getCommandId() {
-      return COMMAND_ID;
-   }
-
-   public Object[] getParameters() {
-      return new Object[]{cacheName, sender, state};
-   }
-
-   @SuppressWarnings("unchecked")
-   public void setParameters(int commandId, Object[] parameters) {
-      cacheName = (String) parameters[0];
-      sender = (Address) parameters[1];
-      state = (Map<Object, InternalCacheValue>) parameters[2];
-   }
-}

Added: trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java	2009-08-18 16:19:17 UTC (rev 704)
@@ -0,0 +1,157 @@
+package org.infinispan.commands.control;
+
+import org.infinispan.CacheException;
+import org.infinispan.commands.remote.BaseRpcCommand;
+import org.infinispan.config.Configuration;
+import org.infinispan.container.DataContainer;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.container.entries.InternalCacheValue;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.distribution.ConsistentHash;
+import org.infinispan.distribution.DistributionManager;
+import org.infinispan.loaders.CacheLoaderException;
+import org.infinispan.loaders.CacheStore;
+import org.infinispan.marshall.Ids;
+import org.infinispan.marshall.Marshallable;
+import org.infinispan.marshall.exts.ReplicableCommandExternalizer;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.remoting.transport.Transport;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A control command to coordinate rehashes that may occur when nodes join or leave a cluster, when DIST is used as a
+ * cache mode.
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+ at Marshallable(externalizer = ReplicableCommandExternalizer.class, id = Ids.REHASH_CONTROL_COMMAND)
+public class RehashControlCommand extends BaseRpcCommand {
+
+   public static final int COMMAND_ID = 17;
+
+   public enum Type {
+      JOIN_REQ, JOIN_REHASH_START, JOIN_REHASH_END, JOIN_COMPLETE, JOIN_ABORT, PULL_STATE, PUSH_STATE
+   }
+
+   Type type;
+   Address sender;
+   Map<Object, InternalCacheValue> state;
+   ConsistentHash consistentHash;
+
+   // cache components
+   DistributionManager distributionManager;
+   Transport transport;
+   Configuration configuration;
+   DataContainer dataContainer;
+
+   public RehashControlCommand(String cacheName, Type type, Address sender, Map<Object, InternalCacheValue> state, ConsistentHash consistentHash) {
+      super(cacheName);
+      this.type = type;
+      this.sender = sender;
+      this.state = state;
+      this.consistentHash = consistentHash;
+   }
+
+   public RehashControlCommand() {
+   }
+
+   public RehashControlCommand(Transport transport) {
+      this.transport = transport;
+   }
+
+   public void init(DistributionManager distributionManager, Configuration configuration, DataContainer dataContainer) {
+      this.distributionManager = distributionManager;
+      this.configuration = configuration;
+      this.dataContainer = dataContainer;
+   }
+
+   public Object perform(InvocationContext ctx) throws Throwable {
+      switch (type) {
+         case JOIN_REQ:
+            return distributionManager.requestPermissionToJoin(sender);
+         case JOIN_REHASH_START:
+            distributionManager.informRehashOnJoin(sender, true);
+            return null;
+         case JOIN_REHASH_END:
+            distributionManager.informRehashOnJoin(sender, false);
+            return null;
+         case JOIN_COMPLETE:
+            distributionManager.notifyJoinComplete(sender);
+            return null;
+         case PULL_STATE:
+            return pullState();
+         case PUSH_STATE:
+            return pushState();
+      }
+      throw new CacheException("Unknown rehash control command type " + type);
+   }
+
+   public Map<Object, InternalCacheValue> pullState() throws CacheLoaderException {
+      Address self = transport.getAddress();
+      ConsistentHash oldCH = distributionManager.getConsistentHash();
+      int numCopies = configuration.getNumOwners();
+
+      Map<Object, InternalCacheValue> state = new HashMap<Object, InternalCacheValue>();
+      for (InternalCacheEntry ice : dataContainer) {
+         Object k = ice.getKey();
+         if (shouldAddToMap(k, oldCH, numCopies, self)) state.put(k, ice.toInternalCacheValue());
+      }
+
+      CacheStore cacheStore = distributionManager.getCacheStoreForRehashing();
+      if (cacheStore != null) {
+         for (InternalCacheEntry ice : cacheStore.loadAll()) {
+            Object k = ice.getKey();
+            if (shouldAddToMap(k, oldCH, numCopies, self) && !state.containsKey(k))
+               state.put(k, ice.toInternalCacheValue());
+         }
+      }
+      return state;
+   }
+
+   final boolean shouldAddToMap(Object k, ConsistentHash oldCH, int numCopies, Address self) {
+      // if the current address is the current "owner" of this key (in old_ch), and the requestor is in the owner list
+      // in new_ch, then add this to the map.
+      List<Address> oldOwnerList = oldCH.locate(k, numCopies);
+      if (oldOwnerList.size() > 0 && self.equals(oldOwnerList.get(0))) {
+         List<Address> newOwnerList = consistentHash.locate(k, numCopies);
+         if (newOwnerList.contains(sender)) return true;
+      }
+      return false;
+   }
+
+   public Object pushState() {
+      throw new RuntimeException("implement me");
+   }
+
+   public byte getCommandId() {
+      return COMMAND_ID;
+   }
+
+   public Object[] getParameters() {
+      return new Object[]{cacheName, (byte) type.ordinal(), sender, state, consistentHash};
+   }
+
+   @SuppressWarnings("unchecked")
+   public void setParameters(int commandId, Object[] parameters) {
+      int i = 0;
+      cacheName = (String) parameters[i++];
+      type = Type.values()[(Byte) parameters[i++]];
+      sender = (Address) parameters[i++];
+      state = (Map<Object, InternalCacheValue>) parameters[i++];
+      consistentHash = (ConsistentHash) parameters[i++];
+   }
+
+   @Override
+   public String toString() {
+      return "RehashControlCommand{" +
+            "type=" + type +
+            ", sender=" + sender +
+            ", state=" + state +
+            ", consistentHash=" + consistentHash +
+            '}';
+   }
+}


Property changes on: trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java	2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java	2009-08-18 16:19:17 UTC (rev 704)
@@ -7,18 +7,58 @@
 import java.util.Map;
 
 /**
- * A consistent hash algorithm
+ * A consistent hash algorithm implementation.  Implementations would typically be constructed via reflection so should
+ * implement a public, no-arg constructor.
  *
  * @author Manik Surtani
  * @since 4.0
  */
 public interface ConsistentHash {
 
+   /**
+    * Sets the collection of cache addresses in the cluster.  The implementation should store these internally and use
+    * these to locate keys.
+    *
+    * @param caches caches in cluster.
+    */
    void setCaches(Collection<Address> caches);
 
+   /**
+    * Should return a collection of cache addresses in the cluster.
+    *
+    * @return collection of cache addresses
+    */
    Collection<Address> getCaches();
 
+   /**
+    * Locates a key, given a replication count (number of copies).
+    *
+    * @param key       key to locate
+    * @param replCount replication count (number of copies)
+    * @return a list of addresses where the key resides, where this list is a subset of the addresses set in {@link
+    *         #setCaches(java.util.Collection)}.  Should never be null, and should contain replCount elements or the max
+    *         number of caches available, whichever is smaller.
+    */
    List<Address> locate(Object key, int replCount);
 
+   /**
+    * The logical equivalent of calling {@link #locate(Object, int)} multiple times for each key in the collection of
+    * keys. Implementations may be optimised for such a bulk lookup, or may just repeatedly call {@link #locate(Object,
+    * int)}.
+    *
+    * @param keys      keys to locate
+    * @param replCount replication count (number of copies) for each key
+    * @return Map of locations, keyed on key.
+    */
    Map<Object, List<Address>> locateAll(Collection<Object> keys, int replCount);
+
+   /**
+    * Tests whether a group of addresses are in the same subspace of the hash space.  Addresses are in the same subspace
+    * if an arbitrary key mapped to one address could also be mapped to the other.
+    *
+    * @param a1 address to test
+    * @param a2 address to test
+    * @return true of the two addresses are in the same subspace, false otherwise.
+    */
+   boolean isInSameSubspace(Address a1, Address a2);
 }

Modified: trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java	2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java	2009-08-18 16:19:17 UTC (rev 704)
@@ -137,4 +137,8 @@
             "addresses (in order of hash space position)=" + positions.values() +
             '}';
    }
+
+   public boolean isInSameSubspace(Address a1, Address a2) {
+      throw new UnsupportedOperationException("TODO Implement me!");
+   }
 }

Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java	2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java	2009-08-18 16:19:17 UTC (rev 704)
@@ -4,6 +4,7 @@
 import org.infinispan.container.entries.InternalCacheEntry;
 import org.infinispan.factories.scopes.Scope;
 import org.infinispan.factories.scopes.Scopes;
+import org.infinispan.loaders.CacheStore;
 import org.infinispan.remoting.transport.Address;
 
 import java.util.Collection;
@@ -93,5 +94,12 @@
     * @param starting
     */
    void informRehashOnJoin(Address joiner, boolean starting);
+
+   /**
+    * Retrieves a cache store if one is available and set up for use in rehashing.  May return null!
+    *
+    * @return a cache store is one is available and configured for use in rehashing, or null otherwise.
+    */
+   CacheStore getCacheStoreForRehashing();
 }
 

Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2009-08-18 16:19:17 UTC (rev 704)
@@ -13,6 +13,8 @@
 import org.infinispan.factories.annotations.Start;
 import org.infinispan.factories.annotations.Stop;
 import org.infinispan.interceptors.InterceptorChain;
+import org.infinispan.loaders.CacheLoaderManager;
+import org.infinispan.loaders.CacheStore;
 import org.infinispan.notifications.Listener;
 import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
@@ -52,6 +54,7 @@
    private final boolean trace = log.isTraceEnabled();
    Configuration configuration;
    ConsistentHash consistentHash;
+   CacheLoaderManager cacheLoaderManager;
    RpcManager rpcManager;
    CacheManagerNotifier notifier;
    int replCount;
@@ -83,7 +86,9 @@
 
    @Inject
    public void init(Configuration configuration, RpcManager rpcManager, CacheManagerNotifier notifier, CommandsFactory cf,
-                    DataContainer dataContainer, InterceptorChain interceptorChain, InvocationContextContainer icc) {
+                    DataContainer dataContainer, InterceptorChain interceptorChain, InvocationContextContainer icc,
+                    CacheLoaderManager cacheLoaderManager) {
+      this.cacheLoaderManager = cacheLoaderManager;
       this.configuration = configuration;
       this.rpcManager = rpcManager;
       this.notifier = notifier;
@@ -251,4 +256,10 @@
          rehash(e.getNewMembers(), e.getOldMembers());
       }
    }
+
+   public CacheStore getCacheStoreForRehashing() {
+      if (cacheLoaderManager == null || !cacheLoaderManager.isEnabled() || cacheLoaderManager.isShared())
+         return null;
+      return cacheLoaderManager.getCacheStore();
+   }
 }

Modified: trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java	2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java	2009-08-18 16:19:17 UTC (rev 704)
@@ -2,7 +2,8 @@
 
 import org.infinispan.CacheException;
 import org.infinispan.commands.CommandsFactory;
-import org.infinispan.commands.control.PullStateCommand;
+import org.infinispan.commands.control.RehashControlCommand;
+import static org.infinispan.commands.control.RehashControlCommand.Type.*;
 import org.infinispan.commands.write.InvalidateCommand;
 import org.infinispan.commands.write.PutKeyValueCommand;
 import org.infinispan.commands.write.WriteCommand;
@@ -15,7 +16,7 @@
 import org.infinispan.interceptors.InterceptorChain;
 import org.infinispan.remoting.responses.Response;
 import org.infinispan.remoting.responses.SuccessfulResponse;
-import org.infinispan.remoting.rpc.ResponseMode;
+import static org.infinispan.remoting.rpc.ResponseMode.SYNCHRONOUS;
 import org.infinispan.remoting.rpc.RpcManager;
 import org.infinispan.remoting.transport.Address;
 import org.infinispan.util.Util;
@@ -73,6 +74,21 @@
       self = rpcManager.getTransport().getAddress();
    }
 
+   @SuppressWarnings("unchecked")
+   private List<Address> parseResponses(List<Response> resp) {
+      for (Response r : resp) {
+         if (r instanceof SuccessfulResponse) {
+            return (List<Address>) ((SuccessfulResponse) r).getResponseValue();
+         }
+      }
+      return null;
+   }
+
+   @SuppressWarnings("unchecked")
+   private Map<Object, InternalCacheValue> getStateFromResponse(SuccessfulResponse r) {
+      return (Map<Object, InternalCacheValue>) r.getResponseValue();
+   }
+
    protected void performRehash() throws Exception {
       log.trace("Starting rehash on new joiner");
       boolean unlocked = false;
@@ -87,15 +103,10 @@
          long giveupTime = System.currentTimeMillis() + maxWaitTime;
          do {
             log.trace("Requesting old consistent hash from coordinator");
-            List<Response> resp = rpcManager.invokeRemotely(coordinator(), commandsFactory.buildGetConsistentHashCommand(self),
-                                                            ResponseMode.SYNCHRONOUS, 100000, true);
-            List<Address> addresses = null;
-            for (Response r : resp) {
-               if (r instanceof SuccessfulResponse) {
-                  addresses = (List<Address>) ((SuccessfulResponse) r).getResponseValue();
-                  break;
-               }
-            }
+            List<Response> resp = rpcManager.invokeRemotely(coordinator(),
+                                                            commandsFactory.buildRehashControlCommand(JOIN_REQ, self),
+                                                            SYNCHRONOUS, 100000, true);
+            List<Address> addresses = parseResponses(resp);
 
             log.trace("Retrieved old consistent hash address list {0}", addresses);
             if (addresses == null) {
@@ -119,21 +130,21 @@
          transactionLogger.enable();
 
          // 4.  Broadcast new temp CH
-         rpcManager.broadcastRpcCommand(commandsFactory.buildInstallConsistentHashCommand(self, true), true, true);
+         rpcManager.broadcastRpcCommand(commandsFactory.buildRehashControlCommand(JOIN_REHASH_START, self), true, true);
 
          // 5.  txLogger being enabled will cause CLusteredGetCommands to return uncertain responses.
 
          // 6.  pull state from everyone.
          Address myAddress = rpcManager.getTransport().getAddress();
-         PullStateCommand cmd = commandsFactory.buildPullStateCommand(myAddress, chNew);
+         RehashControlCommand cmd = commandsFactory.buildRehashControlCommand(PULL_STATE, myAddress, null, chNew);
          // TODO I should be able to process state chunks from different nodes simultaneously!!
          // TODO I should only send this pull state request to nodes which I know will send me state.  Not everyone in chOld!!
-         List<Response> resps = rpcManager.invokeRemotely(chOld.getCaches(), cmd, ResponseMode.SYNCHRONOUS, 10000, true);
+         List<Response> resps = rpcManager.invokeRemotely(chOld.getCaches(), cmd, SYNCHRONOUS, 10000, true);
 
          // 7.  Apply state
          for (Response r : resps) {
             if (r instanceof SuccessfulResponse) {
-               Map<Object, InternalCacheValue> state = (Map<Object, InternalCacheValue>) ((SuccessfulResponse) r).getResponseValue();
+               Map<Object, InternalCacheValue> state = getStateFromResponse((SuccessfulResponse) r);
                for (Map.Entry<Object, InternalCacheValue> e : state.entrySet()) {
                   if (chNew.locate(e.getKey(), configuration.getNumOwners()).contains(myAddress)) {
                      InternalCacheValue v = e.getValue();
@@ -164,8 +175,8 @@
          // 10.
          // TODO this phase should also "tell" the coord that the join is complete so that other waiting joiners
          // may proceed.  Ideally another command, directed to the coord.
-         rpcManager.broadcastRpcCommand(commandsFactory.buildInstallConsistentHashCommand(self, false), true, true);
-         rpcManager.invokeRemotely(coordinator(), commandsFactory.buildJoinCompleteCommand(self), ResponseMode.SYNCHRONOUS,
+         rpcManager.broadcastRpcCommand(commandsFactory.buildRehashControlCommand(JOIN_REHASH_END, self), true, true);
+         rpcManager.invokeRemotely(coordinator(), commandsFactory.buildRehashControlCommand(JOIN_COMPLETE, self), SYNCHRONOUS,
                                    100000, true);
 
          // 11.

Modified: trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java	2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java	2009-08-18 16:19:17 UTC (rev 704)
@@ -16,7 +16,8 @@
 import java.util.Set;
 
 /**
- * // TODO: Manik: Document this
+ * A delegating wrapper that locates keys by getting a union of locations reported by two other ConsistentHash
+ * implementations it delegates to.
  *
  * @author Manik Surtani
  * @since 4.0
@@ -68,4 +69,8 @@
          return new UnionConsistentHash((ConsistentHash) input.readObject(), (ConsistentHash) input.readObject());
       }
    }
+
+   public boolean isInSameSubspace(Address a1, Address a2) {
+      throw new UnsupportedOperationException("Not supported by this impl");
+   }
 }

Modified: trunk/core/src/main/java/org/infinispan/marshall/Ids.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/marshall/Ids.java	2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/marshall/Ids.java	2009-08-18 16:19:17 UTC (rev 704)
@@ -103,11 +103,8 @@
    static final byte ATOMIC_REMOVE_OPERATION = 48;
    static final byte ATOMIC_CLEAR_OPERATION = 49;
 
-   static final byte GET_CONSISTENT_HASH_COMMAND = 50;
-   static final byte PUSH_STATE_COMMAND = 51;
-   static final byte PULL_STATE_COMMAND = 52;
-   static final byte INSTALL_CONSISTENT_HASH_COMMAND = 53;
-   static final byte DEFAULT_CONSISTENT_HASH = 54;
-   static final byte UNION_CONSISTENT_HASH = 55;
-   static final byte JOIN_COMPLETE_COMMAND = 56;
+   static final byte REHASH_CONTROL_COMMAND = 50;
+   static final byte DEFAULT_CONSISTENT_HASH = 51;
+   static final byte UNION_CONSISTENT_HASH = 52;
+   static final byte JOIN_COMPLETE_COMMAND = 53;
 }

Modified: trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java	2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java	2009-08-18 16:19:17 UTC (rev 704)
@@ -28,12 +28,8 @@
 import org.infinispan.atomic.PutOperation;
 import org.infinispan.atomic.RemoveOperation;
 import org.infinispan.commands.RemoteCommandFactory;
-import org.infinispan.commands.control.GetConsistentHashCommand;
-import org.infinispan.commands.control.InstallConsistentHashCommand;
-import org.infinispan.commands.control.JoinCompleteCommand;
 import org.infinispan.commands.control.LockControlCommand;
-import org.infinispan.commands.control.PullStateCommand;
-import org.infinispan.commands.control.PushStateCommand;
+import org.infinispan.commands.control.RehashControlCommand;
 import org.infinispan.commands.control.StateTransferControlCommand;
 import org.infinispan.commands.read.GetKeyValueCommand;
 import org.infinispan.commands.remote.ClusteredGetCommand;
@@ -149,11 +145,7 @@
       MARSHALLABLES.add(RollbackCommand.class.getName());
       MARSHALLABLES.add(InvalidateL1Command.class.getName());
       MARSHALLABLES.add(LockControlCommand.class.getName());
-      MARSHALLABLES.add(GetConsistentHashCommand.class.getName());
-      MARSHALLABLES.add(PushStateCommand.class.getName());
-      MARSHALLABLES.add(PullStateCommand.class.getName());
-      MARSHALLABLES.add(InstallConsistentHashCommand.class.getName());
-      MARSHALLABLES.add(JoinCompleteCommand.class.getName());
+      MARSHALLABLES.add(RehashControlCommand.class.getName());
 
       MARSHALLABLES.add(ImmortalCacheEntry.class.getName());
       MARSHALLABLES.add(MortalCacheEntry.class.getName());

Modified: trunk/core/src/test/java/org/infinispan/test/fwk/TcpMPingEnvironmentTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/test/fwk/TcpMPingEnvironmentTest.java	2009-08-17 16:26:39 UTC (rev 703)
+++ trunk/core/src/test/java/org/infinispan/test/fwk/TcpMPingEnvironmentTest.java	2009-08-18 16:19:17 UTC (rev 704)
@@ -114,8 +114,6 @@
    private void initiChannel(JChannel channel) throws Exception {
       openedChannles.add(channel);
       channel.setOpt(org.jgroups.Channel.LOCAL, false);
-      channel.setOpt(org.jgroups.Channel.AUTO_RECONNECT, true);
-      channel.setOpt(org.jgroups.Channel.AUTO_GETSTATE, false);
       channel.setOpt(org.jgroups.Channel.BLOCK, true);
       channel.connect("someChannel");
    }



More information about the infinispan-commits mailing list