[infinispan-commits] Infinispan SVN: r1803 - trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Tue May 18 11:37:47 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-05-18 11:37:46 -0400 (Tue, 18 May 2010)
New Revision: 1803

Modified:
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
Log:
[ISPN-426] (Deal with concurrent Hot Rod topology view changes) Done.

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala	2010-05-18 12:28:01 UTC (rev 1802)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala	2010-05-18 15:37:46 UTC (rev 1803)
@@ -5,14 +5,15 @@
 import org.infinispan.server.core.{Logging, AbstractProtocolServer}
 import org.infinispan.config.Configuration
 import org.infinispan.config.Configuration.CacheMode
-import org.infinispan.Cache
 import org.infinispan.notifications.Listener
 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged
 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent
 import scala.collection.JavaConversions._
-import org.infinispan.remoting.transport.Address
-import java.util.concurrent.{ThreadFactory, Callable, Executors}
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.TimeUnit._
+import java.util.Random
+import org.infinispan.util.Util
+import org.infinispan.{CacheException, Cache}
+import org.infinispan.remoting.transport.Address;
 
 /**
  * // TODO: Document this
@@ -24,6 +25,8 @@
    private var isClustered: Boolean = _
    private var address: TopologyAddress = _
    private var topologyCache: Cache[String, TopologyView] = _
+   private val rand = new Random
+   private val maxWaitTime = SECONDS.toMillis(30) // TODO: Make this configurable?
 
    def getAddress: TopologyAddress = address
 
@@ -49,28 +52,45 @@
       address = TopologyAddress(host, port, Map.empty, cacheManager.getAddress)
       debug("Local topology address is {0}", address)
       cacheManager.addListener(new CrashedMemberDetectorListener)
-      val currentView = topologyCache.get("view")
-      if (currentView != null) {
-         val newMembers = currentView.members ::: List(address)
-         val newView = TopologyView(currentView.topologyId + 1, newMembers)
-         val replaced = topologyCache.replace("view", currentView, newView)
-         if (!replaced) {
-            // TODO: There was a concurrent view modification, get and try to install new view again.
+      val updated = updateTopologyView(false, System.currentTimeMillis()) { currentView =>
+         if (currentView != null) {
+            val newMembers = currentView.members ::: List(address)
+            val newView = TopologyView(currentView.topologyId + 1, newMembers)
+            val updated = topologyCache.replace("view", currentView, newView)
+            if (updated) debug("Added {0} to topology, new view is {1}", address, newView)
+            updated
          } else {
-            debug("Added {0} to topology, new view is {1}", address, newView)
+            val newMembers = List(address)
+            val newView = TopologyView(1, newMembers)
+            val updated = topologyCache.putIfAbsent("view", newView) == null
+            if (updated) debug("First member to start, topology view is {0}", newView)
+            updated
          }
-      } else {
-         val newMembers = List(address)
-         val newView = TopologyView(1, newMembers)
-         val prev = topologyCache.putIfAbsent("view", newView)
-         if (prev != null) {
-            // TODO: There was a concurrent view modification, get and try to install new view again.
-         } else {
-            debug("First member to start, topology view is {0}", newView)
-         }
       }
+      if (!updated)
+         throw new CacheException("Unable to update topology view, so aborting startup")
    }
 
+   private def updateTopologyView(replaced: Boolean, updateStartTime: Long)(f: TopologyView => Boolean): Boolean = {
+      val giveupTime = updateStartTime + maxWaitTime
+      if (replaced || System.currentTimeMillis() > giveupTime) replaced
+      else updateTopologyView(isViewUpdated(f), giveupTime)(f)
+   }
+
+   private def isViewUpdated(f: TopologyView => Boolean): Boolean = {
+      val currentView = topologyCache.get("view")
+      val updated = f(currentView)
+      if (!updated) {
+         val minSleepTime = 500
+         val maxSleepTime = 2000 // sleep time between retries
+         var time = rand.nextInt((maxSleepTime - minSleepTime) / 10)
+         time = (time * 10) + minSleepTime;
+         trace("Concurrent modification in topology view, sleeping for {0}", Util.prettyPrintTime(time))
+         Thread.sleep(time); // sleep for a while and retry
+      }
+      updated
+   }
+
    override def stop {
       super.stop
       if (isClustered)
@@ -88,7 +108,8 @@
          val newView = TopologyView(currentView.topologyId + 1, newMembers)
          val replaced = topologyCache.replace("view", currentView, newView)
          if (!replaced) {
-            // TODO: There was a concurrent view modification. Just give up, logic to deal with crashed/stalled members will deal with this
+            debug("Attempt to update topology view failed due to a concurrent modification. " +
+                  "Ignoring since logic to deal with crashed members will deal with it.")
          } else {
             debug("Removed {0} from topology view, new view is {1}", address, newView)
          }
@@ -120,28 +141,34 @@
                   val newMembersList = asBuffer(newMembers).toList
                   val oldMembersList = asBuffer(oldMembers).toList
                   val goneMembers = oldMembersList.filterNot(newMembersList contains)
-                  val currentView = topologyCache.get("view")
-                  if (currentView != null) {
-                     var tmpMembers = currentView.members
-                     for (goneMember <- goneMembers) {
-                        trace("Old member {0} is not in new view {1}, did it crash?", goneMember, newMembers)
-                        // If old memmber is in topology, it means that it had an abnormal ending
-                        val (isCrashed, crashedTopologyMember) = isOldMemberInTopology(goneMember, currentView)
-                        if (isCrashed) {
-                           trace("Old member {0} with topology address {1} is still present in Hot Rod topology " +
-                                 "{2}, so must have crashed.", goneMember, crashedTopologyMember, currentView)
-                           tmpMembers = tmpMembers.filterNot(_ == crashedTopologyMember)
-                           trace("After removal, new Hot Rod topology is {0}", tmpMembers)
+                  val updated = updateTopologyView(false, System.currentTimeMillis()) { currentView =>
+                     if (currentView != null) {
+                        var tmpMembers = currentView.members
+                        for (goneMember <- goneMembers) {
+                           trace("Old member {0} is not in new view {1}, did it crash?", goneMember, newMembers)
+                           // If old memmber is in topology, it means that it had an abnormal ending
+                           val (isCrashed, crashedTopologyMember) = isOldMemberInTopology(goneMember, currentView)
+                           if (isCrashed) {
+                              trace("Old member {0} with topology address {1} is still present in Hot Rod topology " +
+                                    "{2}, so must have crashed.", goneMember, crashedTopologyMember, currentView)
+                              tmpMembers = tmpMembers.filterNot(_ == crashedTopologyMember)
+                              trace("After removal, new Hot Rod topology is {0}", tmpMembers)
+                           }
                         }
-                     }
-                     if (tmpMembers.size < currentView.members.size) {
-                        val newView = TopologyView(currentView.topologyId + 1, tmpMembers)
-                        val replaced = topologyCache.replace("view", currentView, newView)
-                        if (!replaced) {
-                           // TODO: How to deal with concurrent failures at this point?
+                        if (tmpMembers.size < currentView.members.size) {
+                           val newView = TopologyView(currentView.topologyId + 1, tmpMembers)
+                           topologyCache.replace("view", currentView, newView)
+                        } else {
+                           true // Mark as topology updated because there was no need to do so
                         }
+                     } else {
+                        warn("While trying to detect a crashed member, current view returned null")
+                        true
                      }
                   }
+                  if (!updated) {
+                     warn("Unable to update topology view after a crashed member left, wait for next view change.")
+                  }
                }
             } catch {
                case t: Throwable => error("Error detecting crashed member", t)



More information about the infinispan-commits mailing list