[infinispan-commits] Infinispan SVN: r1803 - trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Tue May 18 11:37:47 EDT 2010
Author: galder.zamarreno at jboss.com
Date: 2010-05-18 11:37:46 -0400 (Tue, 18 May 2010)
New Revision: 1803
Modified:
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
Log:
[ISPN-426] (Deal with concurrent Hot Rod topology view changes) Done.
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala 2010-05-18 12:28:01 UTC (rev 1802)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala 2010-05-18 15:37:46 UTC (rev 1803)
@@ -5,14 +5,15 @@
import org.infinispan.server.core.{Logging, AbstractProtocolServer}
import org.infinispan.config.Configuration
import org.infinispan.config.Configuration.CacheMode
-import org.infinispan.Cache
import org.infinispan.notifications.Listener
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent
import scala.collection.JavaConversions._
-import org.infinispan.remoting.transport.Address
-import java.util.concurrent.{ThreadFactory, Callable, Executors}
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.TimeUnit._
+import java.util.Random
+import org.infinispan.util.Util
+import org.infinispan.{CacheException, Cache}
+import org.infinispan.remoting.transport.Address;
/**
* // TODO: Document this
@@ -24,6 +25,8 @@
private var isClustered: Boolean = _
private var address: TopologyAddress = _
private var topologyCache: Cache[String, TopologyView] = _
+ private val rand = new Random
+ private val maxWaitTime = SECONDS.toMillis(30) // TODO: Make this configurable?
def getAddress: TopologyAddress = address
@@ -49,28 +52,45 @@
address = TopologyAddress(host, port, Map.empty, cacheManager.getAddress)
debug("Local topology address is {0}", address)
cacheManager.addListener(new CrashedMemberDetectorListener)
- val currentView = topologyCache.get("view")
- if (currentView != null) {
- val newMembers = currentView.members ::: List(address)
- val newView = TopologyView(currentView.topologyId + 1, newMembers)
- val replaced = topologyCache.replace("view", currentView, newView)
- if (!replaced) {
- // TODO: There was a concurrent view modification, get and try to install new view again.
+ val updated = updateTopologyView(false, System.currentTimeMillis()) { currentView =>
+ if (currentView != null) {
+ val newMembers = currentView.members ::: List(address)
+ val newView = TopologyView(currentView.topologyId + 1, newMembers)
+ val updated = topologyCache.replace("view", currentView, newView)
+ if (updated) debug("Added {0} to topology, new view is {1}", address, newView)
+ updated
} else {
- debug("Added {0} to topology, new view is {1}", address, newView)
+ val newMembers = List(address)
+ val newView = TopologyView(1, newMembers)
+ val updated = topologyCache.putIfAbsent("view", newView) == null
+ if (updated) debug("First member to start, topology view is {0}", newView)
+ updated
}
- } else {
- val newMembers = List(address)
- val newView = TopologyView(1, newMembers)
- val prev = topologyCache.putIfAbsent("view", newView)
- if (prev != null) {
- // TODO: There was a concurrent view modification, get and try to install new view again.
- } else {
- debug("First member to start, topology view is {0}", newView)
- }
}
+ if (!updated)
+ throw new CacheException("Unable to update topology view, so aborting startup")
}
+ private def updateTopologyView(replaced: Boolean, updateStartTime: Long)(f: TopologyView => Boolean): Boolean = {
+ val giveupTime = updateStartTime + maxWaitTime
+ if (replaced || System.currentTimeMillis() > giveupTime) replaced
+ else updateTopologyView(isViewUpdated(f), giveupTime)(f)
+ }
+
+ private def isViewUpdated(f: TopologyView => Boolean): Boolean = {
+ val currentView = topologyCache.get("view")
+ val updated = f(currentView)
+ if (!updated) {
+ val minSleepTime = 500
+ val maxSleepTime = 2000 // sleep time between retries
+ var time = rand.nextInt((maxSleepTime - minSleepTime) / 10)
+ time = (time * 10) + minSleepTime;
+ trace("Concurrent modification in topology view, sleeping for {0}", Util.prettyPrintTime(time))
+ Thread.sleep(time); // sleep for a while and retry
+ }
+ updated
+ }
+
override def stop {
super.stop
if (isClustered)
@@ -88,7 +108,8 @@
val newView = TopologyView(currentView.topologyId + 1, newMembers)
val replaced = topologyCache.replace("view", currentView, newView)
if (!replaced) {
- // TODO: There was a concurrent view modification. Just give up, logic to deal with crashed/stalled members will deal with this
+ debug("Attempt to update topology view failed due to a concurrent modification. " +
+ "Ignoring since logic to deal with crashed members will deal with it.")
} else {
debug("Removed {0} from topology view, new view is {1}", address, newView)
}
@@ -120,28 +141,34 @@
val newMembersList = asBuffer(newMembers).toList
val oldMembersList = asBuffer(oldMembers).toList
val goneMembers = oldMembersList.filterNot(newMembersList contains)
- val currentView = topologyCache.get("view")
- if (currentView != null) {
- var tmpMembers = currentView.members
- for (goneMember <- goneMembers) {
- trace("Old member {0} is not in new view {1}, did it crash?", goneMember, newMembers)
- // If old memmber is in topology, it means that it had an abnormal ending
- val (isCrashed, crashedTopologyMember) = isOldMemberInTopology(goneMember, currentView)
- if (isCrashed) {
- trace("Old member {0} with topology address {1} is still present in Hot Rod topology " +
- "{2}, so must have crashed.", goneMember, crashedTopologyMember, currentView)
- tmpMembers = tmpMembers.filterNot(_ == crashedTopologyMember)
- trace("After removal, new Hot Rod topology is {0}", tmpMembers)
+ val updated = updateTopologyView(false, System.currentTimeMillis()) { currentView =>
+ if (currentView != null) {
+ var tmpMembers = currentView.members
+ for (goneMember <- goneMembers) {
+ trace("Old member {0} is not in new view {1}, did it crash?", goneMember, newMembers)
+ // If old memmber is in topology, it means that it had an abnormal ending
+ val (isCrashed, crashedTopologyMember) = isOldMemberInTopology(goneMember, currentView)
+ if (isCrashed) {
+ trace("Old member {0} with topology address {1} is still present in Hot Rod topology " +
+ "{2}, so must have crashed.", goneMember, crashedTopologyMember, currentView)
+ tmpMembers = tmpMembers.filterNot(_ == crashedTopologyMember)
+ trace("After removal, new Hot Rod topology is {0}", tmpMembers)
+ }
}
- }
- if (tmpMembers.size < currentView.members.size) {
- val newView = TopologyView(currentView.topologyId + 1, tmpMembers)
- val replaced = topologyCache.replace("view", currentView, newView)
- if (!replaced) {
- // TODO: How to deal with concurrent failures at this point?
+ if (tmpMembers.size < currentView.members.size) {
+ val newView = TopologyView(currentView.topologyId + 1, tmpMembers)
+ topologyCache.replace("view", currentView, newView)
+ } else {
+ true // Mark as topology updated because there was no need to do so
}
+ } else {
+ warn("While trying to detect a crashed member, current view returned null")
+ true
}
}
+ if (!updated) {
+ warn("Unable to update topology view after a crashed member left, wait for next view change.")
+ }
}
} catch {
case t: Throwable => error("Error detecting crashed member", t)
More information about the infinispan-commits
mailing list