[infinispan-commits] Infinispan SVN: r1082 - trunk/core/src/main/java/org/infinispan/remoting/rpc.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Fri Oct 30 10:41:41 EDT 2009


Author: manik.surtani at jboss.com
Date: 2009-10-30 10:41:41 -0400 (Fri, 30 Oct 2009)
New Revision: 1082

Modified:
   trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
Log:
[ISPN-142] (More JMX information to be exposed)

Modified: trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java	2009-10-30 14:09:00 UTC (rev 1081)
+++ trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java	2009-10-30 14:41:41 UTC (rev 1082)
@@ -54,6 +54,9 @@
    private Transport t;
    private final AtomicLong replicationCount = new AtomicLong(0);
    private final AtomicLong replicationFailures = new AtomicLong(0);
+   private final AtomicLong totalReplicationTime = new AtomicLong(0);
+   private final AtomicLong numReplications = new AtomicLong(0);
+
    @ManagedAttribute(description = "Enables or disables the gathering of statistics by this component", writable = true)
    boolean statisticsEnabled = false; // by default, don't gather statistics.
    private volatile Address currentStateTransferSource;
@@ -119,62 +122,73 @@
    }
 
    public void retrieveState(String cacheName, long timeout) throws StateTransferException {
-      if (t.isSupportStateTransfer()) {
-         long initialWaitTime = configuration.getStateRetrievalInitialRetryWaitTime();
-         int waitTimeIncreaseFactor = configuration.getStateRetrievalRetryWaitTimeIncreaseFactor();
-         int numRetries = configuration.getStateRetrievalNumRetries();
-         List<Address> members = t.getMembers();
-         if (members.size() < 2) {
-            if (log.isDebugEnabled())
-               log.debug("We're the only member in the cluster; no one to retrieve state from. Not doing anything!");
-            return;
-         }
+      long startTime = 0;
+      if (statisticsEnabled) startTime = System.currentTimeMillis();
 
-         boolean success = false;
+      try {
+         if (t.isSupportStateTransfer()) {
+            long initialWaitTime = configuration.getStateRetrievalInitialRetryWaitTime();
+            int waitTimeIncreaseFactor = configuration.getStateRetrievalRetryWaitTimeIncreaseFactor();
+            int numRetries = configuration.getStateRetrievalNumRetries();
+            List<Address> members = t.getMembers();
+            if (members.size() < 2) {
+               if (log.isDebugEnabled())
+                  log.debug("We're the only member in the cluster; no one to retrieve state from. Not doing anything!");
+               return;
+            }
 
-         try {
-            long wait = initialWaitTime;
-            outer:
-            for (int i = 0; i < numRetries; i++) {
-               for (Address member : members) {
-                  if (!member.equals(t.getAddress())) {
-                     try {
-                        if (log.isInfoEnabled()) log.info("Trying to fetch state from {0}", member);
-                        currentStateTransferSource = member;
-                        if (t.retrieveState(cacheName, member, timeout)) {
-                           if (log.isInfoEnabled())
-                              log.info("Successfully retrieved and applied state from {0}", member);
-                           success = true;
-                           break outer;
+            boolean success = false;
+
+            try {
+               long wait = initialWaitTime;
+               outer:
+               for (int i = 0; i < numRetries; i++) {
+                  for (Address member : members) {
+                     if (!member.equals(t.getAddress())) {
+                        try {
+                           if (log.isInfoEnabled()) log.info("Trying to fetch state from {0}", member);
+                           currentStateTransferSource = member;
+                           if (t.retrieveState(cacheName, member, timeout)) {
+                              if (log.isInfoEnabled())
+                                 log.info("Successfully retrieved and applied state from {0}", member);
+                              success = true;
+                              break outer;
+                           }
+                        } catch (StateTransferException e) {
+                           if (log.isDebugEnabled()) log.debug("Error while fetching state from member " + member, e);
+                        } finally {
+                           currentStateTransferSource = null;
                         }
-                     } catch (StateTransferException e) {
-                        if (log.isDebugEnabled()) log.debug("Error while fetching state from member " + member, e);
-                     } finally {
-                        currentStateTransferSource = null;
                      }
                   }
-               }
 
-               if (!success) {
-                  if (log.isWarnEnabled())
-                     log.warn("Could not find available peer for state, backing off and retrying");
+                  if (!success) {
+                     if (log.isWarnEnabled())
+                        log.warn("Could not find available peer for state, backing off and retrying");
 
-                  try {
-                     Thread.sleep(wait *= waitTimeIncreaseFactor);
+                     try {
+                        Thread.sleep(wait *= waitTimeIncreaseFactor);
+                     }
+                     catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                     }
                   }
-                  catch (InterruptedException e) {
-                     Thread.currentThread().interrupt();
-                  }
+
                }
+            } finally {
+               currentStateTransferSource = null;
+            }
 
-            }
-         } finally {
-            currentStateTransferSource = null;
+            if (!success) throw new StateTransferException("Unable to fetch state on startup");
+         } else {
+            throw new StateTransferException("Transport does not, or is not configured to, support state transfer.  Please disable fetching state on startup, or reconfigure your transport.");
          }
-
-         if (!success) throw new StateTransferException("Unable to fetch state on startup");
-      } else {
-         throw new StateTransferException("Transport does not, or is not configured to, support state transfer.  Please disable fetching state on startup, or reconfigure your transport.");
+      } finally {
+         if (statisticsEnabled) {
+            long timeTaken = System.currentTimeMillis() - startTime;
+            totalReplicationTime.getAndAdd(timeTaken);
+            numReplications.getAndIncrement();
+         }
       }
    }
 
@@ -278,6 +292,8 @@
    public void resetStatistics() {
       replicationCount.set(0);
       replicationFailures.set(0);
+      totalReplicationTime.set(0);
+      numReplications.set(0);
    }
 
    @ManagedAttribute(description = "Number of successful replications")
@@ -323,7 +339,7 @@
       List<Address> addressList = t.getMembers();
       return addressList.toString();
    }
-   
+
    @ManagedAttribute(description = "Size of the cluster in number of nodes")
    @Metric(displayName = "Cluster size", displayType = DisplayType.SUMMARY)
    public String getClusterSize() {
@@ -341,6 +357,12 @@
       return NumberFormat.getInstance().format(ration) + "%";
    }
 
+   @ManagedAttribute(description = "The average time spent in the transport layer, in milliseconds")
+   @Metric(displayName = "Average time spent in the transport layer", units = Units.MILLISECONDS, displayType = DisplayType.SUMMARY)
+   public long getAverageReplicationTime() {
+      return totalReplicationTime.get() / numReplications.get();
+   }
+
    // mainly for unit testing
    public void setTransport(Transport t) {
       this.t = t;



More information about the infinispan-commits mailing list