[infinispan-commits] Infinispan SVN: r1759 - in trunk/client/hotrod-client/src: main/java/org/infinispan/client/hotrod/impl/consistenthash and 1 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Mon May 10 05:39:55 EDT 2010
Author: mircea.markus
Date: 2010-05-10 05:39:55 -0400 (Mon, 10 May 2010)
New Revision: 1759
Added:
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChange.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java
Removed:
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TopologyChangeTest.java
Modified:
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsitentHashV1.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java
Log:
updated test
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java 2010-05-07 14:23:44 UTC (rev 1758)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java 2010-05-10 09:39:55 UTC (rev 1759)
@@ -363,7 +363,12 @@
log.info("New topology: " + servers2HashCode);
}
transportFactory.updateServers(servers2HashCode.keySet());
- transportFactory.updateHashFunction(servers2HashCode, numKeyOwners, hashFunctionVersion, hashSpace);
+ if (hashFunctionVersion == 0) {
+ if (log.isTraceEnabled())
+ log.trace("Not using a consistent hash function (hash function version == 0).");
+ } else {
+ transportFactory.updateHashFunction(servers2HashCode, numKeyOwners, hashFunctionVersion, hashSpace);
+ }
}
private void checkForErrorsInResponseStatus(short status, long messageId, Transport transport) {
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsitentHashV1.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsitentHashV1.java 2010-05-07 14:23:44 UTC (rev 1758)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsitentHashV1.java 2010-05-10 09:39:55 UTC (rev 1759)
@@ -4,12 +4,12 @@
import org.infinispan.util.logging.LogFactory;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import static org.infinispan.util.hash.MurmurHash2.hash;
+
/**
* // TODO: Document this
*
@@ -20,29 +20,40 @@
private static Log log = LogFactory.getLog(ConsitentHashV1.class);
- private volatile Map<InetSocketAddress, Integer> servers2HashCode;
+ private final SortedMap<Integer, InetSocketAddress> positions = new TreeMap<Integer, InetSocketAddress>();
- private volatile int numKeyOwners;
-
private volatile int hashSpace;
- private Random random = new Random();
- List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
-
@Override
public void init(LinkedHashMap<InetSocketAddress,Integer> servers2HashCode, int numKeyOwners, int hashSpace) {
- this.servers2HashCode = servers2HashCode;
- this.numKeyOwners = numKeyOwners;
+ for (InetSocketAddress addr :servers2HashCode.keySet()) {
+ positions.put(servers2HashCode.get(addr), addr);
+ }
+ if (log.isTraceEnabled())
+ log.trace("Positions are: " + positions);
this.hashSpace = hashSpace;
- addresses.addAll(servers2HashCode.keySet());
}
@Override
public InetSocketAddress getServer(byte[] key) {
- InetSocketAddress addr = addresses.get(random.nextInt(addresses.size()));
+ int keyHashCode = hash(key);
+ if (keyHashCode == Integer.MIN_VALUE) keyHashCode += 1;
+ int hash = Math.abs(keyHashCode);
+
+ SortedMap<Integer, InetSocketAddress> candidates = positions.tailMap(hash % hashSpace);
if (log.isTraceEnabled()) {
- log.trace("Randomly returning an address: " + addr);
+ log.trace("Found possible candidates: " + candidates);
}
- return addr;
+ if (candidates.isEmpty()) {
+ InetSocketAddress socketAddress = positions.get(positions.firstKey());
+ if (log.isTraceEnabled()) {
+ log.trace("Over the wheel, returning first member: " + socketAddress);
+ }
+ return socketAddress;
+ } else {
+ InetSocketAddress socketAddress = candidates.get(candidates.firstKey());
+ log.trace("Found candidate: " + socketAddress);
+ return socketAddress;
+ }
}
}
Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java 2010-05-07 14:23:44 UTC (rev 1758)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java 2010-05-10 09:39:55 UTC (rev 1759)
@@ -2,11 +2,14 @@
import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
import org.infinispan.config.Configuration;
+import org.infinispan.distribution.BaseDistFunctionalTest;
+import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.manager.CacheManager;
import org.infinispan.server.hotrod.HotRodServer;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
/**
@@ -20,33 +23,42 @@
private HotRodServer hotRodServer1;
private HotRodServer hotRodServer2;
+ private HotRodServer hotRodServer3;
private RemoteCacheManager remoteCacheManager;
private RemoteCache<Object, Object> remoteCache;
private TcpTransportFactory tcpConnectionFactory;
+ private static final String CACHE_NAME = "a";
+ @AfterMethod
+ @Override
+ protected void clearContent() throws Throwable {
+ }
@Override
protected void createCacheManagers() throws Throwable {
cleanup = CleanupPhase.AFTER_METHOD;
- Configuration config = TestHelper.getMultiNodeConfig();
- CacheManager cm1 = TestCacheManagerFactory.createClusteredCacheManager(config);
- CacheManager cm2 = TestCacheManagerFactory.createClusteredCacheManager(config);
- registerCacheManager(cm1);
- registerCacheManager(cm2);
+ Configuration config = getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC);
+ CacheManager cm1 = addClusterEnabledCacheManager();
+ CacheManager cm2 = addClusterEnabledCacheManager();
+ CacheManager cm3 = addClusterEnabledCacheManager();
+ cm1.defineConfiguration(CACHE_NAME, config);
+ cm2.defineConfiguration(CACHE_NAME, config);
+ cm3.defineConfiguration(CACHE_NAME, config);
+//
+ BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(manager(0).getCache(CACHE_NAME), manager(1).getCache(CACHE_NAME), manager(2).getCache(CACHE_NAME));
+
hotRodServer1 = TestHelper.startHotRodServer(manager(0));
hotRodServer2 = TestHelper.startHotRodServer(manager(1));
+ hotRodServer3 = TestHelper.startHotRodServer(manager(2));
- manager(0).getCache();
- manager(1).getCache();
- TestingUtil.blockUntilViewReceived(manager(0).getCache(), 2, 10000);
- TestingUtil.blockUntilViewReceived(manager(1).getCache(), 2, 10000);
+ manager(0).getCache(CACHE_NAME).put("k","v");
+ manager(0).getCache(CACHE_NAME).get("k").equals("v");
+ manager(1).getCache(CACHE_NAME).get("k").equals("v");
+ manager(2).getCache(CACHE_NAME).get("k").equals("v");
- manager(0).getCache().put("k","v");
- manager(0).getCache().get("k").equals("v");
- manager(1).getCache().get("k").equals("v");
log.info("Local replication test passed!");
@@ -57,11 +69,11 @@
tcpConnectionFactory = (TcpTransportFactory) TestingUtil.extractField(remoteCacheManager, "transportFactory");
}
- public void testPing() {
- assert tcpConnectionFactory.getServers().size() == 1;
- remoteCache.ping();
- assert tcpConnectionFactory.getServers().size() == 2;
- }
+// public void testPing() {
+// assert tcpConnectionFactory.getServers().size() == 1;
+// remoteCache.ping();
+// assert tcpConnectionFactory.getServers().size() == 3;
+// }
public void testHashInfoRetrieved() {
remoteCache.put("k", "v");
Added: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChange.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChange.java (rev 0)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChange.java 2010-05-10 09:39:55 UTC (rev 1759)
@@ -0,0 +1,15 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.config.Configuration;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class DistTopologyChange extends ReplTopologyChangeTest {
+ protected Configuration.CacheMode getCacheMode() {
+ return Configuration.CacheMode.DIST_SYNC;
+ }
+}
Copied: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java (from rev 1757, trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TopologyChangeTest.java)
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java (rev 0)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java 2010-05-10 09:39:55 UTC (rev 1759)
@@ -0,0 +1,131 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.Cache;
+import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
+import org.infinispan.config.Configuration;
+import org.infinispan.lifecycle.ComponentStatus;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import java.net.InetSocketAddress;
+
+import static org.testng.AssertJUnit.assertEquals;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test (testName = "client.hotrod.MultipleCacheManagersTest", groups = "functional")
+public class ReplTopologyChangeTest extends MultipleCacheManagersTest {
+
+ HotRodServer hotRodServer1;
+ HotRodServer hotRodServer2;
+ HotRodServer hotRodServer3;
+
+ RemoteCache remoteCache;
+ private RemoteCacheManager remoteCacheManager;
+ private TcpTransportFactory tcpConnectionFactory;
+ private Configuration config;
+
+ @Override
+ protected void assertSupportedConfig() {
+ }
+
+ @AfterMethod
+ @Override
+ protected void clearContent() throws Throwable {
+ }
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ config = getDefaultClusteredConfig(getCacheMode());
+ CacheManager cm1 = TestCacheManagerFactory.createClusteredCacheManager(config);
+ CacheManager cm2 = TestCacheManagerFactory.createClusteredCacheManager(config);
+ registerCacheManager(cm1);
+ registerCacheManager(cm2);
+
+ hotRodServer1 = TestHelper.startHotRodServer(manager(0));
+ hotRodServer2 = TestHelper.startHotRodServer(manager(1));
+
+ manager(0).getCache();
+ manager(1).getCache();
+
+ TestingUtil.blockUntilViewReceived(manager(0).getCache(), 2, 10000);
+ TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
+ TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
+
+
+ manager(0).getCache().put("k","v");
+ manager(0).getCache().get("k").equals("v");
+ manager(1).getCache().get("k").equals("v");
+
+ log.info("Local replication test passed!");
+
+ //Important: this only connects to one of the two servers!
+ remoteCacheManager = new RemoteCacheManager("localhost", hotRodServer2.getPort());
+ remoteCache = remoteCacheManager.getCache();
+
+ tcpConnectionFactory = (TcpTransportFactory) TestingUtil.extractField(remoteCacheManager, "transportFactory");
+ }
+
+ protected Configuration.CacheMode getCacheMode() {
+ return Configuration.CacheMode.REPL_SYNC;
+ }
+
+
+ public void testTwoMembers() {
+ InetSocketAddress server1Address = new InetSocketAddress("localhost", hotRodServer1.getPort());
+ expectTopologyChange(server1Address, true);
+ assertEquals(2, tcpConnectionFactory.getServers().size());
+ }
+
+ @Test(dependsOnMethods = "testTwoMembers")
+ public void testAddNewServer() {
+ CacheManager cm3 = TestCacheManagerFactory.createClusteredCacheManager(config);
+ registerCacheManager(cm3);
+ hotRodServer3 = TestHelper.startHotRodServer(manager(2));
+ manager(2).getCache();
+
+ TestingUtil.blockUntilViewsReceived(10000, true, manager(0), manager(1), manager(2));
+ TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
+ TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
+ TestingUtil.blockUntilCacheStatusAchieved(manager(2).getCache(), ComponentStatus.RUNNING, 10000);
+
+ expectTopologyChange(new InetSocketAddress("localhost",hotRodServer3.getPort()), true);
+ assertEquals(3, tcpConnectionFactory.getServers().size());
+ }
+
+ @Test(dependsOnMethods = "testAddNewServer")
+ public void testDropServer() {
+ hotRodServer3.stop();
+ manager(2).stop();
+ TestingUtil.blockUntilViewsReceived(10000, true, manager(0), manager(1));
+ TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
+ TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
+
+ InetSocketAddress server3Address = new InetSocketAddress("localhost", hotRodServer3.getPort());
+ expectTopologyChange(server3Address, false);
+ assertEquals(2, tcpConnectionFactory.getServers().size());
+ }
+
+ private void expectTopologyChange(InetSocketAddress server1Address, boolean added) {
+ for (int i = 0; i < 10; i++) {
+ try {
+ remoteCache.put("k" + i, "v" + i);
+ } catch (Exception e) {
+ if (added) {
+ throw new IllegalStateException(e);
+ } //else it is acceptable, as the transport hasn't changed
+ }
+ if (added == tcpConnectionFactory.getServers().contains(server1Address)) break;
+ }
+ assertEquals(server1Address + " not found", added, tcpConnectionFactory.getServers().contains(server1Address));
+ }
+}
Deleted: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TopologyChangeTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TopologyChangeTest.java 2010-05-07 14:23:44 UTC (rev 1758)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TopologyChangeTest.java 2010-05-10 09:39:55 UTC (rev 1759)
@@ -1,123 +0,0 @@
-package org.infinispan.client.hotrod;
-
-import org.infinispan.Cache;
-import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
-import org.infinispan.config.Configuration;
-import org.infinispan.lifecycle.ComponentStatus;
-import org.infinispan.manager.CacheManager;
-import org.infinispan.server.hotrod.HotRodServer;
-import org.infinispan.test.MultipleCacheManagersTest;
-import org.infinispan.test.TestingUtil;
-import org.infinispan.test.fwk.TestCacheManagerFactory;
-import org.testng.annotations.Test;
-
-import java.net.InetSocketAddress;
-
-import static org.testng.AssertJUnit.assertEquals;
-
-/**
- * // TODO: Document this
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
- at Test (testName = "client.hotrod.MultipleCacheManagersTest", groups = "functional")
-public class TopologyChangeTest extends MultipleCacheManagersTest {
-
- HotRodServer hotRodServer1;
- HotRodServer hotRodServer2;
- HotRodServer hotRodServer3;
-
- RemoteCache remoteCache;
- private RemoteCacheManager remoteCacheManager;
- private TcpTransportFactory tcpConnectionFactory;
- private Configuration config;
-
- @Override
- protected void assertSupportedConfig() {
- }
-
- @Override
- protected void createCacheManagers() throws Throwable {
- config = getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC);
- CacheManager cm1 = TestCacheManagerFactory.createClusteredCacheManager(config);
- CacheManager cm2 = TestCacheManagerFactory.createClusteredCacheManager(config);
- registerCacheManager(cm1);
- registerCacheManager(cm2);
-
- hotRodServer1 = TestHelper.startHotRodServer(manager(0));
- hotRodServer2 = TestHelper.startHotRodServer(manager(1));
-
- manager(0).getCache();
- manager(1).getCache();
-
- TestingUtil.blockUntilViewReceived(manager(0).getCache(), 2, 10000);
- TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
- TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
-
-
- manager(0).getCache().put("k","v");
- manager(0).getCache().get("k").equals("v");
- manager(1).getCache().get("k").equals("v");
-
- log.info("Local replication test passed!");
-
- //Important: this only connects to one of the two servers!
- remoteCacheManager = new RemoteCacheManager("localhost", hotRodServer2.getPort());
- remoteCache = remoteCacheManager.getCache();
-
- tcpConnectionFactory = (TcpTransportFactory) TestingUtil.extractField(remoteCacheManager, "transportFactory");
- }
-
-
- public void testTwoMembers() {
- InetSocketAddress server1Address = new InetSocketAddress("localhost", hotRodServer1.getPort());
- expectTopologyChange(server1Address, true);
- assertEquals(2, tcpConnectionFactory.getServers().size());
- }
-
- @Test(dependsOnMethods = "testTwoMembers")
- public void testAddNewServer() {
- CacheManager cm3 = TestCacheManagerFactory.createClusteredCacheManager(config);
- registerCacheManager(cm3);
- hotRodServer3 = TestHelper.startHotRodServer(manager(2));
- manager(2).getCache();
-
- TestingUtil.blockUntilViewsReceived(10000, true, manager(0), manager(1), manager(2));
- TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
- TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
- TestingUtil.blockUntilCacheStatusAchieved(manager(2).getCache(), ComponentStatus.RUNNING, 10000);
-
- expectTopologyChange(new InetSocketAddress("localhost",hotRodServer3.getPort()), true);
- assertEquals(3, tcpConnectionFactory.getServers().size());
- }
-
- @Test(dependsOnMethods = "testAddNewServer")
- public void testDropServer() {
- manager(2).stop();
- TestingUtil.blockUntilViewsReceived(10000, true, manager(0), manager(1));
- TestingUtil.blockUntilViewsReceived(10000, true, manager(0), manager(1), manager(2));
- TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
- TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
-
- InetSocketAddress server3Address = new InetSocketAddress("localhost", hotRodServer3.getPort());
- hotRodServer3.stop();
- expectTopologyChange(server3Address, false);
- assertEquals(2, tcpConnectionFactory.getServers().size());
- }
-
- private void expectTopologyChange(InetSocketAddress server1Address, boolean added) {
- for (int i = 0; i < 10; i++) {
- try {
- remoteCache.put("k" + i, "v" + i);
-// remoteCache.ping();
- } catch (Exception e) {
- if (added) {
- throw new IllegalStateException(e);
- } //else it is acceptable, as the transport hasn't changed
- }
- if (added == tcpConnectionFactory.getServers().contains(server1Address)) break;
- }
- assertEquals(server1Address + " not found", added, tcpConnectionFactory.getServers().contains(server1Address));
- }
-}
More information about the infinispan-commits
mailing list