[jbosscache-commits] JBoss Cache SVN: r7767 - in core/trunk/src/main/java/org/jboss/cache: statetransfer and 1 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Tue Feb 24 06:28:25 EST 2009


Author: manik.surtani at jboss.com
Date: 2009-02-24 06:28:25 -0500 (Tue, 24 Feb 2009)
New Revision: 7767

Modified:
   core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
   core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java
   core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferManager.java
   core/trunk/src/main/java/org/jboss/cache/util/CachePrinter.java
Log:
Pretty printing of state transfer duration

Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2009-02-23 14:35:58 UTC (rev 7766)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2009-02-24 11:28:25 UTC (rev 7767)
@@ -21,29 +21,12 @@
  */
 package org.jboss.cache;
 
-import java.net.URL;
-import java.text.NumberFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.Vector;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-
-import javax.transaction.TransactionManager;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.cache.commands.ReplicableCommand;
 import org.jboss.cache.config.Configuration;
-import org.jboss.cache.config.RuntimeConfig;
 import org.jboss.cache.config.Configuration.NodeLockingScheme;
+import org.jboss.cache.config.RuntimeConfig;
 import org.jboss.cache.factories.ComponentRegistry;
 import org.jboss.cache.factories.annotations.Inject;
 import org.jboss.cache.factories.annotations.Start;
@@ -64,6 +47,7 @@
 import org.jboss.cache.statetransfer.DefaultStateTransferManager;
 import org.jboss.cache.transaction.GlobalTransaction;
 import org.jboss.cache.transaction.TransactionTable;
+import org.jboss.cache.util.CachePrinter;
 import org.jboss.cache.util.concurrent.ReclosableLatch;
 import org.jboss.cache.util.reflect.ReflectionUtil;
 import org.jgroups.Address;
@@ -82,6 +66,20 @@
 import org.jgroups.util.Rsp;
 import org.jgroups.util.RspList;
 
+import javax.transaction.TransactionManager;
+import java.net.URL;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.Vector;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 /**
  * Manager that handles all RPC calls between JBoss Cache instances
  *
@@ -92,12 +90,14 @@
 {
    private Channel channel;
    private final Log log = LogFactory.getLog(RPCManagerImpl.class);
+   private final boolean trace = log.isTraceEnabled();
    private volatile List<Address> members;
    private long replicationCount;
    private long replicationFailures;
-   private boolean statisticsEnabled = false;
 
+   private boolean statisticsEnabled = false;
    private final Object coordinatorLock = new Object();
+
    /**
     * True if this Cache is the coordinator.
     */
@@ -112,7 +112,6 @@
     * JGroups RpcDispatcher in use.
     */
    private CommandAwareRpcDispatcher rpcDispatcher = null;
-
    /**
     * JGroups message listener.
     */
@@ -121,7 +120,6 @@
    private Notifier notifier;
    private CacheSPI spi;
    private InvocationContextContainer invocationContextContainer;
-   private final boolean trace = log.isTraceEnabled();
    private Marshaller marshaller;
    private TransactionManager txManager;
    private TransactionTable txTable;
@@ -191,7 +189,7 @@
             {
                if (channel.flushSupported() && !flushBlockGate.await(timeout, TimeUnit.MILLISECONDS))
                {
-                  throw new TimeoutException("State retrieval timed out waiting for flush to unblock. (timeout = " + timeout + " millis) ");
+                  throw new TimeoutException("State retrieval timed out waiting for flush to unblock. (timeout = " + CachePrinter.prettyPrint(timeout) + ")");
                }
                return;
             }
@@ -210,7 +208,7 @@
             {
                if (channel.flushSupported() && !flushWaitGate.await(timeout, TimeUnit.MILLISECONDS))
                {
-                  throw new TimeoutException("State retrieval timed out waiting for flush to block. (timeout = " + timeout + " millis) ");
+                  throw new TimeoutException("State retrieval timed out waiting for flush to block. (timeout = " + CachePrinter.prettyPrint(timeout) + " )");
                }
                return;
             }
@@ -304,7 +302,7 @@
       switch (configuration.getCacheMode())
       {
          case LOCAL:
-            log.debug("cache mode is local, will not create the channel");
+            if (log.isDebugEnabled()) log.debug("cache mode is local, will not create the channel");
             isInLocalMode = true;
             isUsingBuddyReplication = false;
             break;
@@ -314,10 +312,7 @@
          case INVALIDATION_SYNC:
             isInLocalMode = false;
             isUsingBuddyReplication = configuration.getBuddyReplicationConfig() != null && configuration.getBuddyReplicationConfig().isEnabled();
-            if (log.isDebugEnabled())
-            {
-               log.debug("Cache mode is " + configuration.getCacheMode());
-            }
+            if (log.isDebugEnabled()) log.debug("Cache mode is " + configuration.getCacheMode());
 
             boolean fetchState = shouldFetchStateOnStartup();
             boolean nonBlocking = configuration.isNonBlockingStateTransfer();
@@ -334,10 +329,7 @@
                      componentRegistry.setBlockInStarting(false);
                   }
                   channel.connect(configuration.getClusterName());
-                  if (log.isInfoEnabled())
-                  {
-                     log.info("Cache local address is " + getLocalAddress());
-                  }
+                  if (log.isInfoEnabled()) log.info("Cache local address is " + getLocalAddress());
                }
                catch (ChannelException e)
                {
@@ -361,15 +353,9 @@
                try
                {
                   channel.connect(configuration.getClusterName(), null, null, configuration.getStateRetrievalTimeout());
-                  if (log.isInfoEnabled())
-                  {
-                     log.info("Cache local address is " + getLocalAddress());
-                  }
+                  if (log.isInfoEnabled()) log.info("Cache local address is " + getLocalAddress());
 
-                  if (getMembers().size() > 1)
-                  {
-                     messageListener.waitForState();
-                  }
+                  if (getMembers().size() > 1) messageListener.waitForState();
                }
                catch (ChannelException e)
                {
@@ -384,12 +370,8 @@
                }
             }
 
-            if (log.isInfoEnabled())
-            {
-               log.info("state was retrieved successfully (in " + (System.currentTimeMillis() - start) + " milliseconds)");
-            }
+            if (log.isInfoEnabled()) log.info("state was retrieved successfully (in " + CachePrinter.prettyPrint((System.currentTimeMillis() - start)) + ")");
       }
-
    }
 
    private void startNonBlockStateTransfer(List<Address> members)
@@ -414,10 +396,7 @@
 
             try
             {
-               if (log.isInfoEnabled())
-               {
-                  log.info("Trying to fetch state from: " + member);
-               }
+               if (log.isInfoEnabled()) log.info("Trying to fetch state from: " + member);
                if (getState(null, member))
                {
                   messageListener.waitForState();
@@ -427,19 +406,13 @@
             }
             catch (Exception e)
             {
-               if (log.isDebugEnabled())
-               {
-                  log.debug("Error while fetching state", e);
-               }
+               if (log.isDebugEnabled()) log.debug("Error while fetching state", e);
             }
          }
 
          if (!success)
          {
-            if (log.isWarnEnabled())
-            {
-               log.warn("Could not find available peer for state, backing off and retrying");
-            }
+            if (log.isWarnEnabled()) log.warn("Could not find available peer for state, backing off and retrying");
 
             try
             {
@@ -466,7 +439,7 @@
    {
       if (channel != null && channel.isOpen())
       {
-         log.info("Disconnecting and closing the Channel");
+         if (log.isInfoEnabled()) log.info("Disconnecting and closing the Channel");
          channel.disconnect();
          channel.close();
       }
@@ -488,7 +461,7 @@
       configuration.getRuntimeConfig().setChannel(null);
       if (rpcDispatcher != null)
       {
-         log.info("Stopping the RpcDispatcher");
+         if (log.isInfoEnabled()) log.info("Stopping the RpcDispatcher");
          rpcDispatcher.stop();
       }
 
@@ -521,10 +494,7 @@
          {
             ReflectionUtil.setValue(configuration, "accessible", true);
             configuration.setUsingMultiplexer(true);
-            if (log.isDebugEnabled())
-            {
-               log.debug("Created Multiplexer Channel for cache cluster " + configuration.getClusterName() + " using stack " + configuration.getMultiplexerStack());
-            }
+            if (log.isDebugEnabled()) log.debug("Created Multiplexer Channel for cache cluster " + configuration.getClusterName() + " using stack " + configuration.getMultiplexerStack());
          }
          else
          {
@@ -533,20 +503,17 @@
                if (configuration.getJGroupsConfigFile() != null)
                {
                   URL u = configuration.getJGroupsConfigFile();
-                  if (log.isTraceEnabled()) log.trace("Grabbing cluster properties from " + u);
+                  if (trace) log.trace("Grabbing cluster properties from " + u);
                   channel = new JChannel(u);
                }
                else if (configuration.getClusterConfig() == null)
                {
-                  log.debug("setting cluster properties to default value");
+                  if (log.isDebugEnabled()) log.debug("setting cluster properties to default value");
                   channel = new JChannel(configuration.getDefaultClusterConfig());
                }
                else
                {
-                  if (trace)
-                  {
-                     log.trace("Cache cluster properties: " + configuration.getClusterConfig());
-                  }
+                  if (trace) log.trace("Cache cluster properties: " + configuration.getClusterConfig());
                   channel = new JChannel(configuration.getClusterConfig());
                }
             }
@@ -700,14 +667,8 @@
          if (rpcDispatcher == null) return null;
          int modeToUse = mode;
          int preferredMode;
-         if ((preferredMode = spi.getInvocationContext().getOptionOverrides().getGroupRequestMode()) > -1)
-         {
-            modeToUse = preferredMode;
-         }
-         if (trace)
-         {
-            log.trace("callRemoteMethods(): valid members are " + recipients + " methods: " + command + " Using OOB? " + useOutOfBandMessage + " modeToUse: " + modeToUse);
-         }
+         if ((preferredMode = spi.getInvocationContext().getOptionOverrides().getGroupRequestMode()) > -1) modeToUse = preferredMode;
+         if (trace) log.trace("callRemoteMethods(): valid members are " + recipients + " methods: " + command + " Using OOB? " + useOutOfBandMessage + " modeToUse: " + modeToUse);
 
          flushTracker.lockProcessingLock();
          unlock = true;
@@ -716,10 +677,7 @@
          useOutOfBandMessage = false;
          RspList rsps = rpcDispatcher.invokeRemoteCommands(recipients, command, modeToUse, timeout, isUsingBuddyReplication, useOutOfBandMessage, responseFilter);
          if (mode == GroupRequest.GET_NONE) return Collections.emptyList();// async case
-         if (trace)
-         {
-            log.trace("(" + getLocalAddress() + "): responses for method " + command.getClass().getSimpleName() + ":\n" + rsps);
-         }
+         if (trace) log.trace("(" + getLocalAddress() + "): responses for method " + command.getClass().getSimpleName() + ":\n" + rsps);
          // short-circuit no-return-value calls.
          if (rsps == null) return Collections.emptyList();
          List<Object> retval = new ArrayList<Object>(rsps.size());
@@ -791,10 +749,7 @@
          // should this really be throwing an exception?  Are there valid use cases where partial state may not be available? - Manik
          // Yes -- cache is configured LOCAL but app doesn't know it -- Brian
          //throw new IllegalArgumentException("Cannot fetch partial state, targets are " + sources + " and stateId is " + stateId);
-         if (log.isWarnEnabled())
-         {
-            log.warn("Cannot fetch partial state, targets are " + sources + " and stateId is " + stateId);
-         }
+         if (log.isWarnEnabled()) log.warn("Cannot fetch partial state, targets are " + sources + " and stateId is " + stateId);
          return;
       }
 
@@ -811,19 +766,13 @@
          return;
       }
 
-      if (log.isDebugEnabled())
-      {
-         log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from members " + targets);
-      }
+      if (log.isDebugEnabled()) log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from members " + targets);
       boolean successfulTransfer = false;
       for (Address target : targets)
       {
          try
          {
-            if (log.isDebugEnabled())
-            {
-               log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target);
-            }
+            if (log.isDebugEnabled()) log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target);
             messageListener.setStateSet(false);
             successfulTransfer = getState(stateId, target);
             if (successfulTransfer)
@@ -834,31 +783,21 @@
                }
                catch (Exception transferFailed)
                {
-                  if (log.isTraceEnabled()) log.trace("Error while fetching state", transferFailed);
+                  if (trace) log.trace("Error while fetching state", transferFailed);
                   successfulTransfer = false;
                }
             }
-            if (log.isDebugEnabled())
-            {
-               log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target + (successfulTransfer ? " successful" : " failed"));
-            }
+            if (log.isDebugEnabled()) log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target + (successfulTransfer ? " successful" : " failed"));
             if (successfulTransfer) break;
          }
          catch (IllegalStateException ise)
          {
             // thrown by the JGroups channel if state retrieval fails.
-            if (log.isInfoEnabled())
-            {
-               log.info("Channel problems fetching state.  Continuing on to next provider. ", ise);
-            }
+            if (log.isInfoEnabled()) log.info("Channel problems fetching state.  Continuing on to next provider. ", ise);
          }
       }
 
-      if (!successfulTransfer && log.isDebugEnabled())
-      {
-         log.debug("Node " + getLocalAddress() + " could not fetch partial state " + stateId + " from any member " + targets);
-      }
-
+      if (!successfulTransfer && log.isDebugEnabled()) log.debug("Node " + getLocalAddress() + " could not fetch partial state " + stateId + " from any member " + targets);
    }
 
    private boolean getState(String stateId, Address target) throws ChannelNotConnectedException, ChannelClosedException
@@ -925,7 +864,7 @@
       {
          while (members == null || members.isEmpty())
          {
-            log.debug("getCoordinator(): waiting on viewAccepted()");
+            if (log.isDebugEnabled()) log.debug("getCoordinator(): waiting on viewAccepted()");
             try
             {
                coordinatorLock.wait();
@@ -1142,7 +1081,7 @@
       {
          ProtocolStack stack = ((JChannel) channel).getProtocolStack();
          TP transport = stack.getTransport();
-         if (transport.isEnableBundling())
+         if (transport.isEnableBundling() && log.isWarnEnabled())
          {
             log.warn("You have enabled jgroups's message bundling, which is not recommended for sync replication. If there is no particular " +
                   "reason for this we strongly recommend to disable message bundling in JGroups config (enable_bundling=\"false\").");
@@ -1153,7 +1092,7 @@
       {
          ProtocolStack stack = ((JChannel) channel).getProtocolStack();
          TP transport = stack.getTransport();
-         if (!transport.isEnableBundling())
+         if (!transport.isEnableBundling() && log.isWarnEnabled())
          {
             log.warn("You have disabled jgroups's message bundling, which is not recommended for async replication. If there is no particular " +
                   "reason for this we strongly recommend to enable message bundling in JGroups config (enable_bundling=\"true\").");

Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java	2009-02-23 14:35:58 UTC (rev 7766)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java	2009-02-24 11:28:25 UTC (rev 7767)
@@ -38,6 +38,7 @@
 import org.jboss.cache.marshall.Marshaller;
 import org.jboss.cache.marshall.NodeData;
 import org.jboss.cache.marshall.NodeDataMarker;
+import org.jboss.cache.util.CachePrinter;
 
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -104,7 +105,7 @@
          generator.generateState(out, subtreeRoot, fetchTransientState, fetchPersistentState, suppressErrors);
 
          if (log.isDebugEnabled())
-            log.debug("Successfully generated state in " + (System.currentTimeMillis() - startTime) + " msec");
+            log.debug("Successfully generated state in " + CachePrinter.prettyPrint(System.currentTimeMillis() - startTime));
       }
       else
       {
@@ -195,6 +196,6 @@
       integrator.integrateState(state, targetRoot.getDelegationTarget(), targetRoot.getFqn(), fetchPersistentState);
 
       if (log.isDebugEnabled())
-         log.debug("successfully integrated state in " + (System.currentTimeMillis() - startTime) + " msec");
+         log.debug("successfully integrated state in " + CachePrinter.prettyPrint(System.currentTimeMillis() - startTime));
    }
 }

Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferManager.java	2009-02-23 14:35:58 UTC (rev 7766)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferManager.java	2009-02-24 11:28:25 UTC (rev 7767)
@@ -25,6 +25,7 @@
 import org.jboss.cache.Fqn;
 import org.jboss.cache.NodeSPI;
 import org.jboss.cache.RegionEmptyException;
+import org.jboss.cache.util.CachePrinter;
 import org.jboss.cache.factories.annotations.Inject;
 import org.jboss.cache.factories.annotations.Start;
 import org.jboss.cache.lock.LockManager;
@@ -82,7 +83,7 @@
             generator.generateState(out, rootNode, fetchTransientState, fetchPersistentState, suppressErrors);
             if (log.isDebugEnabled())
             {
-               log.debug("Successfully generated state in " + (System.currentTimeMillis() - startTime) + " msec");
+               log.debug("Successfully generated state in " + CachePrinter.prettyPrint(System.currentTimeMillis() - startTime));
             }
          }
          finally
@@ -154,7 +155,7 @@
          integrator.integrateState(state, targetRoot, targetRoot.getFqn(), fetchPersistentState);
          if (log.isDebugEnabled())
          {
-            log.debug("successfully integrated state in " + (System.currentTimeMillis() - startTime) + " msec");
+            log.debug("successfully integrated state in " + CachePrinter.prettyPrint(System.currentTimeMillis() - startTime));
          }
       }
       finally

Modified: core/trunk/src/main/java/org/jboss/cache/util/CachePrinter.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/CachePrinter.java	2009-02-23 14:35:58 UTC (rev 7766)
+++ core/trunk/src/main/java/org/jboss/cache/util/CachePrinter.java	2009-02-24 11:28:25 UTC (rev 7767)
@@ -28,6 +28,8 @@
 import org.jboss.cache.interceptors.base.CommandInterceptor;
 import org.jboss.cache.invocation.CacheInvocationDelegate;
 
+import java.text.NumberFormat;
+
 /**
  * Helper that prints the contents of a {@link org.jboss.cache.Cache} to a string.
  *
@@ -109,4 +111,32 @@
       s = s.replaceAll(" ", "&nbsp;");
       return s;
    }
+
+   /**
+    * Prints a time for display
+    * @param millis time in millis
+    * @return the time, represented as millis, seconds, minutes or hours as appropriate, with suffix
+    */
+   public static String prettyPrint(long millis)
+   {
+      if (millis < 1000) return millis + " milliseconds";
+      NumberFormat nf = NumberFormat.getNumberInstance();
+      nf.setMaximumFractionDigits(2);
+      double toPrint = ((double) millis) / 1000;
+      if (toPrint < 300)
+      {
+         return nf.format(toPrint) + " seconds";
+      }
+
+      toPrint = toPrint / 60;
+
+      if (toPrint < 120)
+      {
+         return nf.format(toPrint) + " minutes";
+      }
+
+      toPrint = toPrint / 60;
+
+      return nf.format(toPrint) + " hours";
+   }
 }




More information about the jbosscache-commits mailing list