[infinispan-commits] Infinispan SVN: r1082 - trunk/core/src/main/java/org/infinispan/remoting/rpc.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Fri Oct 30 10:41:41 EDT 2009
Author: manik.surtani at jboss.com
Date: 2009-10-30 10:41:41 -0400 (Fri, 30 Oct 2009)
New Revision: 1082
Modified:
trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
Log:
[ISPN-142] (More JMX information to be exposed)
Modified: trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java 2009-10-30 14:09:00 UTC (rev 1081)
+++ trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java 2009-10-30 14:41:41 UTC (rev 1082)
@@ -54,6 +54,9 @@
private Transport t;
private final AtomicLong replicationCount = new AtomicLong(0);
private final AtomicLong replicationFailures = new AtomicLong(0);
+ private final AtomicLong totalReplicationTime = new AtomicLong(0);
+ private final AtomicLong numReplications = new AtomicLong(0);
+
@ManagedAttribute(description = "Enables or disables the gathering of statistics by this component", writable = true)
boolean statisticsEnabled = false; // by default, don't gather statistics.
private volatile Address currentStateTransferSource;
@@ -119,62 +122,73 @@
}
public void retrieveState(String cacheName, long timeout) throws StateTransferException {
- if (t.isSupportStateTransfer()) {
- long initialWaitTime = configuration.getStateRetrievalInitialRetryWaitTime();
- int waitTimeIncreaseFactor = configuration.getStateRetrievalRetryWaitTimeIncreaseFactor();
- int numRetries = configuration.getStateRetrievalNumRetries();
- List<Address> members = t.getMembers();
- if (members.size() < 2) {
- if (log.isDebugEnabled())
- log.debug("We're the only member in the cluster; no one to retrieve state from. Not doing anything!");
- return;
- }
+ long startTime = 0;
+ if (statisticsEnabled) startTime = System.currentTimeMillis();
- boolean success = false;
+ try {
+ if (t.isSupportStateTransfer()) {
+ long initialWaitTime = configuration.getStateRetrievalInitialRetryWaitTime();
+ int waitTimeIncreaseFactor = configuration.getStateRetrievalRetryWaitTimeIncreaseFactor();
+ int numRetries = configuration.getStateRetrievalNumRetries();
+ List<Address> members = t.getMembers();
+ if (members.size() < 2) {
+ if (log.isDebugEnabled())
+ log.debug("We're the only member in the cluster; no one to retrieve state from. Not doing anything!");
+ return;
+ }
- try {
- long wait = initialWaitTime;
- outer:
- for (int i = 0; i < numRetries; i++) {
- for (Address member : members) {
- if (!member.equals(t.getAddress())) {
- try {
- if (log.isInfoEnabled()) log.info("Trying to fetch state from {0}", member);
- currentStateTransferSource = member;
- if (t.retrieveState(cacheName, member, timeout)) {
- if (log.isInfoEnabled())
- log.info("Successfully retrieved and applied state from {0}", member);
- success = true;
- break outer;
+ boolean success = false;
+
+ try {
+ long wait = initialWaitTime;
+ outer:
+ for (int i = 0; i < numRetries; i++) {
+ for (Address member : members) {
+ if (!member.equals(t.getAddress())) {
+ try {
+ if (log.isInfoEnabled()) log.info("Trying to fetch state from {0}", member);
+ currentStateTransferSource = member;
+ if (t.retrieveState(cacheName, member, timeout)) {
+ if (log.isInfoEnabled())
+ log.info("Successfully retrieved and applied state from {0}", member);
+ success = true;
+ break outer;
+ }
+ } catch (StateTransferException e) {
+ if (log.isDebugEnabled()) log.debug("Error while fetching state from member " + member, e);
+ } finally {
+ currentStateTransferSource = null;
}
- } catch (StateTransferException e) {
- if (log.isDebugEnabled()) log.debug("Error while fetching state from member " + member, e);
- } finally {
- currentStateTransferSource = null;
}
}
- }
- if (!success) {
- if (log.isWarnEnabled())
- log.warn("Could not find available peer for state, backing off and retrying");
+ if (!success) {
+ if (log.isWarnEnabled())
+ log.warn("Could not find available peer for state, backing off and retrying");
- try {
- Thread.sleep(wait *= waitTimeIncreaseFactor);
+ try {
+ Thread.sleep(wait *= waitTimeIncreaseFactor);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+
}
+ } finally {
+ currentStateTransferSource = null;
+ }
- }
- } finally {
- currentStateTransferSource = null;
+ if (!success) throw new StateTransferException("Unable to fetch state on startup");
+ } else {
+ throw new StateTransferException("Transport does not, or is not configured to, support state transfer. Please disable fetching state on startup, or reconfigure your transport.");
}
-
- if (!success) throw new StateTransferException("Unable to fetch state on startup");
- } else {
- throw new StateTransferException("Transport does not, or is not configured to, support state transfer. Please disable fetching state on startup, or reconfigure your transport.");
+ } finally {
+ if (statisticsEnabled) {
+ long timeTaken = System.currentTimeMillis() - startTime;
+ totalReplicationTime.getAndAdd(timeTaken);
+ numReplications.getAndIncrement();
+ }
}
}
@@ -278,6 +292,8 @@
public void resetStatistics() {
replicationCount.set(0);
replicationFailures.set(0);
+ totalReplicationTime.set(0);
+ numReplications.set(0);
}
@ManagedAttribute(description = "Number of successful replications")
@@ -323,7 +339,7 @@
List<Address> addressList = t.getMembers();
return addressList.toString();
}
-
+
@ManagedAttribute(description = "Size of the cluster in number of nodes")
@Metric(displayName = "Cluster size", displayType = DisplayType.SUMMARY)
public String getClusterSize() {
@@ -341,6 +357,12 @@
return NumberFormat.getInstance().format(ration) + "%";
}
+ @ManagedAttribute(description = "The average time spent in the transport layer, in milliseconds")
+ @Metric(displayName = "Average time spent in the transport layer", units = Units.MILLISECONDS, displayType = DisplayType.SUMMARY)
+ public long getAverageReplicationTime() {
+ return totalReplicationTime.get() / numReplications.get();
+ }
+
// mainly for unit testing
public void setTransport(Transport t) {
this.t = t;
More information about the infinispan-commits
mailing list