[infinispan-commits] Infinispan SVN: r1867 - in trunk/core/src: test/java/org/infinispan and 3 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu May 27 14:03:10 EDT 2010
Author: mircea.markus
Date: 2010-05-27 14:03:09 -0400 (Thu, 27 May 2010)
New Revision: 1867
Added:
trunk/core/src/main/java/org/infinispan/affinity/ListenerRegistration.java
trunk/core/src/test/java/org/infinispan/affinity/
trunk/core/src/test/java/org/infinispan/affinity/BaseFilterKeyAffinityServiceTest.java
trunk/core/src/test/java/org/infinispan/affinity/BaseKeyAffinityServiceTest.java
trunk/core/src/test/java/org/infinispan/affinity/FilteredKeyAffinityService.java
trunk/core/src/test/java/org/infinispan/affinity/KeyAffinityServiceShutdownTest.java
trunk/core/src/test/java/org/infinispan/affinity/KeyAffinityServiceTest.java
trunk/core/src/test/java/org/infinispan/affinity/LocalKeyAffinityServiceTest.java
Removed:
trunk/core/src/main/java/org/infinispan/affinity/TopologyChangeListener.java
Modified:
trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityService.java
trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceFactory.java
trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceImpl.java
trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
trunk/core/src/test/java/org/infinispan/distribution/rehash/WorkDuringJoinTest.java
Log:
added tests and various fixes for affinity service.
Modified: trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityService.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityService.java 2010-05-27 17:22:26 UTC (rev 1866)
+++ trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityService.java 2010-05-27 18:03:09 UTC (rev 1867)
@@ -38,6 +38,7 @@
* Returns a key that will be distributed on the cluster node identified by address.
* @param address identifying the cluster node.
* @return a key object
+ * @throws IllegalStateException if the service has not been started or it is shutdown
*/
K getKeyForAddress(Address address);
@@ -45,6 +46,12 @@
* Returns a key that will be distributed on the same node as the supplied key.
* @param otherKey the key for which we need a collocation
* @return a key object
+ * @throws IllegalStateException if the service has not been started or it is shutdown
*/
K getCollocatedKey(K otherKey);
+
+ /**
+ * Checks weather or not the service is started.
+ */
+ public boolean isStarted();
}
Modified: trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceFactory.java 2010-05-27 17:22:26 UTC (rev 1866)
+++ trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceFactory.java 2010-05-27 18:03:09 UTC (rev 1867)
@@ -33,7 +33,8 @@
* generating keys for it.
*
* @param cache the distributed cache for which this service runs
- * @param ex used for running async key generation process.
+ * @param ex used for running async key generation process. On service shutdown, the executor won't be
+ * stopped; i.e. it's user responsibility manage it's lifecycle.
* @param keyGenerator allows one to control how the generated keys look like.
* @param keyBufferSize the number of generated keys per {@link org.infinispan.remoting.transport.Address}.
* @param start weather to start the service or not
@@ -75,11 +76,11 @@
public static <K, V> KeyAffinityService newLocalKeyAffinityService(Cache<K, V> cache, KeyGenerator keyGenerator, Executor ex, int keyBufferSize, boolean start) {
Address localAddress = cache.getAdvancedCache().getRpcManager().getTransport().getAddress();
Collection<Address> forAddresses = Collections.singletonList(localAddress);
- return newKeyAffinityService(cache,forAddresses, keyGenerator, ex, keyBufferSize, start);
+ return newKeyAffinityService(cache, forAddresses, keyGenerator, ex, keyBufferSize, start);
}
/**
- * Same as {@link #newLocalKeyAffinityService(org.infinispan.Cache, KeyGenerator, java.util.concurrent.Executor, int)} with start == true.
+ * Same as {@link #newLocalKeyAffinityService(org.infinispan.Cache, KeyGenerator, java.util.concurrent.Executor, int, boolean)} with start == true.
*/
public static <K, V> KeyAffinityService newLocalKeyAffinityService(Cache<K, V> cache, KeyGenerator keyGenerator, Executor ex, int keyBufferSize) {
return newLocalKeyAffinityService(cache, keyGenerator, ex, keyBufferSize, true);
Modified: trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceImpl.java 2010-05-27 17:22:26 UTC (rev 1866)
+++ trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceImpl.java 2010-05-27 18:03:09 UTC (rev 1867)
@@ -5,8 +5,11 @@
import org.infinispan.Cache;
import org.infinispan.distribution.ConsistentHash;
import org.infinispan.distribution.DistributionManager;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.notifications.cachemanagerlistener.event.CacheStoppedEvent;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.remoting.transport.Address;
+import org.infinispan.util.concurrent.ConcurrentHashSet;
import org.infinispan.util.concurrent.ReclosableLatch;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@@ -19,14 +22,13 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
- * // TODO: Document this
+ * Implementation of KeyAffinityService.
*
* @author Mircea.Markus at jboss.com
* @since 4.1
@@ -34,13 +36,14 @@
@ThreadSafe
public class KeyAffinityServiceImpl implements KeyAffinityService {
+ private final float THRESHOLD = 0.5f;
+
private static Log log = LogFactory.getLog(KeyAffinityServiceImpl.class);
private final Set<Address> filter;
@GuardedBy("maxNumberInvariant")
private final Map<Address, BlockingQueue> address2key = new ConcurrentHashMap<Address, BlockingQueue>();
-
private final Executor executor;
private final Cache cache;
private final KeyGenerator keyGenerator;
@@ -55,11 +58,12 @@
*/
private final ReadWriteLock maxNumberInvariant = new ReentrantReadWriteLock();
-
/**
* Used for coordinating between the KeyGeneratorWorker and consumers.
*/
private final ReclosableLatch keyProducerStartLatch = new ReclosableLatch();
+ private volatile KeyGeneratorWorker keyGenWorker;
+ private volatile ListenerRegistration listenerRegistration;
public KeyAffinityServiceImpl(Executor executor, Cache cache, KeyGenerator keyGenerator, int bufferSize, Collection<Address> filter, boolean start) {
@@ -68,7 +72,10 @@
this.keyGenerator = keyGenerator;
this.bufferSize = bufferSize;
if (filter != null) {
- this.filter = new CopyOnWriteArraySet<Address>();
+ this.filter = new ConcurrentHashSet<Address>();
+ for (Address address : filter) {
+ this.filter.add(address);
+ }
} else {
this.filter = null;
}
@@ -102,7 +109,7 @@
Thread.currentThread().interrupt();
return null;
} finally {
- if (maxNumberOfKeys.equals(exitingNumberOfKeys)) {
+ if (queue.size() < maxNumberOfKeys.get() * THRESHOLD + 1) {
keyProducerStartLatch.open();
}
}
@@ -118,88 +125,153 @@
maxNumberInvariant.writeLock().lock();
try {
addQueuesForAddresses(existingNodes);
- revisitNumberOfKeys();
+ resetNumberOfKeys();
} finally {
maxNumberInvariant.writeLock().unlock();
}
- executor.execute(new KeyGeneratorWorker());
- cache.addListener(new TopologyChangeListener(this));
+ keyGenWorker = new KeyGeneratorWorker();
+ executor.execute(keyGenWorker);
+ listenerRegistration = new ListenerRegistration(this);
+ ((EmbeddedCacheManager)cache.getCacheManager()).addListener(listenerRegistration);
+ cache.addListener(listenerRegistration);
keyProducerStartLatch.open();
started = true;
}
@Override
public void stop() {
+ if (!started) {
+ log.info("Ignoring call to stop as service is not started.");
+ return;
+ }
+ started = false;
+ EmbeddedCacheManager cacheManager = (EmbeddedCacheManager) cache.getCacheManager();
+ if (cacheManager.getListeners().contains(listenerRegistration)) {
+ cacheManager.removeListener(listenerRegistration);
+ } else {
+ throw new IllegalStateException("Listener must have been registered!");
+ }
+ //most likely the listeners collection is shared between CacheManager and the Cache
+ if (cache.getListeners().contains(listenerRegistration)) {
+ cache.removeListener(listenerRegistration);
+ }
+ keyGenWorker.stop();
}
public void handleViewChange(ViewChangedEvent vce) {
if (log.isTraceEnabled()) {
log.trace("ViewChange received: " + vce);
}
- synchronized (address2key) {
- for (Address address : vce.getOldMembers()) {
- BlockingQueue queue = address2key.remove(address);
- if (queue == null) {
- KeyAffinityServiceImpl.log.warn("Null queue not expected for address: " + address + ". Did we miss a view change?");
- }
- }
+ maxNumberInvariant.writeLock().lock();
+ try {
+ address2key.clear(); //wee need to drop everything as key-mapping data is stale due to view change
addQueuesForAddresses(vce.getNewMembers());
+ resetNumberOfKeys();
+ keyProducerStartLatch.open();
+ } finally {
+ maxNumberInvariant.writeLock().unlock();
}
}
+ public boolean isKeyGeneratorThreadAlive() {
+ return keyGenWorker.isAlive() ;
+ }
+ public void handleCacheStopped(CacheStoppedEvent cse) {
+ if (log.isTraceEnabled()) {
+ log.trace("Cache stopped, stopping the service: " + cse);
+ }
+ stop();
+ }
+
+
public class KeyGeneratorWorker implements Runnable {
+ private volatile boolean isActive;
+ private boolean isAlive;
+ private volatile Thread runner;
+
@Override
public void run() {
+ this.runner = Thread.currentThread();
+ isAlive = true;
while (true) {
- try {
- keyProducerStartLatch.await();
- } catch (InterruptedException e) {
- if (log.isInfoEnabled()) {
- log.info("Shutting down KeyAffinity service for key set: " + filter);
- }
- return;
+ if (waitToBeWakenUp()) break;
+ isActive = true;
+ if (log.isTraceEnabled()) {
+ log.trace("KeyGeneratorWorker marked as ACTIVE");
}
generateKeys();
+
+ isActive = false;
+ if (log.isTraceEnabled()) {
+ log.trace("KeyGeneratorWorker marked as INACTIVE");
+ }
}
+ isAlive = false;
}
private void generateKeys() {
- maxNumberInvariant.writeLock().lock();
+ maxNumberInvariant.readLock().lock();
try {
- for (Address address : address2key.keySet()) {
+ while (maxNumberOfKeys.get() != exitingNumberOfKeys.get()) {
Object key = keyGenerator.getKey();
- tryAddKey(address, key);
- if (maxNumberOfKeys.equals(exitingNumberOfKeys)) {
- keyProducerStartLatch.close();
- return;
+ Address addressForKey = getAddressForKey(key);
+ if (interestedInAddress(addressForKey)) {
+ tryAddKey(addressForKey, key);
}
}
+ keyProducerStartLatch.close();
} finally {
- maxNumberInvariant.writeLock().unlock();
+ maxNumberInvariant.readLock().unlock();
}
}
+ private boolean waitToBeWakenUp() {
+ try {
+ keyProducerStartLatch.await();
+ } catch (InterruptedException e) {
+ if (log.isInfoEnabled()) {
+ log.info("Shutting down KeyAffinity service for key set: " + filter);
+ }
+ return true;
+ }
+ return false;
+ }
+
private void tryAddKey(Address address, Object key) {
BlockingQueue queue = address2key.get(address);
boolean added = queue.offer(key);
+ if (added) {
+ exitingNumberOfKeys.incrementAndGet();
+ }
if (log.isTraceEnabled()) {
- log.trace((added ? "Successfully " : "Not") + "added key(" + key + ") to the address(" + address + ").");
+ log.trace((added ? "Successfully" : "Not") + " added key(" + key + ") to the address(" + address + ").");
+ if (added) log.trace("maxNumberOfKeys==" + maxNumberOfKeys + ", exitingNumberOfKeys==" + exitingNumberOfKeys);
}
}
+
+ public boolean isActive() {
+ return isActive;
+ }
+
+ public boolean isAlive() {
+ return isAlive;
+ }
+
+ public void stop() {
+ runner.interrupt();
+ }
}
/**
* Important: this *MUST* be called with WL on {@link #address2key}.
*/
- private void revisitNumberOfKeys() {
+ private void resetNumberOfKeys() {
maxNumberOfKeys.set(address2key.keySet().size() * bufferSize);
- for (Address address : address2key.keySet()) {
- exitingNumberOfKeys.addAndGet(address2key.get(address).size());
- }
+ exitingNumberOfKeys.set(0);
if (log.isTraceEnabled()) {
- log.trace("revisitNumberOfKeys ends with: maxNumberOfKeys=" + maxNumberOfKeys +
+ log.trace("resetNumberOfKeys ends with: maxNumberOfKeys=" + maxNumberOfKeys +
", exitingNumberOfKeys=" + exitingNumberOfKeys);
}
}
@@ -247,4 +319,20 @@
public Map<Address, BlockingQueue> getAddress2KeysMapping() {
return Collections.unmodifiableMap(address2key);
}
+
+ public int getMaxNumberOfKeys() {
+ return maxNumberOfKeys.intValue();
+ }
+
+ public int getExitingNumberOfKeys() {
+ return exitingNumberOfKeys.intValue();
+ }
+
+ public boolean isKeyGeneratorThreadActive() {
+ return keyGenWorker.isActive();
+ }
+
+ public boolean isStarted() {
+ return started;
+ }
}
Copied: trunk/core/src/main/java/org/infinispan/affinity/ListenerRegistration.java (from rev 1862, trunk/core/src/main/java/org/infinispan/affinity/TopologyChangeListener.java)
===================================================================
--- trunk/core/src/main/java/org/infinispan/affinity/ListenerRegistration.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/affinity/ListenerRegistration.java 2010-05-27 18:03:09 UTC (rev 1867)
@@ -0,0 +1,32 @@
+package org.infinispan.affinity;
+
+import org.infinispan.notifications.Listener;
+import org.infinispan.notifications.cachemanagerlistener.annotation.CacheStopped;
+import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
+import org.infinispan.notifications.cachemanagerlistener.event.CacheStoppedEvent;
+import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
+
+/**
+* Used for registering various cache notifications.
+*
+* @author Mircea.Markus at jboss.com
+* @since 4.1
+*/
+ at Listener
+public class ListenerRegistration {
+ private final KeyAffinityServiceImpl keyAffinityService;
+
+ public ListenerRegistration(KeyAffinityServiceImpl keyAffinityService) {
+ this.keyAffinityService = keyAffinityService;
+ }
+
+ @ViewChanged
+ public void handleViewChange(ViewChangedEvent vce) {
+ keyAffinityService.handleViewChange(vce);
+ }
+
+ @CacheStopped
+ public void handleCacheStopped(CacheStoppedEvent cse) {
+ keyAffinityService.handleCacheStopped(cse);
+ }
+}
Deleted: trunk/core/src/main/java/org/infinispan/affinity/TopologyChangeListener.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/affinity/TopologyChangeListener.java 2010-05-27 17:22:26 UTC (rev 1866)
+++ trunk/core/src/main/java/org/infinispan/affinity/TopologyChangeListener.java 2010-05-27 18:03:09 UTC (rev 1867)
@@ -1,30 +0,0 @@
-package org.infinispan.affinity;
-
-import org.infinispan.notifications.Listener;
-import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
-import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
-import org.infinispan.remoting.transport.Address;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-import java.util.concurrent.BlockingQueue;
-
-/**
-* // TODO: Document this
-*
-* @author Mircea.Markus at jboss.com
-* @since 4.1
-*/
- at Listener
-class TopologyChangeListener {
- private final KeyAffinityServiceImpl keyAffinityService;
-
- public TopologyChangeListener(KeyAffinityServiceImpl keyAffinityService) {
- this.keyAffinityService = keyAffinityService;
- }
-
- @ViewChanged
- public void handleViewChange(ViewChangedEvent vce) {
- keyAffinityService.handleViewChange(vce);
- }
-}
Added: trunk/core/src/test/java/org/infinispan/affinity/BaseFilterKeyAffinityServiceTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/affinity/BaseFilterKeyAffinityServiceTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/affinity/BaseFilterKeyAffinityServiceTest.java 2010-05-27 18:03:09 UTC (rev 1867)
@@ -0,0 +1,87 @@
+package org.infinispan.affinity;
+
+import junit.framework.Assert;
+import org.infinispan.Cache;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.testng.annotations.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+import static org.testng.AssertJUnit.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public abstract class BaseFilterKeyAffinityServiceTest extends BaseKeyAffinityServiceTest {
+
+ private static Log log = LogFactory.getLog(BaseFilterKeyAffinityServiceTest.class);
+
+ protected EmbeddedCacheManager cacheManager;
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ super.createCacheManagers();
+ createService();
+ }
+
+ protected abstract void createService();
+
+ protected abstract List<Address> getAddresses() ;
+
+
+ protected void testSingleKey() throws InterruptedException {
+ Map<Address, BlockingQueue> blockingQueueMap = keyAffinityService.getAddress2KeysMapping();
+ assertEquals(getAddresses().size(), blockingQueueMap.keySet().size());
+ assertEventualFullCapacity(getAddresses());
+ }
+
+ protected void testAddNewServer() throws Exception {
+ EmbeddedCacheManager cm = addClusterEnabledCacheManager();
+ cm.defineConfiguration(cacheName, configuration);
+ Cache cache = cm.getCache(cacheName);
+ caches.add(cache);
+ waitForClusterToResize();
+ assertUnaffected();
+ }
+
+ protected void testRemoveServers() throws InterruptedException {
+ caches.get(4).getCacheManager().stop();
+ caches.get(3).getCacheManager().stop();
+ caches.remove(4);
+ caches.remove(3);
+ Assert.assertEquals(3, caches.size());
+ waitForClusterToResize();
+ assertUnaffected();
+ }
+
+ protected void testShutdownOwnManager() throws InterruptedException {
+ log.info("**** here it starts");
+ caches.get(0).getCacheManager().stop();
+ caches.remove(0);
+ Assert.assertEquals(2, caches.size());
+ TestingUtil.blockUntilViewsReceived(10000, caches);
+ Assert.assertEquals(2, topology().size());
+
+ for (int i = 0; i < 10; i++) {
+ if (!keyAffinityService.isStarted()) break;
+ Thread.sleep(1000);
+ }
+ assert !keyAffinityService.isStarted();
+ }
+
+ private void assertUnaffected() throws InterruptedException {
+ for (int i = 0; i < 10; i++) {
+ assert keyAffinityService.getAddress2KeysMapping().keySet().size() == getAddresses().size();
+ Thread.sleep(200);
+ }
+ assertEventualFullCapacity(getAddresses());
+ assertKeyAffinityCorrectness(getAddresses());
+ }
+}
Added: trunk/core/src/test/java/org/infinispan/affinity/BaseKeyAffinityServiceTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/affinity/BaseKeyAffinityServiceTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/affinity/BaseKeyAffinityServiceTest.java 2010-05-27 18:03:09 UTC (rev 1867)
@@ -0,0 +1,84 @@
+package org.infinispan.affinity;
+
+import org.infinispan.distribution.BaseDistFunctionalTest;
+import org.infinispan.distribution.ConsistentHash;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.test.TestingUtil;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+import static junit.framework.Assert.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class BaseKeyAffinityServiceTest extends BaseDistFunctionalTest {
+
+ protected KeyAffinityServiceImpl keyAffinityService;
+
+ protected void assertMapsToAddress(Object o, Address addr) {
+ ConsistentHash hash = manager(0).getCache(cacheName).getAdvancedCache().getDistributionManager().getConsistentHash();
+ List<Address> addresses = hash.locate(o, numOwners);
+ assertEquals("Expected key " + o + " to map to address " + addr + ". List of addresses is" + addresses, true, addresses.contains(addr));
+ }
+
+ protected List<Address> topology() {
+ return topology(caches.get(1).getCacheManager());
+ }
+
+ protected List<Address> topology(CacheManager cm) {
+ return cm.getCache(cacheName).getAdvancedCache().getRpcManager().getTransport().getMembers();
+ }
+
+ protected void assertEventualFullCapacity() throws InterruptedException {
+ List<Address> addresses = topology();
+ assertEventualFullCapacity(addresses);
+ }
+
+ protected void assertEventualFullCapacity(List<Address> addresses) throws InterruptedException {
+ Map<Address, BlockingQueue> blockingQueueMap = keyAffinityService.getAddress2KeysMapping();
+ for (Address addr : addresses) {
+ BlockingQueue queue = blockingQueueMap.get(addr);
+ //the queue will eventually get filled
+ for (int i = 0; i < 10; i++) {
+ if (!(queue.size() == 100)) {
+ Thread.sleep(1000);
+ } else {
+ break;
+ }
+ }
+ assertEquals(100, queue.size());
+ }
+ assertEquals(keyAffinityService.getMaxNumberOfKeys(), keyAffinityService.getExitingNumberOfKeys());
+ assertEquals(addresses.size() * 100, keyAffinityService.getExitingNumberOfKeys());
+ assertEquals(false, keyAffinityService.isKeyGeneratorThreadActive());
+ }
+
+ protected void assertKeyAffinityCorrectness() {
+ List<Address> addressList = topology();
+ assertKeyAffinityCorrectness(addressList);
+ }
+
+ protected void assertKeyAffinityCorrectness(Collection<Address> addressList) {
+ Map<Address, BlockingQueue> blockingQueueMap = keyAffinityService.getAddress2KeysMapping();
+ for (Address addr : addressList) {
+ BlockingQueue queue = blockingQueueMap.get(addr);
+ assertEquals(100, queue.size());
+ for (Object o : queue) {
+ assertMapsToAddress(o, addr);
+ }
+ }
+ }
+
+ protected void waitForClusterToResize() {
+ TestingUtil.blockUntilViewsReceived(10000, caches);
+ RehashWaiter.waitForInitRehashToComplete(new HashSet(caches));
+ assertEquals(caches.size(), topology().size());
+ }
+}
Added: trunk/core/src/test/java/org/infinispan/affinity/FilteredKeyAffinityService.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/affinity/FilteredKeyAffinityService.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/affinity/FilteredKeyAffinityService.java 2010-05-27 18:03:09 UTC (rev 1867)
@@ -0,0 +1,65 @@
+package org.infinispan.affinity;
+
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.remoting.transport.Address;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ *
+ * This class just overrides the methods in the base class as TestNG behaves funny with depending methods and inheritance.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test (groups = "functional", testName = "affinity.FilteredKeyAffinityService")
+public class FilteredKeyAffinityService extends BaseFilterKeyAffinityServiceTest {
+ private List<Address> filter;
+
+ @Override
+ protected void createService() {
+ ThreadFactory tf = new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "KeyGeneratorThread");
+ }
+ };
+ filter = new ArrayList<Address>();
+ filter.add(caches.get(0).getAdvancedCache().getRpcManager().getTransport().getAddress());
+ filter.add(caches.get(1).getAdvancedCache().getRpcManager().getTransport().getAddress());
+ cacheManager = (EmbeddedCacheManager) caches.get(0).getCacheManager();
+ keyAffinityService = (KeyAffinityServiceImpl) KeyAffinityServiceFactory.
+ newKeyAffinityService(cacheManager.getCache(cacheName), filter, new RndKeyGenerator(),
+ Executors.newSingleThreadExecutor(tf), 100);
+ }
+
+ @Override
+ protected List<Address> getAddresses() {
+ return filter;
+ }
+
+ @Override
+ public void testSingleKey() throws InterruptedException {
+ super.testSingleKey();
+ }
+
+ @Test(dependsOnMethods = "testSingleKey")
+ public void testAddNewServer() throws Exception {
+ super.testAddNewServer();
+ }
+
+ @Test(dependsOnMethods = "testAddNewServer")
+ public void testRemoveServers() throws InterruptedException {
+ super.testRemoveServers();
+ }
+
+ @Test (dependsOnMethods = "testRemoveServers")
+ public void testShutdownOwnManager() throws InterruptedException {
+ super.testShutdownOwnManager();
+ }
+}
Added: trunk/core/src/test/java/org/infinispan/affinity/KeyAffinityServiceShutdownTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/affinity/KeyAffinityServiceShutdownTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/affinity/KeyAffinityServiceShutdownTest.java 2010-05-27 18:03:09 UTC (rev 1867)
@@ -0,0 +1,89 @@
+package org.infinispan.affinity;
+
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import static junit.framework.Assert.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test (groups = "functional", testName = "affinity.KeyAffinityServiceShutdownTest")
+public class KeyAffinityServiceShutdownTest extends BaseKeyAffinityServiceTest {
+ private ExecutorService executor;
+ private EmbeddedCacheManager cacheManager;
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ super.createCacheManagers();
+
+ ThreadFactory tf = new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "KeyGeneratorThread");
+ }
+ };
+ executor = Executors.newSingleThreadExecutor(tf);
+ cacheManager = manager(0);
+ keyAffinityService = (KeyAffinityServiceImpl) KeyAffinityServiceFactory.newKeyAffinityService(cacheManager.getCache(cacheName),
+ executor,
+ new RndKeyGenerator(), 100);
+ }
+
+ public void testSimpleShutdown() throws Exception {
+ assertListenerRegistered(true);
+ assertEventualFullCapacity();
+ assert keyAffinityService.isKeyGeneratorThreadAlive();
+ keyAffinityService.stop();
+ for (int i = 0; i < 10; i++) {
+ if (!keyAffinityService.isKeyGeneratorThreadAlive())
+ break;
+ Thread.sleep(1000);
+ }
+ assert !keyAffinityService.isKeyGeneratorThreadAlive();
+ assert !executor.isShutdown();
+ }
+
+ @Test(dependsOnMethods = "testSimpleShutdown")
+ public void testServiceCannotBeUsedAfterShutdown() {
+ try {
+ keyAffinityService.getKeyForAddress(topology().get(0));
+ assert false : "Exception expected!";
+ } catch (IllegalStateException e) {
+ //expected
+ }
+ try {
+ keyAffinityService.getCollocatedKey("a");
+ assert false : "Exception expected!";
+ } catch (IllegalStateException e) {
+ //expected
+ }
+ }
+
+ @Test (dependsOnMethods = "testServiceCannotBeUsedAfterShutdown")
+ public void testViewChaneListenerUnregistered() {
+ assertListenerRegistered(false);
+ }
+
+ @Test (dependsOnMethods = "testViewChaneListenerUnregistered")
+ public void testRestart() throws InterruptedException {
+ keyAffinityService.start();
+ assertEventualFullCapacity();
+ }
+
+ private void assertListenerRegistered(boolean registered) {
+ boolean isRegistered = false;
+ for (Object o : cacheManager.getListeners()) {
+ if (o instanceof ListenerRegistration) {
+ isRegistered = true;
+ break;
+ }
+ }
+ assertEquals(registered, isRegistered);
+ }
+}
Added: trunk/core/src/test/java/org/infinispan/affinity/KeyAffinityServiceTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/affinity/KeyAffinityServiceTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/affinity/KeyAffinityServiceTest.java 2010-05-27 18:03:09 UTC (rev 1867)
@@ -0,0 +1,169 @@
+package org.infinispan.affinity;
+
+import org.infinispan.Cache;
+import org.infinispan.distribution.BaseDistFunctionalTest;
+import org.infinispan.distribution.ConsistentHash;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import static junit.framework.Assert.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test(groups = "functional", testName = "affinity.KeyAffinityServiceTest")
+public class KeyAffinityServiceTest extends BaseKeyAffinityServiceTest {
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ super.createCacheManagers();
+ assertEquals(4, topology(caches.get(0).getCacheManager()).size());
+ assertEquals(4, topology(caches.get(1).getCacheManager()).size());
+ assertEquals(4, topology(caches.get(2).getCacheManager()).size());
+ assertEquals(4, topology(caches.get(3).getCacheManager()).size());
+
+ cache(0, cacheName).put("k", "v");
+ assertEquals("v", cache(0, cacheName).get("k"));
+ assertEquals("v", cache(1, cacheName).get("k"));
+ assertEquals("v", cache(2, cacheName).get("k"));
+ assertEquals("v", cache(3, cacheName).get("k"));
+
+
+ ThreadFactory tf = new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "KeyGeneratorThread");
+ }
+ };
+ keyAffinityService = (KeyAffinityServiceImpl) KeyAffinityServiceFactory.newKeyAffinityService(manager(0).getCache(cacheName),
+ Executors.newSingleThreadExecutor(tf),
+ new RndKeyGenerator(), 100);
+ }
+
+ public void testKeysAreCorrectlyCreated() throws Exception {
+ assertEventualFullCapacity();
+ assertKeyAffinityCorrectness();
+ }
+
+ @Test (dependsOnMethods = "testKeysAreCorrectlyCreated")
+ public void testConcurrentConsumptionOfKeys() throws InterruptedException {
+ List<KeyConsumer> consumers = new ArrayList<KeyConsumer>();
+ int keysToConsume = 1000;
+ CountDownLatch consumersStart = new CountDownLatch(1);
+ for (int i = 0; i < 10; i++) {
+ consumers.add(new KeyConsumer(keysToConsume, consumersStart));
+ }
+ consumersStart.countDown();
+
+ for (KeyConsumer kc : consumers) {
+ kc.join();
+ }
+
+ for (KeyConsumer kc : consumers) {
+ assertEquals(null, kc.exception);
+ }
+
+ assertEventualFullCapacity();
+ }
+
+ @Test (dependsOnMethods = "testConcurrentConsumptionOfKeys")
+ public void testServerAdded() throws InterruptedException {
+ EmbeddedCacheManager cm = addClusterEnabledCacheManager();
+ cm.defineConfiguration(cacheName, configuration);
+ Cache cache = cm.getCache(cacheName);
+ caches.add(cache);
+ waitForClusterToResize();
+ for (int i = 0; i < 10; i++) {
+ if (keyAffinityService.getAddress2KeysMapping().keySet().size() == 5) {
+ break;
+ }
+ Thread.sleep(500);
+ }
+ assertEquals(5, keyAffinityService.getAddress2KeysMapping().keySet().size());
+ assertEventualFullCapacity();
+ assertKeyAffinityCorrectness();
+ }
+
+ @Test(dependsOnMethods = "testServerAdded")
+ public void testServersDropped() throws InterruptedException {
+ log.info("*** Here it is");
+ caches.get(4).getCacheManager().stop();
+ caches.remove(4);
+ waitForClusterToResize();
+ for (int i = 0; i < 10; i++) {
+ if (keyAffinityService.getAddress2KeysMapping().keySet().size() == 3) {
+ break;
+ }
+ Thread.sleep(500);
+ }
+ assertEquals(4, keyAffinityService.getAddress2KeysMapping().keySet().size());
+ assertEventualFullCapacity();
+ assertKeyAffinityCorrectness();
+ }
+
+ @Test (dependsOnMethods = "testServersDropped")
+ public void testCollocatedKey() {
+ ConsistentHash hash = manager(0).getCache(cacheName).getAdvancedCache().getDistributionManager().getConsistentHash();
+ for (int i = 0; i < 1000; i++) {
+ List<Address> addresses = hash.locate(i, numOwners);
+ Object collocatedKey = keyAffinityService.getCollocatedKey(i);
+ List<Address> addressList = hash.locate(collocatedKey, numOwners);
+ assertEquals(addresses, addressList);
+ }
+ }
+
+ public class KeyConsumer extends Thread {
+
+ volatile Exception exception;
+
+
+ private final int keysToConsume;
+ private CountDownLatch consumersStart;
+ private final List<Address> topology = topology();
+ private final Random rnd = new Random();
+
+ public KeyConsumer(int keysToConsume, CountDownLatch consumersStart) {
+ super("KeyConsumer");
+ this.keysToConsume = keysToConsume;
+ this.consumersStart = consumersStart;
+ start();
+ }
+
+ @Override
+ public void run() {
+ try {
+ consumersStart.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ return;
+ }
+ for (int i = 0; i < keysToConsume; i++) {
+ Address whichAddr = topology.get(rnd.nextInt(topology.size()));
+ try {
+ Object keyForAddress = keyAffinityService.getKeyForAddress(whichAddr);
+ assertMapsToAddress(keyForAddress, whichAddr);
+ } catch (Exception e) {
+ this.exception = e;
+ break;
+ }
+ }
+ }
+ }
+
+}
Added: trunk/core/src/test/java/org/infinispan/affinity/LocalKeyAffinityServiceTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/affinity/LocalKeyAffinityServiceTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/affinity/LocalKeyAffinityServiceTest.java 2010-05-27 18:03:09 UTC (rev 1867)
@@ -0,0 +1,63 @@
+package org.infinispan.affinity;
+
+import junit.framework.Assert;
+import org.infinispan.Cache;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * This class just overrides the methods in the base class as TestNG behaves funny with depending methods and inheritance.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test(groups = "functional", testName = "affinity.LocalKeyAffinityServiceTest")
+public class LocalKeyAffinityServiceTest extends BaseFilterKeyAffinityServiceTest {
+
+ @Override
+ protected void createService() {
+ {
+ ThreadFactory tf = new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "KeyGeneratorThread");
+ }
+ };
+ cacheManager = (EmbeddedCacheManager) caches.get(0).getCacheManager();
+ keyAffinityService = (KeyAffinityServiceImpl) KeyAffinityServiceFactory.
+ newLocalKeyAffinityService(cacheManager.getCache(cacheName), new RndKeyGenerator(),
+ Executors.newSingleThreadExecutor(tf), 100);
+ }
+ }
+
+ @Override
+ protected List<Address> getAddresses() {
+ return Collections.singletonList(cacheManager.getAddress());
+ }
+
+ public void testFilteredSingleKey() throws InterruptedException {
+ super.testSingleKey();
+ }
+
+ @Test(dependsOnMethods = "testFilteredSingleKey")
+ public void testFilteredAddNewServer() throws Exception {
+ super.testAddNewServer();
+ }
+
+ @Test(dependsOnMethods = "testFilteredAddNewServer")
+ public void testFilteredRemoveServers() throws InterruptedException {
+ super.testRemoveServers();
+ }
+
+ @Test (dependsOnMethods = "testFilteredRemoveServers")
+ public void testShutdownOwnManager() throws InterruptedException {
+ super.testShutdownOwnManager();
+ }
+}
Modified: trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java 2010-05-27 17:22:26 UTC (rev 1866)
+++ trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java 2010-05-27 18:03:09 UTC (rev 1867)
@@ -20,8 +20,11 @@
import javax.transaction.TransactionManager;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.concurrent.locks.LockSupport;
@@ -40,13 +43,13 @@
protected boolean l1CacheEnabled = true;
protected boolean l1OnRehash = false;
protected boolean performRehashing = false;
- protected static final int NUM_OWNERS = 2;
+ protected int numOwners = 2;
protected void createCacheManagers() throws Throwable {
cacheName = "dist";
configuration = getDefaultClusteredConfig(sync ? Configuration.CacheMode.DIST_SYNC : Configuration.CacheMode.DIST_ASYNC, tx);
configuration.setRehashEnabled(performRehashing);
- configuration.setNumOwners(NUM_OWNERS);
+ configuration.setNumOwners(numOwners);
if (!testRetVals) {
configuration.setUnsafeUnreliableReturnValues(true);
// we also need to use repeatable read for tests to work when we dont have reliable return values, since the
@@ -102,6 +105,12 @@
}
}
}
+
+ public static void waitForInitRehashToComplete(Collection<Cache> caches) {
+ Set<Cache> cachesSet = new HashSet<Cache>();
+ cachesSet.addAll(caches);
+ waitForInitRehashToComplete(cachesSet.toArray(new Cache[cachesSet.size()]));
+ }
}
// only used if the CH impl does not order the hash ring based on the order of the view.
Modified: trunk/core/src/test/java/org/infinispan/distribution/rehash/WorkDuringJoinTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/WorkDuringJoinTest.java 2010-05-27 17:22:26 UTC (rev 1866)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/WorkDuringJoinTest.java 2010-05-27 18:03:09 UTC (rev 1867)
@@ -4,7 +4,6 @@
import org.infinispan.distribution.BaseDistFunctionalTest;
import org.infinispan.distribution.ConsistentHash;
import org.infinispan.distribution.ConsistentHashHelper;
-import org.infinispan.manager.CacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.Address;
import org.testng.annotations.Test;
@@ -58,7 +57,7 @@
// which key should me mapped to the joiner?
MagicKey keyToTest = null;
for (MagicKey k: keys) {
- if (chNew.isKeyLocalToAddress(joinerAddress, k, NUM_OWNERS)) {
+ if (chNew.isKeyLocalToAddress(joinerAddress, k, numOwners)) {
keyToTest = k;
break;
}
More information about the infinispan-commits
mailing list