[infinispan-commits] Infinispan SVN: r1862 - trunk/core/src/main/java/org/infinispan/affinity.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed May 26 13:18:47 EDT 2010


Author: mircea.markus
Date: 2010-05-26 13:18:47 -0400 (Wed, 26 May 2010)
New Revision: 1862

Added:
   trunk/core/src/main/java/org/infinispan/affinity/TopologyChangeListener.java
Modified:
   trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityService.java
   trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceFactory.java
   trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceImpl.java
Log:
ongoing work on affinity service

Modified: trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityService.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityService.java	2010-05-26 10:22:04 UTC (rev 1861)
+++ trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityService.java	2010-05-26 17:18:47 UTC (rev 1862)
@@ -1,5 +1,6 @@
 package org.infinispan.affinity;
 
+import org.infinispan.lifecycle.Lifecycle;
 import org.infinispan.remoting.transport.Address;
 
 import java.util.concurrent.Executor;
@@ -31,7 +32,7 @@
  * @author Mircea.Markus at jboss.com
  * @since 4.1
  */
-public interface KeyAffinityService<K> {
+public interface KeyAffinityService<K> extends Lifecycle {
 
    /**
     * Returns a key that will be distributed on the cluster node identified by address.

Modified: trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceFactory.java	2010-05-26 10:22:04 UTC (rev 1861)
+++ trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceFactory.java	2010-05-26 17:18:47 UTC (rev 1862)
@@ -7,6 +7,7 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Properties;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -27,56 +28,60 @@
 public class KeyAffinityServiceFactory {
 
    /**
-    * Creates an {@link org.infinispan.affinity.KeyAffinityService} instance.
+    * Creates an {@link org.infinispan.affinity.KeyAffinityService} instance that generates keys mapped to all addresses
+    * in the cluster. Changes in topology would also noticed: by adding a new node, the service will automatically start
+    * generating keys for it.
     *
     * @param cache         the distributed cache for which this service runs
-    * @param ex            used for obtaining a thread that async generates keys.
-    * @param keyGenerator   allows one to control how the generated keys look like.
+    * @param ex            used for running async key generation process.
+    * @param keyGenerator  allows one to control how the generated keys look like.
     * @param keyBufferSize the number of generated keys per {@link org.infinispan.remoting.transport.Address}.
+    * @param start         weather to start the service or not
     * @return an {@link org.infinispan.affinity.KeyAffinityService} implementation.
     * @throws IllegalStateException if the supplied cache is not DIST.
     */
-   public static <K,V> KeyAffinityService<K> newKeyAffinityService(Cache<K,V> cache, ExecutorFactory ex, KeyGenerator keyGenerator, int keyBufferSize) {
-      return null;
+   public static <K, V> KeyAffinityService<K> newKeyAffinityService(Cache<K, V> cache, Executor ex, KeyGenerator keyGenerator, int keyBufferSize, boolean start) {
+      return new KeyAffinityServiceImpl(ex, cache, keyGenerator, keyBufferSize, null, start);
    }
 
    /**
-    * Same as {@link #newKeyAffinityService(org.infinispan.Cache,org.infinispan.executors.ExecutorFactory,
-    * KeyGenerator ,int)} with the an {@link RndKeyGenerator}.
+    * Same as {@link #newKeyAffinityService(org.infinispan.Cache, java.util.concurrent.Executor, KeyGenerator, int,
+    * boolean)} with start == true;
     */
-   public static <K,V> KeyAffinityService newKeyAffinityService(Cache<K,V> cache, ExecutorFactory ex, int keyBufferSize) {
-      return newKeyAffinityService(cache, ex, new RndKeyGenerator(), keyBufferSize);
+   public static <K, V> KeyAffinityService<K> newKeyAffinityService(Cache<K, V> cache, Executor ex, KeyGenerator keyGenerator, int keyBufferSize) {
+      return newKeyAffinityService(cache, ex, keyGenerator, keyBufferSize, true);
    }
-   
+
    /**
-    * Same as {@link #newKeyAffinityService(org.infinispan.Cache,org.infinispan.executors.ExecutorFactory,
-    * KeyGenerator ,int)} with the an {@link RndKeyGenerator}.
+    * Creates a service that would only generate keys for addresses specified in filter.
+    *
+    * @param filter the set of addresses for which to generate keys
     */
-   public static <K,V> KeyAffinityService newKeyAffinityService(Cache<K,V> cache, Collection<Address> forAddresses, ExecutorFactory ex, int keyBufferSize) {
-      return newKeyAffinityService(cache, ex, new RndKeyGenerator(), keyBufferSize);
+   public static <K, V> KeyAffinityService newKeyAffinityService(Cache<K, V> cache, Collection<Address> filter, KeyGenerator keyGenerator, Executor ex, int keyBufferSize, boolean start) {
+      return new KeyAffinityServiceImpl(ex, cache, keyGenerator, keyBufferSize, filter, start);
    }
-   
+
    /**
-    * Same as {@link #newKeyAffinityService(org.infinispan.Cache,org.infinispan.executors.ExecutorFactory,
-    * KeyGenerator ,int)} with the an {@link RndKeyGenerator}.
+    * Same as {@link #newKeyAffinityService(org.infinispan.Cache, java.util.Collection, KeyGenerator,
+    * java.util.concurrent.Executor, int, boolean)} with start == true.
     */
-   public static <K,V> KeyAffinityService newLocalKeyAffinityService(Cache<K,V> cache, ExecutorFactory ex, int keyBufferSize) {
+   public static <K, V> KeyAffinityService newKeyAffinityService(Cache<K, V> cache, Collection<Address> filter, KeyGenerator keyGenerator, Executor ex, int keyBufferSize) {
+      return newKeyAffinityService(cache, filter, keyGenerator, ex, keyBufferSize, true);
+   }
+
+   /**
+    * Created an service that only generates keys for the local address.
+    */
+   public static <K, V> KeyAffinityService newLocalKeyAffinityService(Cache<K, V> cache, KeyGenerator keyGenerator, Executor ex, int keyBufferSize, boolean start) {
       Address localAddress = cache.getAdvancedCache().getRpcManager().getTransport().getAddress();
       Collection<Address> forAddresses = Collections.singletonList(localAddress);
-      return newKeyAffinityService(cache, ex, new RndKeyGenerator(), keyBufferSize);
+      return newKeyAffinityService(cache,forAddresses, keyGenerator, ex, keyBufferSize, start);
    }
 
    /**
-    * Same as {@link #newKeyAffinityService(org.infinispan.Cache,org.infinispan.executors.ExecutorFactory,
-    * KeyGenerator ,int)} with the an {@link RndKeyGenerator} and an
-    * {@link java.util.concurrent.Executors#newSingleThreadExecutor()} executor.
+    * Same as {@link #newLocalKeyAffinityService(org.infinispan.Cache, KeyGenerator, java.util.concurrent.Executor, int)} with start == true.
     */
-   public static <K,V> KeyAffinityService newKeyAffinityService(Cache<K,V> cache, int keyBufferSize) {
-      return newKeyAffinityService(cache, new ExecutorFactory() {
-         @Override
-         public ExecutorService getExecutor(Properties p) {
-            return Executors.newSingleThreadExecutor();
-         }
-      }, new RndKeyGenerator(), keyBufferSize);
+   public static <K, V> KeyAffinityService newLocalKeyAffinityService(Cache<K, V> cache, KeyGenerator keyGenerator, Executor ex, int keyBufferSize) {
+      return newLocalKeyAffinityService(cache, keyGenerator, ex, keyBufferSize, true);
    }
 }

Modified: trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceImpl.java	2010-05-26 10:22:04 UTC (rev 1861)
+++ trunk/core/src/main/java/org/infinispan/affinity/KeyAffinityServiceImpl.java	2010-05-26 17:18:47 UTC (rev 1862)
@@ -1,22 +1,250 @@
 package org.infinispan.affinity;
 
+import net.jcip.annotations.GuardedBy;
+import net.jcip.annotations.ThreadSafe;
+import org.infinispan.Cache;
+import org.infinispan.distribution.ConsistentHash;
+import org.infinispan.distribution.DistributionManager;
+import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
 import org.infinispan.remoting.transport.Address;
+import org.infinispan.util.concurrent.ReclosableLatch;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 /**
  * // TODO: Document this
  *
  * @author Mircea.Markus at jboss.com
  * @since 4.1
  */
+ at ThreadSafe
 public class KeyAffinityServiceImpl implements KeyAffinityService {
-   
+
+   private static Log log = LogFactory.getLog(KeyAffinityServiceImpl.class);
+
+   private final Set<Address> filter;
+
+   @GuardedBy("maxNumberInvariant")
+   private final Map<Address, BlockingQueue> address2key = new ConcurrentHashMap<Address, BlockingQueue>();
+
+   private final Executor executor;
+   private final Cache cache;
+   private final KeyGenerator keyGenerator;
+   private final int bufferSize;
+   private final AtomicInteger maxNumberOfKeys = new AtomicInteger(); //(nr. of addresses) * bufferSize;
+   private final AtomicInteger exitingNumberOfKeys = new AtomicInteger();
+
+   private volatile boolean started;
+
+   /**
+    * Guards and make sure the following invariant stands:  maxNumberOfKeys ==  address2key.keys().size() * bufferSize
+    */
+   private final ReadWriteLock maxNumberInvariant = new ReentrantReadWriteLock();
+
+
+   /**
+    * Used for coordinating between the KeyGeneratorWorker and consumers.
+    */
+   private final ReclosableLatch keyProducerStartLatch = new ReclosableLatch();
+
+
+   public KeyAffinityServiceImpl(Executor executor, Cache cache, KeyGenerator keyGenerator, int bufferSize, Collection<Address> filter, boolean start) {
+      this.executor = executor;
+      this.cache = cache;
+      this.keyGenerator = keyGenerator;
+      this.bufferSize = bufferSize;
+      if (filter != null) {
+         this.filter = new CopyOnWriteArraySet<Address>();
+      } else {
+         this.filter = null;
+      }
+      if (start)
+         start();
+   }
+
    @Override
    public Object getCollocatedKey(Object otherKey) {
-      return null;  // TODO: Customise this generated block
+      Address address = getAddressForKey(otherKey);
+      return getKeyForAddress(address);
    }
 
    @Override
    public Object getKeyForAddress(Address address) {
-      return null;  // TODO: Customise this generated block
+      if (!started) {
+         throw new IllegalStateException("You have to start the service first!");
+      }
+      BlockingQueue queue = address2key.get(address);
+      try {
+         maxNumberInvariant.readLock().lock();
+         Object result;
+         try {
+            result = queue.take();
+         } finally {
+            maxNumberInvariant.readLock().unlock();
+         }
+         exitingNumberOfKeys.decrementAndGet();
+         return result;
+      } catch (InterruptedException e) {
+         Thread.currentThread().interrupt();
+         return null;
+      } finally {
+         if (maxNumberOfKeys.equals(exitingNumberOfKeys)) {
+            keyProducerStartLatch.open();
+         }
+      }
    }
+
+   @Override
+   public void start() {
+      if (started) {
+         log.info("Service already started, ignoring call to start!");
+         return;
+      }
+      List<Address> existingNodes = getExistingNodes();
+      maxNumberInvariant.writeLock().lock();
+      try {
+         addQueuesForAddresses(existingNodes);
+         revisitNumberOfKeys();
+      } finally {
+         maxNumberInvariant.writeLock().unlock();
+      }
+      executor.execute(new KeyGeneratorWorker());
+      cache.addListener(new TopologyChangeListener(this));
+      keyProducerStartLatch.open();
+      started = true;
+   }
+
+   @Override
+   public void stop() {
+   }
+
+   public void handleViewChange(ViewChangedEvent vce) {
+      if (log.isTraceEnabled()) {
+         log.trace("ViewChange received: " + vce);
+      }
+      synchronized (address2key) {
+         for (Address address : vce.getOldMembers()) {
+            BlockingQueue queue = address2key.remove(address);
+            if (queue == null) {
+               KeyAffinityServiceImpl.log.warn("Null queue not expected for address: " + address + ". Did we miss a view change?");
+            }
+         }
+         addQueuesForAddresses(vce.getNewMembers());
+      }
+   }
+
+
+   public class KeyGeneratorWorker implements Runnable {
+
+      @Override
+      public void run() {
+         while (true) {
+            try {
+               keyProducerStartLatch.await();
+            } catch (InterruptedException e) {
+               if (log.isInfoEnabled()) {
+                  log.info("Shutting down KeyAffinity service for key set: " + filter);
+               }
+               return;
+            }
+            generateKeys();
+         }
+      }
+
+      private void generateKeys() {
+         maxNumberInvariant.writeLock().lock();
+         try {
+            for (Address address : address2key.keySet()) {
+               Object key = keyGenerator.getKey();
+               tryAddKey(address, key);
+               if (maxNumberOfKeys.equals(exitingNumberOfKeys)) {
+                  keyProducerStartLatch.close();
+                  return;
+               }
+            }
+         } finally {
+            maxNumberInvariant.writeLock().unlock();
+         }
+      }
+
+      private void tryAddKey(Address address, Object key) {
+         BlockingQueue queue = address2key.get(address);
+         boolean added = queue.offer(key);
+         if (log.isTraceEnabled()) {
+            log.trace((added ? "Successfully " : "Not") + "added key(" + key + ") to the address(" + address + ").");
+         }
+      }
+   }
+
+   /**
+    * Important: this *MUST* be called with WL on {@link #address2key}.
+    */
+   private void revisitNumberOfKeys() {
+      maxNumberOfKeys.set(address2key.keySet().size() * bufferSize);
+      for (Address address : address2key.keySet()) {
+         exitingNumberOfKeys.addAndGet(address2key.get(address).size());
+      }
+      if (log.isTraceEnabled()) {
+         log.trace("revisitNumberOfKeys ends with: maxNumberOfKeys=" + maxNumberOfKeys +
+               ", exitingNumberOfKeys=" + exitingNumberOfKeys);
+      }
+   }
+
+   /**
+    * Important: this *MUST* be called with WL on {@link #address2key}.
+    */
+   private void addQueuesForAddresses(Collection<Address> addresses) {
+      for (Address address : addresses) {
+         if (interestedInAddress(address)) {
+            address2key.put(address, new ArrayBlockingQueue(bufferSize));
+         } else {
+            if (log.isTraceEnabled())
+               log.trace("Skipping address: " + address);
+         }
+      }
+   }
+
+   private boolean interestedInAddress(Address address) {
+      return filter == null || filter.contains(address);
+   }
+
+   private List<Address> getExistingNodes() {
+      return cache.getAdvancedCache().getRpcManager().getTransport().getMembers();
+   }
+
+   private Address getAddressForKey(Object key) {
+      DistributionManager distributionManager = getDistributionManager();
+      ConsistentHash hash = distributionManager.getConsistentHash();
+      List<Address> addressList = hash.locate(key, 1);
+      if (addressList.size() == 0) {
+         throw new IllegalStateException("Empty address list returned by consistent hash " + hash + " for key " + key);
+      }
+      return addressList.get(0);
+   }
+
+   private DistributionManager getDistributionManager() {
+      DistributionManager distributionManager = cache.getAdvancedCache().getDistributionManager();
+      if (distributionManager == null) {
+         throw new IllegalStateException("Null distribution manager. Is this an distributed(v.s. replicated) cache?");
+      }
+      return distributionManager;
+   }
+
+   public Map<Address, BlockingQueue> getAddress2KeysMapping() {
+      return Collections.unmodifiableMap(address2key);
+   }
 }

Added: trunk/core/src/main/java/org/infinispan/affinity/TopologyChangeListener.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/affinity/TopologyChangeListener.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/affinity/TopologyChangeListener.java	2010-05-26 17:18:47 UTC (rev 1862)
@@ -0,0 +1,30 @@
+package org.infinispan.affinity;
+
+import org.infinispan.notifications.Listener;
+import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
+import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+* // TODO: Document this
+*
+* @author Mircea.Markus at jboss.com
+* @since 4.1
+*/
+ at Listener
+class TopologyChangeListener {
+   private final KeyAffinityServiceImpl keyAffinityService;
+
+   public TopologyChangeListener(KeyAffinityServiceImpl keyAffinityService) {
+      this.keyAffinityService = keyAffinityService;
+   }
+
+   @ViewChanged
+   public void handleViewChange(ViewChangedEvent vce) {
+      keyAffinityService.handleViewChange(vce);
+   }
+}



More information about the infinispan-commits mailing list