[infinispan-commits] Infinispan SVN: r1872 - in trunk/server: core/src/main/scala/org/infinispan/server/core/transport/netty and 2 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Fri May 28 07:51:32 EDT 2010
Author: galder.zamarreno at jboss.com
Date: 2010-05-28 07:51:30 -0400 (Fri, 28 May 2010)
New Revision: 1872
Modified:
trunk/server/core/src/main/scala/org/infinispan/server/core/Logging.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
Log:
[ISPN-468] (Wrap any log calls in Memcached/HotRod servers around if enabled calls) Done.
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/Logging.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/Logging.scala 2010-05-27 20:20:38 UTC (rev 1871)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/Logging.scala 2010-05-28 11:51:30 UTC (rev 1872)
@@ -14,16 +14,20 @@
// params.map(_.asInstanceOf[AnyRef]) => returns a Seq[AnyRef]
// the ': _*' part tells the compiler to pass it as varargs
- def info(msg: => String, params: Any*) = if (log.isInfoEnabled) log.info(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+ def info(msg: => String, params: Any*) = log.info(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
// def debug(msg: => String) = log.debug(msg, null)
- def debug(msg: => String, params: Any*) = if (log.isDebugEnabled) log.debug(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+ def isDebugEnabled = log.isDebugEnabled
+ def debug(msg: => String, params: Any*) = log.debug(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+
// def trace(msg: => String) = log.trace(msg, null)
- def trace(msg: => String, params: Any*) = if (log.isTraceEnabled) log.trace(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+ def isTraceEnabled = log.isTraceEnabled
+ def trace(msg: => String, params: Any*) = log.trace(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+
def warn(msg: => String, params: Any*) = log.warn(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
def warn(msg: => String, t: Throwable) = log.warn(msg, t, null)
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala 2010-05-27 20:20:38 UTC (rev 1871)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala 2010-05-28 11:51:30 UTC (rev 1872)
@@ -44,7 +44,7 @@
@ViewChanged
def calculateRank(e: ViewChangedEvent) {
val rank = calculateRank(e.getLocalAddress, asIterable(e.getNewMembers), e.getViewId)
- trace("Calculated rank based on view {0} and result was {1}", e, rank)
+ if (isTraceEnabled) trace("Calculated rank based on view {0} and result was {1}", e, rank)
}
private[core] def calculateRank(address: Address, members: Iterable[Address], viewId: Long): Long = {
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala 2010-05-27 20:20:38 UTC (rev 1871)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala 2010-05-28 11:51:30 UTC (rev 1872)
@@ -33,21 +33,21 @@
private lazy val masterExecutor = {
if (masterThreads == 0) {
- debug("Configured unlimited threads for master thread pool")
+ if (isDebugEnabled) debug("Configured unlimited threads for master thread pool")
Executors.newCachedThreadPool
} else {
- debug("Configured {0} threads for master thread pool", masterThreads)
+ if (isDebugEnabled) debug("Configured {0} threads for master thread pool", masterThreads)
Executors.newFixedThreadPool(masterThreads)
}
}
private lazy val workerExecutor = {
if (workerThreads == 0) {
- debug("Configured unlimited threads for worker thread pool")
+ if (isDebugEnabled) debug("Configured unlimited threads for worker thread pool")
Executors.newCachedThreadPool
}
else {
- debug("Configured {0} threads for worker thread pool", workerThreads)
+ if (isDebugEnabled) debug("Configured {0} threads for worker thread pool", workerThreads)
Executors.newFixedThreadPool(masterThreads)
}
}
@@ -62,8 +62,9 @@
else if (proposedThreadName contains "client worker") "ClientWorker-"
else "ClientMaster-"
val name = threadNamePrefix + typeInFix + proposedThreadName.substring(index + 1, proposedThreadName.length)
- trace("Thread name will be {0}, with current thread name being {1} and proposed name being '{2}'",
- name, currentThread, proposedThreadName)
+ if (isTraceEnabled)
+ trace("Thread name will be {0}, with current thread name being {1} and proposed name being '{2}'",
+ name, currentThread, proposedThreadName)
name
}
})
@@ -97,7 +98,7 @@
}
}
pipeline.stop
- debug("Channel group completely closed, release external resources");
+ if (isDebugEnabled) debug("Channel group completely closed, release external resources");
factory.releaseExternalResources();
}
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala 2010-05-27 20:20:38 UTC (rev 1871)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala 2010-05-28 11:51:30 UTC (rev 1872)
@@ -224,9 +224,9 @@
def toRequest(streamOp: Short): Option[Enumeration#Value] = {
val op = requests.get(streamOp)
if (op == None)
- trace("Operation code: {0} was unmatched", streamOp)
+ if (isTraceEnabled) trace("Operation code: {0} was unmatched", streamOp)
else
- trace("Operation code: {0} has been matched to {1}", streamOp, op)
+ if (isTraceEnabled) trace("Operation code: {0} has been matched to {1}", streamOp, op)
op
}
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala 2010-05-27 20:20:38 UTC (rev 1871)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala 2010-05-28 11:51:30 UTC (rev 1872)
@@ -26,6 +26,8 @@
private var isError = false
private var joined = false
+ private val isTrace = isTraceEnabled
+
override def readHeader(buffer: ChannelBuffer): HotRodHeader = {
try {
val magic = buffer.readUnsignedByte
@@ -33,7 +35,7 @@
if (!isError) {
throw new InvalidMagicIdException("Error reading magic byte or message id: " + magic)
} else {
- trace("Error happened previously, ignoring {0} byte until we find the magic number again", magic)
+ if (isTrace) trace("Error happened previously, ignoring {0} byte until we find the magic number again", magic)
return null // Keep trying to read until we find magic
}
}
@@ -53,7 +55,7 @@
case _ => throw new UnknownVersionException("Unknown version:" + version)
}
val header = decoder.readHeader(buffer, messageId)
- trace("Decoded header {0}", header)
+ if (isTrace) trace("Decoded header {0}", header)
isError = false
header
} catch {
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala 2010-05-27 20:20:38 UTC (rev 1871)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala 2010-05-28 11:51:30 UTC (rev 1872)
@@ -20,9 +20,11 @@
private lazy val topologyCache: Cache[String, TopologyView] = cacheManager.getCache(TopologyCacheName)
override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = {
- trace("Encode msg {0}", msg)
+ val isTrace = isTraceEnabled
+
+ if (isTrace) trace("Encode msg {0}", msg)
val buffer: ChannelBuffer = msg match {
- case r: Response => writeHeader(r)
+ case r: Response => writeHeader(r, isTrace)
}
msg match {
case r: ResponseWithPrevious => {
@@ -51,7 +53,7 @@
buffer
}
- private def writeHeader(r: Response): ChannelBuffer = {
+ private def writeHeader(r: Response, isTrace: Boolean): ChannelBuffer = {
val buffer = dynamicBuffer
buffer.writeByte(Magic.byteValue)
buffer.writeUnsignedLong(r.messageId)
@@ -62,11 +64,11 @@
r.topologyResponse.get match {
case t: TopologyAwareResponse => {
if (r.clientIntel == 2)
- writeTopologyHeader(t, buffer)
+ writeTopologyHeader(t, buffer, isTrace)
else
- writeHashTopologyHeader(t, buffer)
+ writeHashTopologyHeader(t, buffer, isTrace)
}
- case h: HashDistAwareResponse => writeHashTopologyHeader(h, buffer, r)
+ case h: HashDistAwareResponse => writeHashTopologyHeader(h, buffer, r, isTrace)
}
} else {
buffer.writeByte(0) // No topology change
@@ -74,8 +76,8 @@
buffer
}
- private def writeTopologyHeader(t: TopologyAwareResponse, buffer: ChannelBuffer) {
- trace("Write topology change response header {0}", t)
+ private def writeTopologyHeader(t: TopologyAwareResponse, buffer: ChannelBuffer, isTrace: Boolean) {
+ if (isTrace) trace("Write topology change response header {0}", t)
buffer.writeUnsignedInt(t.view.topologyId)
buffer.writeUnsignedInt(t.view.members.size)
t.view.members.foreach{address =>
@@ -84,8 +86,8 @@
}
}
- private def writeHashTopologyHeader(t: TopologyAwareResponse, buffer: ChannelBuffer) {
- trace("Return limited hash distribution aware header in spite of having a hash aware client {0}", t)
+ private def writeHashTopologyHeader(t: TopologyAwareResponse, buffer: ChannelBuffer, isTrace: Boolean) {
+ if (isTrace) trace("Return limited hash distribution aware header in spite of having a hash aware client {0}", t)
buffer.writeUnsignedInt(t.view.topologyId)
buffer.writeUnsignedShort(0) // Num key owners
buffer.writeByte(0) // Hash function
@@ -98,8 +100,8 @@
}
}
- private def writeHashTopologyHeader(h: HashDistAwareResponse, buffer: ChannelBuffer, r: Response) {
- trace("Write hash distribution change response header {0}", h)
+ private def writeHashTopologyHeader(h: HashDistAwareResponse, buffer: ChannelBuffer, r: Response, isTrace: Boolean) {
+ if (isTrace) trace("Write hash distribution change response header {0}", h)
buffer.writeUnsignedInt(h.view.topologyId)
buffer.writeUnsignedShort(h.numOwners) // Num key owners
buffer.writeByte(h.hashFunction) // Hash function
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala 2010-05-27 20:20:38 UTC (rev 1871)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala 2010-05-28 11:51:30 UTC (rev 1872)
@@ -50,20 +50,21 @@
defineTopologyCacheConfig(cacheManager)
topologyCache = cacheManager.getCache(TopologyCacheName)
address = TopologyAddress(host, port, Map.empty, cacheManager.getAddress)
- debug("Local topology address is {0}", address)
+ val isDebug = isDebugEnabled
+ if (isDebug) debug("Local topology address is {0}", address)
cacheManager.addListener(new CrashedMemberDetectorListener)
val updated = updateTopologyView(false, System.currentTimeMillis()) { currentView =>
if (currentView != null) {
val newMembers = currentView.members ::: List(address)
val newView = TopologyView(currentView.topologyId + 1, newMembers)
val updated = topologyCache.replace("view", currentView, newView)
- if (updated) debug("Added {0} to topology, new view is {1}", address, newView)
+ if (isDebug && updated) debug("Added {0} to topology, new view is {1}", address, newView)
updated
} else {
val newMembers = List(address)
val newView = TopologyView(1, newMembers)
val updated = topologyCache.putIfAbsent("view", newView) == null
- if (updated) debug("First member to start, topology view is {0}", newView)
+ if (isDebug && updated) debug("First member to start, topology view is {0}", newView)
updated
}
}
@@ -85,7 +86,7 @@
val maxSleepTime = 2000 // sleep time between retries
var time = rand.nextInt((maxSleepTime - minSleepTime) / 10)
time = (time * 10) + minSleepTime;
- trace("Concurrent modification in topology view, sleeping for {0}", Util.prettyPrintTime(time))
+ if (isTraceEnabled) trace("Concurrent modification in topology view, sleeping for {0}", Util.prettyPrintTime(time))
Thread.sleep(time); // sleep for a while and retry
}
updated
@@ -102,15 +103,16 @@
val currentView = topologyCache.get("view")
// Comparing cluster address should be enough. Full object comparison could fail if hash id map has changed.
val newMembers = currentView.members.filterNot(_.clusterAddress == address.clusterAddress)
+ val isDebug = isDebugEnabled
if (newMembers.length != (currentView.members.length - 1)) {
- debug("Cluster member {0} was not filtered out of the current view {1}", address, currentView)
+ if (isDebug) debug("Cluster member {0} was not filtered out of the current view {1}", address, currentView)
} else {
val newView = TopologyView(currentView.topologyId + 1, newMembers)
val replaced = topologyCache.replace("view", currentView, newView)
- if (!replaced) {
+ if (isDebug && !replaced) {
debug("Attempt to update topology view failed due to a concurrent modification. " +
"Ignoring since logic to deal with crashed members will deal with it.")
- } else {
+ } else if (isDebug) {
debug("Removed {0} from topology view, new view is {1}", address, newView)
}
}
@@ -129,6 +131,7 @@
@ViewChanged
def handleViewChange(e: ViewChangedEvent) {
+ val isTrace = isTraceEnabled
val cacheManager = e.getCacheManager
// Only the coordinator can potentially make modifications related to crashed members.
// This is to avoid all nodes trying to make the same modification which would be wasteful and lead to deadlocks.
@@ -145,14 +148,14 @@
if (currentView != null) {
var tmpMembers = currentView.members
for (goneMember <- goneMembers) {
- trace("Old member {0} is not in new view {1}, did it crash?", goneMember, newMembers)
+ if (isTrace) trace("Old member {0} is not in new view {1}, did it crash?", goneMember, newMembers)
// If old memmber is in topology, it means that it had an abnormal ending
val (isCrashed, crashedTopologyMember) = isOldMemberInTopology(goneMember, currentView)
if (isCrashed) {
- trace("Old member {0} with topology address {1} is still present in Hot Rod topology " +
+ if (isTrace) trace("Old member {0} with topology address {1} is still present in Hot Rod topology " +
"{2}, so must have crashed.", goneMember, crashedTopologyMember, currentView)
tmpMembers = tmpMembers.filterNot(_ == crashedTopologyMember)
- trace("After removal, new Hot Rod topology is {0}", tmpMembers)
+ if (isTrace) trace("After removal, new Hot Rod topology is {0}", tmpMembers)
}
}
if (tmpMembers.size < currentView.members.size) {
Modified: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala 2010-05-27 20:20:38 UTC (rev 1871)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala 2010-05-28 11:51:30 UTC (rev 1872)
@@ -73,7 +73,7 @@
override def readParameters(h: RequestHeader, b: ChannelBuffer): Option[MemcachedParameters] = {
val line = readLine(b)
if (!line.isEmpty) {
- trace("Operation parameters: {0}", line)
+ if (isTraceEnabled) trace("Operation parameters: {0}", line)
val args = line.trim.split(" +")
var index = 0
h.op match {
@@ -436,7 +436,7 @@
)
def toRequest(commandName: String): Option[Enumeration#Value] = {
- trace("Operation: {0}", commandName)
+ if (isTraceEnabled) trace("Operation: {0}", commandName)
val op = operations.get(commandName)
op
}
More information about the infinispan-commits
mailing list