Author: manik.surtani(a)jboss.com
Date: 2009-02-24 06:28:25 -0500 (Tue, 24 Feb 2009)
New Revision: 7767
Modified:
core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferManager.java
core/trunk/src/main/java/org/jboss/cache/util/CachePrinter.java
Log:
Pretty printing of state transfer duration
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2009-02-23 14:35:58 UTC
(rev 7766)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2009-02-24 11:28:25 UTC
(rev 7767)
@@ -21,29 +21,12 @@
*/
package org.jboss.cache;
-import java.net.URL;
-import java.text.NumberFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.Vector;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-
-import javax.transaction.TransactionManager;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.commands.ReplicableCommand;
import org.jboss.cache.config.Configuration;
-import org.jboss.cache.config.RuntimeConfig;
import org.jboss.cache.config.Configuration.NodeLockingScheme;
+import org.jboss.cache.config.RuntimeConfig;
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.Start;
@@ -64,6 +47,7 @@
import org.jboss.cache.statetransfer.DefaultStateTransferManager;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.TransactionTable;
+import org.jboss.cache.util.CachePrinter;
import org.jboss.cache.util.concurrent.ReclosableLatch;
import org.jboss.cache.util.reflect.ReflectionUtil;
import org.jgroups.Address;
@@ -82,6 +66,20 @@
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
+import javax.transaction.TransactionManager;
+import java.net.URL;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.Vector;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
/**
* Manager that handles all RPC calls between JBoss Cache instances
*
@@ -92,12 +90,14 @@
{
private Channel channel;
private final Log log = LogFactory.getLog(RPCManagerImpl.class);
+ private final boolean trace = log.isTraceEnabled();
private volatile List<Address> members;
private long replicationCount;
private long replicationFailures;
- private boolean statisticsEnabled = false;
+ private boolean statisticsEnabled = false;
private final Object coordinatorLock = new Object();
+
/**
* True if this Cache is the coordinator.
*/
@@ -112,7 +112,6 @@
* JGroups RpcDispatcher in use.
*/
private CommandAwareRpcDispatcher rpcDispatcher = null;
-
/**
* JGroups message listener.
*/
@@ -121,7 +120,6 @@
private Notifier notifier;
private CacheSPI spi;
private InvocationContextContainer invocationContextContainer;
- private final boolean trace = log.isTraceEnabled();
private Marshaller marshaller;
private TransactionManager txManager;
private TransactionTable txTable;
@@ -191,7 +189,7 @@
{
if (channel.flushSupported() && !flushBlockGate.await(timeout,
TimeUnit.MILLISECONDS))
{
- throw new TimeoutException("State retrieval timed out waiting for
flush to unblock. (timeout = " + timeout + " millis) ");
+ throw new TimeoutException("State retrieval timed out waiting for
flush to unblock. (timeout = " + CachePrinter.prettyPrint(timeout) + ")");
}
return;
}
@@ -210,7 +208,7 @@
{
if (channel.flushSupported() && !flushWaitGate.await(timeout,
TimeUnit.MILLISECONDS))
{
- throw new TimeoutException("State retrieval timed out waiting for
flush to block. (timeout = " + timeout + " millis) ");
+ throw new TimeoutException("State retrieval timed out waiting for
flush to block. (timeout = " + CachePrinter.prettyPrint(timeout) + " )");
}
return;
}
@@ -304,7 +302,7 @@
switch (configuration.getCacheMode())
{
case LOCAL:
- log.debug("cache mode is local, will not create the channel");
+ if (log.isDebugEnabled()) log.debug("cache mode is local, will not
create the channel");
isInLocalMode = true;
isUsingBuddyReplication = false;
break;
@@ -314,10 +312,7 @@
case INVALIDATION_SYNC:
isInLocalMode = false;
isUsingBuddyReplication = configuration.getBuddyReplicationConfig() != null
&& configuration.getBuddyReplicationConfig().isEnabled();
- if (log.isDebugEnabled())
- {
- log.debug("Cache mode is " + configuration.getCacheMode());
- }
+ if (log.isDebugEnabled()) log.debug("Cache mode is " +
configuration.getCacheMode());
boolean fetchState = shouldFetchStateOnStartup();
boolean nonBlocking = configuration.isNonBlockingStateTransfer();
@@ -334,10 +329,7 @@
componentRegistry.setBlockInStarting(false);
}
channel.connect(configuration.getClusterName());
- if (log.isInfoEnabled())
- {
- log.info("Cache local address is " + getLocalAddress());
- }
+ if (log.isInfoEnabled()) log.info("Cache local address is " +
getLocalAddress());
}
catch (ChannelException e)
{
@@ -361,15 +353,9 @@
try
{
channel.connect(configuration.getClusterName(), null, null,
configuration.getStateRetrievalTimeout());
- if (log.isInfoEnabled())
- {
- log.info("Cache local address is " + getLocalAddress());
- }
+ if (log.isInfoEnabled()) log.info("Cache local address is " +
getLocalAddress());
- if (getMembers().size() > 1)
- {
- messageListener.waitForState();
- }
+ if (getMembers().size() > 1) messageListener.waitForState();
}
catch (ChannelException e)
{
@@ -384,12 +370,8 @@
}
}
- if (log.isInfoEnabled())
- {
- log.info("state was retrieved successfully (in " +
(System.currentTimeMillis() - start) + " milliseconds)");
- }
+ if (log.isInfoEnabled()) log.info("state was retrieved successfully (in
" + CachePrinter.prettyPrint((System.currentTimeMillis() - start)) + ")");
}
-
}
private void startNonBlockStateTransfer(List<Address> members)
@@ -414,10 +396,7 @@
try
{
- if (log.isInfoEnabled())
- {
- log.info("Trying to fetch state from: " + member);
- }
+ if (log.isInfoEnabled()) log.info("Trying to fetch state from: "
+ member);
if (getState(null, member))
{
messageListener.waitForState();
@@ -427,19 +406,13 @@
}
catch (Exception e)
{
- if (log.isDebugEnabled())
- {
- log.debug("Error while fetching state", e);
- }
+ if (log.isDebugEnabled()) log.debug("Error while fetching
state", e);
}
}
if (!success)
{
- if (log.isWarnEnabled())
- {
- log.warn("Could not find available peer for state, backing off and
retrying");
- }
+ if (log.isWarnEnabled()) log.warn("Could not find available peer for
state, backing off and retrying");
try
{
@@ -466,7 +439,7 @@
{
if (channel != null && channel.isOpen())
{
- log.info("Disconnecting and closing the Channel");
+ if (log.isInfoEnabled()) log.info("Disconnecting and closing the
Channel");
channel.disconnect();
channel.close();
}
@@ -488,7 +461,7 @@
configuration.getRuntimeConfig().setChannel(null);
if (rpcDispatcher != null)
{
- log.info("Stopping the RpcDispatcher");
+ if (log.isInfoEnabled()) log.info("Stopping the RpcDispatcher");
rpcDispatcher.stop();
}
@@ -521,10 +494,7 @@
{
ReflectionUtil.setValue(configuration, "accessible", true);
configuration.setUsingMultiplexer(true);
- if (log.isDebugEnabled())
- {
- log.debug("Created Multiplexer Channel for cache cluster " +
configuration.getClusterName() + " using stack " +
configuration.getMultiplexerStack());
- }
+ if (log.isDebugEnabled()) log.debug("Created Multiplexer Channel for
cache cluster " + configuration.getClusterName() + " using stack " +
configuration.getMultiplexerStack());
}
else
{
@@ -533,20 +503,17 @@
if (configuration.getJGroupsConfigFile() != null)
{
URL u = configuration.getJGroupsConfigFile();
- if (log.isTraceEnabled()) log.trace("Grabbing cluster properties
from " + u);
+ if (trace) log.trace("Grabbing cluster properties from " +
u);
channel = new JChannel(u);
}
else if (configuration.getClusterConfig() == null)
{
- log.debug("setting cluster properties to default value");
+ if (log.isDebugEnabled()) log.debug("setting cluster properties to
default value");
channel = new JChannel(configuration.getDefaultClusterConfig());
}
else
{
- if (trace)
- {
- log.trace("Cache cluster properties: " +
configuration.getClusterConfig());
- }
+ if (trace) log.trace("Cache cluster properties: " +
configuration.getClusterConfig());
channel = new JChannel(configuration.getClusterConfig());
}
}
@@ -700,14 +667,8 @@
if (rpcDispatcher == null) return null;
int modeToUse = mode;
int preferredMode;
- if ((preferredMode =
spi.getInvocationContext().getOptionOverrides().getGroupRequestMode()) > -1)
- {
- modeToUse = preferredMode;
- }
- if (trace)
- {
- log.trace("callRemoteMethods(): valid members are " + recipients +
" methods: " + command + " Using OOB? " + useOutOfBandMessage + "
modeToUse: " + modeToUse);
- }
+ if ((preferredMode =
spi.getInvocationContext().getOptionOverrides().getGroupRequestMode()) > -1) modeToUse
= preferredMode;
+ if (trace) log.trace("callRemoteMethods(): valid members are " +
recipients + " methods: " + command + " Using OOB? " +
useOutOfBandMessage + " modeToUse: " + modeToUse);
flushTracker.lockProcessingLock();
unlock = true;
@@ -716,10 +677,7 @@
useOutOfBandMessage = false;
RspList rsps = rpcDispatcher.invokeRemoteCommands(recipients, command,
modeToUse, timeout, isUsingBuddyReplication, useOutOfBandMessage, responseFilter);
if (mode == GroupRequest.GET_NONE) return Collections.emptyList();// async case
- if (trace)
- {
- log.trace("(" + getLocalAddress() + "): responses for method
" + command.getClass().getSimpleName() + ":\n" + rsps);
- }
+ if (trace) log.trace("(" + getLocalAddress() + "): responses for
method " + command.getClass().getSimpleName() + ":\n" + rsps);
// short-circuit no-return-value calls.
if (rsps == null) return Collections.emptyList();
List<Object> retval = new ArrayList<Object>(rsps.size());
@@ -791,10 +749,7 @@
// should this really be throwing an exception? Are there valid use cases where
partial state may not be available? - Manik
// Yes -- cache is configured LOCAL but app doesn't know it -- Brian
//throw new IllegalArgumentException("Cannot fetch partial state, targets
are " + sources + " and stateId is " + stateId);
- if (log.isWarnEnabled())
- {
- log.warn("Cannot fetch partial state, targets are " + sources +
" and stateId is " + stateId);
- }
+ if (log.isWarnEnabled()) log.warn("Cannot fetch partial state, targets are
" + sources + " and stateId is " + stateId);
return;
}
@@ -811,19 +766,13 @@
return;
}
- if (log.isDebugEnabled())
- {
- log.debug("Node " + getLocalAddress() + " fetching partial state
" + stateId + " from members " + targets);
- }
+ if (log.isDebugEnabled()) log.debug("Node " + getLocalAddress() + "
fetching partial state " + stateId + " from members " + targets);
boolean successfulTransfer = false;
for (Address target : targets)
{
try
{
- if (log.isDebugEnabled())
- {
- log.debug("Node " + getLocalAddress() + " fetching partial
state " + stateId + " from member " + target);
- }
+ if (log.isDebugEnabled()) log.debug("Node " + getLocalAddress() +
" fetching partial state " + stateId + " from member " + target);
messageListener.setStateSet(false);
successfulTransfer = getState(stateId, target);
if (successfulTransfer)
@@ -834,31 +783,21 @@
}
catch (Exception transferFailed)
{
- if (log.isTraceEnabled()) log.trace("Error while fetching
state", transferFailed);
+ if (trace) log.trace("Error while fetching state",
transferFailed);
successfulTransfer = false;
}
}
- if (log.isDebugEnabled())
- {
- log.debug("Node " + getLocalAddress() + " fetching partial
state " + stateId + " from member " + target + (successfulTransfer ? "
successful" : " failed"));
- }
+ if (log.isDebugEnabled()) log.debug("Node " + getLocalAddress() +
" fetching partial state " + stateId + " from member " + target +
(successfulTransfer ? " successful" : " failed"));
if (successfulTransfer) break;
}
catch (IllegalStateException ise)
{
// thrown by the JGroups channel if state retrieval fails.
- if (log.isInfoEnabled())
- {
- log.info("Channel problems fetching state. Continuing on to next
provider. ", ise);
- }
+ if (log.isInfoEnabled()) log.info("Channel problems fetching state.
Continuing on to next provider. ", ise);
}
}
- if (!successfulTransfer && log.isDebugEnabled())
- {
- log.debug("Node " + getLocalAddress() + " could not fetch partial
state " + stateId + " from any member " + targets);
- }
-
+ if (!successfulTransfer && log.isDebugEnabled()) log.debug("Node
" + getLocalAddress() + " could not fetch partial state " + stateId +
" from any member " + targets);
}
private boolean getState(String stateId, Address target) throws
ChannelNotConnectedException, ChannelClosedException
@@ -925,7 +864,7 @@
{
while (members == null || members.isEmpty())
{
- log.debug("getCoordinator(): waiting on viewAccepted()");
+ if (log.isDebugEnabled()) log.debug("getCoordinator(): waiting on
viewAccepted()");
try
{
coordinatorLock.wait();
@@ -1142,7 +1081,7 @@
{
ProtocolStack stack = ((JChannel) channel).getProtocolStack();
TP transport = stack.getTransport();
- if (transport.isEnableBundling())
+ if (transport.isEnableBundling() && log.isWarnEnabled())
{
log.warn("You have enabled jgroups's message bundling, which is not
recommended for sync replication. If there is no particular " +
"reason for this we strongly recommend to disable message bundling
in JGroups config (enable_bundling=\"false\").");
@@ -1153,7 +1092,7 @@
{
ProtocolStack stack = ((JChannel) channel).getProtocolStack();
TP transport = stack.getTransport();
- if (!transport.isEnableBundling())
+ if (!transport.isEnableBundling() && log.isWarnEnabled())
{
log.warn("You have disabled jgroups's message bundling, which is not
recommended for async replication. If there is no particular " +
"reason for this we strongly recommend to enable message bundling
in JGroups config (enable_bundling=\"true\").");
Modified:
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java 2009-02-23
14:35:58 UTC (rev 7766)
+++
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java 2009-02-24
11:28:25 UTC (rev 7767)
@@ -38,6 +38,7 @@
import org.jboss.cache.marshall.Marshaller;
import org.jboss.cache.marshall.NodeData;
import org.jboss.cache.marshall.NodeDataMarker;
+import org.jboss.cache.util.CachePrinter;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@@ -104,7 +105,7 @@
generator.generateState(out, subtreeRoot, fetchTransientState,
fetchPersistentState, suppressErrors);
if (log.isDebugEnabled())
- log.debug("Successfully generated state in " +
(System.currentTimeMillis() - startTime) + " msec");
+ log.debug("Successfully generated state in " +
CachePrinter.prettyPrint(System.currentTimeMillis() - startTime));
}
else
{
@@ -195,6 +196,6 @@
integrator.integrateState(state, targetRoot.getDelegationTarget(),
targetRoot.getFqn(), fetchPersistentState);
if (log.isDebugEnabled())
- log.debug("successfully integrated state in " +
(System.currentTimeMillis() - startTime) + " msec");
+ log.debug("successfully integrated state in " +
CachePrinter.prettyPrint(System.currentTimeMillis() - startTime));
}
}
Modified:
core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferManager.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferManager.java 2009-02-23
14:35:58 UTC (rev 7766)
+++
core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferManager.java 2009-02-24
11:28:25 UTC (rev 7767)
@@ -25,6 +25,7 @@
import org.jboss.cache.Fqn;
import org.jboss.cache.NodeSPI;
import org.jboss.cache.RegionEmptyException;
+import org.jboss.cache.util.CachePrinter;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.Start;
import org.jboss.cache.lock.LockManager;
@@ -82,7 +83,7 @@
generator.generateState(out, rootNode, fetchTransientState,
fetchPersistentState, suppressErrors);
if (log.isDebugEnabled())
{
- log.debug("Successfully generated state in " +
(System.currentTimeMillis() - startTime) + " msec");
+ log.debug("Successfully generated state in " +
CachePrinter.prettyPrint(System.currentTimeMillis() - startTime));
}
}
finally
@@ -154,7 +155,7 @@
integrator.integrateState(state, targetRoot, targetRoot.getFqn(),
fetchPersistentState);
if (log.isDebugEnabled())
{
- log.debug("successfully integrated state in " +
(System.currentTimeMillis() - startTime) + " msec");
+ log.debug("successfully integrated state in " +
CachePrinter.prettyPrint(System.currentTimeMillis() - startTime));
}
}
finally
Modified: core/trunk/src/main/java/org/jboss/cache/util/CachePrinter.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/CachePrinter.java 2009-02-23 14:35:58
UTC (rev 7766)
+++ core/trunk/src/main/java/org/jboss/cache/util/CachePrinter.java 2009-02-24 11:28:25
UTC (rev 7767)
@@ -28,6 +28,8 @@
import org.jboss.cache.interceptors.base.CommandInterceptor;
import org.jboss.cache.invocation.CacheInvocationDelegate;
+import java.text.NumberFormat;
+
/**
* Helper that prints the contents of a {@link org.jboss.cache.Cache} to a string.
*
@@ -109,4 +111,32 @@
s = s.replaceAll(" ", " ");
return s;
}
+
+ /**
+ * Prints a time for display
+ * @param millis time in millis
+ * @return the time, represented as millis, seconds, minutes or hours as appropriate,
with suffix
+ */
+ public static String prettyPrint(long millis)
+ {
+ if (millis < 1000) return millis + " milliseconds";
+ NumberFormat nf = NumberFormat.getNumberInstance();
+ nf.setMaximumFractionDigits(2);
+ double toPrint = ((double) millis) / 1000;
+ if (toPrint < 300)
+ {
+ return nf.format(toPrint) + " seconds";
+ }
+
+ toPrint = toPrint / 60;
+
+ if (toPrint < 120)
+ {
+ return nf.format(toPrint) + " minutes";
+ }
+
+ toPrint = toPrint / 60;
+
+ return nf.format(toPrint) + " hours";
+ }
}