[infinispan-commits] Infinispan SVN: r1726 - in trunk/server: hotrod/src/main/scala/org/infinispan/server/hotrod and 1 other directory.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Fri Apr 23 10:59:35 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-04-23 10:59:34 -0400 (Fri, 23 Apr 2010)
New Revision: 1726

Modified:
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyView.scala
Log:
[ISPN-412] (Detect crashed members and update Hot Rod topology) More efficient detection of crashed members using scala List's convenient method to detect differences between two lists.

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala	2010-04-23 13:54:05 UTC (rev 1725)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala	2010-04-23 14:59:34 UTC (rev 1726)
@@ -3,10 +3,9 @@
 import java.net.SocketAddress
 import org.jboss.netty.channel.group.DefaultChannelGroup
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
-import java.util.concurrent.atomic.AtomicInteger
 import org.jboss.netty.channel.ChannelDownstreamHandler
 import org.jboss.netty.bootstrap.ServerBootstrap
-import java.util.concurrent.{Executors, ThreadFactory}
+import java.util.concurrent.Executors
 import org.infinispan.server.core.transport.Transport
 import scala.collection.JavaConversions._
 import org.infinispan.server.core.{ProtocolServer, Logging}

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala	2010-04-23 13:54:05 UTC (rev 1725)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala	2010-04-23 14:59:34 UTC (rev 1726)
@@ -24,6 +24,7 @@
    import HotRodServer._
    private var isClustered: Boolean = _
    private var address: TopologyAddress = _
+   private var topologyCache: Cache[String, TopologyView] = _
 
    def getAddress: TopologyAddress = address
 
@@ -41,9 +42,9 @@
 
    private def addSelfToTopologyView(host: String, port: Int, cacheManager: CacheManager) {
       defineTopologyCacheConfig(cacheManager)
-      val topologyCache: Cache[String, TopologyView] = cacheManager.getCache(TopologyCacheName)
-      cacheManager.addListener(new CrashedMemberDetectorListener)
+      topologyCache = cacheManager.getCache(TopologyCacheName)
       address = TopologyAddress(host, port, 0, cacheManager.getAddress)
+      cacheManager.addListener(new CrashedMemberDetectorListener)
       val currentView = topologyCache.get("view")
       // TODO: If distribution configured, add hashcode of this address
       if (currentView != null) {
@@ -75,7 +76,6 @@
 
    protected def removeSelfFromTopologyView {
       // Graceful shutdown, remove this node as member and install new view
-      val topologyCache: Cache[String, TopologyView] = getCacheManager.getCache(TopologyCacheName)
       val currentView = topologyCache.get("view")
       // TODO: If distribution configured, add hashcode of this address
       val newMembers = currentView.members.filterNot(_ == address)
@@ -124,25 +124,24 @@
                      val oldMembers = e.getOldMembers
                      // Someone left the cluster, verify whether it did it gracefully or crashed.
                      if (oldMembers.size > newMembers.size) {
-                        val topologyCache: Cache[String, TopologyView] = getCacheManager.getCache(TopologyCacheName)
+                        val newMembersList = asBuffer(newMembers).toList
+                        val oldMembersList = asBuffer(oldMembers).toList
+                        val goneMembers = oldMembersList -- newMembersList
                         val currentView = topologyCache.get("view")
-                        var newTopologyMembers = currentView.members
-                        for (oldMember <- asIterator(oldMembers.iterator)) {
-                           // If old member is not amongst the new ones, check whether it's still in the topology cache
-                           if (!newMembers.contains(oldMember)) {
-                              trace("Old member {0} is not in new view {1}, did it crash?", oldMember, newMembers)
-                              // If old memmber is in topology, it means that it had an abnormal ending
-                              val (isCrashed, crashedTopologyMember) = isOldMemberInTopology(oldMember, currentView)
-                              if (isCrashed) {
-                                 trace("Old member {0} with topology address {1} is still present in Hot Rod topology " +
-                                       "{2}, so must have crashed.", oldMember, crashedTopologyMember, currentView)
-                                 newTopologyMembers = newTopologyMembers.filterNot(_ == crashedTopologyMember)
-                                 trace("After removal, new Hot Rod topology is {0}", newTopologyMembers)
-                              }
+                        var tmpMembers = currentView.members
+                        for (goneMember <- goneMembers) {
+                           trace("Old member {0} is not in new view {1}, did it crash?", goneMember, newMembers)
+                           // If old memmber is in topology, it means that it had an abnormal ending
+                           val (isCrashed, crashedTopologyMember) = isOldMemberInTopology(goneMember, currentView)
+                           if (isCrashed) {
+                              trace("Old member {0} with topology address {1} is still present in Hot Rod topology " +
+                                    "{2}, so must have crashed.", goneMember, crashedTopologyMember, currentView)
+                              tmpMembers = tmpMembers.filterNot(_ == crashedTopologyMember)
+                              trace("After removal, new Hot Rod topology is {0}", tmpMembers)
                            }
                         }
-                        if (newTopologyMembers.size < currentView.members.size) {
-                           val newView = TopologyView(currentView.topologyId + 1, newTopologyMembers)
+                        if (tmpMembers.size < currentView.members.size) {
+                           val newView = TopologyView(currentView.topologyId + 1, tmpMembers)
                            val replaced = topologyCache.replace("view", currentView, newView)
                            if (!replaced) {
                               // TODO: How to deal with concurrent failures at this point?

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyView.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyView.scala	2010-04-23 13:54:05 UTC (rev 1725)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyView.scala	2010-04-23 14:59:34 UTC (rev 1726)
@@ -10,6 +10,8 @@
  */
 @Marshallable(externalizer = classOf[TopologyView.Externalizer], id = 59)
 case class TopologyView(val topologyId: Int, val members: List[TopologyAddress])
+// TODO: TopologyView could maintain a Map[Address, TopologyAddress] rather than pushing Address into each TopologyAddress.
+// TODO: That would make crash detection more efficient at the expense of some extra space. 
 
 object TopologyView {
    class Externalizer extends org.infinispan.marshall.Externalizer {



More information about the infinispan-commits mailing list