[infinispan-commits] Infinispan SVN: r1862 - trunk/core/src/main/java/org/infinispan/affinity.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed May 26 13:18:47 EDT 2010
Author: mircea.markus
Date: 2010-05-26 13:18:47 -0400 (Wed, 26 May 2010)
New Revision: 1862
Added:
trunk/core/src/main/java/org/infinispan/affinity/TopologyChangeListener.java
Modified:
trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityService.java
trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceFactory.java
trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceImpl.java
Log:
ongoing work on affinity service
Modified: trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityService.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityService.java 2010-05-26 10:22:04 UTC (rev 1861)
+++ trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityService.java 2010-05-26 17:18:47 UTC (rev 1862)
@@ -1,5 +1,6 @@
package org.infinispan.affinity;
+import org.infinispan.lifecycle.Lifecycle;
import org.infinispan.remoting.transport.Address;
import java.util.concurrent.Executor;
@@ -31,7 +32,7 @@
* @author Mircea.Markus at jboss.com
* @since 4.1
*/
-public interface KeyAffinityService<K> {
+public interface KeyAffinityService<K> extends Lifecycle {
/**
* Returns a key that will be distributed on the cluster node identified by address.
Modified: trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceFactory.java 2010-05-26 10:22:04 UTC (rev 1861)
+++ trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceFactory.java 2010-05-26 17:18:47 UTC (rev 1862)
@@ -7,6 +7,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -27,56 +28,60 @@
public class KeyAffinityServiceFactory {
/**
- * Creates an {@link org.infinispan.affinity.KeyAffinityService} instance.
+ * Creates an {@link org.infinispan.affinity.KeyAffinityService} instance that generates keys mapped to all addresses
+ * in the cluster. Changes in topology would also noticed: by adding a new node, the service will automatically start
+ * generating keys for it.
*
* @param cache the distributed cache for which this service runs
- * @param ex used for obtaining a thread that async generates keys.
- * @param keyGenerator allows one to control how the generated keys look like.
+ * @param ex used for running async key generation process.
+ * @param keyGenerator allows one to control how the generated keys look like.
* @param keyBufferSize the number of generated keys per {@link org.infinispan.remoting.transport.Address}.
+ * @param start weather to start the service or not
* @return an {@link org.infinispan.affinity.KeyAffinityService} implementation.
* @throws IllegalStateException if the supplied cache is not DIST.
*/
- public static <K,V> KeyAffinityService<K> newKeyAffinityService(Cache<K,V> cache, ExecutorFactory ex, KeyGenerator keyGenerator, int keyBufferSize) {
- return null;
+ public static <K, V> KeyAffinityService<K> newKeyAffinityService(Cache<K, V> cache, Executor ex, KeyGenerator keyGenerator, int keyBufferSize, boolean start) {
+ return new KeyAffinityServiceImpl(ex, cache, keyGenerator, keyBufferSize, null, start);
}
/**
- * Same as {@link #newKeyAffinityService(org.infinispan.Cache,org.infinispan.executors.ExecutorFactory,
- * KeyGenerator ,int)} with the an {@link RndKeyGenerator}.
+ * Same as {@link #newKeyAffinityService(org.infinispan.Cache, java.util.concurrent.Executor, KeyGenerator, int,
+ * boolean)} with start == true;
*/
- public static <K,V> KeyAffinityService newKeyAffinityService(Cache<K,V> cache, ExecutorFactory ex, int keyBufferSize) {
- return newKeyAffinityService(cache, ex, new RndKeyGenerator(), keyBufferSize);
+ public static <K, V> KeyAffinityService<K> newKeyAffinityService(Cache<K, V> cache, Executor ex, KeyGenerator keyGenerator, int keyBufferSize) {
+ return newKeyAffinityService(cache, ex, keyGenerator, keyBufferSize, true);
}
-
+
/**
- * Same as {@link #newKeyAffinityService(org.infinispan.Cache,org.infinispan.executors.ExecutorFactory,
- * KeyGenerator ,int)} with the an {@link RndKeyGenerator}.
+ * Creates a service that would only generate keys for addresses specified in filter.
+ *
+ * @param filter the set of addresses for which to generate keys
*/
- public static <K,V> KeyAffinityService newKeyAffinityService(Cache<K,V> cache, Collection<Address> forAddresses, ExecutorFactory ex, int keyBufferSize) {
- return newKeyAffinityService(cache, ex, new RndKeyGenerator(), keyBufferSize);
+ public static <K, V> KeyAffinityService newKeyAffinityService(Cache<K, V> cache, Collection<Address> filter, KeyGenerator keyGenerator, Executor ex, int keyBufferSize, boolean start) {
+ return new KeyAffinityServiceImpl(ex, cache, keyGenerator, keyBufferSize, filter, start);
}
-
+
/**
- * Same as {@link #newKeyAffinityService(org.infinispan.Cache,org.infinispan.executors.ExecutorFactory,
- * KeyGenerator ,int)} with the an {@link RndKeyGenerator}.
+ * Same as {@link #newKeyAffinityService(org.infinispan.Cache, java.util.Collection, KeyGenerator,
+ * java.util.concurrent.Executor, int, boolean)} with start == true.
*/
- public static <K,V> KeyAffinityService newLocalKeyAffinityService(Cache<K,V> cache, ExecutorFactory ex, int keyBufferSize) {
+ public static <K, V> KeyAffinityService newKeyAffinityService(Cache<K, V> cache, Collection<Address> filter, KeyGenerator keyGenerator, Executor ex, int keyBufferSize) {
+ return newKeyAffinityService(cache, filter, keyGenerator, ex, keyBufferSize, true);
+ }
+
+ /**
+ * Created an service that only generates keys for the local address.
+ */
+ public static <K, V> KeyAffinityService newLocalKeyAffinityService(Cache<K, V> cache, KeyGenerator keyGenerator, Executor ex, int keyBufferSize, boolean start) {
Address localAddress = cache.getAdvancedCache().getRpcManager().getTransport().getAddress();
Collection<Address> forAddresses = Collections.singletonList(localAddress);
- return newKeyAffinityService(cache, ex, new RndKeyGenerator(), keyBufferSize);
+ return newKeyAffinityService(cache,forAddresses, keyGenerator, ex, keyBufferSize, start);
}
/**
- * Same as {@link #newKeyAffinityService(org.infinispan.Cache,org.infinispan.executors.ExecutorFactory,
- * KeyGenerator ,int)} with the an {@link RndKeyGenerator} and an
- * {@link java.util.concurrent.Executors#newSingleThreadExecutor()} executor.
+ * Same as {@link #newLocalKeyAffinityService(org.infinispan.Cache, KeyGenerator, java.util.concurrent.Executor, int)} with start == true.
*/
- public static <K,V> KeyAffinityService newKeyAffinityService(Cache<K,V> cache, int keyBufferSize) {
- return newKeyAffinityService(cache, new ExecutorFactory() {
- @Override
- public ExecutorService getExecutor(Properties p) {
- return Executors.newSingleThreadExecutor();
- }
- }, new RndKeyGenerator(), keyBufferSize);
+ public static <K, V> KeyAffinityService newLocalKeyAffinityService(Cache<K, V> cache, KeyGenerator keyGenerator, Executor ex, int keyBufferSize) {
+ return newLocalKeyAffinityService(cache, keyGenerator, ex, keyBufferSize, true);
}
}
Modified: trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceImpl.java 2010-05-26 10:22:04 UTC (rev 1861)
+++ trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceImpl.java 2010-05-26 17:18:47 UTC (rev 1862)
@@ -1,22 +1,250 @@
package org.infinispan.affinity;
+import net.jcip.annotations.GuardedBy;
+import net.jcip.annotations.ThreadSafe;
+import org.infinispan.Cache;
+import org.infinispan.distribution.ConsistentHash;
+import org.infinispan.distribution.DistributionManager;
+import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.remoting.transport.Address;
+import org.infinispan.util.concurrent.ReclosableLatch;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
/**
* // TODO: Document this
*
* @author Mircea.Markus at jboss.com
* @since 4.1
*/
+ at ThreadSafe
public class KeyAffinityServiceImpl implements KeyAffinityService {
-
+
+ private static Log log = LogFactory.getLog(KeyAffinityServiceImpl.class);
+
+ private final Set<Address> filter;
+
+ @GuardedBy("maxNumberInvariant")
+ private final Map<Address, BlockingQueue> address2key = new ConcurrentHashMap<Address, BlockingQueue>();
+
+ private final Executor executor;
+ private final Cache cache;
+ private final KeyGenerator keyGenerator;
+ private final int bufferSize;
+ private final AtomicInteger maxNumberOfKeys = new AtomicInteger(); //(nr. of addresses) * bufferSize;
+ private final AtomicInteger exitingNumberOfKeys = new AtomicInteger();
+
+ private volatile boolean started;
+
+ /**
+ * Guards and make sure the following invariant stands: maxNumberOfKeys == address2key.keys().size() * bufferSize
+ */
+ private final ReadWriteLock maxNumberInvariant = new ReentrantReadWriteLock();
+
+
+ /**
+ * Used for coordinating between the KeyGeneratorWorker and consumers.
+ */
+ private final ReclosableLatch keyProducerStartLatch = new ReclosableLatch();
+
+
+ public KeyAffinityServiceImpl(Executor executor, Cache cache, KeyGenerator keyGenerator, int bufferSize, Collection<Address> filter, boolean start) {
+ this.executor = executor;
+ this.cache = cache;
+ this.keyGenerator = keyGenerator;
+ this.bufferSize = bufferSize;
+ if (filter != null) {
+ this.filter = new CopyOnWriteArraySet<Address>();
+ } else {
+ this.filter = null;
+ }
+ if (start)
+ start();
+ }
+
@Override
public Object getCollocatedKey(Object otherKey) {
- return null; // TODO: Customise this generated block
+ Address address = getAddressForKey(otherKey);
+ return getKeyForAddress(address);
}
@Override
public Object getKeyForAddress(Address address) {
- return null; // TODO: Customise this generated block
+ if (!started) {
+ throw new IllegalStateException("You have to start the service first!");
+ }
+ BlockingQueue queue = address2key.get(address);
+ try {
+ maxNumberInvariant.readLock().lock();
+ Object result;
+ try {
+ result = queue.take();
+ } finally {
+ maxNumberInvariant.readLock().unlock();
+ }
+ exitingNumberOfKeys.decrementAndGet();
+ return result;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return null;
+ } finally {
+ if (maxNumberOfKeys.equals(exitingNumberOfKeys)) {
+ keyProducerStartLatch.open();
+ }
+ }
}
+
+ @Override
+ public void start() {
+ if (started) {
+ log.info("Service already started, ignoring call to start!");
+ return;
+ }
+ List<Address> existingNodes = getExistingNodes();
+ maxNumberInvariant.writeLock().lock();
+ try {
+ addQueuesForAddresses(existingNodes);
+ revisitNumberOfKeys();
+ } finally {
+ maxNumberInvariant.writeLock().unlock();
+ }
+ executor.execute(new KeyGeneratorWorker());
+ cache.addListener(new TopologyChangeListener(this));
+ keyProducerStartLatch.open();
+ started = true;
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ public void handleViewChange(ViewChangedEvent vce) {
+ if (log.isTraceEnabled()) {
+ log.trace("ViewChange received: " + vce);
+ }
+ synchronized (address2key) {
+ for (Address address : vce.getOldMembers()) {
+ BlockingQueue queue = address2key.remove(address);
+ if (queue == null) {
+ KeyAffinityServiceImpl.log.warn("Null queue not expected for address: " + address + ". Did we miss a view change?");
+ }
+ }
+ addQueuesForAddresses(vce.getNewMembers());
+ }
+ }
+
+
+ public class KeyGeneratorWorker implements Runnable {
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ keyProducerStartLatch.await();
+ } catch (InterruptedException e) {
+ if (log.isInfoEnabled()) {
+ log.info("Shutting down KeyAffinity service for key set: " + filter);
+ }
+ return;
+ }
+ generateKeys();
+ }
+ }
+
+ private void generateKeys() {
+ maxNumberInvariant.writeLock().lock();
+ try {
+ for (Address address : address2key.keySet()) {
+ Object key = keyGenerator.getKey();
+ tryAddKey(address, key);
+ if (maxNumberOfKeys.equals(exitingNumberOfKeys)) {
+ keyProducerStartLatch.close();
+ return;
+ }
+ }
+ } finally {
+ maxNumberInvariant.writeLock().unlock();
+ }
+ }
+
+ private void tryAddKey(Address address, Object key) {
+ BlockingQueue queue = address2key.get(address);
+ boolean added = queue.offer(key);
+ if (log.isTraceEnabled()) {
+ log.trace((added ? "Successfully " : "Not") + "added key(" + key + ") to the address(" + address + ").");
+ }
+ }
+ }
+
+ /**
+ * Important: this *MUST* be called with WL on {@link #address2key}.
+ */
+ private void revisitNumberOfKeys() {
+ maxNumberOfKeys.set(address2key.keySet().size() * bufferSize);
+ for (Address address : address2key.keySet()) {
+ exitingNumberOfKeys.addAndGet(address2key.get(address).size());
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("revisitNumberOfKeys ends with: maxNumberOfKeys=" + maxNumberOfKeys +
+ ", exitingNumberOfKeys=" + exitingNumberOfKeys);
+ }
+ }
+
+ /**
+ * Important: this *MUST* be called with WL on {@link #address2key}.
+ */
+ private void addQueuesForAddresses(Collection<Address> addresses) {
+ for (Address address : addresses) {
+ if (interestedInAddress(address)) {
+ address2key.put(address, new ArrayBlockingQueue(bufferSize));
+ } else {
+ if (log.isTraceEnabled())
+ log.trace("Skipping address: " + address);
+ }
+ }
+ }
+
+ private boolean interestedInAddress(Address address) {
+ return filter == null || filter.contains(address);
+ }
+
+ private List<Address> getExistingNodes() {
+ return cache.getAdvancedCache().getRpcManager().getTransport().getMembers();
+ }
+
+ private Address getAddressForKey(Object key) {
+ DistributionManager distributionManager = getDistributionManager();
+ ConsistentHash hash = distributionManager.getConsistentHash();
+ List<Address> addressList = hash.locate(key, 1);
+ if (addressList.size() == 0) {
+ throw new IllegalStateException("Empty address list returned by consistent hash " + hash + " for key " + key);
+ }
+ return addressList.get(0);
+ }
+
+ private DistributionManager getDistributionManager() {
+ DistributionManager distributionManager = cache.getAdvancedCache().getDistributionManager();
+ if (distributionManager == null) {
+ throw new IllegalStateException("Null distribution manager. Is this an distributed(v.s. replicated) cache?");
+ }
+ return distributionManager;
+ }
+
+ public Map<Address, BlockingQueue> getAddress2KeysMapping() {
+ return Collections.unmodifiableMap(address2key);
+ }
}
Added: trunk/core/src/main/java/org/infinispan/affinity/TopologyChangeListener.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/affinity/TopologyChangeListener.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/affinity/TopologyChangeListener.java 2010-05-26 17:18:47 UTC (rev 1862)
@@ -0,0 +1,30 @@
+package org.infinispan.affinity;
+
+import org.infinispan.notifications.Listener;
+import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
+import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+* // TODO: Document this
+*
+* @author Mircea.Markus at jboss.com
+* @since 4.1
+*/
+ at Listener
+class TopologyChangeListener {
+ private final KeyAffinityServiceImpl keyAffinityService;
+
+ public TopologyChangeListener(KeyAffinityServiceImpl keyAffinityService) {
+ this.keyAffinityService = keyAffinityService;
+ }
+
+ @ViewChanged
+ public void handleViewChange(ViewChangedEvent vce) {
+ keyAffinityService.handleViewChange(vce);
+ }
+}
More information about the infinispan-commits
mailing list