[infinispan-commits] Infinispan SVN: r2629 - in branches/4.2.x/core/src: main/java/org/infinispan/config and 12 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Oct 28 15:40:22 EDT 2010
Author: mircea.markus
Date: 2010-10-28 15:40:21 -0400 (Thu, 28 Oct 2010)
New Revision: 2629
Added:
branches/4.2.x/core/src/test/java/org/infinispan/distribution/TopologyInfoBroadcastTest.java
Modified:
branches/4.2.x/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
branches/4.2.x/core/src/main/java/org/infinispan/config/Configuration.java
branches/4.2.x/core/src/main/java/org/infinispan/config/GlobalConfiguration.java
branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManager.java
branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
branches/4.2.x/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java
branches/4.2.x/core/src/main/java/org/infinispan/distribution/JoinTask.java
branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/AbstractConsistentHash.java
branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/ConsistentHashHelper.java
branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/NodeTopologyInfo.java
branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/TopologyAwareConsistentHash.java
branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/TopologyInfo.java
branches/4.2.x/core/src/main/java/org/infinispan/marshall/Ids.java
branches/4.2.x/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java
branches/4.2.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java
branches/4.2.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
branches/4.2.x/core/src/main/resources/config-samples/all.xml
branches/4.2.x/core/src/test/java/org/infinispan/config/parsing/XmlFileParsingTest.java
branches/4.2.x/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
branches/4.2.x/core/src/test/java/org/infinispan/distribution/TopologyAwareConsistentHashTest.java
branches/4.2.x/core/src/test/java/org/infinispan/distribution/rehash/WorkDuringJoinTest.java
branches/4.2.x/core/src/test/java/org/infinispan/manager/CacheManagerXmlConfigurationTest.java
branches/4.2.x/core/src/test/java/org/infinispan/tx/dld/ControlledRpcManager.java
branches/4.2.x/core/src/test/resources/configs/named-cache-test.xml
Log:
[ISPN-180]-(Colocated nodes should be handled in DIST) - ongoing work
Modified: branches/4.2.x/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -13,6 +13,7 @@
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.DistributionManager;
+import org.infinispan.distribution.ch.NodeTopologyInfo;
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheStore;
import org.infinispan.marshall.Ids;
@@ -63,6 +64,7 @@
List<WriteCommand> txLogCommands;
List<PrepareCommand> pendingPrepares;
CommandsFactory commandsFactory;
+ NodeTopologyInfo nodeTopologyInfo;
private static final Log log = LogFactory.getLog(RehashControlCommand.class);
public RehashControlCommand() {
@@ -115,10 +117,9 @@
case JOIN_REQ:
return distributionManager.requestPermissionToJoin(sender);
case JOIN_REHASH_START:
- distributionManager.informRehashOnJoin(sender, true);
- return null;
+ return distributionManager.informRehashOnJoin(sender, true, nodeTopologyInfo);
case JOIN_REHASH_END:
- distributionManager.informRehashOnJoin(sender, false);
+ distributionManager.informRehashOnJoin(sender, false, nodeTopologyInfo);
return null;
case PULL_STATE_JOIN:
return pullStateForJoin();
@@ -230,9 +231,13 @@
}
public Object[] getParameters() {
- return new Object[]{cacheName, (byte) type.ordinal(), sender, state, oldCH, nodesLeft, newCH, txLogCommands, pendingPrepares};
+ return new Object[]{cacheName, (byte) type.ordinal(), sender, state, oldCH, nodesLeft, newCH, txLogCommands, pendingPrepares, nodeTopologyInfo};
}
+ public void setNodeTopologyInfo(NodeTopologyInfo nodeTopologyInfo) {
+ this.nodeTopologyInfo = nodeTopologyInfo;
+ }
+
@SuppressWarnings("unchecked")
public void setParameters(int commandId, Object[] parameters) {
int i = 0;
@@ -245,6 +250,7 @@
newCH = (ConsistentHash) parameters[i++];
txLogCommands = (List<WriteCommand>) parameters[i++];
pendingPrepares = (List<PrepareCommand>) parameters[i++];
+ nodeTopologyInfo = (NodeTopologyInfo) parameters[i++];
}
@Override
Modified: branches/4.2.x/core/src/main/java/org/infinispan/config/Configuration.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/config/Configuration.java 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/main/java/org/infinispan/config/Configuration.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -22,6 +22,7 @@
package org.infinispan.config;
import org.infinispan.distribution.ch.DefaultConsistentHash;
+import org.infinispan.distribution.ch.TopologyAwareConsistentHash;
import org.infinispan.eviction.EvictionStrategy;
import org.infinispan.eviction.EvictionThreadPolicy;
import org.infinispan.factories.ComponentRegistry;
@@ -60,6 +61,7 @@
* @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
* @author Vladimir Blagojevic
* @author Galder Zamarreño
+ * @author Mircea.Markus at jboss.com
* @see <a href="../../../config.html#ce_infinispan_default">Configuration reference</a>
* @since 4.0
*/
@@ -154,10 +156,7 @@
// SETTERS - MAKE SURE ALL SETTERS PERFORM testImmutability()!!!
// ------------------------------------------------------------------------------------------------------------
- /**
- * will be removed, please use {@link org.infinispan.manager.EmbeddedCacheManager#getGlobalConfiguration()}
- */
- @Deprecated
+
public GlobalConfiguration getGlobalConfiguration() {
return globalConfiguration;
}
@@ -395,8 +394,6 @@
/**
* Cache mode. For distribution, set mode to either 'd', 'dist' or 'distribution'. For replication, use either 'r',
* 'repl' or 'replication'. Finally, for invalidation, 'i', 'inv' or 'invalidation'.
- *
- * @param cacheMode
*/
public void setCacheMode(CacheMode cacheModeInt) {
clustering.setMode(cacheModeInt);
@@ -943,6 +940,9 @@
}
public String getConsistentHashClass() {
+ if (clustering.hash.consistentHashClass == null) {
+ clustering.hash.consistentHashClass = globalConfiguration == null || globalConfiguration.hasTopologyInfo() ? TopologyAwareConsistentHash.class.getName() : DefaultConsistentHash.class.getName();
+ }
return clustering.hash.consistentHashClass;
}
@@ -1912,7 +1912,7 @@
private static final long serialVersionUID = 752218766840948822L;
@ConfigurationDocRef(name = "class", bean = Configuration.class, targetElement = "setConsistentHashClass")
- protected String consistentHashClass = DefaultConsistentHash.class.getName();
+ protected String consistentHashClass;
@ConfigurationDocRef(bean = Configuration.class, targetElement = "setNumOwners")
protected Integer numOwners = 2;
Modified: branches/4.2.x/core/src/main/java/org/infinispan/config/GlobalConfiguration.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/config/GlobalConfiguration.java 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/main/java/org/infinispan/config/GlobalConfiguration.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -35,6 +35,7 @@
*
* @author Manik Surtani
* @author Vladimir Blagojevic
+ * @author Mircea.Markus at jboss.com
* @since 4.0
*
* @see <a href="../../../config.html#ce_infinispan_global">Configuration reference</a>
@@ -163,6 +164,10 @@
transport.setStrictPeerToPeer(strictPeerToPeer);
}
+ public boolean hasTopologyInfo() {
+ return getSiteId() != null || getRackId() != null || getMachineId() != null;
+ }
+
/**
* Behavior of the JVM shutdown hook registered by the cache
*/
@@ -301,6 +306,52 @@
transport.setClusterName(clusterName);
}
+ /**
+ * The id of the machine where this node runs. Used for <a href="http://community.jboss.org/wiki/DesigningServerHinting">server
+ * hinting</a> .
+ */
+ public void setMachineId(String machineId) {
+ transport.setMachineId(machineId);
+ }
+
+ /**
+ * @see #setMachineId(String)
+ */
+ public String getMachineId() {
+ return transport.getMachineId();
+ }
+
+ /**
+ * The id of the rack where this node runs. Used for <a href="http://community.jboss.org/wiki/DesigningServerHinting">server
+ * hinting</a> .
+ */
+ public void setRackId(String rackId) {
+ transport.setRackId(rackId);
+ }
+
+ /**
+ * @see #setRackId(String)
+ */
+ public String getRackId() {
+ return transport.getRackId();
+ }
+
+ /**
+ * The id of the site where this node runs. Used for <a href="http://community.jboss.org/wiki/DesigningServerHinting">server
+ * hinting</a> .
+ */
+ public void setSiteId(String siteId) {
+ transport.setSiteId(siteId);
+ }
+
+ /**
+ * @see #setSiteId(String)
+ */
+ public String getSiteId() {
+ return transport.getSiteId();
+ }
+
+
public ShutdownHookBehavior getShutdownHookBehavior() {
return shutdown.hookBehavior;
}
@@ -661,7 +712,16 @@
@ConfigurationDocRef(bean=GlobalConfiguration.class,targetElement="setClusterName")
protected String clusterName = "Infinispan-Cluster";
-
+
+ @ConfigurationDocRef(bean=GlobalConfiguration.class,targetElement="setMachineId")
+ protected String machineId;
+
+ @ConfigurationDocRef(bean=GlobalConfiguration.class,targetElement="setRackId")
+ protected String rackId;
+
+ @ConfigurationDocRef(bean=GlobalConfiguration.class,targetElement="setSiteId")
+ protected String siteId;
+
@ConfigurationDocRef(bean=GlobalConfiguration.class,targetElement="setStrictPeerToPeer")
protected Boolean strictPeerToPeer = true;
@@ -699,6 +759,36 @@
}
@XmlAttribute
+ public void setMachineId(String machineId) {
+ testImmutability("machineId");
+ this.machineId = machineId;
+ }
+
+ @XmlAttribute
+ public void setRackId(String rackId) {
+ testImmutability("rackId");
+ this.rackId = rackId;
+ }
+
+ @XmlAttribute
+ public void setSiteId(String siteId) {
+ testImmutability("siteId");
+ this.siteId = siteId;
+ }
+
+ public String getMachineId() {
+ return machineId;
+ }
+
+ public String getRackId() {
+ return rackId;
+ }
+
+ public String getSiteId() {
+ return siteId;
+ }
+
+ @XmlAttribute
public void setDistributedSyncTimeout(Long distributedSyncTimeout) {
testImmutability("distributedSyncTimeout");
this.distributedSyncTimeout = distributedSyncTimeout;
Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManager.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManager.java 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManager.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -4,6 +4,7 @@
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.distribution.ch.ConsistentHash;
+import org.infinispan.distribution.ch.NodeTopologyInfo;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.loaders.CacheStore;
@@ -115,9 +116,9 @@
*
* @param joiner address of joiner
* @param starting if true, the joiner is reporting that it is starting the join process. If false, the joiner is
- * reporting that it has completed the join process.
+ * @param nodeTopologyInfo
*/
- void informRehashOnJoin(Address joiner, boolean starting);
+ NodeTopologyInfo informRehashOnJoin(Address joiner, boolean starting, NodeTopologyInfo nodeTopologyInfo);
/**
* Retrieves a cache store if one is available and set up for use in rehashing. May return null!
Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -6,6 +6,7 @@
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.config.Configuration;
+import org.infinispan.config.GlobalConfiguration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
@@ -18,6 +19,8 @@
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.ConsistentHashHelper;
+import org.infinispan.distribution.ch.NodeTopologyInfo;
+import org.infinispan.distribution.ch.TopologyInfo;
import org.infinispan.distribution.ch.UnionConsistentHash;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
@@ -86,6 +89,8 @@
private final ExecutorService rehashExecutor;
private final TransactionLogger transactionLogger = new TransactionLoggerImpl();
+
+ TopologyInfo topologyInfo = new TopologyInfo();
/**
* Rehash flag set by a rehash task associated with this DistributionManager
@@ -155,6 +160,12 @@
log.trace("Starting distribution manager on " + getMyAddress());
listener = new ViewChangeListener();
notifier.addListener(listener);
+ GlobalConfiguration gc = configuration.getGlobalConfiguration();
+ if (gc.hasTopologyInfo()) {
+ Address address = rpcManager.getTransport().getAddress();
+ NodeTopologyInfo nti = new NodeTopologyInfo(gc.getMachineId(),gc.getRackId(), gc.getSiteId(), address);
+ topologyInfo.addNodeTopologyInfo(address, nti);
+ }
join();
}
@@ -187,7 +198,7 @@
setJoinComplete(false);
Transport t = rpcManager.getTransport();
List<Address> members = t.getMembers();
- consistentHash = createConsistentHash(configuration, members);
+ consistentHash = createConsistentHash(configuration, members, topologyInfo);
self = t.getAddress();
if (members.size() > 1 && !t.getCoordinator().equals(self)) {
JoinTask joinTask = new JoinTask(rpcManager, cf, configuration, dataContainer, this);
@@ -218,13 +229,15 @@
Address leaver = MembershipArithmetic.getMemberLeft(oldMembers, newMembers);
log.info("This is a LEAVE event! Node {0} has just left", leaver);
+ topologyInfo.removeNodeInfo(leaver);
+
try {
if (!(consistentHash instanceof UnionConsistentHash)) {
oldConsistentHash = consistentHash;
} else {
oldConsistentHash = ((UnionConsistentHash) consistentHash).getNewCH();
}
- consistentHash = ConsistentHashHelper.removeAddress(consistentHash, leaver, configuration);
+ consistentHash = ConsistentHashHelper.removeAddress(consistentHash, leaver, configuration, topologyInfo);
} catch (Exception e) {
log.fatal("Unable to process leaver!!", e);
throw new CacheException(e);
@@ -346,7 +359,7 @@
}
}
- public void informRehashOnJoin(Address a, boolean starting) {
+ public NodeTopologyInfo informRehashOnJoin(Address a, boolean starting, NodeTopologyInfo nodeTopologyInfo) {
log.trace("Informed of a JOIN by {0}. Starting? {1}", a, starting);
if (!starting) {
if (consistentHash instanceof UnionConsistentHash) {
@@ -356,18 +369,17 @@
}
joiner = null;
} else {
+ topologyInfo.addNodeTopologyInfo(a, nodeTopologyInfo);
+ log.trace("Node topology info added({0}). Topology info is {1}", nodeTopologyInfo, topologyInfo);
ConsistentHash chOld = consistentHash;
if (chOld instanceof UnionConsistentHash) throw new RuntimeException("Not expecting a union CH!");
oldConsistentHash = chOld;
joiner = a;
- ConsistentHash chNew;
- chNew = (ConsistentHash) Util.getInstance(configuration.getConsistentHashClass());
- List<Address> newAddresses = new LinkedList<Address>(chOld.getCaches());
- newAddresses.add(a);
- chNew.setCaches(newAddresses);
+ ConsistentHash chNew = ConsistentHashHelper.createConsistentHash(configuration, chOld.getCaches(), topologyInfo, a);
consistentHash = new UnionConsistentHash(chOld, chNew);
}
log.trace("New CH is {0}", consistentHash);
+ return topologyInfo.getNodeTopologyInfo(rpcManager.getAddress());
}
public void applyState(ConsistentHash consistentHash, Map<Object, InternalCacheValue> state) {
@@ -502,4 +514,8 @@
public void setConfiguration(Configuration configuration) {
this.configuration = configuration;
}
+
+ public TopologyInfo getTopologyInfo() {
+ return topologyInfo;
+ }
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -72,7 +72,7 @@
long start = trace ? System.currentTimeMillis() : 0;
int replCount = configuration.getNumOwners();
- ConsistentHash oldCH = ConsistentHashHelper.createConsistentHash(configuration, dmi.getConsistentHash().getCaches(), leaversHandled);
+ ConsistentHash oldCH = ConsistentHashHelper.createConsistentHash(configuration, dmi.getConsistentHash().getCaches(), leaversHandled, dmi.topologyInfo);
ConsistentHash newCH = dmi.getConsistentHash();
try {
if (log.isDebugEnabled()) {
Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/JoinTask.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/JoinTask.java 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/JoinTask.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -10,6 +10,7 @@
import static org.infinispan.distribution.ch.ConsistentHashHelper.createConsistentHash;
import org.infinispan.distribution.ch.ConsistentHash;
+import org.infinispan.distribution.ch.NodeTopologyInfo;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import static org.infinispan.remoting.rpc.ResponseMode.SYNCHRONOUS;
@@ -41,6 +42,7 @@
* <ul>
*
* @author Manik Surtani
+ * @author Mircea.Markus at jboss.com
* @since 4.0
*/
public class JoinTask extends RehashTask {
@@ -87,9 +89,10 @@
// 2. new CH instance
if (chOld.getCaches().contains(self))
chNew = chOld;
- else
- chNew = createConsistentHash(configuration, chOld.getCaches(), self);
-
+ else {
+ chNew = createConsistentHash(configuration, chOld.getCaches(), dmi.topologyInfo, self);
+ }
+
dmi.setConsistentHash(chNew);
try {
if (configuration.isRehashEnabled()) {
@@ -97,8 +100,11 @@
transactionLogger.enable();
// 4. Broadcast new temp CH
- rpcManager.broadcastRpcCommand(cf.buildRehashControlCommand(JOIN_REHASH_START, self), true, true);
-
+ RehashControlCommand rehashControlCommand = cf.buildRehashControlCommand(JOIN_REHASH_START, self);
+ rehashControlCommand.setNodeTopologyInfo(dmi.topologyInfo.getNodeTopologyInfo(rpcManager.getAddress()));
+ List<Response> responseList = rpcManager.invokeRemotely(null, rehashControlCommand, true, true);
+ updateTopologyInfo(responseList);
+
// 5. txLogger being enabled will cause ClusteredGetCommands to return uncertain responses.
// 6. pull state from everyone.
@@ -147,7 +153,18 @@
}
}
- private ConsistentHash retrieveOldCH(boolean trace) throws InterruptedException, IllegalAccessException,
+ private void updateTopologyInfo(List<Response> responseList) {
+ for (Response r : responseList) {
+ SuccessfulResponse sr = (SuccessfulResponse) r;
+ NodeTopologyInfo nti = (NodeTopologyInfo) sr.getResponseValue();
+ if (nti != null) {
+ dmi.topologyInfo.addNodeTopologyInfo(nti.getAddress(), nti);
+ }
+ }
+ if (log.isTraceEnabled()) log.trace("Topology after after getting cluster info: " + dmi.topologyInfo);
+ }
+
+ private ConsistentHash retrieveOldCH(boolean trace) throws InterruptedException, IllegalAccessException,
InstantiationException, ClassNotFoundException {
// this happens in a loop to ensure we receive the correct CH and not a "union".
@@ -184,7 +201,7 @@
log.trace("Sleeping for {0}", Util.prettyPrintTime(time));
Thread.sleep(time); // sleep for a while and retry
} else {
- result = createConsistentHash(configuration, addresses);
+ result = createConsistentHash(configuration, addresses, dmi.topologyInfo);
}
} while (result == null && System.currentTimeMillis() < giveupTime);
Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/AbstractConsistentHash.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/AbstractConsistentHash.java 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/AbstractConsistentHash.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -42,4 +42,8 @@
public void setTopologyInfo(TopologyInfo topologyInfo) {
this.topologyInfo = topologyInfo;
}
+
+ public TopologyInfo getTopologyInfo() {
+ return topologyInfo;
+ }
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/ConsistentHashHelper.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/ConsistentHashHelper.java 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/ConsistentHashHelper.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -14,6 +14,7 @@
* A helper class that handles the construction of consistent hash instances based on configuration.
*
* @author Manik Surtani
+ * @author Mircea.Markus at jboss.com
* @since 4.0
*/
public class ConsistentHashHelper {
@@ -24,16 +25,18 @@
* @param ch consistent hash to start with
* @param toRemove address to remove
* @param c configuration
+ * @param topologyInfo
* @return a new consistent hash instance of the same type
*/
- public static ConsistentHash removeAddress(ConsistentHash ch, Address toRemove, Configuration c) {
+ public static ConsistentHash removeAddress(ConsistentHash ch, Address toRemove, Configuration c, TopologyInfo topologyInfo) {
if (ch instanceof UnionConsistentHash)
- return removeAddressFromUnionConsistentHash((UnionConsistentHash) ch, toRemove, c);
+ return removeAddressFromUnionConsistentHash((UnionConsistentHash) ch, toRemove, c, topologyInfo);
else {
ConsistentHash newCH = (ConsistentHash) Util.getInstance(c.getConsistentHashClass());
List<Address> caches = new ArrayList<Address>(ch.getCaches());
caches.remove(toRemove);
newCH.setCaches(caches);
+ newCH.setTopologyInfo(topologyInfo);
return newCH;
}
}
@@ -45,11 +48,12 @@
* @param uch union consistent hash instance
* @param toRemove address to remove
* @param c configuration
+ * @param topologyInfo
* @return a new UnionConsistentHash instance
*/
- public static UnionConsistentHash removeAddressFromUnionConsistentHash(UnionConsistentHash uch, Address toRemove, Configuration c) {
- ConsistentHash newFirstCH = removeAddress(uch.getOldConsistentHash(), toRemove, c);
- ConsistentHash newSecondCH = removeAddress(uch.getNewConsistentHash(), toRemove, c);
+ public static UnionConsistentHash removeAddressFromUnionConsistentHash(UnionConsistentHash uch, Address toRemove, Configuration c, TopologyInfo topologyInfo) {
+ ConsistentHash newFirstCH = removeAddress(uch.getOldConsistentHash(), toRemove, c, topologyInfo);
+ ConsistentHash newSecondCH = removeAddress(uch.getNewConsistentHash(), toRemove, c, topologyInfo);
return new UnionConsistentHash(newFirstCH, newSecondCH);
}
@@ -59,11 +63,13 @@
*
* @param c configuration
* @param addresses with which to populate the consistent hash
+ * @param topologyInfo
* @return a new consistent hash instance
*/
- public static ConsistentHash createConsistentHash(Configuration c, List<Address> addresses) {
+ public static ConsistentHash createConsistentHash(Configuration c, List<Address> addresses, TopologyInfo topologyInfo) {
ConsistentHash ch = (ConsistentHash) Util.getInstance(c.getConsistentHashClass());
ch.setCaches(addresses);
+ ch.setTopologyInfo(topologyInfo);
return ch;
}
@@ -73,13 +79,13 @@
*
* @param c configuration
* @param addresses with which to populate the consistent hash
- * @param moreAddresses to add to the list of addresses
- * @return a new consistent hash instance
+ * @param topologyInfo
+ *@param moreAddresses to add to the list of addresses @return a new consistent hash instance
*/
- public static ConsistentHash createConsistentHash(Configuration c, List<Address> addresses, Address... moreAddresses) {
+ public static ConsistentHash createConsistentHash(Configuration c, List<Address> addresses, TopologyInfo topologyInfo, Address... moreAddresses) {
List<Address> list = new LinkedList<Address>(addresses);
list.addAll(Arrays.asList(moreAddresses));
- return createConsistentHash(c, list);
+ return createConsistentHash(c, list, topologyInfo);
}
/**
@@ -89,12 +95,13 @@
* @param c configuration
* @param addresses with which to populate the consistent hash
* @param moreAddresses to add to the list of addresses
+ * @param topologyInfo
* @return a new consistent hash instance
*/
- public static ConsistentHash createConsistentHash(Configuration c, List<Address> addresses, Collection<Address> moreAddresses) {
+ public static ConsistentHash createConsistentHash(Configuration c, List<Address> addresses, Collection<Address> moreAddresses, TopologyInfo topologyInfo) {
List<Address> list = new LinkedList<Address>(addresses);
list.addAll(moreAddresses);
- return createConsistentHash(c, list);
+ return createConsistentHash(c, list, topologyInfo);
}
/**
@@ -103,12 +110,14 @@
*
* @param clazz type of the consistent hash to create
* @param addresses with which to populate the consistent hash
+ * @param topologyInfo
* @return a new consistent hash instance
*/
- public static ConsistentHash createConsistentHash(Class<? extends ConsistentHash> clazz, List<Address> addresses) {
+ public static ConsistentHash createConsistentHash(Class<? extends ConsistentHash> clazz, List<Address> addresses, TopologyInfo topologyInfo) {
ConsistentHash ch;
ch = Util.getInstance(clazz);
- if (addresses != null && !addresses.isEmpty()) ch.setCaches(addresses);
+ if (addresses != null && !addresses.isEmpty()) ch.setCaches(addresses);
+ ch.setTopologyInfo(topologyInfo);
return ch;
}
@@ -118,13 +127,13 @@
*
* @param clazz type of the consistent hash to create
* @param addresses with which to populate the consistent hash
- * @param moreAddresses to add to the list of addresses
- * @return a new consistent hash instance
+ * @param topologyInfo
+ *@param moreAddresses to add to the list of addresses @return a new consistent hash instance
*/
- public static ConsistentHash createConsistentHash(Class<? extends ConsistentHash> clazz, List<Address> addresses, Address... moreAddresses) {
+ public static ConsistentHash createConsistentHash(Class<? extends ConsistentHash> clazz, List<Address> addresses, TopologyInfo topologyInfo, Address... moreAddresses) {
List<Address> list = new LinkedList<Address>(addresses);
list.addAll(Arrays.asList(moreAddresses));
- return createConsistentHash(clazz, list);
+ return createConsistentHash(clazz, list, topologyInfo);
}
/**
@@ -134,11 +143,12 @@
* @param clazz type of the consistent hash to create
* @param addresses with which to populate the consistent hash
* @param moreAddresses to add to the list of addresses
+ * @param topologyInfo
* @return a new consistent hash instance
*/
- public static ConsistentHash createConsistentHash(Class<? extends ConsistentHash> clazz, List<Address> addresses, Collection<Address> moreAddresses) {
+ public static ConsistentHash createConsistentHash(Class<? extends ConsistentHash> clazz, List<Address> addresses, Collection<Address> moreAddresses, TopologyInfo topologyInfo) {
List<Address> list = new LinkedList<Address>(addresses);
list.addAll(moreAddresses);
- return createConsistentHash(clazz, list);
+ return createConsistentHash(clazz, list, topologyInfo);
}
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/NodeTopologyInfo.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/NodeTopologyInfo.java 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/NodeTopologyInfo.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -1,21 +1,33 @@
package org.infinispan.distribution.ch;
+import org.infinispan.marshall.Externalizer;
+import org.infinispan.marshall.Ids;
+import org.infinispan.marshall.Marshallable;
+import org.infinispan.remoting.transport.Address;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
/**
* Holds topology information about a a node.
*
* @author Mircea.Markus at jboss.com
* @since 4.2
*/
+ at Marshallable(externalizer = NodeTopologyInfo.NodeTopologyInfoExternalizer.class, id = Ids.NODE_TOPOLOGY_INFO)
public class NodeTopologyInfo {
private final String machineId;
private final String rackId;
private final String siteId;
+ private final Address address;
- public NodeTopologyInfo(String machineId, String rackId, String siteId) {
+ public NodeTopologyInfo(String machineId, String rackId, String siteId, Address address) {
this.machineId = machineId;
this.rackId = rackId;
this.siteId = siteId;
+ this.address = address;
}
public String getMachineId() {
@@ -45,4 +57,63 @@
private boolean equalObjects(Object first, Object second) {
return first == null ? second == null : first.equals(second);
}
+
+ public Address getAddress() {
+ return address;
+ }
+
+ public static class NodeTopologyInfoExternalizer implements Externalizer {
+
+ @Override
+ public void writeObject(ObjectOutput output, Object object) throws IOException {
+ NodeTopologyInfo nti = (NodeTopologyInfo) object;
+ output.writeObject(nti.siteId);
+ output.writeObject(nti.rackId);
+ output.writeObject(nti.machineId);
+ output.writeObject(nti.address);
+ }
+
+ @Override
+ public Object readObject(ObjectInput input) throws IOException, ClassNotFoundException {
+ String siteId = (String) input.readObject();
+ String rackId = (String) input.readObject();
+ String machineId = (String) input.readObject();
+ Address address = (Address) input.readObject();
+ return new NodeTopologyInfo(machineId, rackId, siteId, address);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ NodeTopologyInfo that = (NodeTopologyInfo) o;
+
+ if (address != null ? !address.equals(that.address) : that.address != null) return false;
+ if (machineId != null ? !machineId.equals(that.machineId) : that.machineId != null) return false;
+ if (rackId != null ? !rackId.equals(that.rackId) : that.rackId != null) return false;
+ if (siteId != null ? !siteId.equals(that.siteId) : that.siteId != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = machineId != null ? machineId.hashCode() : 0;
+ result = 31 * result + (rackId != null ? rackId.hashCode() : 0);
+ result = 31 * result + (siteId != null ? siteId.hashCode() : 0);
+ result = 31 * result + (address != null ? address.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "NodeTopologyInfo{" +
+ "machineId='" + machineId + '\'' +
+ ", rackId='" + rackId + '\'' +
+ ", siteId='" + siteId + '\'' +
+ ", address=" + address +
+ '}';
+ }
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/TopologyAwareConsistentHash.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/TopologyAwareConsistentHash.java 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/TopologyAwareConsistentHash.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -1,13 +1,20 @@
package org.infinispan.distribution.ch;
+import org.infinispan.marshall.Ids;
+import org.infinispan.marshall.Marshallable;
import org.infinispan.remoting.transport.Address;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.SortedMap;
import static java.lang.Math.min;
@@ -15,6 +22,7 @@
* Consistent hash that is aware of cluster topology.
* Design described here: http://community.jboss.org/wiki/DesigningServerHinting.
* <p>
+ * <pre>
* Algorithm:
* - place nodes on the hash wheel based address's hash code
* - For selecting owner nodes:
@@ -24,11 +32,12 @@
* - if not enough nodes found repeat walk again and pick nodes that have different site id, rack id and machine id
* - Ultimately cycle back to the first node selected, don't discard any nodes, regardless of machine id/rack
* id/site id match.
-
+ * </pre>
*
* @author Mircea.Markus at jboss.com
* @since 4.2
*/
+ at Marshallable(externalizer = TopologyAwareConsistentHash.Externalizer.class, id = Ids.TOPOLOGY_AWARE_CH)
public class TopologyAwareConsistentHash extends AbstractWheelConsistentHash {
public List<Address> locate(Object key, int replCount) {
@@ -119,4 +128,32 @@
Integer ownerHash = positions.tailMap(hash).firstKey();
return positions.get(ownerHash);
}
+
+ public static class Externalizer implements org.infinispan.marshall.Externalizer {
+ @Override
+ public void writeObject(ObjectOutput output, Object subject) throws IOException {
+ TopologyAwareConsistentHash dch = (TopologyAwareConsistentHash) subject;
+ output.writeObject(dch.addresses);
+ output.writeObject(dch.positions);
+ output.writeObject(dch.addressToHashIds);
+ Collection<NodeTopologyInfo> infoCollection = dch.topologyInfo.getAllTopologyInfo();
+ output.writeInt(infoCollection.size());
+ for (NodeTopologyInfo nti : infoCollection) output.writeObject(nti);
+ }
+
+ @Override
+ public Object readObject(ObjectInput unmarshaller) throws IOException, ClassNotFoundException {
+ TopologyAwareConsistentHash ch = new TopologyAwareConsistentHash();
+ ch.addresses = (ArrayList<Address>) unmarshaller.readObject();
+ ch.positions = (SortedMap<Integer, Address>) unmarshaller.readObject();
+ ch.addressToHashIds = (Map<Address, Integer>) unmarshaller.readObject();
+ ch.topologyInfo = new TopologyInfo();
+ int ntiCount = unmarshaller.readInt();
+ for (int i = 0; i < ntiCount; i++) {
+ NodeTopologyInfo nti = (NodeTopologyInfo) unmarshaller.readObject();
+ ch.topologyInfo.addNodeTopologyInfo(nti.getAddress(), nti);
+ }
+ return ch;
+ }
+ }
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/TopologyInfo.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/TopologyInfo.java 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/TopologyInfo.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -1,7 +1,15 @@
package org.infinispan.distribution.ch;
+import org.infinispan.marshall.Externalizer;
+import org.infinispan.marshall.Ids;
+import org.infinispan.marshall.Marshallable;
import org.infinispan.remoting.transport.Address;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -36,4 +44,41 @@
NodeTopologyInfo info2 = address2TopologyInfo.get(a2);
return info1.sameMachine(info2);
}
+
+ public NodeTopologyInfo getNodeTopologyInfo(Address address) {
+ return address2TopologyInfo.get(address);
+ }
+
+ public void removeNodeInfo(Address leaver) {
+ address2TopologyInfo.remove(leaver);
+ }
+
+ public Collection<NodeTopologyInfo> getAllTopologyInfo() {
+ return Collections.unmodifiableCollection(address2TopologyInfo.values());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TopologyInfo that = (TopologyInfo) o;
+
+ if (address2TopologyInfo != null ? !address2TopologyInfo.equals(that.address2TopologyInfo) : that.address2TopologyInfo != null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return address2TopologyInfo != null ? address2TopologyInfo.hashCode() : 0;
+ }
+
+ @Override
+ public String toString() {
+ return "TopologyInfo{" +
+ "address2TopologyInfo=" + address2TopologyInfo +
+ '}';
+ }
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/marshall/Ids.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/marshall/Ids.java 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/main/java/org/infinispan/marshall/Ids.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -117,4 +117,6 @@
static final byte BYTE_ARRAY_KEY = 57;
static final byte TOPOLOGY_ADDRESS = 58;
static final byte TOPOLOGY_VIEW = 59;
+ static final byte NODE_TOPOLOGY_INFO = 60;
+ static final byte TOPOLOGY_AWARE_CH = 61;
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -54,6 +54,8 @@
import org.infinispan.container.entries.TransientMortalCacheEntry;
import org.infinispan.container.entries.TransientMortalCacheValue;
import org.infinispan.distribution.ch.DefaultConsistentHash;
+import org.infinispan.distribution.ch.NodeTopologyInfo;
+import org.infinispan.distribution.ch.TopologyAwareConsistentHash;
import org.infinispan.distribution.ch.UnionConsistentHash;
import org.infinispan.loaders.bucket.Bucket;
import org.infinispan.marshall.Externalizer;
@@ -170,6 +172,8 @@
MARSHALLABLES.add(ClearOperation.class.getName());
MARSHALLABLES.add(DefaultConsistentHash.class.getName());
MARSHALLABLES.add(UnionConsistentHash.class.getName());
+ MARSHALLABLES.add(NodeTopologyInfo.class.getName());
+ MARSHALLABLES.add(TopologyAwareConsistentHash.class.getName());
MARSHALLABLES.add("org.infinispan.server.core.CacheValue");
MARSHALLABLES.add("org.infinispan.server.memcached.MemcachedValue");
Modified: branches/4.2.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -37,6 +37,7 @@
* the registered {@link Transport}.
*
* @author Manik Surtani
+ * @author Mircea.Markus at jboss.com
* @since 4.0
*/
public interface RpcManager {
@@ -156,7 +157,7 @@
* @param usePriorityQueue if true, a priority queue is used
* @throws ReplicationException in the event of problems
*/
- void invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws ReplicationException;
+ List<Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws ReplicationException;
/**
* The same as {@link #invokeRemotely(java.util.Collection, org.infinispan.commands.ReplicableCommand, boolean)}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -44,6 +44,7 @@
*
* @author Manik Surtani
* @author Galder Zamarreño
+ * @author Mircea.Markus at jboss.com
* @since 4.0
*/
@MBean(objectName = "RpcManager", description = "Manages all remote calls to remote cache instances in the cluster.")
@@ -212,23 +213,24 @@
invokeRemotely(recipients, rpc, sync, false);
}
- public final void invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws ReplicationException {
- invokeRemotely(recipients, rpc, sync, usePriorityQueue, configuration.getSyncReplTimeout());
+ public final List<Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws ReplicationException {
+ return invokeRemotely(recipients, rpc, sync, usePriorityQueue, configuration.getSyncReplTimeout());
}
- public final void invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue, long timeout) throws ReplicationException {
+ public final List<Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue, long timeout) throws ReplicationException {
if (trace) log.trace("{0} broadcasting call {1} to recipient list {2}", t.getAddress(), rpc, recipients);
if (useReplicationQueue(sync)) {
replicationQueue.add(rpc);
+ return null;
} else {
if (!(rpc instanceof CacheRpcCommand)) {
rpc = cf.buildSingleRpcCommand(rpc);
}
- List rsps;
- rsps = invokeRemotely(recipients, rpc, getResponseMode(sync), timeout, usePriorityQueue);
+ List<Response> rsps = invokeRemotely(recipients, rpc, getResponseMode(sync), timeout, usePriorityQueue);
if (trace) log.trace("responses=" + rsps);
if (sync) checkResponses(rsps);
+ return rsps;
}
}
Modified: branches/4.2.x/core/src/main/resources/config-samples/all.xml
===================================================================
--- branches/4.2.x/core/src/main/resources/config-samples/all.xml 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/main/resources/config-samples/all.xml 2010-10-28 19:40:21 UTC (rev 2629)
@@ -44,7 +44,7 @@
There is no added cost to defining a transport but not creating a cache that uses one, since the transport
is created and initialized lazily.
-->
- <transport clusterName="infinispan-cluster" distributedSyncTimeout="50000" nodeName="Jalapeno"/>
+ <transport clusterName="infinispan-cluster" machineId="m1" rackId="r1" siteId="s1" distributedSyncTimeout="50000" nodeName="Jalapeno"/>
<!-- Note that the JGroups transport uses sensible defaults if no configuration property is defined. -->
<!-- See the JGroupsTransport javadocs for more flags -->
Modified: branches/4.2.x/core/src/test/java/org/infinispan/config/parsing/XmlFileParsingTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/config/parsing/XmlFileParsingTest.java 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/test/java/org/infinispan/config/parsing/XmlFileParsingTest.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -8,6 +8,7 @@
import org.infinispan.config.GlobalConfiguration.ShutdownHookBehavior;
import org.infinispan.config.InfinispanConfiguration;
import org.infinispan.distribution.ch.DefaultConsistentHash;
+import org.infinispan.distribution.ch.TopologyAwareConsistentHash;
import org.infinispan.eviction.EvictionStrategy;
import org.infinispan.eviction.EvictionThreadPolicy;
import org.infinispan.loaders.file.FileCacheStoreConfig;
@@ -234,7 +235,7 @@
assert c.getCacheMode() == Configuration.CacheMode.DIST_SYNC;
assert c.getL1Lifespan() == 600000;
assert c.getRehashWaitTime() == 120000;
- assert c.getConsistentHashClass().equals(DefaultConsistentHash.class.getName());
+ assert c.getConsistentHashClass().equals(TopologyAwareConsistentHash.class.getName());
assert c.getNumOwners() == 3;
assert c.isL1CacheEnabled();
Modified: branches/4.2.x/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -11,6 +11,7 @@
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.ConsistentHashHelper;
import org.infinispan.distribution.ch.DefaultConsistentHash;
+import org.infinispan.distribution.ch.TopologyInfo;
import org.infinispan.distribution.ch.UnionConsistentHash;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
@@ -90,7 +91,7 @@
public static ConsistentHash createNewConsistentHash(List<Address> servers) {
try {
- return ConsistentHashHelper.createConsistentHash(DefaultConsistentHash.class, servers);
+ return ConsistentHashHelper.createConsistentHash(DefaultConsistentHash.class, servers, new TopologyInfo());
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
Modified: branches/4.2.x/core/src/test/java/org/infinispan/distribution/TopologyAwareConsistentHashTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/distribution/TopologyAwareConsistentHashTest.java 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/test/java/org/infinispan/distribution/TopologyAwareConsistentHashTest.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -509,7 +509,7 @@
private void addNode(TestAddress address, String machineId, String rackId, String siteId) {
addresses.add(address);
- NodeTopologyInfo nti = new NodeTopologyInfo(machineId, rackId, siteId);
+ NodeTopologyInfo nti = new NodeTopologyInfo(machineId, rackId, siteId, null);
ti.addNodeTopologyInfo(address, nti);
}
Added: branches/4.2.x/core/src/test/java/org/infinispan/distribution/TopologyInfoBroadcastTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/distribution/TopologyInfoBroadcastTest.java (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/distribution/TopologyInfoBroadcastTest.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -0,0 +1,83 @@
+package org.infinispan.distribution;
+
+import org.infinispan.config.Configuration;
+import org.infinispan.config.GlobalConfiguration;
+import org.infinispan.distribution.ch.NodeTopologyInfo;
+import org.infinispan.distribution.ch.TopologyAwareConsistentHash;
+import org.infinispan.distribution.ch.TopologyInfo;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+ at Test(groups = "functional", testName = "distribution.TopologyInfoBroadcastTest")
+public class TopologyInfoBroadcastTest extends MultipleCacheManagersTest {
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ addClusterEnabledCacheManagers(Configuration.CacheMode.DIST_SYNC, 3);
+ updatedSiteInfo(manager(0), "s0", "r0", "m0");
+ updatedSiteInfo(manager(1), "s1", "r1", "m1");
+ updatedSiteInfo(manager(2), "s2", "r2", "m2");
+ BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(cache(0), cache(1), cache(2));
+ }
+
+ private void updatedSiteInfo(EmbeddedCacheManager embeddedCacheManager, String s, String r, String m) {
+ GlobalConfiguration gc = embeddedCacheManager.getGlobalConfiguration();
+ gc.setSiteId(s);
+ gc.setRackId(r);
+ gc.setMachineId(m);
+ }
+
+ public void testIsReplicated() {
+ assert advancedCache(0).getDistributionManager().getConsistentHash() instanceof TopologyAwareConsistentHash;
+ assert advancedCache(1).getDistributionManager().getConsistentHash() instanceof TopologyAwareConsistentHash;
+ assert advancedCache(2).getDistributionManager().getConsistentHash() instanceof TopologyAwareConsistentHash;
+
+ DistributionManagerImpl dmi = (DistributionManagerImpl) advancedCache(0).getDistributionManager();
+ assertTopologyInfo3Nodes(dmi.getTopologyInfo());
+ dmi = (DistributionManagerImpl) advancedCache(1).getDistributionManager();
+ assertTopologyInfo3Nodes(dmi.getTopologyInfo());
+ dmi = (DistributionManagerImpl) advancedCache(2).getDistributionManager();
+ assertTopologyInfo3Nodes(dmi.getTopologyInfo());
+
+ TopologyAwareConsistentHash tach = (TopologyAwareConsistentHash) advancedCache(0).getDistributionManager().getConsistentHash();
+ assertEquals(tach.getTopologyInfo(), dmi.getTopologyInfo());
+ tach = (TopologyAwareConsistentHash) advancedCache(1).getDistributionManager().getConsistentHash();
+ assertEquals(tach.getTopologyInfo(), dmi.getTopologyInfo());
+ tach = (TopologyAwareConsistentHash) advancedCache(2).getDistributionManager().getConsistentHash();
+ assertEquals(tach.getTopologyInfo(), dmi.getTopologyInfo());
+ }
+
+ @Test(dependsOnMethods = "testIsReplicated")
+ public void testNodeLeaves() {
+ TestingUtil.killCacheManagers(manager(1));
+ BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(cache(0), cache(2));
+
+ DistributionManagerImpl dmi = (DistributionManagerImpl) advancedCache(0).getDistributionManager();
+ assertTopologyInfo2Nodes(dmi.getTopologyInfo());
+ dmi = (DistributionManagerImpl) advancedCache(2).getDistributionManager();
+ assertTopologyInfo2Nodes(dmi.getTopologyInfo());
+
+ TopologyAwareConsistentHash tach = (TopologyAwareConsistentHash) advancedCache(0).getDistributionManager().getConsistentHash();
+ assertEquals(tach.getTopologyInfo(), dmi.getTopologyInfo());
+ tach = (TopologyAwareConsistentHash) advancedCache(2).getDistributionManager().getConsistentHash();
+ assertEquals(tach.getTopologyInfo(), dmi.getTopologyInfo());
+ }
+
+ private void assertTopologyInfo3Nodes(TopologyInfo topologyInfo) {
+ assertTopologyInfo2Nodes(topologyInfo);
+ assertEquals(topologyInfo.getNodeTopologyInfo(address(1)), new NodeTopologyInfo("m1","r1", "s1", address(1)));
+ }
+
+ private void assertTopologyInfo2Nodes(TopologyInfo topologyInfo) {
+ assertEquals(topologyInfo.getNodeTopologyInfo(address(0)), new NodeTopologyInfo("m0","r0", "s0", address(0)));
+ assertEquals(topologyInfo.getNodeTopologyInfo(address(2)), new NodeTopologyInfo("m2","r2", "s2", address(2)));
+ }
+}
Modified: branches/4.2.x/core/src/test/java/org/infinispan/distribution/rehash/WorkDuringJoinTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/distribution/rehash/WorkDuringJoinTest.java 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/test/java/org/infinispan/distribution/rehash/WorkDuringJoinTest.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -5,6 +5,7 @@
import org.infinispan.distribution.MagicKey;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.ConsistentHashHelper;
+import org.infinispan.distribution.ch.TopologyInfo;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.Address;
import org.testng.annotations.Test;
@@ -54,7 +55,7 @@
List<MagicKey> keys = init();
ConsistentHash chOld = getConsistentHash(c1);
Address joinerAddress = startNewMember();
- ConsistentHash chNew = ConsistentHashHelper.createConsistentHash(chOld.getClass(), chOld.getCaches(), joinerAddress);
+ ConsistentHash chNew = ConsistentHashHelper.createConsistentHash(chOld.getClass(), chOld.getCaches(), new TopologyInfo(), joinerAddress);
// which key should me mapped to the joiner?
MagicKey keyToTest = null;
for (MagicKey k: keys) {
Modified: branches/4.2.x/core/src/test/java/org/infinispan/manager/CacheManagerXmlConfigurationTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/manager/CacheManagerXmlConfigurationTest.java 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/test/java/org/infinispan/manager/CacheManagerXmlConfigurationTest.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -15,6 +15,7 @@
import static org.infinispan.test.TestingUtil.INFINISPAN_END_TAG;
import static org.infinispan.test.TestingUtil.INFINISPAN_START_TAG;
+import static org.testng.Assert.assertEquals;
/**
* @author Manik Surtani
@@ -33,6 +34,10 @@
public void testNamedCacheXML() throws IOException {
cm = TestCacheManagerFactory.fromXml("configs/named-cache-test.xml");
+ assertEquals("s1", cm.getGlobalConfiguration().getSiteId());
+ assertEquals("r1", cm.getGlobalConfiguration().getRackId());
+ assertEquals("m1", cm.getGlobalConfiguration().getMachineId());
+
// test default cache
Cache c = cm.getCache();
assert c.getConfiguration().getConcurrencyLevel() == 100;
Modified: branches/4.2.x/core/src/test/java/org/infinispan/tx/dld/ControlledRpcManager.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/tx/dld/ControlledRpcManager.java 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/test/java/org/infinispan/tx/dld/ControlledRpcManager.java 2010-10-28 19:40:21 UTC (rev 2629)
@@ -108,9 +108,10 @@
realOne.invokeRemotely(recipients, rpc, sync);
}
- public void invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws ReplicationException {
+ public List<Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws ReplicationException {
failIfNeeded();
realOne.invokeRemotely(recipients, rpc, sync, usePriorityQueue);
+ return null;
}
public void invokeRemotelyInFuture(Collection<Address> recipients, ReplicableCommand rpc, NotifyingNotifiableFuture<Object> future) {
Modified: branches/4.2.x/core/src/test/resources/configs/named-cache-test.xml
===================================================================
--- branches/4.2.x/core/src/test/resources/configs/named-cache-test.xml 2010-10-28 17:26:50 UTC (rev 2628)
+++ branches/4.2.x/core/src/test/resources/configs/named-cache-test.xml 2010-10-28 19:40:21 UTC (rev 2629)
@@ -1,8 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<infinispan
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="urn:infinispan:config:4.1 http://www.infinispan.org/schemas/infinispan-config-4.1.xsd"
- xmlns="urn:infinispan:config:4.1">
+ xsi:schemaLocation="urn:infinispan:config:4.2 http://www.infinispan.org/schemas/infinispan-config-4.2.xsd"
+ xmlns="urn:infinispan:config:4.2">
<global>
@@ -32,7 +32,7 @@
</properties>
</replicationQueueScheduledExecutor>
- <transport clusterName="infinispan-cluster" distributedSyncTimeout="50000" nodeName="Jalapeno">
+ <transport clusterName="infinispan-cluster" distributedSyncTimeout="50000" nodeName="Jalapeno" machineId="m1" rackId="r1" siteId="s1">
<!-- Note that the JGroups transport uses sensible defaults if no configuration property is defined. -->
<properties>
<property name="configurationFile" value="config-samples/jgroups-udp.xml"/>
More information about the infinispan-commits
mailing list