[infinispan-commits] Infinispan SVN: r1872 - in trunk/server: core/src/main/scala/org/infinispan/server/core/transport/netty and 2 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Fri May 28 07:51:32 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-05-28 07:51:30 -0400 (Fri, 28 May 2010)
New Revision: 1872

Modified:
   trunk/server/core/src/main/scala/org/infinispan/server/core/Logging.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
   trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
Log:
[ISPN-468] (Wrap any log calls in Memcached/HotRod servers around if enabled calls) Done.

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/Logging.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/Logging.scala	2010-05-27 20:20:38 UTC (rev 1871)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/Logging.scala	2010-05-28 11:51:30 UTC (rev 1872)
@@ -14,16 +14,20 @@
 
    // params.map(_.asInstanceOf[AnyRef]) => returns a Seq[AnyRef]
    // the ': _*' part tells the compiler to pass it as varargs
-   def info(msg: => String, params: Any*) = if (log.isInfoEnabled) log.info(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+   def info(msg: => String, params: Any*) = log.info(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
 
 //   def debug(msg: => String) = log.debug(msg, null)
 
-   def debug(msg: => String, params: Any*) = if (log.isDebugEnabled) log.debug(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+   def isDebugEnabled = log.isDebugEnabled
 
+   def debug(msg: => String, params: Any*) = log.debug(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+
 //   def trace(msg: => String) = log.trace(msg, null)
 
-   def trace(msg: => String, params: Any*) = if (log.isTraceEnabled) log.trace(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+   def isTraceEnabled = log.isTraceEnabled
 
+   def trace(msg: => String, params: Any*) = log.trace(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+
    def warn(msg: => String, params: Any*) = log.warn(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
 
    def warn(msg: => String, t: Throwable) = log.warn(msg, t, null)

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala	2010-05-27 20:20:38 UTC (rev 1871)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala	2010-05-28 11:51:30 UTC (rev 1872)
@@ -44,7 +44,7 @@
       @ViewChanged
       def calculateRank(e: ViewChangedEvent) {
          val rank = calculateRank(e.getLocalAddress, asIterable(e.getNewMembers), e.getViewId)
-         trace("Calculated rank based on view {0} and result was {1}", e, rank)
+         if (isTraceEnabled) trace("Calculated rank based on view {0} and result was {1}", e, rank)
       }
 
       private[core] def calculateRank(address: Address, members: Iterable[Address], viewId: Long): Long = {

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala	2010-05-27 20:20:38 UTC (rev 1871)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala	2010-05-28 11:51:30 UTC (rev 1872)
@@ -33,21 +33,21 @@
    
    private lazy val masterExecutor = {
       if (masterThreads == 0) {
-         debug("Configured unlimited threads for master thread pool")
+         if (isDebugEnabled) debug("Configured unlimited threads for master thread pool")
          Executors.newCachedThreadPool
       } else {
-         debug("Configured {0} threads for master thread pool", masterThreads)
+         if (isDebugEnabled) debug("Configured {0} threads for master thread pool", masterThreads)
          Executors.newFixedThreadPool(masterThreads)
       }
    }
 
    private lazy val workerExecutor = {
       if (workerThreads == 0) {
-         debug("Configured unlimited threads for worker thread pool")
+         if (isDebugEnabled) debug("Configured unlimited threads for worker thread pool")
          Executors.newCachedThreadPool
       }
       else {
-         debug("Configured {0} threads for worker thread pool", workerThreads)
+         if (isDebugEnabled) debug("Configured {0} threads for worker thread pool", workerThreads)
          Executors.newFixedThreadPool(masterThreads)
       }
    }
@@ -62,8 +62,9 @@
                else if (proposedThreadName contains "client worker") "ClientWorker-"
                else "ClientMaster-"
             val name = threadNamePrefix + typeInFix + proposedThreadName.substring(index + 1, proposedThreadName.length)
-            trace("Thread name will be {0}, with current thread name being {1} and proposed name being '{2}'",
-               name, currentThread, proposedThreadName)
+            if (isTraceEnabled)
+               trace("Thread name will be {0}, with current thread name being {1} and proposed name being '{2}'",
+                  name, currentThread, proposedThreadName)
             name
          }
       })
@@ -97,7 +98,7 @@
          }
       }
       pipeline.stop
-      debug("Channel group completely closed, release external resources");
+      if (isDebugEnabled) debug("Channel group completely closed, release external resources");
       factory.releaseExternalResources();
    }
 

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-05-27 20:20:38 UTC (rev 1871)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-05-28 11:51:30 UTC (rev 1872)
@@ -224,9 +224,9 @@
    def toRequest(streamOp: Short): Option[Enumeration#Value] = {
       val op = requests.get(streamOp)
       if (op == None)
-         trace("Operation code: {0} was unmatched", streamOp)
+         if (isTraceEnabled) trace("Operation code: {0} was unmatched", streamOp)
       else
-         trace("Operation code: {0} has been matched to {1}", streamOp, op)
+         if (isTraceEnabled) trace("Operation code: {0} has been matched to {1}", streamOp, op)
       op
    }
 

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-05-27 20:20:38 UTC (rev 1871)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-05-28 11:51:30 UTC (rev 1872)
@@ -26,6 +26,8 @@
    private var isError = false
    private var joined = false
 
+   private val isTrace = isTraceEnabled
+
    override def readHeader(buffer: ChannelBuffer): HotRodHeader = {
       try {
          val magic = buffer.readUnsignedByte
@@ -33,7 +35,7 @@
             if (!isError) {               
                throw new InvalidMagicIdException("Error reading magic byte or message id: " + magic)
             } else {
-               trace("Error happened previously, ignoring {0} byte until we find the magic number again", magic)
+               if (isTrace) trace("Error happened previously, ignoring {0} byte until we find the magic number again", magic)
                return null // Keep trying to read until we find magic
             }
          }
@@ -53,7 +55,7 @@
             case _ => throw new UnknownVersionException("Unknown version:" + version)
          }
          val header = decoder.readHeader(buffer, messageId)
-         trace("Decoded header {0}", header)
+         if (isTrace) trace("Decoded header {0}", header)
          isError = false
          header
       } catch {

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala	2010-05-27 20:20:38 UTC (rev 1871)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala	2010-05-28 11:51:30 UTC (rev 1872)
@@ -20,9 +20,11 @@
    private lazy val topologyCache: Cache[String, TopologyView] = cacheManager.getCache(TopologyCacheName)   
 
    override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = {
-      trace("Encode msg {0}", msg)
+      val isTrace = isTraceEnabled
+
+      if (isTrace) trace("Encode msg {0}", msg)
       val buffer: ChannelBuffer = msg match { 
-         case r: Response => writeHeader(r)
+         case r: Response => writeHeader(r, isTrace)
       }
       msg match {
          case r: ResponseWithPrevious => {
@@ -51,7 +53,7 @@
       buffer
    }
 
-   private def writeHeader(r: Response): ChannelBuffer = {
+   private def writeHeader(r: Response, isTrace: Boolean): ChannelBuffer = {
       val buffer = dynamicBuffer
       buffer.writeByte(Magic.byteValue)
       buffer.writeUnsignedLong(r.messageId)
@@ -62,11 +64,11 @@
          r.topologyResponse.get match {
             case t: TopologyAwareResponse => {
                if (r.clientIntel == 2)
-                  writeTopologyHeader(t, buffer)
+                  writeTopologyHeader(t, buffer, isTrace)
                else
-                  writeHashTopologyHeader(t, buffer)
+                  writeHashTopologyHeader(t, buffer, isTrace)
             }
-            case h: HashDistAwareResponse => writeHashTopologyHeader(h, buffer, r)
+            case h: HashDistAwareResponse => writeHashTopologyHeader(h, buffer, r, isTrace)
          }
       } else {
          buffer.writeByte(0) // No topology change
@@ -74,8 +76,8 @@
       buffer
    }
 
-   private def writeTopologyHeader(t: TopologyAwareResponse, buffer: ChannelBuffer) {
-      trace("Write topology change response header {0}", t)
+   private def writeTopologyHeader(t: TopologyAwareResponse, buffer: ChannelBuffer, isTrace: Boolean) {
+      if (isTrace) trace("Write topology change response header {0}", t)
       buffer.writeUnsignedInt(t.view.topologyId)
       buffer.writeUnsignedInt(t.view.members.size)
       t.view.members.foreach{address =>
@@ -84,8 +86,8 @@
       }
    }
 
-   private def writeHashTopologyHeader(t: TopologyAwareResponse, buffer: ChannelBuffer) {
-      trace("Return limited hash distribution aware header in spite of having a hash aware client {0}", t)
+   private def writeHashTopologyHeader(t: TopologyAwareResponse, buffer: ChannelBuffer, isTrace: Boolean) {
+      if (isTrace) trace("Return limited hash distribution aware header in spite of having a hash aware client {0}", t)
       buffer.writeUnsignedInt(t.view.topologyId)
       buffer.writeUnsignedShort(0) // Num key owners
       buffer.writeByte(0) // Hash function
@@ -98,8 +100,8 @@
       }
    }
 
-   private def writeHashTopologyHeader(h: HashDistAwareResponse, buffer: ChannelBuffer, r: Response) {
-      trace("Write hash distribution change response header {0}", h)
+   private def writeHashTopologyHeader(h: HashDistAwareResponse, buffer: ChannelBuffer, r: Response, isTrace: Boolean) {
+      if (isTrace) trace("Write hash distribution change response header {0}", h)
       buffer.writeUnsignedInt(h.view.topologyId)
       buffer.writeUnsignedShort(h.numOwners) // Num key owners
       buffer.writeByte(h.hashFunction) // Hash function

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala	2010-05-27 20:20:38 UTC (rev 1871)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala	2010-05-28 11:51:30 UTC (rev 1872)
@@ -50,20 +50,21 @@
       defineTopologyCacheConfig(cacheManager)
       topologyCache = cacheManager.getCache(TopologyCacheName)
       address = TopologyAddress(host, port, Map.empty, cacheManager.getAddress)
-      debug("Local topology address is {0}", address)
+      val isDebug = isDebugEnabled
+      if (isDebug) debug("Local topology address is {0}", address)
       cacheManager.addListener(new CrashedMemberDetectorListener)
       val updated = updateTopologyView(false, System.currentTimeMillis()) { currentView =>
          if (currentView != null) {
             val newMembers = currentView.members ::: List(address)
             val newView = TopologyView(currentView.topologyId + 1, newMembers)
             val updated = topologyCache.replace("view", currentView, newView)
-            if (updated) debug("Added {0} to topology, new view is {1}", address, newView)
+            if (isDebug && updated) debug("Added {0} to topology, new view is {1}", address, newView)
             updated
          } else {
             val newMembers = List(address)
             val newView = TopologyView(1, newMembers)
             val updated = topologyCache.putIfAbsent("view", newView) == null
-            if (updated) debug("First member to start, topology view is {0}", newView)
+            if (isDebug && updated) debug("First member to start, topology view is {0}", newView)
             updated
          }
       }
@@ -85,7 +86,7 @@
          val maxSleepTime = 2000 // sleep time between retries
          var time = rand.nextInt((maxSleepTime - minSleepTime) / 10)
          time = (time * 10) + minSleepTime;
-         trace("Concurrent modification in topology view, sleeping for {0}", Util.prettyPrintTime(time))
+         if (isTraceEnabled) trace("Concurrent modification in topology view, sleeping for {0}", Util.prettyPrintTime(time))
          Thread.sleep(time); // sleep for a while and retry
       }
       updated
@@ -102,15 +103,16 @@
       val currentView = topologyCache.get("view")
       // Comparing cluster address should be enough. Full object comparison could fail if hash id map has changed.
       val newMembers = currentView.members.filterNot(_.clusterAddress == address.clusterAddress)
+      val isDebug = isDebugEnabled
       if (newMembers.length != (currentView.members.length - 1)) {
-         debug("Cluster member {0} was not filtered out of the current view {1}", address, currentView)
+         if (isDebug) debug("Cluster member {0} was not filtered out of the current view {1}", address, currentView)
       } else {
          val newView = TopologyView(currentView.topologyId + 1, newMembers)
          val replaced = topologyCache.replace("view", currentView, newView)
-         if (!replaced) {
+         if (isDebug && !replaced) {
             debug("Attempt to update topology view failed due to a concurrent modification. " +
                   "Ignoring since logic to deal with crashed members will deal with it.")
-         } else {
+         } else if (isDebug) {
             debug("Removed {0} from topology view, new view is {1}", address, newView)
          }
       }
@@ -129,6 +131,7 @@
 
       @ViewChanged
       def handleViewChange(e: ViewChangedEvent) {
+         val isTrace = isTraceEnabled
          val cacheManager = e.getCacheManager
          // Only the coordinator can potentially make modifications related to crashed members.
          // This is to avoid all nodes trying to make the same modification which would be wasteful and lead to deadlocks.
@@ -145,14 +148,14 @@
                      if (currentView != null) {
                         var tmpMembers = currentView.members
                         for (goneMember <- goneMembers) {
-                           trace("Old member {0} is not in new view {1}, did it crash?", goneMember, newMembers)
+                           if (isTrace) trace("Old member {0} is not in new view {1}, did it crash?", goneMember, newMembers)
                            // If old memmber is in topology, it means that it had an abnormal ending
                            val (isCrashed, crashedTopologyMember) = isOldMemberInTopology(goneMember, currentView)
                            if (isCrashed) {
-                              trace("Old member {0} with topology address {1} is still present in Hot Rod topology " +
+                              if (isTrace) trace("Old member {0} with topology address {1} is still present in Hot Rod topology " +
                                     "{2}, so must have crashed.", goneMember, crashedTopologyMember, currentView)
                               tmpMembers = tmpMembers.filterNot(_ == crashedTopologyMember)
-                              trace("After removal, new Hot Rod topology is {0}", tmpMembers)
+                              if (isTrace) trace("After removal, new Hot Rod topology is {0}", tmpMembers)
                            }
                         }
                         if (tmpMembers.size < currentView.members.size) {

Modified: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala	2010-05-27 20:20:38 UTC (rev 1871)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala	2010-05-28 11:51:30 UTC (rev 1872)
@@ -73,7 +73,7 @@
    override def readParameters(h: RequestHeader, b: ChannelBuffer): Option[MemcachedParameters] = {
       val line = readLine(b)
       if (!line.isEmpty) {
-         trace("Operation parameters: {0}", line)
+         if (isTraceEnabled) trace("Operation parameters: {0}", line)
          val args = line.trim.split(" +")
          var index = 0
          h.op match {
@@ -436,7 +436,7 @@
    )
 
    def toRequest(commandName: String): Option[Enumeration#Value] = {
-      trace("Operation: {0}", commandName)
+      if (isTraceEnabled) trace("Operation: {0}", commandName)
       val op = operations.get(commandName)
       op
    }



More information about the infinispan-commits mailing list