[infinispan-commits] Infinispan SVN: r1867 - in trunk/core/src: test/java/org/infinispan and 3 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu May 27 14:03:10 EDT 2010


Author: mircea.markus
Date: 2010-05-27 14:03:09 -0400 (Thu, 27 May 2010)
New Revision: 1867

Added:
   trunk/core/src/main/java/org/infinispan/affinity/ListenerRegistration.java
   trunk/core/src/test/java/org/infinispan/affinity/
   trunk/core/src/test/java/org/infinispan/affinity/BaseFilterKeyAffinityServiceTest.java
   trunk/core/src/test/java/org/infinispan/affinity/BaseKeyAffinityServiceTest.java
   trunk/core/src/test/java/org/infinispan/affinity/FilteredKeyAffinityService.java
   trunk/core/src/test/java/org/infinispan/affinity/KeyAffinityServiceShutdownTest.java
   trunk/core/src/test/java/org/infinispan/affinity/KeyAffinityServiceTest.java
   trunk/core/src/test/java/org/infinispan/affinity/LocalKeyAffinityServiceTest.java
Removed:
   trunk/core/src/main/java/org/infinispan/affinity/TopologyChangeListener.java
Modified:
   trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityService.java
   trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceFactory.java
   trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceImpl.java
   trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
   trunk/core/src/test/java/org/infinispan/distribution/rehash/WorkDuringJoinTest.java
Log:
added tests and various fixes for affinity service.

Modified: trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityService.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityService.java	2010-05-27 17:22:26 UTC (rev 1866)
+++ trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityService.java	2010-05-27 18:03:09 UTC (rev 1867)
@@ -38,6 +38,7 @@
     * Returns a key that will be distributed on the cluster node identified by address.
     * @param address identifying the cluster node.
     * @return a key object
+    * @throws IllegalStateException if the service has not been started or it is shutdown
     */
    K getKeyForAddress(Address address);
 
@@ -45,6 +46,12 @@
     * Returns a key that will be distributed on the same node as the supplied key.
     * @param otherKey the key for which we need a collocation
     * @return a key object
+    * @throws IllegalStateException if the service has not been started or it is shutdown
     */
    K getCollocatedKey(K otherKey);
+
+   /**
+    * Checks weather or not the service is started.
+    */
+   public boolean isStarted();
 }

Modified: trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceFactory.java	2010-05-27 17:22:26 UTC (rev 1866)
+++ trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceFactory.java	2010-05-27 18:03:09 UTC (rev 1867)
@@ -33,7 +33,8 @@
     * generating keys for it.
     *
     * @param cache         the distributed cache for which this service runs
-    * @param ex            used for running async key generation process.
+    * @param ex            used for running async key generation process. On service shutdown, the executor won't be
+    *                      stopped; i.e. it's user responsibility manage it's lifecycle.
     * @param keyGenerator  allows one to control how the generated keys look like.
     * @param keyBufferSize the number of generated keys per {@link org.infinispan.remoting.transport.Address}.
     * @param start         weather to start the service or not
@@ -75,11 +76,11 @@
    public static <K, V> KeyAffinityService newLocalKeyAffinityService(Cache<K, V> cache, KeyGenerator keyGenerator, Executor ex, int keyBufferSize, boolean start) {
       Address localAddress = cache.getAdvancedCache().getRpcManager().getTransport().getAddress();
       Collection<Address> forAddresses = Collections.singletonList(localAddress);
-      return newKeyAffinityService(cache,forAddresses, keyGenerator, ex, keyBufferSize, start);
+      return newKeyAffinityService(cache, forAddresses, keyGenerator, ex, keyBufferSize, start);
    }
 
    /**
-    * Same as {@link #newLocalKeyAffinityService(org.infinispan.Cache, KeyGenerator, java.util.concurrent.Executor, int)} with start == true.
+    * Same as {@link #newLocalKeyAffinityService(org.infinispan.Cache, KeyGenerator, java.util.concurrent.Executor, int, boolean)} with start == true.
     */
    public static <K, V> KeyAffinityService newLocalKeyAffinityService(Cache<K, V> cache, KeyGenerator keyGenerator, Executor ex, int keyBufferSize) {
       return newLocalKeyAffinityService(cache, keyGenerator, ex, keyBufferSize, true);

Modified: trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceImpl.java	2010-05-27 17:22:26 UTC (rev 1866)
+++ trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceImpl.java	2010-05-27 18:03:09 UTC (rev 1867)
@@ -5,8 +5,11 @@
 import org.infinispan.Cache;
 import org.infinispan.distribution.ConsistentHash;
 import org.infinispan.distribution.DistributionManager;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.notifications.cachemanagerlistener.event.CacheStoppedEvent;
 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
 import org.infinispan.remoting.transport.Address;
+import org.infinispan.util.concurrent.ConcurrentHashSet;
 import org.infinispan.util.concurrent.ReclosableLatch;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
@@ -19,14 +22,13 @@
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
- * // TODO: Document this
+ * Implementation of KeyAffinityService.
  *
  * @author Mircea.Markus at jboss.com
  * @since 4.1
@@ -34,13 +36,14 @@
 @ThreadSafe
 public class KeyAffinityServiceImpl implements KeyAffinityService {
 
+   private final float THRESHOLD = 0.5f;
+   
    private static Log log = LogFactory.getLog(KeyAffinityServiceImpl.class);
 
    private final Set<Address> filter;
 
    @GuardedBy("maxNumberInvariant")
    private final Map<Address, BlockingQueue> address2key = new ConcurrentHashMap<Address, BlockingQueue>();
-
    private final Executor executor;
    private final Cache cache;
    private final KeyGenerator keyGenerator;
@@ -55,11 +58,12 @@
     */
    private final ReadWriteLock maxNumberInvariant = new ReentrantReadWriteLock();
 
-
    /**
     * Used for coordinating between the KeyGeneratorWorker and consumers.
     */
    private final ReclosableLatch keyProducerStartLatch = new ReclosableLatch();
+   private volatile KeyGeneratorWorker keyGenWorker;
+   private volatile ListenerRegistration listenerRegistration;
 
 
    public KeyAffinityServiceImpl(Executor executor, Cache cache, KeyGenerator keyGenerator, int bufferSize, Collection<Address> filter, boolean start) {
@@ -68,7 +72,10 @@
       this.keyGenerator = keyGenerator;
       this.bufferSize = bufferSize;
       if (filter != null) {
-         this.filter = new CopyOnWriteArraySet<Address>();
+         this.filter = new ConcurrentHashSet<Address>();
+         for (Address address : filter) {
+            this.filter.add(address);
+         }
       } else {
          this.filter = null;
       }
@@ -102,7 +109,7 @@
          Thread.currentThread().interrupt();
          return null;
       } finally {
-         if (maxNumberOfKeys.equals(exitingNumberOfKeys)) {
+         if (queue.size() < maxNumberOfKeys.get() * THRESHOLD + 1) {
             keyProducerStartLatch.open();
          }
       }
@@ -118,88 +125,153 @@
       maxNumberInvariant.writeLock().lock();
       try {
          addQueuesForAddresses(existingNodes);
-         revisitNumberOfKeys();
+         resetNumberOfKeys();
       } finally {
          maxNumberInvariant.writeLock().unlock();
       }
-      executor.execute(new KeyGeneratorWorker());
-      cache.addListener(new TopologyChangeListener(this));
+      keyGenWorker = new KeyGeneratorWorker();
+      executor.execute(keyGenWorker);
+      listenerRegistration = new ListenerRegistration(this);
+      ((EmbeddedCacheManager)cache.getCacheManager()).addListener(listenerRegistration);
+      cache.addListener(listenerRegistration);
       keyProducerStartLatch.open();
       started = true;
    }
 
    @Override
    public void stop() {
+      if (!started) {
+         log.info("Ignoring call to stop as service is not started.");
+         return;
+      }
+      started = false;
+      EmbeddedCacheManager cacheManager = (EmbeddedCacheManager) cache.getCacheManager();
+      if (cacheManager.getListeners().contains(listenerRegistration)) {
+         cacheManager.removeListener(listenerRegistration);
+      } else {
+         throw new IllegalStateException("Listener must have been registered!");
+      }
+      //most likely the listeners collection is shared between CacheManager and the Cache
+      if (cache.getListeners().contains(listenerRegistration)) {
+         cache.removeListener(listenerRegistration);
+      }
+      keyGenWorker.stop();
    }
 
    public void handleViewChange(ViewChangedEvent vce) {
       if (log.isTraceEnabled()) {
          log.trace("ViewChange received: " + vce);
       }
-      synchronized (address2key) {
-         for (Address address : vce.getOldMembers()) {
-            BlockingQueue queue = address2key.remove(address);
-            if (queue == null) {
-               KeyAffinityServiceImpl.log.warn("Null queue not expected for address: " + address + ". Did we miss a view change?");
-            }
-         }
+      maxNumberInvariant.writeLock().lock();
+      try {
+         address2key.clear(); //wee need to drop everything as key-mapping data is stale due to view change
          addQueuesForAddresses(vce.getNewMembers());
+         resetNumberOfKeys();
+         keyProducerStartLatch.open();
+      } finally {
+         maxNumberInvariant.writeLock().unlock();
       }
    }
 
+   public boolean isKeyGeneratorThreadAlive() {
+      return keyGenWorker.isAlive() ;
+   }
 
+   public void handleCacheStopped(CacheStoppedEvent cse) {
+      if (log.isTraceEnabled()) {
+         log.trace("Cache stopped, stopping the service: " + cse);
+      }
+      stop();
+   }
+
+
    public class KeyGeneratorWorker implements Runnable {
 
+      private volatile boolean isActive;
+      private boolean isAlive;
+      private volatile Thread runner;
+
       @Override
       public void run() {
+         this.runner = Thread.currentThread();
+         isAlive = true;
          while (true) {
-            try {
-               keyProducerStartLatch.await();
-            } catch (InterruptedException e) {
-               if (log.isInfoEnabled()) {
-                  log.info("Shutting down KeyAffinity service for key set: " + filter);
-               }
-               return;
+            if (waitToBeWakenUp()) break;
+            isActive = true;
+            if (log.isTraceEnabled()) {
+               log.trace("KeyGeneratorWorker marked as ACTIVE");
             }
             generateKeys();
+            
+            isActive = false;
+            if (log.isTraceEnabled()) {
+               log.trace("KeyGeneratorWorker marked as INACTIVE");
+            }
          }
+         isAlive = false;
       }
 
       private void generateKeys() {
-         maxNumberInvariant.writeLock().lock();
+         maxNumberInvariant.readLock().lock();
          try {
-            for (Address address : address2key.keySet()) {
+            while (maxNumberOfKeys.get() != exitingNumberOfKeys.get()) {
                Object key = keyGenerator.getKey();
-               tryAddKey(address, key);
-               if (maxNumberOfKeys.equals(exitingNumberOfKeys)) {
-                  keyProducerStartLatch.close();
-                  return;
+               Address addressForKey = getAddressForKey(key);
+               if (interestedInAddress(addressForKey)) {
+                  tryAddKey(addressForKey, key);
                }
             }
+            keyProducerStartLatch.close();
          } finally {
-            maxNumberInvariant.writeLock().unlock();
+            maxNumberInvariant.readLock().unlock();
          }
       }
 
+      private boolean waitToBeWakenUp() {
+         try {
+            keyProducerStartLatch.await();
+         } catch (InterruptedException e) {
+            if (log.isInfoEnabled()) {
+               log.info("Shutting down KeyAffinity service for key set: " + filter);
+            }
+            return true;
+         }
+         return false;
+      }
+
       private void tryAddKey(Address address, Object key) {
          BlockingQueue queue = address2key.get(address);
          boolean added = queue.offer(key);
+         if (added) {
+            exitingNumberOfKeys.incrementAndGet();
+         }
          if (log.isTraceEnabled()) {
-            log.trace((added ? "Successfully " : "Not") + "added key(" + key + ") to the address(" + address + ").");
+            log.trace((added ? "Successfully" : "Not") + " added key(" + key + ") to the address(" + address + ").");
+            if (added) log.trace("maxNumberOfKeys==" + maxNumberOfKeys + ", exitingNumberOfKeys==" + exitingNumberOfKeys);
          }
       }
+
+      public boolean isActive() {
+         return isActive;
+      }
+
+      public boolean isAlive() {
+         return isAlive;
+      }
+
+      public void stop() {
+         runner.interrupt();
+      }
    }
 
    /**
     * Important: this *MUST* be called with WL on {@link #address2key}.
     */
-   private void revisitNumberOfKeys() {
+   private void resetNumberOfKeys() {
       maxNumberOfKeys.set(address2key.keySet().size() * bufferSize);
-      for (Address address : address2key.keySet()) {
-         exitingNumberOfKeys.addAndGet(address2key.get(address).size());
-      }
+      exitingNumberOfKeys.set(0);
       if (log.isTraceEnabled()) {
-         log.trace("revisitNumberOfKeys ends with: maxNumberOfKeys=" + maxNumberOfKeys +
+         log.trace("resetNumberOfKeys ends with: maxNumberOfKeys=" + maxNumberOfKeys +
                ", exitingNumberOfKeys=" + exitingNumberOfKeys);
       }
    }
@@ -247,4 +319,20 @@
    public Map<Address, BlockingQueue> getAddress2KeysMapping() {
       return Collections.unmodifiableMap(address2key);
    }
+
+   public int getMaxNumberOfKeys() {
+      return maxNumberOfKeys.intValue();
+   }
+
+   public int getExitingNumberOfKeys() {
+      return exitingNumberOfKeys.intValue();
+   }
+
+   public boolean isKeyGeneratorThreadActive() {
+      return keyGenWorker.isActive();
+   }
+
+   public boolean isStarted() {
+      return started;
+   }
 }

Copied: trunk/core/src/main/java/org/infinispan/affinity/ListenerRegistration.java (from rev 1862, trunk/core/src/main/java/org/infinispan/affinity/TopologyChangeListener.java)
===================================================================
--- trunk/core/src/main/java/org/infinispan/affinity/ListenerRegistration.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/affinity/ListenerRegistration.java	2010-05-27 18:03:09 UTC (rev 1867)
@@ -0,0 +1,32 @@
+package org.infinispan.affinity;
+
+import org.infinispan.notifications.Listener;
+import org.infinispan.notifications.cachemanagerlistener.annotation.CacheStopped;
+import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
+import org.infinispan.notifications.cachemanagerlistener.event.CacheStoppedEvent;
+import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
+
+/**
+* Used for registering various cache notifications.
+*
+* @author Mircea.Markus at jboss.com
+* @since 4.1
+*/
+ at Listener
+public class ListenerRegistration {
+   private final KeyAffinityServiceImpl keyAffinityService;
+
+   public ListenerRegistration(KeyAffinityServiceImpl keyAffinityService) {
+      this.keyAffinityService = keyAffinityService;
+   }
+
+   @ViewChanged
+   public void handleViewChange(ViewChangedEvent vce) {
+      keyAffinityService.handleViewChange(vce);
+   }
+
+   @CacheStopped
+   public void handleCacheStopped(CacheStoppedEvent cse) {
+      keyAffinityService.handleCacheStopped(cse);
+   }
+}

Deleted: trunk/core/src/main/java/org/infinispan/affinity/TopologyChangeListener.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/affinity/TopologyChangeListener.java	2010-05-27 17:22:26 UTC (rev 1866)
+++ trunk/core/src/main/java/org/infinispan/affinity/TopologyChangeListener.java	2010-05-27 18:03:09 UTC (rev 1867)
@@ -1,30 +0,0 @@
-package org.infinispan.affinity;
-
-import org.infinispan.notifications.Listener;
-import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
-import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
-import org.infinispan.remoting.transport.Address;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-import java.util.concurrent.BlockingQueue;
-
-/**
-* // TODO: Document this
-*
-* @author Mircea.Markus at jboss.com
-* @since 4.1
-*/
- at Listener
-class TopologyChangeListener {
-   private final KeyAffinityServiceImpl keyAffinityService;
-
-   public TopologyChangeListener(KeyAffinityServiceImpl keyAffinityService) {
-      this.keyAffinityService = keyAffinityService;
-   }
-
-   @ViewChanged
-   public void handleViewChange(ViewChangedEvent vce) {
-      keyAffinityService.handleViewChange(vce);
-   }
-}

Added: trunk/core/src/test/java/org/infinispan/affinity/BaseFilterKeyAffinityServiceTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/affinity/BaseFilterKeyAffinityServiceTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/affinity/BaseFilterKeyAffinityServiceTest.java	2010-05-27 18:03:09 UTC (rev 1867)
@@ -0,0 +1,87 @@
+package org.infinispan.affinity;
+
+import junit.framework.Assert;
+import org.infinispan.Cache;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.testng.annotations.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+import static org.testng.AssertJUnit.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public abstract class BaseFilterKeyAffinityServiceTest extends BaseKeyAffinityServiceTest {
+
+   private static Log log = LogFactory.getLog(BaseFilterKeyAffinityServiceTest.class);
+
+   protected EmbeddedCacheManager cacheManager;
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      super.createCacheManagers();
+      createService();
+   }
+
+   protected abstract void createService();
+
+   protected abstract List<Address> getAddresses() ;
+
+
+   protected void testSingleKey() throws InterruptedException {
+      Map<Address, BlockingQueue> blockingQueueMap = keyAffinityService.getAddress2KeysMapping();
+      assertEquals(getAddresses().size(), blockingQueueMap.keySet().size());
+      assertEventualFullCapacity(getAddresses());
+   }
+
+   protected void testAddNewServer() throws Exception {
+      EmbeddedCacheManager cm = addClusterEnabledCacheManager();
+      cm.defineConfiguration(cacheName, configuration);
+      Cache cache = cm.getCache(cacheName);
+      caches.add(cache);
+      waitForClusterToResize();
+      assertUnaffected();
+   }
+
+   protected void testRemoveServers() throws InterruptedException {
+      caches.get(4).getCacheManager().stop();
+      caches.get(3).getCacheManager().stop();
+      caches.remove(4);
+      caches.remove(3);
+      Assert.assertEquals(3, caches.size());
+      waitForClusterToResize();
+      assertUnaffected();
+   }
+
+   protected void testShutdownOwnManager() throws InterruptedException {
+      log.info("**** here it starts");
+      caches.get(0).getCacheManager().stop();
+      caches.remove(0);
+      Assert.assertEquals(2, caches.size());
+      TestingUtil.blockUntilViewsReceived(10000, caches);
+      Assert.assertEquals(2, topology().size());
+
+      for (int i = 0; i < 10; i++) {
+         if (!keyAffinityService.isStarted()) break;
+         Thread.sleep(1000);
+      }
+      assert !keyAffinityService.isStarted();
+   }
+
+   private void assertUnaffected() throws InterruptedException {
+      for (int i = 0; i < 10; i++) {
+         assert keyAffinityService.getAddress2KeysMapping().keySet().size() == getAddresses().size();
+         Thread.sleep(200);
+      }
+      assertEventualFullCapacity(getAddresses());
+      assertKeyAffinityCorrectness(getAddresses());
+   }
+}

Added: trunk/core/src/test/java/org/infinispan/affinity/BaseKeyAffinityServiceTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/affinity/BaseKeyAffinityServiceTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/affinity/BaseKeyAffinityServiceTest.java	2010-05-27 18:03:09 UTC (rev 1867)
@@ -0,0 +1,84 @@
+package org.infinispan.affinity;
+
+import org.infinispan.distribution.BaseDistFunctionalTest;
+import org.infinispan.distribution.ConsistentHash;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.test.TestingUtil;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+import static junit.framework.Assert.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class BaseKeyAffinityServiceTest extends BaseDistFunctionalTest {
+   
+   protected KeyAffinityServiceImpl keyAffinityService;
+
+   protected void assertMapsToAddress(Object o, Address addr) {
+      ConsistentHash hash = manager(0).getCache(cacheName).getAdvancedCache().getDistributionManager().getConsistentHash();
+      List<Address> addresses = hash.locate(o, numOwners);
+      assertEquals("Expected key " + o + " to map to address " + addr + ". List of addresses is" + addresses, true, addresses.contains(addr));
+   }
+
+   protected List<Address> topology() {
+      return topology(caches.get(1).getCacheManager());
+   }
+
+   protected List<Address> topology(CacheManager cm) {
+      return cm.getCache(cacheName).getAdvancedCache().getRpcManager().getTransport().getMembers();
+   }
+
+   protected void assertEventualFullCapacity() throws InterruptedException {
+      List<Address> addresses = topology();
+      assertEventualFullCapacity(addresses);
+   }
+
+   protected void assertEventualFullCapacity(List<Address> addresses) throws InterruptedException {
+      Map<Address, BlockingQueue> blockingQueueMap = keyAffinityService.getAddress2KeysMapping();
+      for (Address addr : addresses) {
+         BlockingQueue queue = blockingQueueMap.get(addr);
+         //the queue will eventually get filled
+         for (int i = 0; i < 10; i++) {
+            if (!(queue.size() == 100)) {
+               Thread.sleep(1000);
+            } else {
+               break;
+            }
+         }
+         assertEquals(100, queue.size());
+      }
+      assertEquals(keyAffinityService.getMaxNumberOfKeys(), keyAffinityService.getExitingNumberOfKeys());
+      assertEquals(addresses.size() * 100, keyAffinityService.getExitingNumberOfKeys());
+      assertEquals(false, keyAffinityService.isKeyGeneratorThreadActive());
+   }
+
+   protected void assertKeyAffinityCorrectness() {
+      List<Address> addressList = topology();
+      assertKeyAffinityCorrectness(addressList);
+   }
+
+   protected void assertKeyAffinityCorrectness(Collection<Address> addressList) {
+      Map<Address, BlockingQueue> blockingQueueMap = keyAffinityService.getAddress2KeysMapping();
+      for (Address addr : addressList) {
+         BlockingQueue queue = blockingQueueMap.get(addr);
+         assertEquals(100, queue.size());
+         for (Object o : queue) {
+            assertMapsToAddress(o, addr);
+         }
+      }
+   }
+
+   protected void waitForClusterToResize() {
+      TestingUtil.blockUntilViewsReceived(10000, caches);
+      RehashWaiter.waitForInitRehashToComplete(new HashSet(caches));
+      assertEquals(caches.size(), topology().size());
+   }
+}

Added: trunk/core/src/test/java/org/infinispan/affinity/FilteredKeyAffinityService.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/affinity/FilteredKeyAffinityService.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/affinity/FilteredKeyAffinityService.java	2010-05-27 18:03:09 UTC (rev 1867)
@@ -0,0 +1,65 @@
+package org.infinispan.affinity;
+
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.remoting.transport.Address;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ *
+ *  This class just overrides the methods in the base class as TestNG behaves funny with depending methods and inheritance.
+ * 
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test (groups = "functional", testName = "affinity.FilteredKeyAffinityService")
+public class FilteredKeyAffinityService extends BaseFilterKeyAffinityServiceTest {
+   private List<Address> filter;
+
+   @Override
+   protected void createService() {
+      ThreadFactory tf = new ThreadFactory() {
+         @Override
+         public Thread newThread(Runnable r) {
+            return new Thread(r, "KeyGeneratorThread");
+         }
+      };
+      filter = new ArrayList<Address>();
+      filter.add(caches.get(0).getAdvancedCache().getRpcManager().getTransport().getAddress());
+      filter.add(caches.get(1).getAdvancedCache().getRpcManager().getTransport().getAddress());
+      cacheManager = (EmbeddedCacheManager) caches.get(0).getCacheManager();
+      keyAffinityService = (KeyAffinityServiceImpl) KeyAffinityServiceFactory.
+            newKeyAffinityService(cacheManager.getCache(cacheName), filter, new RndKeyGenerator(),
+                                       Executors.newSingleThreadExecutor(tf), 100);
+   }
+
+   @Override
+   protected List<Address> getAddresses() {
+      return filter;
+   }
+
+   @Override
+   public void testSingleKey() throws InterruptedException {
+      super.testSingleKey();  
+   }
+
+   @Test(dependsOnMethods = "testSingleKey")
+   public void testAddNewServer() throws Exception {
+      super.testAddNewServer();
+   }
+
+   @Test(dependsOnMethods = "testAddNewServer")
+   public void testRemoveServers() throws InterruptedException {
+      super.testRemoveServers();
+   }
+
+   @Test (dependsOnMethods = "testRemoveServers")
+   public void testShutdownOwnManager() throws InterruptedException {
+      super.testShutdownOwnManager();
+   }   
+}

Added: trunk/core/src/test/java/org/infinispan/affinity/KeyAffinityServiceShutdownTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/affinity/KeyAffinityServiceShutdownTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/affinity/KeyAffinityServiceShutdownTest.java	2010-05-27 18:03:09 UTC (rev 1867)
@@ -0,0 +1,89 @@
+package org.infinispan.affinity;
+
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import static junit.framework.Assert.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test (groups = "functional", testName = "affinity.KeyAffinityServiceShutdownTest")
+public class KeyAffinityServiceShutdownTest extends BaseKeyAffinityServiceTest {
+   private ExecutorService executor;
+   private EmbeddedCacheManager cacheManager;
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      super.createCacheManagers();
+
+      ThreadFactory tf = new ThreadFactory() {
+         @Override
+         public Thread newThread(Runnable r) {
+            return new Thread(r, "KeyGeneratorThread");
+         }
+      };
+      executor = Executors.newSingleThreadExecutor(tf);
+      cacheManager = manager(0);
+      keyAffinityService = (KeyAffinityServiceImpl) KeyAffinityServiceFactory.newKeyAffinityService(cacheManager.getCache(cacheName),
+                                                                                                    executor,
+                                                                                                    new RndKeyGenerator(), 100);
+   }
+
+   public void testSimpleShutdown() throws Exception {
+      assertListenerRegistered(true);
+      assertEventualFullCapacity();
+      assert keyAffinityService.isKeyGeneratorThreadAlive();
+      keyAffinityService.stop();
+      for (int i = 0; i < 10; i++) {
+         if (!keyAffinityService.isKeyGeneratorThreadAlive())
+            break;
+         Thread.sleep(1000);
+      }
+      assert !keyAffinityService.isKeyGeneratorThreadAlive();
+      assert !executor.isShutdown();
+   }
+
+   @Test(dependsOnMethods = "testSimpleShutdown")
+   public void testServiceCannotBeUsedAfterShutdown() {
+      try {
+         keyAffinityService.getKeyForAddress(topology().get(0));
+         assert false : "Exception expected!";
+      } catch (IllegalStateException e) {
+         //expected
+      }
+      try {
+         keyAffinityService.getCollocatedKey("a");
+         assert false : "Exception expected!";
+      } catch (IllegalStateException e) {
+         //expected
+      }
+   }
+
+   @Test (dependsOnMethods = "testServiceCannotBeUsedAfterShutdown")
+   public void testViewChaneListenerUnregistered() {
+      assertListenerRegistered(false);
+   }
+
+   @Test (dependsOnMethods = "testViewChaneListenerUnregistered")
+   public void testRestart() throws InterruptedException {
+      keyAffinityService.start();
+      assertEventualFullCapacity();
+   }
+
+   private void assertListenerRegistered(boolean registered) {
+      boolean isRegistered = false;
+      for (Object o : cacheManager.getListeners()) {
+         if (o instanceof ListenerRegistration) {
+            isRegistered = true;
+            break;
+         }
+      }
+      assertEquals(registered, isRegistered);
+   }   
+}

Added: trunk/core/src/test/java/org/infinispan/affinity/KeyAffinityServiceTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/affinity/KeyAffinityServiceTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/affinity/KeyAffinityServiceTest.java	2010-05-27 18:03:09 UTC (rev 1867)
@@ -0,0 +1,169 @@
+package org.infinispan.affinity;
+
+import org.infinispan.Cache;
+import org.infinispan.distribution.BaseDistFunctionalTest;
+import org.infinispan.distribution.ConsistentHash;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import static junit.framework.Assert.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test(groups = "functional", testName = "affinity.KeyAffinityServiceTest")
+public class KeyAffinityServiceTest extends BaseKeyAffinityServiceTest {
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      super.createCacheManagers();
+      assertEquals(4, topology(caches.get(0).getCacheManager()).size());
+      assertEquals(4, topology(caches.get(1).getCacheManager()).size());
+      assertEquals(4, topology(caches.get(2).getCacheManager()).size());
+      assertEquals(4, topology(caches.get(3).getCacheManager()).size());
+
+      cache(0, cacheName).put("k", "v");
+      assertEquals("v", cache(0, cacheName).get("k"));
+      assertEquals("v", cache(1, cacheName).get("k"));
+      assertEquals("v", cache(2, cacheName).get("k"));
+      assertEquals("v", cache(3, cacheName).get("k"));
+
+
+      ThreadFactory tf = new ThreadFactory() {
+         @Override
+         public Thread newThread(Runnable r) {
+            return new Thread(r, "KeyGeneratorThread");
+         }
+      };
+      keyAffinityService = (KeyAffinityServiceImpl) KeyAffinityServiceFactory.newKeyAffinityService(manager(0).getCache(cacheName),
+                                                                                                    Executors.newSingleThreadExecutor(tf),
+                                                                                                    new RndKeyGenerator(), 100);
+   }
+
+   public void testKeysAreCorrectlyCreated() throws Exception {
+      assertEventualFullCapacity();
+      assertKeyAffinityCorrectness();
+   }
+
+   @Test (dependsOnMethods = "testKeysAreCorrectlyCreated")
+   public void testConcurrentConsumptionOfKeys() throws InterruptedException {
+      List<KeyConsumer> consumers = new ArrayList<KeyConsumer>();
+      int keysToConsume = 1000;
+      CountDownLatch consumersStart = new CountDownLatch(1);
+      for (int i = 0; i < 10; i++) {
+         consumers.add(new KeyConsumer(keysToConsume, consumersStart));
+      }
+      consumersStart.countDown();
+
+      for (KeyConsumer kc : consumers) {
+         kc.join();
+      }
+
+      for (KeyConsumer kc : consumers) {
+         assertEquals(null, kc.exception);
+      }
+
+      assertEventualFullCapacity();
+   }
+
+   @Test (dependsOnMethods = "testConcurrentConsumptionOfKeys")
+   public void testServerAdded() throws InterruptedException {
+      EmbeddedCacheManager cm = addClusterEnabledCacheManager();
+      cm.defineConfiguration(cacheName, configuration);
+      Cache cache = cm.getCache(cacheName);
+      caches.add(cache);
+      waitForClusterToResize();
+      for (int i = 0; i < 10; i++) {
+         if (keyAffinityService.getAddress2KeysMapping().keySet().size() == 5) {
+            break;
+         }
+         Thread.sleep(500);
+      }
+      assertEquals(5, keyAffinityService.getAddress2KeysMapping().keySet().size());
+      assertEventualFullCapacity();
+      assertKeyAffinityCorrectness();
+   }
+
+   @Test(dependsOnMethods = "testServerAdded")
+   public void testServersDropped() throws InterruptedException {
+      log.info("*** Here it is");
+      caches.get(4).getCacheManager().stop();
+      caches.remove(4);
+      waitForClusterToResize();
+      for (int i = 0; i < 10; i++) {
+         if (keyAffinityService.getAddress2KeysMapping().keySet().size() == 3) {
+            break;
+         }
+         Thread.sleep(500);
+      }
+      assertEquals(4, keyAffinityService.getAddress2KeysMapping().keySet().size());
+      assertEventualFullCapacity();
+      assertKeyAffinityCorrectness();
+   }
+
+   @Test (dependsOnMethods = "testServersDropped")
+   public void testCollocatedKey() {
+      ConsistentHash hash = manager(0).getCache(cacheName).getAdvancedCache().getDistributionManager().getConsistentHash();
+      for (int i = 0; i < 1000; i++) {
+         List<Address> addresses = hash.locate(i, numOwners);
+         Object collocatedKey = keyAffinityService.getCollocatedKey(i);
+         List<Address> addressList = hash.locate(collocatedKey, numOwners);
+         assertEquals(addresses, addressList);
+      }
+   }
+
+   public class KeyConsumer extends Thread {
+
+      volatile Exception exception;
+
+
+      private final int keysToConsume;
+      private CountDownLatch consumersStart;
+      private final List<Address> topology = topology();
+      private final Random rnd = new Random();
+
+      public KeyConsumer(int keysToConsume, CountDownLatch consumersStart) {
+         super("KeyConsumer");
+         this.keysToConsume = keysToConsume;
+         this.consumersStart = consumersStart;
+         start();
+      }
+
+      @Override
+      public void run() {
+         try {
+            consumersStart.await();
+         } catch (InterruptedException e) {
+            e.printStackTrace();
+            return;
+         }
+         for (int i = 0; i < keysToConsume; i++) {
+            Address whichAddr = topology.get(rnd.nextInt(topology.size()));
+            try {
+               Object keyForAddress = keyAffinityService.getKeyForAddress(whichAddr);
+               assertMapsToAddress(keyForAddress, whichAddr);
+            } catch (Exception e) {
+               this.exception = e;
+               break;
+            }
+         }
+      }
+   }
+
+}

Added: trunk/core/src/test/java/org/infinispan/affinity/LocalKeyAffinityServiceTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/affinity/LocalKeyAffinityServiceTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/affinity/LocalKeyAffinityServiceTest.java	2010-05-27 18:03:09 UTC (rev 1867)
@@ -0,0 +1,63 @@
+package org.infinispan.affinity;
+
+import junit.framework.Assert;
+import org.infinispan.Cache;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * This class just overrides the methods in the base class as TestNG behaves funny with depending methods and inheritance.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test(groups = "functional", testName = "affinity.LocalKeyAffinityServiceTest")
+public class LocalKeyAffinityServiceTest extends BaseFilterKeyAffinityServiceTest {
+
+   @Override
+   protected void createService() {
+      {
+         ThreadFactory tf = new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable r) {
+               return new Thread(r, "KeyGeneratorThread");
+            }
+         };
+         cacheManager = (EmbeddedCacheManager) caches.get(0).getCacheManager();
+         keyAffinityService = (KeyAffinityServiceImpl) KeyAffinityServiceFactory.
+               newLocalKeyAffinityService(cacheManager.getCache(cacheName), new RndKeyGenerator(),
+                                          Executors.newSingleThreadExecutor(tf), 100);
+      }
+   }
+
+   @Override
+   protected List<Address> getAddresses() {
+      return Collections.singletonList(cacheManager.getAddress());
+   }
+
+   public void testFilteredSingleKey() throws InterruptedException {
+      super.testSingleKey();  
+   }
+
+   @Test(dependsOnMethods = "testFilteredSingleKey")
+   public void testFilteredAddNewServer() throws Exception {
+      super.testAddNewServer();
+   }
+
+   @Test(dependsOnMethods = "testFilteredAddNewServer")
+   public void testFilteredRemoveServers() throws InterruptedException {
+      super.testRemoveServers();
+   }
+
+   @Test (dependsOnMethods = "testFilteredRemoveServers")
+   public void testShutdownOwnManager() throws InterruptedException {
+      super.testShutdownOwnManager();
+   }
+}

Modified: trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java	2010-05-27 17:22:26 UTC (rev 1866)
+++ trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java	2010-05-27 18:03:09 UTC (rev 1867)
@@ -20,8 +20,11 @@
 import javax.transaction.TransactionManager;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import java.util.concurrent.locks.LockSupport;
@@ -40,13 +43,13 @@
    protected boolean l1CacheEnabled = true;
    protected boolean l1OnRehash = false;
    protected boolean performRehashing = false;
-   protected static final int NUM_OWNERS = 2;
+   protected int numOwners = 2;
 
    protected void createCacheManagers() throws Throwable {
       cacheName = "dist";
       configuration = getDefaultClusteredConfig(sync ? Configuration.CacheMode.DIST_SYNC : Configuration.CacheMode.DIST_ASYNC, tx);
       configuration.setRehashEnabled(performRehashing);
-      configuration.setNumOwners(NUM_OWNERS);
+      configuration.setNumOwners(numOwners);
       if (!testRetVals) {
          configuration.setUnsafeUnreliableReturnValues(true);
          // we also need to use repeatable read for tests to work when we dont have reliable return values, since the
@@ -102,6 +105,12 @@
             }
          }
       }
+
+      public static void waitForInitRehashToComplete(Collection<Cache> caches) {
+         Set<Cache> cachesSet = new HashSet<Cache>();
+         cachesSet.addAll(caches);
+         waitForInitRehashToComplete(cachesSet.toArray(new Cache[cachesSet.size()]));
+      }
    }
 
    // only used if the CH impl does not order the hash ring based on the order of the view.

Modified: trunk/core/src/test/java/org/infinispan/distribution/rehash/WorkDuringJoinTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/WorkDuringJoinTest.java	2010-05-27 17:22:26 UTC (rev 1866)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/WorkDuringJoinTest.java	2010-05-27 18:03:09 UTC (rev 1867)
@@ -4,7 +4,6 @@
 import org.infinispan.distribution.BaseDistFunctionalTest;
 import org.infinispan.distribution.ConsistentHash;
 import org.infinispan.distribution.ConsistentHashHelper;
-import org.infinispan.manager.CacheManager;
 import org.infinispan.manager.EmbeddedCacheManager;
 import org.infinispan.remoting.transport.Address;
 import org.testng.annotations.Test;
@@ -58,7 +57,7 @@
       // which key should me mapped to the joiner?
       MagicKey keyToTest = null;
       for (MagicKey k: keys) {
-         if (chNew.isKeyLocalToAddress(joinerAddress, k, NUM_OWNERS)) {
+         if (chNew.isKeyLocalToAddress(joinerAddress, k, numOwners)) {
             keyToTest = k;
             break;
          }



More information about the infinispan-commits mailing list