[infinispan-commits] Infinispan SVN: r1759 - in trunk/client/hotrod-client/src: main/java/org/infinispan/client/hotrod/impl/consistenthash and 1 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Mon May 10 05:39:55 EDT 2010


Author: mircea.markus
Date: 2010-05-10 05:39:55 -0400 (Mon, 10 May 2010)
New Revision: 1759

Added:
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChange.java
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java
Removed:
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TopologyChangeTest.java
Modified:
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsitentHashV1.java
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java
Log:
updated test


Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java	2010-05-07 14:23:44 UTC (rev 1758)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java	2010-05-10 09:39:55 UTC (rev 1759)
@@ -363,7 +363,12 @@
          log.info("New topology: " + servers2HashCode);
       }
       transportFactory.updateServers(servers2HashCode.keySet());
-      transportFactory.updateHashFunction(servers2HashCode, numKeyOwners, hashFunctionVersion, hashSpace);
+      if (hashFunctionVersion == 0) {
+         if (log.isTraceEnabled())
+            log.trace("Not using a consistent hash function (hash function version == 0).");
+      } else {
+         transportFactory.updateHashFunction(servers2HashCode, numKeyOwners, hashFunctionVersion, hashSpace);
+      }
    }
 
    private void checkForErrorsInResponseStatus(short status, long messageId, Transport transport) {

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsitentHashV1.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsitentHashV1.java	2010-05-07 14:23:44 UTC (rev 1758)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsitentHashV1.java	2010-05-10 09:39:55 UTC (rev 1759)
@@ -4,12 +4,12 @@
 import org.infinispan.util.logging.LogFactory;
 
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
 
+import static org.infinispan.util.hash.MurmurHash2.hash;
+
 /**
  * // TODO: Document this
  *
@@ -20,29 +20,40 @@
 
    private static Log log = LogFactory.getLog(ConsitentHashV1.class);
    
-   private volatile Map<InetSocketAddress, Integer> servers2HashCode;
+   private final SortedMap<Integer, InetSocketAddress> positions = new TreeMap<Integer, InetSocketAddress>();
 
-   private volatile int numKeyOwners;
-
    private volatile int hashSpace;
 
-   private Random random = new Random();
-   List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
-
    @Override
    public void init(LinkedHashMap<InetSocketAddress,Integer> servers2HashCode, int numKeyOwners, int hashSpace) {
-      this.servers2HashCode = servers2HashCode;
-      this.numKeyOwners = numKeyOwners;
+      for (InetSocketAddress addr :servers2HashCode.keySet()) {
+         positions.put(servers2HashCode.get(addr), addr);
+      }
+      if (log.isTraceEnabled())
+         log.trace("Positions are: " + positions);
       this.hashSpace = hashSpace;
-      addresses.addAll(servers2HashCode.keySet());
    }
 
    @Override
    public InetSocketAddress getServer(byte[] key) {
-      InetSocketAddress addr = addresses.get(random.nextInt(addresses.size()));
+      int keyHashCode = hash(key);
+      if (keyHashCode == Integer.MIN_VALUE) keyHashCode += 1;
+      int hash = Math.abs(keyHashCode);
+
+      SortedMap<Integer, InetSocketAddress> candidates = positions.tailMap(hash % hashSpace);
       if (log.isTraceEnabled()) {
-         log.trace("Randomly returning an address: " + addr);
+         log.trace("Found possible candidates: " + candidates);
       }
-      return addr;
+      if (candidates.isEmpty()) {
+         InetSocketAddress socketAddress = positions.get(positions.firstKey());
+         if (log.isTraceEnabled()) {
+            log.trace("Over the wheel, returning first member: " + socketAddress);
+         }
+         return socketAddress;
+      } else {
+         InetSocketAddress socketAddress = candidates.get(candidates.firstKey());
+         log.trace("Found candidate: " + socketAddress);
+         return socketAddress;
+      }
    }
 }

Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java	2010-05-07 14:23:44 UTC (rev 1758)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java	2010-05-10 09:39:55 UTC (rev 1759)
@@ -2,11 +2,14 @@
 
 import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
 import org.infinispan.config.Configuration;
+import org.infinispan.distribution.BaseDistFunctionalTest;
+import org.infinispan.lifecycle.ComponentStatus;
 import org.infinispan.manager.CacheManager;
 import org.infinispan.server.hotrod.HotRodServer;
 import org.infinispan.test.MultipleCacheManagersTest;
 import org.infinispan.test.TestingUtil;
 import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
 /**
@@ -20,33 +23,42 @@
 
    private HotRodServer hotRodServer1;
    private HotRodServer hotRodServer2;
+   private HotRodServer hotRodServer3;
    private RemoteCacheManager remoteCacheManager;
    private RemoteCache<Object, Object> remoteCache;
    private TcpTransportFactory tcpConnectionFactory;
+   private static final String CACHE_NAME = "a";
 
+   @AfterMethod
+   @Override
+   protected void clearContent() throws Throwable {
+   }
 
    @Override
    protected void createCacheManagers() throws Throwable {
       cleanup = CleanupPhase.AFTER_METHOD;
-      Configuration config = TestHelper.getMultiNodeConfig();
-      CacheManager cm1 = TestCacheManagerFactory.createClusteredCacheManager(config);
-      CacheManager cm2 = TestCacheManagerFactory.createClusteredCacheManager(config);
-      registerCacheManager(cm1);
-      registerCacheManager(cm2);
+      Configuration config = getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC);
+      CacheManager cm1 = addClusterEnabledCacheManager();
+      CacheManager cm2 = addClusterEnabledCacheManager();
+      CacheManager cm3 = addClusterEnabledCacheManager();
+      cm1.defineConfiguration(CACHE_NAME, config);
+      cm2.defineConfiguration(CACHE_NAME, config);
+      cm3.defineConfiguration(CACHE_NAME, config);
 
+//
+      BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(manager(0).getCache(CACHE_NAME), manager(1).getCache(CACHE_NAME), manager(2).getCache(CACHE_NAME));
+
       hotRodServer1 = TestHelper.startHotRodServer(manager(0));
       hotRodServer2 = TestHelper.startHotRodServer(manager(1));
+      hotRodServer3 = TestHelper.startHotRodServer(manager(2));
 
-      manager(0).getCache();
-      manager(1).getCache();
 
-      TestingUtil.blockUntilViewReceived(manager(0).getCache(), 2, 10000);
-      TestingUtil.blockUntilViewReceived(manager(1).getCache(), 2, 10000);
+      manager(0).getCache(CACHE_NAME).put("k","v");
+      manager(0).getCache(CACHE_NAME).get("k").equals("v");
+      manager(1).getCache(CACHE_NAME).get("k").equals("v");
+      manager(2).getCache(CACHE_NAME).get("k").equals("v");
 
 
-      manager(0).getCache().put("k","v");
-      manager(0).getCache().get("k").equals("v");
-      manager(1).getCache().get("k").equals("v");
 
       log.info("Local replication test passed!");
 
@@ -57,11 +69,11 @@
       tcpConnectionFactory = (TcpTransportFactory) TestingUtil.extractField(remoteCacheManager, "transportFactory");
    }
 
-   public void testPing() {
-      assert tcpConnectionFactory.getServers().size() == 1;
-      remoteCache.ping();
-      assert tcpConnectionFactory.getServers().size() == 2;
-   }
+//   public void testPing() {
+//      assert tcpConnectionFactory.getServers().size() == 1;
+//      remoteCache.ping();
+//      assert tcpConnectionFactory.getServers().size() == 3;
+//   }
 
    public void testHashInfoRetrieved() {
       remoteCache.put("k", "v");

Added: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChange.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChange.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChange.java	2010-05-10 09:39:55 UTC (rev 1759)
@@ -0,0 +1,15 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.config.Configuration;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class DistTopologyChange extends ReplTopologyChangeTest {
+   protected Configuration.CacheMode getCacheMode() {
+      return Configuration.CacheMode.DIST_SYNC;
+   }
+}

Copied: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java (from rev 1757, trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TopologyChangeTest.java)
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java	2010-05-10 09:39:55 UTC (rev 1759)
@@ -0,0 +1,131 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.Cache;
+import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
+import org.infinispan.config.Configuration;
+import org.infinispan.lifecycle.ComponentStatus;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import java.net.InetSocketAddress;
+
+import static org.testng.AssertJUnit.assertEquals;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test (testName = "client.hotrod.MultipleCacheManagersTest", groups = "functional")
+public class ReplTopologyChangeTest extends MultipleCacheManagersTest {
+
+   HotRodServer hotRodServer1;
+   HotRodServer hotRodServer2;
+   HotRodServer hotRodServer3;
+
+   RemoteCache remoteCache;
+   private RemoteCacheManager remoteCacheManager;
+   private TcpTransportFactory tcpConnectionFactory;
+   private Configuration config;
+
+   @Override
+   protected void assertSupportedConfig() {      
+   }
+
+   @AfterMethod
+   @Override
+   protected void clearContent() throws Throwable {      
+   }
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      config = getDefaultClusteredConfig(getCacheMode());
+      CacheManager cm1 = TestCacheManagerFactory.createClusteredCacheManager(config);
+      CacheManager cm2 = TestCacheManagerFactory.createClusteredCacheManager(config);
+      registerCacheManager(cm1);
+      registerCacheManager(cm2);
+
+      hotRodServer1 = TestHelper.startHotRodServer(manager(0));
+      hotRodServer2 = TestHelper.startHotRodServer(manager(1));
+
+      manager(0).getCache();
+      manager(1).getCache();
+
+      TestingUtil.blockUntilViewReceived(manager(0).getCache(), 2, 10000);
+      TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
+      TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
+
+
+      manager(0).getCache().put("k","v");
+      manager(0).getCache().get("k").equals("v");
+      manager(1).getCache().get("k").equals("v");
+
+      log.info("Local replication test passed!");
+
+      //Important: this only connects to one of the two servers!
+      remoteCacheManager = new RemoteCacheManager("localhost", hotRodServer2.getPort());
+      remoteCache = remoteCacheManager.getCache();
+
+      tcpConnectionFactory = (TcpTransportFactory) TestingUtil.extractField(remoteCacheManager, "transportFactory");
+   }
+
+   protected Configuration.CacheMode getCacheMode() {
+      return Configuration.CacheMode.REPL_SYNC;
+   }
+
+
+   public void testTwoMembers() {
+      InetSocketAddress server1Address = new InetSocketAddress("localhost", hotRodServer1.getPort());
+      expectTopologyChange(server1Address, true);
+      assertEquals(2, tcpConnectionFactory.getServers().size());
+   }
+
+   @Test(dependsOnMethods = "testTwoMembers")
+   public void testAddNewServer() {
+      CacheManager cm3 = TestCacheManagerFactory.createClusteredCacheManager(config);
+      registerCacheManager(cm3);
+      hotRodServer3 = TestHelper.startHotRodServer(manager(2));
+      manager(2).getCache();
+
+      TestingUtil.blockUntilViewsReceived(10000, true, manager(0), manager(1), manager(2));
+      TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
+      TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
+      TestingUtil.blockUntilCacheStatusAchieved(manager(2).getCache(), ComponentStatus.RUNNING, 10000);
+
+      expectTopologyChange(new InetSocketAddress("localhost",hotRodServer3.getPort()), true);
+      assertEquals(3, tcpConnectionFactory.getServers().size());
+   }
+
+   @Test(dependsOnMethods = "testAddNewServer")
+   public void testDropServer() {
+      hotRodServer3.stop();
+      manager(2).stop();
+      TestingUtil.blockUntilViewsReceived(10000, true, manager(0), manager(1));
+      TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
+      TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
+
+      InetSocketAddress server3Address = new InetSocketAddress("localhost", hotRodServer3.getPort());
+      expectTopologyChange(server3Address, false);
+      assertEquals(2, tcpConnectionFactory.getServers().size());
+   }
+
+   private void expectTopologyChange(InetSocketAddress server1Address, boolean added) {
+      for (int i = 0; i < 10; i++) {
+         try {
+            remoteCache.put("k" + i, "v" + i);
+         } catch (Exception e) {
+            if (added) {
+               throw new IllegalStateException(e);
+            } //else it is acceptable, as the transport hasn't changed
+         }
+         if (added == tcpConnectionFactory.getServers().contains(server1Address)) break;
+      }
+      assertEquals(server1Address + " not found", added, tcpConnectionFactory.getServers().contains(server1Address));
+   }
+}

Deleted: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TopologyChangeTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TopologyChangeTest.java	2010-05-07 14:23:44 UTC (rev 1758)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TopologyChangeTest.java	2010-05-10 09:39:55 UTC (rev 1759)
@@ -1,123 +0,0 @@
-package org.infinispan.client.hotrod;
-
-import org.infinispan.Cache;
-import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
-import org.infinispan.config.Configuration;
-import org.infinispan.lifecycle.ComponentStatus;
-import org.infinispan.manager.CacheManager;
-import org.infinispan.server.hotrod.HotRodServer;
-import org.infinispan.test.MultipleCacheManagersTest;
-import org.infinispan.test.TestingUtil;
-import org.infinispan.test.fwk.TestCacheManagerFactory;
-import org.testng.annotations.Test;
-
-import java.net.InetSocketAddress;
-
-import static org.testng.AssertJUnit.assertEquals;
-
-/**
- * // TODO: Document this
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
- at Test (testName = "client.hotrod.MultipleCacheManagersTest", groups = "functional")
-public class TopologyChangeTest extends MultipleCacheManagersTest {
-
-   HotRodServer hotRodServer1;
-   HotRodServer hotRodServer2;
-   HotRodServer hotRodServer3;
-
-   RemoteCache remoteCache;
-   private RemoteCacheManager remoteCacheManager;
-   private TcpTransportFactory tcpConnectionFactory;
-   private Configuration config;
-
-   @Override
-   protected void assertSupportedConfig() {      
-   }
-
-   @Override
-   protected void createCacheManagers() throws Throwable {
-      config = getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC);
-      CacheManager cm1 = TestCacheManagerFactory.createClusteredCacheManager(config);
-      CacheManager cm2 = TestCacheManagerFactory.createClusteredCacheManager(config);
-      registerCacheManager(cm1);
-      registerCacheManager(cm2);
-
-      hotRodServer1 = TestHelper.startHotRodServer(manager(0));
-      hotRodServer2 = TestHelper.startHotRodServer(manager(1));
-
-      manager(0).getCache();
-      manager(1).getCache();
-
-      TestingUtil.blockUntilViewReceived(manager(0).getCache(), 2, 10000);
-      TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
-      TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
-
-
-      manager(0).getCache().put("k","v");
-      manager(0).getCache().get("k").equals("v");
-      manager(1).getCache().get("k").equals("v");
-
-      log.info("Local replication test passed!");
-
-      //Important: this only connects to one of the two servers!
-      remoteCacheManager = new RemoteCacheManager("localhost", hotRodServer2.getPort());
-      remoteCache = remoteCacheManager.getCache();
-
-      tcpConnectionFactory = (TcpTransportFactory) TestingUtil.extractField(remoteCacheManager, "transportFactory");
-   }
-
-
-   public void testTwoMembers() {
-      InetSocketAddress server1Address = new InetSocketAddress("localhost", hotRodServer1.getPort());
-      expectTopologyChange(server1Address, true);
-      assertEquals(2, tcpConnectionFactory.getServers().size());
-   }
-
-   @Test(dependsOnMethods = "testTwoMembers")
-   public void testAddNewServer() {
-      CacheManager cm3 = TestCacheManagerFactory.createClusteredCacheManager(config);
-      registerCacheManager(cm3);
-      hotRodServer3 = TestHelper.startHotRodServer(manager(2));
-      manager(2).getCache();
-
-      TestingUtil.blockUntilViewsReceived(10000, true, manager(0), manager(1), manager(2));
-      TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
-      TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
-      TestingUtil.blockUntilCacheStatusAchieved(manager(2).getCache(), ComponentStatus.RUNNING, 10000);
-
-      expectTopologyChange(new InetSocketAddress("localhost",hotRodServer3.getPort()), true);
-      assertEquals(3, tcpConnectionFactory.getServers().size());
-   }
-
-   @Test(dependsOnMethods = "testAddNewServer")
-   public void testDropServer() {
-      manager(2).stop();
-      TestingUtil.blockUntilViewsReceived(10000, true, manager(0), manager(1));
-      TestingUtil.blockUntilViewsReceived(10000, true, manager(0), manager(1), manager(2));
-      TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
-      TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
-      
-      InetSocketAddress server3Address = new InetSocketAddress("localhost", hotRodServer3.getPort());
-      hotRodServer3.stop();
-      expectTopologyChange(server3Address, false);
-      assertEquals(2, tcpConnectionFactory.getServers().size());
-   }
-
-   private void expectTopologyChange(InetSocketAddress server1Address, boolean added) {
-      for (int i = 0; i < 10; i++) {
-         try {
-            remoteCache.put("k" + i, "v" + i);
-//            remoteCache.ping();
-         } catch (Exception e) {
-            if (added) {
-               throw new IllegalStateException(e);
-            } //else it is acceptable, as the transport hasn't changed
-         }
-         if (added == tcpConnectionFactory.getServers().contains(server1Address)) break;
-      }
-      assertEquals(server1Address + " not found", added, tcpConnectionFactory.getServers().contains(server1Address));
-   }
-}



More information about the infinispan-commits mailing list