[infinispan-commits] Infinispan SVN: r1215 - in trunk/core/src: test/java/org/infinispan/jmx and 1 other directory.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Tue Nov 24 11:17:27 EST 2009
Author: galder.zamarreno at jboss.com
Date: 2009-11-24 11:17:26 -0500 (Tue, 24 Nov 2009)
New Revision: 1215
Modified:
trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
trunk/core/src/test/java/org/infinispan/jmx/RpcManagerMBeanTest.java
Log:
[ISPN-290] (RpcManagerImpl stats are confusing) Fixed.
Modified: trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java 2009-11-24 14:16:28 UTC (rev 1214)
+++ trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java 2009-11-24 16:17:26 UTC (rev 1215)
@@ -56,7 +56,6 @@
private final AtomicLong replicationCount = new AtomicLong(0);
private final AtomicLong replicationFailures = new AtomicLong(0);
private final AtomicLong totalReplicationTime = new AtomicLong(0);
- private final AtomicLong numReplications = new AtomicLong(0);
@ManagedAttribute(description = "Enables or disables the gathering of statistics by this component", writable = true)
boolean statisticsEnabled = false; // by default, don't gather statistics.
@@ -95,6 +94,8 @@
log.debug("We're the only member in the cluster; Don't invoke remotely.");
return Collections.emptyList();
} else {
+ long startTime = 0;
+ if (statisticsEnabled) startTime = System.currentTimeMillis();
try {
List<Response> result = t.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, responseFilter, stateTransferEnabled);
if (isStatisticsEnabled()) replicationCount.incrementAndGet();
@@ -110,6 +111,11 @@
log.error("unexpected error while replicating", th);
if (isStatisticsEnabled()) replicationFailures.incrementAndGet();
throw new CacheException(th);
+ } finally {
+ if (statisticsEnabled) {
+ long timeTaken = System.currentTimeMillis() - startTime;
+ totalReplicationTime.getAndAdd(timeTaken);
+ }
}
}
}
@@ -123,73 +129,62 @@
}
public void retrieveState(String cacheName, long timeout) throws StateTransferException {
- long startTime = 0;
- if (statisticsEnabled) startTime = System.currentTimeMillis();
+ if (t.isSupportStateTransfer()) {
+ long initialWaitTime = configuration.getStateRetrievalInitialRetryWaitTime();
+ int waitTimeIncreaseFactor = configuration.getStateRetrievalRetryWaitTimeIncreaseFactor();
+ int numRetries = configuration.getStateRetrievalNumRetries();
+ List<Address> members = t.getMembers();
+ if (members.size() < 2) {
+ if (log.isDebugEnabled())
+ log.debug("We're the only member in the cluster; no one to retrieve state from. Not doing anything!");
+ return;
+ }
- try {
- if (t.isSupportStateTransfer()) {
- long initialWaitTime = configuration.getStateRetrievalInitialRetryWaitTime();
- int waitTimeIncreaseFactor = configuration.getStateRetrievalRetryWaitTimeIncreaseFactor();
- int numRetries = configuration.getStateRetrievalNumRetries();
- List<Address> members = t.getMembers();
- if (members.size() < 2) {
- if (log.isDebugEnabled())
- log.debug("We're the only member in the cluster; no one to retrieve state from. Not doing anything!");
- return;
- }
+ boolean success = false;
- boolean success = false;
-
- try {
- long wait = initialWaitTime;
- outer:
- for (int i = 0; i < numRetries; i++) {
- for (Address member : members) {
- if (!member.equals(t.getAddress())) {
- try {
- if (log.isInfoEnabled()) log.info("Trying to fetch state from {0}", member);
- currentStateTransferSource = member;
- if (t.retrieveState(cacheName, member, timeout)) {
- if (log.isInfoEnabled())
- log.info("Successfully retrieved and applied state from {0}", member);
- success = true;
- break outer;
- }
- } catch (StateTransferException e) {
- if (log.isDebugEnabled()) log.debug("Error while fetching state from member " + member, e);
- } finally {
- currentStateTransferSource = null;
+ try {
+ long wait = initialWaitTime;
+ outer:
+ for (int i = 0; i < numRetries; i++) {
+ for (Address member : members) {
+ if (!member.equals(t.getAddress())) {
+ try {
+ if (log.isInfoEnabled()) log.info("Trying to fetch state from {0}", member);
+ currentStateTransferSource = member;
+ if (t.retrieveState(cacheName, member, timeout)) {
+ if (log.isInfoEnabled())
+ log.info("Successfully retrieved and applied state from {0}", member);
+ success = true;
+ break outer;
}
+ } catch (StateTransferException e) {
+ if (log.isDebugEnabled()) log.debug("Error while fetching state from member " + member, e);
+ } finally {
+ currentStateTransferSource = null;
}
}
+ }
- if (!success) {
- if (log.isWarnEnabled())
- log.warn("Could not find available peer for state, backing off and retrying");
+ if (!success) {
+ if (log.isWarnEnabled())
+ log.warn("Could not find available peer for state, backing off and retrying");
- try {
- Thread.sleep(wait *= waitTimeIncreaseFactor);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ try {
+ Thread.sleep(wait *= waitTimeIncreaseFactor);
}
-
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
- } finally {
- currentStateTransferSource = null;
- }
- if (!success) throw new StateTransferException("Unable to fetch state on startup");
- } else {
- throw new StateTransferException("Transport does not, or is not configured to, support state transfer. Please disable fetching state on startup, or reconfigure your transport.");
+ }
+ } finally {
+ currentStateTransferSource = null;
}
- } finally {
- if (statisticsEnabled) {
- long timeTaken = System.currentTimeMillis() - startTime;
- totalReplicationTime.getAndAdd(timeTaken);
- numReplications.getAndIncrement();
- }
+
+ if (!success) throw new StateTransferException("Unable to fetch state on startup");
+ } else {
+ throw new StateTransferException("Transport does not, or is not configured to, support state transfer. Please disable fetching state on startup, or reconfigure your transport.");
}
}
@@ -294,7 +289,6 @@
replicationCount.set(0);
replicationFailures.set(0);
totalReplicationTime.set(0);
- numReplications.set(0);
}
@ManagedAttribute(description = "Number of successful replications")
@@ -379,10 +373,10 @@
@ManagedAttribute(description = "The average time spent in the transport layer, in milliseconds")
@Metric(displayName = "Average time spent in the transport layer", units = Units.MILLISECONDS, displayType = DisplayType.SUMMARY)
public long getAverageReplicationTime() {
- if (numReplications.get() == 0) {
+ if (replicationCount.get() == 0) {
return 0;
}
- return totalReplicationTime.get() / numReplications.get();
+ return totalReplicationTime.get() / replicationCount.get();
}
// mainly for unit testing
Modified: trunk/core/src/test/java/org/infinispan/jmx/RpcManagerMBeanTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/jmx/RpcManagerMBeanTest.java 2009-11-24 14:16:28 UTC (rev 1214)
+++ trunk/core/src/test/java/org/infinispan/jmx/RpcManagerMBeanTest.java 2009-11-24 16:17:26 UTC (rev 1215)
@@ -23,6 +23,12 @@
import java.util.List;
/**
+ * TODO: For some reason, if you add to any of the methods below 'assert false;'
+ * Eclipse 3.5 and org.testng.eclipse_5.9.0.4.jar combination will indicate that
+ * the test passes correctly. Command line mvn execution does show the failure.
+ * Need to show this to Max either in the office or via a screencast to see how
+ * to debug it.
+ *
* @author Mircea.Markus at jboss.com
*/
@Test(groups = "functional", testName = "jmx.RpcManagerMBeanTest")
@@ -92,7 +98,6 @@
mBeanServer.setAttribute(rpcManager1, new Attribute("StatisticsEnabled", Boolean.TRUE));
}
-
@Test(dependsOnMethods = "testEnableJmxStats")
public void testSuccessRatio() throws Exception {
Cache cache1 = manager(0).getCache(cachename);
@@ -109,7 +114,10 @@
cache1.put("a2", "b2");
cache1.put("a3", "b3");
cache1.put("a4", "b4");
+ assert mBeanServer.getAttribute(rpcManager1, "ReplicationCount").equals(new Long(4));
assert mBeanServer.getAttribute(rpcManager1, "SuccessRatio").equals("100%");
+ assert !mBeanServer.getAttribute(rpcManager1, "AverageReplicationTime").equals(new Long(0));
+
RpcManagerImpl rpcManager = (RpcManagerImpl) TestingUtil.extractComponent(cache1, RpcManager.class);
Transport originalTransport = rpcManager.getTransport();
More information about the infinispan-commits
mailing list