[infinispan-commits] Infinispan SVN: r2477 - in trunk/core/src/main/java/org/infinispan: distribution and 1 other directory.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Sun Oct 3 14:57:11 EDT 2010


Author: vblagojevic at jboss.com
Date: 2010-10-03 14:57:11 -0400 (Sun, 03 Oct 2010)
New Revision: 2477

Modified:
   trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
   trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
   trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
   trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
Log:
[ISPN-634] - Merging on distributed caches can lead rehash failure
[ISPN-668] - Node fails to join a cluster after a shutdown or crash of a node in cluster. Joining node can be restart or new (different port) one. 

Modified: trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java	2010-10-03 18:43:09 UTC (rev 2476)
+++ trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java	2010-10-03 18:57:11 UTC (rev 2477)
@@ -45,7 +45,7 @@
    public static final int COMMAND_ID = 17;
 
    public enum Type {
-      JOIN_REQ, JOIN_REHASH_START, JOIN_REHASH_END, JOIN_COMPLETE, JOIN_ABORT, PULL_STATE_JOIN, PULL_STATE_LEAVE, PUSH_STATE, DRAIN_TX, DRAIN_TX_PREPARES
+      JOIN_REQ, JOIN_REHASH_START, JOIN_REHASH_END, JOIN_ABORT, PULL_STATE_JOIN, PULL_STATE_LEAVE, PUSH_STATE, DRAIN_TX, DRAIN_TX_PREPARES
    }
 
    Type type;
@@ -120,9 +120,6 @@
          case JOIN_REHASH_END:
             distributionManager.informRehashOnJoin(sender, false);
             return null;
-         case JOIN_COMPLETE:
-            distributionManager.notifyJoinComplete(sender);
-            return null;
          case PULL_STATE_JOIN:
             return pullStateForJoin();             
          case PULL_STATE_LEAVE:

Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java	2010-10-03 18:43:09 UTC (rev 2476)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java	2010-10-03 18:57:11 UTC (rev 2477)
@@ -110,13 +110,6 @@
    List<Address> requestPermissionToJoin(Address joiner);
 
    /**
-    * Notifies a coordinator when a join completes
-    *
-    * @param joiner joiner who has completed.
-    */
-   void notifyJoinComplete(Address joiner);
-
-   /**
     * This will cause all nodes to add the joiner to their consistent hash instance (usually by creating a {@link org.infinispan.distribution.UnionConsistentHash}
     *
     * @param joiner address of joiner

Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2010-10-03 18:43:09 UTC (rev 2476)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2010-10-03 18:57:11 UTC (rev 2477)
@@ -69,50 +69,74 @@
 /**
  * The default distribution manager implementation
  *
- * @author Manik Surtani
+ * @author Manik Surtani, Vladimir Blagojevic
  * @since 4.0
  */
 @MBean(objectName = "DistributionManager", description = "Component that handles distribution of content across a cluster")
 public class DistributionManagerImpl implements DistributionManager {
    private final Log log = LogFactory.getLog(DistributionManagerImpl.class);
-   private final boolean trace = log.isTraceEnabled();
-   Configuration configuration;
-   volatile ConsistentHash consistentHash, oldConsistentHash;
-   Address self;
-   CacheLoaderManager cacheLoaderManager;
+   
+   private Configuration configuration;
+   private volatile ConsistentHash consistentHash, oldConsistentHash;
+   private Address self;
+   private CacheLoaderManager cacheLoaderManager;
    RpcManager rpcManager;
-   CacheManagerNotifier notifier;
-   int replCount;
-   ViewChangeListener listener;
-   CommandsFactory cf;
-   LinkedBlockingQueue<Runnable> rehashQueue = new LinkedBlockingQueue<Runnable>();
-   ThreadFactory tf = new ThreadFactory() {
-      public Thread newThread(Runnable r) {
-         Thread t = new Thread(r);
-         t.setDaemon(true);
-         t.setPriority(Thread.MIN_PRIORITY);
-         t.setName("Rehasher-" + rpcManager.getTransport().getAddress());
-         return t;
-      }
-   };
-   ExecutorService rehashExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, rehashQueue, tf);
+   private CacheManagerNotifier notifier;
 
-   TransactionLogger transactionLogger = new TransactionLoggerImpl();
+   private ViewChangeListener listener;
+   private CommandsFactory cf;
+   
+   private final ExecutorService rehashExecutor;
+
+   private final TransactionLogger transactionLogger = new TransactionLoggerImpl();
+   
+   /**
+    * Rehash flag set by a rehash task associated with this DistributionManager
+    */
    volatile boolean rehashInProgress = false;
-   volatile Address joiner;
-   static final AtomicReferenceFieldUpdater<DistributionManagerImpl, Address> JOINER_CAS =
+   
+   
+   /**
+    * Address of a joiner node requesting to join Infinispan cluster. Each node in the cluster is
+    * aware of joiner's identity. After joiner successfully joins (or fails to join), joiner field is nullified
+    */
+   private volatile Address joiner;
+   
+   private static final AtomicReferenceFieldUpdater<DistributionManagerImpl, Address> JOINER_CAS =
            AtomicReferenceFieldUpdater.newUpdater(DistributionManagerImpl.class, Address.class, "joiner");
+   
    private DataContainer dataContainer;
    private InterceptorChain interceptorChain;
    private InvocationContextContainer icc;
+   
    @ManagedAttribute(description = "If true, the node has successfully joined the grid and is considered to hold state.  If false, the join process is still in progress.")
    @Metric(displayName = "Is join completed?", dataType = DataType.TRAIT)
    private volatile boolean joinComplete = false;
-   Future<Void> joinFuture;
-   final List<Address> leavers = new CopyOnWriteArrayList<Address>();
-   volatile Future<Void> leaveTaskFuture;
-   final ReclosableLatch startLatch = new ReclosableLatch(false);
+   
+   private Future<Void> joinFuture;
+   private final List<Address> leavers = new CopyOnWriteArrayList<Address>();
+   private volatile Future<Void> leaveTaskFuture;
+   private final ReclosableLatch startLatch = new ReclosableLatch(false);
+  
 
+   /**
+    * Default constructor
+    */
+   public DistributionManagerImpl() {
+      super();      
+      LinkedBlockingQueue<Runnable> rehashQueue = new LinkedBlockingQueue<Runnable>();
+      ThreadFactory tf = new ThreadFactory() {
+         public Thread newThread(Runnable r) {
+            Thread t = new Thread(r);
+            t.setDaemon(true);
+            t.setPriority(Thread.MIN_PRIORITY);
+            t.setName("Rehasher-" + rpcManager.getTransport().getAddress());
+            return t;
+         }
+      };
+      rehashExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, rehashQueue, tf);
+   }
+
    @Inject
    public void init(Configuration configuration, RpcManager rpcManager, CacheManagerNotifier notifier, CommandsFactory cf,
                     DataContainer dataContainer, InterceptorChain interceptorChain, InvocationContextContainer icc,
@@ -131,14 +155,15 @@
 
    @Start(priority = 20)
    public void start() throws Exception {
-      if (log.isTraceEnabled()) {
-         log.trace("Starting distribution manager on " + getMyAddress());
-      }
-      replCount = configuration.getNumOwners();
+      log.trace("Starting distribution manager on " + getMyAddress());            
       listener = new ViewChangeListener();
       notifier.addListener(listener);
       join();
    }
+   
+   private int getReplCount(){
+      return configuration.getNumOwners();
+   }
 
    private Address getMyAddress() {
       return rpcManager != null ? rpcManager.getAddress() : null;
@@ -162,6 +187,7 @@
 
    private void join() throws Exception {
       startLatch.close();
+      setJoinComplete(false);
       Transport t = rpcManager.getTransport();
       List<Address> members = t.getMembers();
       consistentHash = createConsistentHash(configuration, members);
@@ -169,6 +195,7 @@
       if (members.size() > 1 && !t.getCoordinator().equals(self)) {
          JoinTask joinTask = new JoinTask(rpcManager, cf, configuration, dataContainer, this);
          joinFuture = rehashExecutor.submit(joinTask);
+         //task will set joinComplete flag
       } else {
          setJoinComplete(true);
       }
@@ -246,20 +273,20 @@
    boolean willReceiveLeaverState(Address leaver) {
       ConsistentHash ch = consistentHash instanceof UnionConsistentHash ? oldConsistentHash : consistentHash;
       int dist = ch.getDistance(leaver, self);
-      return dist >= 0 && dist <= replCount;
+      return dist >= 0 && dist <= getReplCount();
    }
 
    public boolean isLocal(Object key) {
-      return consistentHash == null || consistentHash.isKeyLocalToAddress(self, key, replCount);
+      return consistentHash == null || consistentHash.isKeyLocalToAddress(self, key, getReplCount());
    }
 
    public List<Address> locate(Object key) {
       if (consistentHash == null) return Collections.singletonList(self);
-      return consistentHash.locate(key, replCount);
+      return consistentHash.locate(key, getReplCount());
    }
 
    public Map<Object, List<Address>> locateAll(Collection<Object> keys) {
-      return locateAll(keys, replCount);
+      return locateAll(keys, getReplCount());
    }
 
    public Map<Object, List<Address>> locateAll(Collection<Object> keys, int numOwners) {
@@ -308,51 +335,41 @@
    @ManagedOperation(description = "Determines whether a given key is affected by an ongoing rehash, if any.")
    @Operation(displayName = "Could key be affected by rehash?")
    public boolean isAffectedByRehash(@Parameter(name = "key", description = "Key to check") Object key) {
-      return transactionLogger.isEnabled() && oldConsistentHash != null && !oldConsistentHash.locate(key, replCount).contains(self);
+      return transactionLogger.isEnabled() && oldConsistentHash != null && !oldConsistentHash.locate(key, getReplCount()).contains(self);
    }
 
    public TransactionLogger getTransactionLogger() {
       return transactionLogger;
    }
 
-   public List<Address> requestPermissionToJoin(Address joiner) {
-      if (JOINER_CAS.compareAndSet(this, null, joiner)) {
-         if (trace) log.trace("Allowing {0} to join", joiner);
+   public List<Address> requestPermissionToJoin(Address a) {
+      if (JOINER_CAS.compareAndSet(this, null, a)) {
+         log.trace("Allowing {0} to join", a);
          return new LinkedList<Address>(consistentHash.getCaches());
       } else {
-         if (trace)
-            log.trace("Not alowing {0} to join since there is a join already in progress {1}", joiner, this.joiner);
+         log.trace("Not alowing {0} to join since there is a join already in progress for node {1}", a, joiner);
          return null;
       }
    }
 
-   public void notifyJoinComplete(Address joiner) {
-      log.trace("Received notification that {0} has completed a join.  Current 'joiner' flag is {1}, setting this to null.", joiner, this.joiner);
-      if (this.joiner != null) {
-         if (this.joiner.equals(joiner)) this.joiner = null;
-      }
-   }
-
-   public void informRehashOnJoin(Address joiner, boolean starting) {
-      log.trace("Informed of a JOIN by {0}.  Starting? {1}", joiner, starting);
+   public void informRehashOnJoin(Address a, boolean starting) {
+      log.trace("Informed of a JOIN by {0}.  Starting? {1}", a, starting);
       if (!starting) {
          if (consistentHash instanceof UnionConsistentHash) {
             UnionConsistentHash uch = (UnionConsistentHash) consistentHash;
             consistentHash = uch.getNewConsistentHash();
             oldConsistentHash = null;
          }
-         rehashInProgress = false;
+         joiner = null;            
       } else {
          ConsistentHash chOld = consistentHash;
          if (chOld instanceof UnionConsistentHash) throw new RuntimeException("Not expecting a union CH!");
          oldConsistentHash = chOld;
-         this.joiner = joiner;
-         rehashInProgress = true;
-
+         joiner = a;        
          ConsistentHash chNew;
          chNew = (ConsistentHash) Util.getInstance(configuration.getConsistentHashClass());
          List<Address> newAddresses = new LinkedList<Address>(chOld.getCaches());
-         newAddresses.add(joiner);
+         newAddresses.add(a);
          chNew.setCaches(newAddresses);
          consistentHash = new UnionConsistentHash(chOld, chNew);
       }
@@ -360,7 +377,7 @@
    }
 
    public void applyState(ConsistentHash consistentHash, Map<Object, InternalCacheValue> state) {
-      if (trace) log.trace("Apply state with " + state);
+      log.trace("Apply state with " + state);
       for (Map.Entry<Object, InternalCacheValue> e : state.entrySet()) {
          if (consistentHash.locate(e.getKey(), configuration.getNumOwners()).contains(self)) {
             InternalCacheValue v = e.getValue();
@@ -376,9 +393,7 @@
    public class ViewChangeListener {
       @ViewChanged
       public void handleViewChange(ViewChangedEvent e) {
-         if (log.isTraceEnabled()) {
-            log.trace("view change received. Needs to re-join? " + e.isNeedsToRejoin());
-         }
+         log.trace("view change received. Needs to re-join? " + e.isNeedsToRejoin());         
          boolean started;
          // how long do we wait for a startup?
          if (e.isNeedsToRejoin()) {
@@ -428,9 +443,7 @@
    }
 
    public void setJoinComplete(boolean joinComplete) {
-      if (log.isTraceEnabled()) {
-         log.trace("Setting joinComplete to " + joinComplete + " for node " + rpcManager.getAddress());
-      }
+      log.debug("Setting joinComplete to " + joinComplete + " for node " + rpcManager.getAddress());      
       this.joinComplete = joinComplete;
    }
 

Modified: trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java	2010-10-03 18:43:09 UTC (rev 2476)
+++ trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java	2010-10-03 18:57:11 UTC (rev 2477)
@@ -50,8 +50,7 @@
 
    public JoinTask(RpcManager rpcManager, CommandsFactory commandsFactory, Configuration conf,
             DataContainer dataContainer, DistributionManagerImpl dmi) {
-      super(dmi, rpcManager, conf, commandsFactory, dataContainer);
-      this.dataContainer = dataContainer;
+      super(dmi, rpcManager, conf, commandsFactory, dataContainer);      
       this.self = rpcManager.getTransport().getAddress();
    }
 
@@ -91,52 +90,49 @@
             chNew = createConsistentHash(configuration, chOld.getCaches(), self);
          
          dmi.setConsistentHash(chNew);
-
-         if (configuration.isRehashEnabled()) {
-            // 3.  Enable TX logging
-            transactionLogger.enable();
-
-            // 4.  Broadcast new temp CH
-            rpcManager.broadcastRpcCommand(cf.buildRehashControlCommand(JOIN_REHASH_START, self), true, true);
-
-            // 5.  txLogger being enabled will cause ClusteredGetCommands to return uncertain responses.
-
-            // 6.  pull state from everyone.
-            Address myAddress = rpcManager.getTransport().getAddress();
-            
-            RehashControlCommand cmd = cf.buildRehashControlCommand(PULL_STATE_JOIN, myAddress, null, chOld, chNew,null);
-            // TODO I should be able to process state chunks from different nodes simultaneously!!
-            List<Address> addressesWhoMaySendStuff = getAddressesWhoMaySendStuff(chNew, configuration.getNumOwners());
-            List<Response> resps = rpcManager.invokeRemotely(addressesWhoMaySendStuff, cmd, SYNCHRONOUS, configuration.getRehashRpcTimeout(), true);
-
-            // 7.  Apply state
-            for (Response r : resps) {
-               if (r instanceof SuccessfulResponse) {
-                  Map<Object, InternalCacheValue> state = getStateFromResponse((SuccessfulResponse) r);
-                  dmi.applyState(chNew, state);
+         try {
+            if (configuration.isRehashEnabled()) {
+               // 3.  Enable TX logging
+               transactionLogger.enable();
+   
+               // 4.  Broadcast new temp CH
+               rpcManager.broadcastRpcCommand(cf.buildRehashControlCommand(JOIN_REHASH_START, self), true, true);
+   
+               // 5.  txLogger being enabled will cause ClusteredGetCommands to return uncertain responses.
+   
+               // 6.  pull state from everyone.
+               Address myAddress = rpcManager.getTransport().getAddress();
+               
+               RehashControlCommand cmd = cf.buildRehashControlCommand(PULL_STATE_JOIN, myAddress, null, chOld, chNew,null);
+               // TODO I should be able to process state chunks from different nodes simultaneously!!
+               List<Address> addressesWhoMaySendStuff = getAddressesWhoMaySendStuff(chNew, configuration.getNumOwners());
+               List<Response> resps = rpcManager.invokeRemotely(addressesWhoMaySendStuff, cmd, SYNCHRONOUS, configuration.getRehashRpcTimeout(), true);
+   
+               // 7.  Apply state
+               for (Response r : resps) {
+                  if (r instanceof SuccessfulResponse) {
+                     Map<Object, InternalCacheValue> state = getStateFromResponse((SuccessfulResponse) r);
+                     dmi.applyState(chNew, state);
+                  }
                }
+   
+               // 8.  Drain logs
+               dmi.drainLocalTransactionLog();
+               unlocked = true;
+            } else {
+               rpcManager.broadcastRpcCommand(cf.buildRehashControlCommand(JOIN_REHASH_START, self), true, true);
+               if (trace) log.trace("Rehash not enabled, so not pulling state.");
+            }                                 
+         } finally {
+            // 10.
+            rpcManager.broadcastRpcCommand(cf.buildRehashControlCommand(JOIN_REHASH_END, self), true, true);
+            
+            if (configuration.isRehashEnabled()) {
+               // 11.
+               invalidateInvalidHolders(chOld, chNew);
             }
-
-            // 8.  Drain logs
-            dmi.drainLocalTransactionLog();
-         } else {
-            if (trace) log.trace("Rehash not enabled, so not pulling state.");
          }
-         unlocked = true;
 
-         if (!configuration.isRehashEnabled()) {
-            rpcManager.broadcastRpcCommand(cf.buildRehashControlCommand(JOIN_REHASH_START, self), true, true);
-         }
-         // 10.
-         rpcManager.broadcastRpcCommand(cf.buildRehashControlCommand(JOIN_REHASH_END, self), true, true);
-         rpcManager.invokeRemotely(coordinator(), cf.buildRehashControlCommand(JOIN_COMPLETE, self), SYNCHRONOUS,
-                                   configuration.getRehashRpcTimeout(), true);
-
-         if (configuration.isRehashEnabled()) {
-            // 11.
-            invalidateInvalidHolders(chOld, chNew);
-         }
-
       } catch (Exception e) {
          log.error("Caught exception!", e);
          throw new CacheException("Unexpected exception", e);



More information about the infinispan-commits mailing list