[jbosscache-commits] JBoss Cache SVN: r4886 - in cache-bench-fwk/trunk: cache-products/jbosscache-2.0.0/src/org/cachebench/cachewrappers and 6 other directories.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Wed Dec 19 16:51:58 EST 2007
Author: mircea.markus
Date: 2007-12-19 16:51:57 -0500 (Wed, 19 Dec 2007)
New Revision: 4886
Added:
cache-bench-fwk/trunk/src/org/cachebench/ClusterConfigurationCheck.java
cache-bench-fwk/trunk/src/org/cachebench/cluster/
cache-bench-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java
cache-bench-fwk/trunk/src/org/cachebench/cluster/Receiver.java
cache-bench-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java
cache-bench-fwk/trunk/src/org/cachebench/cluster/Transport.java
cache-bench-fwk/trunk/src/org/cachebench/config/CacheWarmupConfig.java
cache-bench-fwk/trunk/src/org/cachebench/config/ClusterConfig.java
cache-bench-fwk/trunk/src/org/cachebench/config/ConfigBuilder.java
cache-bench-fwk/trunk/src/org/cachebench/config/NodeAddress.java
cache-bench-fwk/trunk/src/org/cachebench/warmup/
cache-bench-fwk/trunk/src/org/cachebench/warmup/CacheWarmup.java
cache-bench-fwk/trunk/src/org/cachebench/warmup/NoCacheWarmup.java
cache-bench-fwk/trunk/src/org/cachebench/warmup/PutGetCacheWarmup.java
Modified:
cache-bench-fwk/trunk/build.xml
cache-bench-fwk/trunk/cache-products/jbosscache-2.0.0/src/org/cachebench/cachewrappers/JBossCache200Wrapper.java
cache-bench-fwk/trunk/conf/cachebench.xml
cache-bench-fwk/trunk/src/org/cachebench/CacheBenchmarkRunner.java
cache-bench-fwk/trunk/src/org/cachebench/config/Configuration.java
cache-bench-fwk/trunk/src/org/cachebench/config/TestCase.java
cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/AbstractReportGenerator.java
cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/ClusterReportGenerator.java
cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/ReportGenerator.java
Log:
enhance adding barriers and warmup
Modified: cache-bench-fwk/trunk/build.xml
===================================================================
--- cache-bench-fwk/trunk/build.xml 2007-12-19 17:49:35 UTC (rev 4885)
+++ cache-bench-fwk/trunk/build.xml 2007-12-19 21:51:57 UTC (rev 4886)
@@ -42,9 +42,9 @@
<!-- Module Framework -->
<dirname property="module.framework.basedir" file="${ant.file}"/>
+
+ <property environment="env"/>
-
-
<property name="compiler.args.framework" value="${compiler.args}"/>
<property name="framework.output.dir" value="${module.framework.basedir}/classes/production/Framework"/>
@@ -89,15 +89,15 @@
all - builds the entire project, including plugins for all
cache products in /cache-products/. Output classes
in /classes/.
- runMaster - runs the CacheBenchFwk in "master" mode.
- runSlave - runs the CacheBenchFwk in "slave" mode.
+ runNode - runs the CacheBenchFwk. Depending on the number of nodes in the cluster(configured in cachebenchmark.xml),
+ it will wait for all configured nodes to be launched before starting the tests
-For either runMaster or runSlave, make sure you have looked at:
+Make sure you have looked at:
1) 'build.properties' and have set JVM params (such as heap size,
etc.) as necessary.
- 2) '/conf/cachebench.xml' to configure the tests you want run and
- output file for reports.
+ 2) '/conf/cachebench.xml' to configure the tests you want run, the
+ nodes in the cluster if the case and the output file for reports.
3) '/conf/log4j.xml' for logging settings (make sure these aren't
very verbose as it can skew tests).
4) Provided one of the plugins as a system property. This is
@@ -109,14 +109,17 @@
-Dorg.cachebench.plugins.jbosscache2=true -Dbind.address=${MYTESTIP_1}
-Dorg.cachebench.pluins.ehcache=true -Dbind.address=${MYTESTIP_1}
-Dorg.cachebench.plugins.coherence=true -Dtangosol.coherence.localhost=${MYTESTIP_1}
-
// WORK IN PROGRESS
- -Dorg.cachebench.plugins.terracotta=true
+ -Dorg.cachebench.plugins.terracotta=true
+ when running ant. Note that only one can be set at any time.
+ 5) Make sure you set up an correct NODE_INDEX environment property
+ indicating the index of the node before starting it. E.g. if
+ we have 3 nodes each process should have an environment
+ variable named NODE_INDEX, having values in the range 0-2,
+ each node having an distict value.
+ see cachebench.xml\cachebench\cluster for more details
-
- when running ant. Note that only one can be set at any time.
-
-NB: NEEDS Ant >= 1.7.0
+ NB: NEEDS Ant >= 1.7.0
</echo>
</target>
@@ -531,27 +534,32 @@
<!--<pathelement location="./cache-products/terracotta-2.4.8/lib/bootstrap/boot.jar" />-->
<!--</bootclasspath>-->
<sysproperty key="bind.address" value="${bind.address}" />
- <sysproperty key="tangosol.coherence.localhost" value="${tangosol.coherence.localhost}" />
- <sysproperty key="cluterReportGenerator" value="${cluterReportGenerator}" />
- <sysproperty key="org.cachebench.debug" value="${org.cachebench.debug}" />
+ <sysproperty key="currentIndex" value="${env.NODE_INDEX}" />
+ <sysproperty key="tangosol.coherence.localhost" value="${tangosol.coherence.localhost}" />
+ <sysproperty key="org.cachebench.debug" value="${org.cachebench.debug}" />
<sysproperty key="java.net.preferIPv4Stack" value="${java.net.preferIPv4Stack}" />
<classpath refid="framework.module.classpath" />
<classpath refid="${plugin.classpath.ref}"/>
</java>
</target>
- <target name="runSlave">
+ <target name="runNode">
<antcall target="run">
<param name="runtime.classname" value="org.cachebench.CacheBenchmarkRunner" />
- <param name="cluterReportGenerator" value="false"/>
</antcall>
</target>
- <target name="runMaster">
- <antcall target="run">
- <param name="runtime.classname" value="org.cachebench.CacheBenchmarkRunner" />
- <param name="cluterReportGenerator" value="true"/>
- </antcall>
- </target>
-
+ <target name="checkClusterAddresses" depends="compile.module.framework.production" description="Check whether the cluster config is a valid one">
+ <!--<antcall target="run">-->
+ <!--<param name="runtime.classname" value="org.cachebench.ClusterConfigurationCheck" />-->
+ <!--</antcall>-->
+
+ <java classname="org.cachebench.ClusterConfigurationCheck" clonevm="true" fork="true">
+ <classpath>
+ <pathelement location="${framework.output.dir}"/>
+ <pathelement location="${framework.testoutput.dir}"/>
+ </classpath>
+ <classpath refid="framework.module.classpath" />
+ </java>
+ </target>
</project>
Modified: cache-bench-fwk/trunk/cache-products/jbosscache-2.0.0/src/org/cachebench/cachewrappers/JBossCache200Wrapper.java
===================================================================
--- cache-bench-fwk/trunk/cache-products/jbosscache-2.0.0/src/org/cachebench/cachewrappers/JBossCache200Wrapper.java 2007-12-19 17:49:35 UTC (rev 4885)
+++ cache-bench-fwk/trunk/cache-products/jbosscache-2.0.0/src/org/cachebench/cachewrappers/JBossCache200Wrapper.java 2007-12-19 21:51:57 UTC (rev 4886)
@@ -40,7 +40,8 @@
public void empty() throws Exception
{
- cache.removeNode(Fqn.ROOT);
+ //not removing root because there it fails with buddy replication: http://jira.jboss.com/jira/browse/JBCACHE-1241
+ cache.removeNode(new Fqn("test"));
}
public int getNumMembers()
Modified: cache-bench-fwk/trunk/conf/cachebench.xml
===================================================================
--- cache-bench-fwk/trunk/conf/cachebench.xml 2007-12-19 17:49:35 UTC (rev 4885)
+++ cache-bench-fwk/trunk/conf/cachebench.xml 2007-12-19 21:51:57 UTC (rev 4886)
@@ -7,16 +7,32 @@
emptyCacheBetweenTests - again, use if you're running out of mem.
numThreads - the number of executor threads to use to perform the required number of operations.
-->
-<cachebench sampleSize="10000" gcBetweenTestsEnabled="true" sleepBetweenTests="1000" emptyCacheBetweenTests="true" numThreads="10">
+<cachebench sampleSize="2000" gcBetweenTestsEnabled="true" sleepBetweenTests="1000" emptyCacheBetweenTests="true" numThreads="10">
- <!-- Each testcase represents either a single configuration or a cacheing product.
-
- For example, WhirlyCache would be one test case. JBossCache-standalone could be another, JBossCache-replicated could be yet another
-
- See the javadoc for org.cachebench.CacheWrapper for the cacheWrapper property
- -->
<!--
+ There are various steps we want to start executing at once: e.g. all the tests should start at the same time,
+ otherwise (part of) cluster operations do not replicate on all instances. We configure here one socket addresses
+ on each tested node, so that the framework can communicate with all the nodes and sync whenever needed.
+ - bindAddress can be used on a multi-homed host for a ServerSocket that will only accept connect requests to one
+ of its addresses
+ - for each node instance socket address is specified. You can make sure that addresses are available by using
+ checkClusterAddresses ant target
+ -->
+ <cluster bindAddress="127.0.0.1">
+ <member host="127.0.0.1" port="7800"/>
+ <member host="127.0.0.1" port="7801"/>
+ <member host="127.0.0.1" port="7802"/>
+ <member host="127.0.0.1" port="7803"/>
+ <member host="127.0.0.1" port="7804"/>
+ </cluster>
+
+ <!-- Each testcase represents either a single configuration or a cacheing product.
+ For example, WhirlyCache would be one test case. JBossCache-standalone could be another, JBossCache-replicated could be yet another
+ See the javadoc for org.cachebench.CacheWrapper for the cacheWrapper property
+ -->
+
+ <!--
Note that if you are using REPLICATED tests, using the "ant runSlave" command, you can only run one test at a time.
Otherwise, if you are using the test in standalone mode (testing a LOCAL cache), you can add multiple "testcase" elements.
-->
@@ -33,6 +49,14 @@
<!--<testcase name="JBossCache2x-Pessimistic-REPL_ASYNC" cacheWrapper="org.cachebench.cachewrappers.JBossCache200Wrapper">-->
<testcase name="JBossCache2.0" cacheWrapper="org.cachebench.cachewrappers.JBossCache200Wrapper">
+
+ <!-- warms up the cache by doing operation on it; simulates a real-world environment. If no warmup is needed use
+ org.cachebench.warmup.NoCacheWarmup
+ -->
+ <warmup warmupClass="org.cachebench.warmup.PutGetCacheWarmup">
+ <param name="operationCount" value="100"/>
+ </warmup>
+
<!--
* The "name" attrib is just used for display in the reports.
* You can write your own custom testClass.
@@ -65,14 +89,10 @@
<!--
Available generators are: CSVReportGenerator and ClusterReportGenerator.
See javadocs for org.cachebench.reportgenerators.ReportGenerator for writing your
- own report generators such as XML generators, graphic generators, etc
+ own report generators such as XML generators, graphic generators, etc
-->
<!-- The CSV report generated can be plugged in to a spreadsheet to generate graphs, cluster size is
needed so that the . -->
- <report outputFile="performance.csv" generator="org.cachebench.reportgenerators.ClusterReportGenerator">
- <param name="clusterSize" value="3"/>
- <param name="masterHost" value="127.0.0.1"/>
- <param name="masterPort" value="54334"/>
- </report>
+ <report outputFile="performance.csv" generator="org.cachebench.reportgenerators.ClusterReportGenerator"/>
</cachebench>
Modified: cache-bench-fwk/trunk/src/org/cachebench/CacheBenchmarkRunner.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/CacheBenchmarkRunner.java 2007-12-19 17:49:35 UTC (rev 4885)
+++ cache-bench-fwk/trunk/src/org/cachebench/CacheBenchmarkRunner.java 2007-12-19 21:51:57 UTC (rev 4886)
@@ -1,19 +1,16 @@
package org.cachebench;
-import org.apache.commons.digester.Digester;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.cachebench.config.Configuration;
-import org.cachebench.config.Report;
-import org.cachebench.config.TestCase;
-import org.cachebench.config.TestConfig;
+import org.cachebench.config.*;
import org.cachebench.reportgenerators.ReportGenerator;
import org.cachebench.tests.CacheTest;
+import org.cachebench.cluster.ClusterBarrier;
import org.cachebench.utils.Instantiator;
+import org.cachebench.warmup.CacheWarmup;
import java.io.File;
-import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Date;
@@ -56,13 +53,9 @@
private CacheBenchmarkRunner(String s)
{
// first, try and find the configuration on the filesystem.
- URL confFile = findOnFS(s);
+ URL confFile = ConfigBuilder.findConfigFile(s);
if (confFile == null)
{
- confFile = findInClasspath(s);
- }
- if (confFile == null)
- {
logger.warn("Unable to locate a configuration file; Application terminated");
}
else
@@ -71,7 +64,7 @@
logger.debug("Parsing configuration");
try
{
- conf = parseConfiguration(confFile);
+ conf = ConfigBuilder.parseConfiguration(confFile);
logger.info("Starting Benchmarking....");
List<TestResult> results = runTests(); // Run the tests from this point.
if (results != null && results.size() != 0)
@@ -86,43 +79,12 @@
}
catch (Exception e)
{
- logger.warn("Unable to parse configuration file " + confFile + ". Application terminated");
+ logger.warn("Unable to parse configuration file " + confFile + ". Application terminated", e);
errorLogger.fatal("Unable to parse configuration file " + confFile, e);
}
}
}
- private Configuration parseConfiguration(URL url) throws Exception
- {
- Digester digester = new Digester();
- // set up the digester rules.
- digester.setValidating(false);
- digester.addObjectCreate("cachebench", "org.cachebench.config.Configuration");
- digester.addSetProperties("cachebench");
- digester.addObjectCreate("cachebench/testcase", "org.cachebench.config.TestCase");
- digester.addSetProperties("cachebench/testcase");
-
- digester.addObjectCreate("cachebench/testcase/test", "org.cachebench.config.TestConfig");
- digester.addSetProperties("cachebench/testcase/test");
- digester.addSetNext("cachebench/testcase/test", "addTest", "org.cachebench.config.TestConfig");
-
- digester.addObjectCreate("cachebench/testcase/param", "org.cachebench.config.NVPair");
- digester.addSetProperties("cachebench/testcase/param");
-
- digester.addSetNext("cachebench/testcase/param", "addParam", "org.cachebench.config.NVPair");
- digester.addSetNext("cachebench/testcase", "addTestCase", "org.cachebench.config.TestCase");
-
- digester.addObjectCreate("cachebench/report", "org.cachebench.config.Report");
- digester.addSetProperties("cachebench/report");
-
- digester.addObjectCreate("cachebench/report/param", "org.cachebench.config.NVPair");
- digester.addSetProperties("cachebench/report/param");
- digester.addSetNext("cachebench/report/param", "addParam", "org.cachebench.config.NVPair");
-
- digester.addSetNext("cachebench/report", "addReport", "org.cachebench.config.Report");
- return (Configuration) digester.parse(url.openStream());
- }
-
/**
* Executes each test case and returns the result.
*
@@ -140,8 +102,14 @@
if (cache != null)
{
cache.init(test.getParams());
+ barrier("BEFORE_WARMUP");
+ warmupCache(test, cache);
+ barrier("AFTER_WARMUP");
+
+ //now start testing
cache.setUp();
List<TestResult> resultsForCache = runTestsOnCache(cache, test);
+ barrier("AFTER_TEST_RUN");
shutdownCache(cache);
results.addAll(resultsForCache);
}
@@ -156,6 +124,25 @@
return results;
}
+ private void barrier(String messageName) throws Exception
+ {
+ ClusterBarrier barrier = new ClusterBarrier();
+ logger.trace("Using following cluster config: " + conf.getClusterConfig());
+ barrier.setConfig(conf.getClusterConfig());
+ barrier.setAcknowledge(true);
+ barrier.barrier(messageName);
+ logger.trace("Barrier finished");
+ }
+
+ private void warmupCache(TestCase test, CacheWrapper cache) throws Exception
+ {
+ CacheWarmupConfig warmupConfig = test.getCacheWarmupConfig();
+ logger.trace("Warmup config is: " + warmupConfig);
+ CacheWarmup warmup = (CacheWarmup) Instantiator.getInstance().createClass(warmupConfig.getWarmupClass());
+ warmup.setConfigParams(warmupConfig.getConfigParams());
+ warmup.warmup(cache);
+ }
+
/**
* Peforms the necessary external tasks for cache benchmarking.
* These external tasks are defined in the cachebench.xml and would
@@ -184,9 +171,9 @@
}
catch (Exception e)
{
- // The Empty process of the cache failed. Add a foot note for the TestResult here.
- testResult.setFootNote("The Cache Empty process failed after test case: " + testResult.getTestName() + " : " + testResult.getTestType());
- errorLogger.error("The Cache Empty process failed after test case : " + testResult.getTestName() + ", " + testResult.getTestType(), e);
+ // The Empty barrier of the cache failed. Add a foot note for the TestResult here.
+ testResult.setFootNote("The Cache Empty barrier failed after test case: " + testResult.getTestName() + " : " + testResult.getTestType());
+ errorLogger.error("The Cache Empty barrier failed after test case : " + testResult.getTestName() + ", " + testResult.getTestType(), e);
}
return testResult;
@@ -241,6 +228,7 @@
generator.setConfigParams(report.getParams());
generator.setResults(results);
generator.setOutputFile(new File(report.getOutputFile()));
+ generator.setClusterConfig(conf.getClusterConfig());
generator.generate();
logger.info("Report Generation Completed");
}
@@ -259,37 +247,6 @@
}
}
- /**
- * Util method to locate a resource on the filesystem as a URL
- *
- * @param filename
- * @return The URL object of the file
- */
- private URL findOnFS(String filename)
- {
- File f = new File(filename);
- try
- {
- if (f.exists()) return f.toURL();
- }
- catch (MalformedURLException mue)
- {
- // bad URL
- }
- return null;
- }
-
- /**
- * Util method to locate a resource in your classpath
- *
- * @param filename
- * @return The URL object of the file
- */
- private URL findInClasspath(String filename)
- {
- return getClass().getClassLoader().getResource(filename);
- }
-
private CacheWrapper getCacheWrapperInstance(TestCase testCaseClass)
{
CacheWrapper cache = null;
Added: cache-bench-fwk/trunk/src/org/cachebench/ClusterConfigurationCheck.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/ClusterConfigurationCheck.java (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/ClusterConfigurationCheck.java 2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,63 @@
+package org.cachebench;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.cachebench.config.ClusterConfig;
+import org.cachebench.config.ConfigBuilder;
+
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URL;
+import java.io.IOException;
+
+/**
+ * Loads config file and checks whether the specified ports are available.
+ * Can be used from a script, following exit codes are used:
+ * <pre>
+ * 0 - all ports are available
+ * 1 - config file not found
+ * 2 - a address is in use
+ * </pre>
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 2.2
+ */
+public class ClusterConfigurationCheck
+{
+
+ private static Log log = LogFactory.getLog("ClusterConfigurationCheck");
+
+ public static void main(String[] args) throws Exception
+ {
+ String configFile = "cachebench.xml";
+ if (args.length >= 1)
+ {
+ configFile = args[0];
+ }
+ URL configUrl = ConfigBuilder.findConfigFile(configFile);
+ if (configUrl == null)
+ {
+ log.info("Could not find the config file, exiting with code 1");
+ System.exit(1);
+ }
+ ClusterConfig config = ConfigBuilder.parseConfiguration(configUrl).getClusterConfig();
+ boolean areSuspects = false;
+ for (InetSocketAddress address : config.getMemberAddresses())
+ {
+ try
+ {
+ Socket sock = new Socket(address.getHostName(), address.getPort());
+ areSuspects = true;
+ log.info("Managed to connect to " + address);
+ } catch (IOException e)
+ {
+ log.trace("Connection to : " + address + " failed; expected behavior");
+ }
+ }
+ if (!areSuspects)
+ {
+ log.info("Success (could not establish any connection)");
+ }
+ System.exit(areSuspects ? 2 : 0);
+ }
+}
Added: cache-bench-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java 2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,154 @@
+package org.cachebench.cluster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.cachebench.config.ClusterConfig;
+
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Use for making the nodes in the cluster to hold until ALL nodes reached the barrier.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public class ClusterBarrier implements Receiver
+{
+ private static Log log = LogFactory.getLog(ClusterBarrier.class);
+
+ private ClusterConfig config;
+ public final Map<SocketAddress, Object> receivedMessages = new HashMap<SocketAddress, Object>();
+ private Transport transport;
+ private Object message;
+ private int numMembers;
+ private boolean acknowledge;
+ private static final String ACK = "_ACK";
+
+
+ /**
+ * Returns the messages sent between the nodes in the cluster.
+ */
+ public Map<SocketAddress, Object> getReceivedMessages()
+ {
+ return receivedMessages;
+ }
+
+ /**
+ * Setting this to true would tide up barrier delays.
+ */
+ public void setAcknowledge(boolean acknowledge)
+ {
+ this.acknowledge = acknowledge;
+ }
+
+ /**
+ * Message sent between cluster members to ack on the barrier.
+ */
+ public void barrier(Object message) throws Exception
+ {
+ log.trace("Started processing a message cluster, message='" + message + "'");
+ receivedMessages.clear();
+ this.message = message;
+ transport = new TcpTransport();
+ numMembers = config.getClusterSize();
+ transport.create(config);
+ transport.setReceiver(this);
+ transport.start();
+ log.trace("Transport started, local address is: " + transport.getLocalAddress());
+ log.trace("Waiting for " + numMembers + " member(s) to join");
+ waitForAllMemebers();
+ transport.stop();
+ //just to make sure all other nodes closed their resources
+ //needed e.g. when a new barrier is immediately initiated on this barrier which might connect
+ //to a port on a barrier that is about to close, so messages sent by new barrier might be lost
+ // a nicer way to implement this is to accept connections only if the sender uses same barrier name
+ Thread.sleep(2000);
+ }
+
+ private void waitForAllMemebers() throws Exception
+ {
+ boolean receivedAllMessages = false;
+ while (!receivedAllMessages)
+ {
+ synchronized (receivedMessages)
+ {
+ receivedAllMessages = receivedMessages.size() >= numMembers;
+ if (!receivedAllMessages)
+ {
+ receivedMessages.wait(2000);
+ }
+ }
+ log.trace("sending message " + message + ", expecting " + getMissingMembersCount() + " member(s)");
+ transport.send(null, message);
+ if (acknowledge)
+ {
+ log.trace("Send ack also");
+ transport.send(null, getAcknowledgeMessage(message));
+ }
+ }
+ }
+
+ public void receive(SocketAddress sender, Object payload) throws Exception
+ {
+ log.trace("Received '" + payload + "' from " + sender + " still expecting " + getMissingMembersCount() + " member(s)");
+ if (payload == null)
+ {
+ log.warn("payload is incorrect (sender=" + sender + "): " + payload);
+ return;
+ }
+ if (acknowledge && !isAcknowledgeMessage(payload, message))
+ {
+ log.trace("Sending ack, still expecting " + getMissingMembersCount() + " members.");
+ transport.send(null, getAcknowledgeMessage(message));
+ return;
+ }
+
+ //we are here if either no ack or ack the message is an ack message
+ synchronized (this.receivedMessages)
+ {
+ if (!this.receivedMessages.containsKey(sender))
+ {
+ this.receivedMessages.put(sender, getMessage(payload));
+ int expected = getMissingMembersCount();
+ log.trace("Sender " + sender + " registered, still waiting for " + expected + " member(s)");
+ this.receivedMessages.notifyAll();
+ }
+ }
+ }
+
+ private int getMissingMembersCount()
+ {
+ return numMembers - receivedMessages.size();
+ }
+
+ private Object getMessage(Object payload)
+ {
+ if (!acknowledge)
+ {
+ return message;
+ }
+ String payloadStr = payload.toString();
+ int endIndex = payloadStr.length() - ACK.length();
+ return payloadStr.substring(0, endIndex);
+ }
+
+ private String getAcknowledgeMessage(Object message)
+ {
+ return message.toString() + ACK;
+ }
+
+ private boolean isAcknowledgeMessage(Object payload, Object message)
+ {
+ boolean result;
+ String payloadStr = payload.toString();
+ result = payloadStr.equals(getAcknowledgeMessage(message));
+ log.trace("Is acknowledge? " + result);
+ return result;
+ }
+
+ public void setConfig(ClusterConfig config)
+ {
+ this.config = config;
+ }
+}
Added: cache-bench-fwk/trunk/src/org/cachebench/cluster/Receiver.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/cluster/Receiver.java (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/cluster/Receiver.java 2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,12 @@
+package org.cachebench.cluster;
+
+import java.net.SocketAddress;
+
+/**
+ * @author Bela Ban Jan 22
+ * @author 2004
+ * @version $Id: Receiver.java,v 1.1 2004/01/23 00:08:31 belaban Exp $
+ */
+public interface Receiver {
+ void receive(SocketAddress sender, Object payload) throws Exception;
+}
Added: cache-bench-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java 2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,389 @@
+package org.cachebench.cluster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.cachebench.config.ClusterConfig;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * note: this code is copyed and adapted from jgroups test.
+ *
+ * @author Bela Ban Jan 22
+ * @author 2004
+ * @version $Id: TcpTransport.java,v 1.16 2006/12/19 08:51:46 belaban Exp $
+ */
+public class TcpTransport implements Transport
+{
+
+ private static Log log = LogFactory.getLog(TcpTransport.class);
+
+ Receiver receiver = null;
+ ClusterConfig config = null;
+ int max_receiver_buffer_size = 500000;
+ int max_send_buffer_size = 500000;
+ List<InetSocketAddress> nodes;
+ ConnectionTable connectionTable;
+ int startPort = 7800;
+ ServerSocket srvSock = null;
+ InetAddress bindAddr = null;
+ SocketAddress localAddr = null;
+ List receivers = new ArrayList();
+ private ServerSocket server;
+ private boolean isStoping;
+
+
+ public TcpTransport()
+ {
+ }
+
+ public Object getLocalAddress()
+ {
+ return localAddr;
+ }
+
+ public void create(ClusterConfig clusterConfig) throws Exception
+ {
+ this.config = clusterConfig;
+ String tmp;
+ startPort = config.getPortForThisNode();
+ String bindAddrStr = config.getBindAddress();
+ if (bindAddrStr != null)
+ {
+ bindAddr = InetAddress.getByName(bindAddrStr);
+ }
+ else
+ {
+ bindAddr = InetAddress.getLocalHost();
+ }
+ nodes = clusterConfig.getMemberAddresses();
+ connectionTable = new ConnectionTable(nodes);
+ }
+
+
+ public void start() throws Exception
+ {
+ srvSock = createServerSocket();
+ log.trace("ServerSock created, listening on: "+ srvSock.getLocalSocketAddress());
+ localAddr = new InetSocketAddress(srvSock.getInetAddress(), srvSock.getLocalPort());
+ connectionTable.init();
+
+ // accept connections and start 1 Receiver per connection
+ Thread acceptor = new Thread()
+ {
+ public void run()
+ {
+ while (true)
+ {
+ try
+ {
+ Socket s = srvSock.accept();
+ log.trace("Accepted client " + s.getRemoteSocketAddress());
+ ReceiverThread r = new ReceiverThread(s);
+ r.setDaemon(true);
+ receivers.add(r);
+ r.start();
+ }
+ catch (Exception ex)
+ {
+ if (!isStoping)
+ {
+ log.warn("Exception whilst accepting new threads", ex);
+ }
+ break;
+ }
+ }
+ }
+ };
+ acceptor.setDaemon(true);
+ acceptor.start();
+ }
+
+ private ServerSocket createServerSocket()
+ {
+ int start_port1 = startPort;
+ server = null;
+
+ while (true)
+ {
+ try
+ {
+ server = new ServerSocket(start_port1, 50, bindAddr);
+ }
+ catch (BindException bindEx)
+ {
+ log.trace("Binding exception, most likely port " + start_port1 + " is in use. Trying next value. Error:"
+ + bindEx.getMessage());
+ start_port1++;
+ continue;
+ }
+ catch (IOException ioEx)
+ {
+ log.warn("An exception appeared whilst trying to create server socket on port " + start_port1 + ", error:"
+ + ioEx.getMessage());
+ }
+ break;
+ }
+ return server;
+ }
+
+ public void stop()
+ {
+ try
+ {
+ isStoping = true;
+ server.close();
+ log.trace("Successfully closed server socket " + server);
+ } catch (IOException e)
+ {
+ log.warn("Failed to close servet socket for " + server + ", error is " + e.getMessage());
+ }
+ connectionTable.close();
+ for (Iterator it = receivers.iterator(); it.hasNext();)
+ {
+ ReceiverThread thread = (ReceiverThread) it.next();
+ thread.stopThread();
+ }
+ }
+
+ public void destroy()
+ {
+ ;
+ }
+
+ public void setReceiver(Receiver r)
+ {
+ this.receiver = r;
+ }
+
+ public Map dumpStats()
+ {
+ return null;
+ }
+
+ public void send(Object destination, Object payload) throws Exception
+ {
+ if (destination != null)
+ throw new Exception("TcpTransport.send(): unicasts not supported");
+ connectionTable.writeMessage(payload);
+ }
+
+
+ class ConnectionTable
+ {
+ /**
+ * List<InetSocketAddress>
+ */
+ List myNodes;
+ final Connection[] connections;
+
+ ConnectionTable(List nodes) throws Exception
+ {
+ this.myNodes = nodes;
+ connections = new Connection[nodes.size()];
+ }
+
+
+ void init() throws Exception
+ {
+ int i = 0;
+ log.trace("Nodes is " + myNodes);
+ for (Iterator it = myNodes.iterator(); it.hasNext();)
+ {
+ InetSocketAddress addr = (InetSocketAddress) it.next();
+ if (connections[i] == null)
+ {
+ try
+ {
+ connections[i] = new Connection(addr);
+ connections[i].createSocket();
+ }
+ catch (ConnectException connect_ex)
+ {
+ log.trace("-- failed to connect to " + addr);
+ }
+ catch (Exception all_others)
+ {
+ throw all_others;
+ }
+ }
+ i++;
+ }
+ }
+
+ // todo: parallelize
+ void writeMessage(Object msg) throws Exception
+ {
+ int recieversCount = 0;
+ for (int i = 0; i < connections.length; i++)
+ {
+ Connection c = connections[i];
+ if (c != null)
+ {
+ try
+ {
+ c.writeMessage(msg);
+ recieversCount ++;
+ }
+ catch (Exception e)
+ {
+ // System.err.println("failed sending msg on " + c);
+ }
+ }
+ }
+ log.trace("Message successfully sent to " + recieversCount + "/" + connections.length);
+ }
+
+ void close()
+ {
+ for (int i = 0; i < connections.length; i++)
+ {
+ Connection c = connections[i];
+ if (c != null)
+ c.close();
+ }
+ }
+
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer();
+ for (Iterator it = myNodes.iterator(); it.hasNext();)
+ {
+ sb.append(it.next()).append(' ');
+ }
+ return sb.toString();
+ }
+ }
+
+ class Connection
+ {
+ Socket sock = null;
+ DataOutputStream out;
+ InetSocketAddress to;
+ final Object mutex = new Object();
+
+ Connection(InetSocketAddress addr)
+ {
+ this.to = addr;
+ }
+
+ void createSocket() throws IOException
+ {
+ sock = new Socket(to.getAddress(), to.getPort());
+ sock.setSendBufferSize(max_send_buffer_size);
+ sock.setReceiveBufferSize(max_receiver_buffer_size);
+ out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream()));
+ log.trace("-- connected to " + to + ". Local address is " + sock.getLocalSocketAddress());
+ }
+
+ void writeMessage(Object msg) throws Exception
+ {
+ synchronized (mutex)
+ {
+ if (sock == null)
+ {
+ createSocket();
+ }
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ oos.writeObject(msg);
+ }
+ out.flush();
+ }
+
+
+ void close()
+ {
+ try
+ {
+ out.flush();
+ sock.close();
+ }
+ catch (Exception ex)
+ {
+ }
+ }
+
+ public String toString()
+ {
+ return "Connection from " + localAddr + " to " + to;
+ }
+ }
+
+
+ class ReceiverThread extends Thread
+ {
+ Socket sock;
+ DataInputStream in;
+ SocketAddress remote;
+
+ ReceiverThread(Socket sock) throws Exception
+ {
+ this.sock = sock;
+ // sock.setSoTimeout(5000);
+ in = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
+ remote = sock.getRemoteSocketAddress();
+ }
+
+ public void run()
+ {
+ while (sock != null)
+ {
+ try
+ {
+ Object message = new ObjectInputStream(in).readObject();
+ if (receiver != null)
+ receiver.receive(remote, message);
+ }
+ catch (EOFException eof)
+ {
+ break;
+ }
+ catch (Exception ex)
+ {
+ break;
+ }
+ }
+ log.trace("-- receiver thread for " + remote + " terminated");
+ }
+
+ void stopThread()
+ {
+ try
+ {
+ sock.close();
+ sock = null;
+ }
+ catch (Exception ex)
+ {
+ }
+ }
+ }
+
+ public List parseCommaDelimitedList(String s) throws Exception
+ {
+ List retval = new ArrayList();
+ StringTokenizer tok;
+ String hostname, tmp;
+ int port;
+ InetSocketAddress addr;
+ int index;
+
+ if (s == null) return null;
+ tok = new StringTokenizer(s, ",");
+ while (tok.hasMoreTokens())
+ {
+ tmp = tok.nextToken();
+ index = tmp.indexOf(':');
+ if (index == -1)
+ throw new Exception("host must be in format <host:port>, was " + tmp);
+ hostname = tmp.substring(0, index);
+ port = Integer.parseInt(tmp.substring(index + 1));
+ addr = new InetSocketAddress(hostname, port);
+ retval.add(addr);
+ }
+ return retval;
+ }
+
+}
Added: cache-bench-fwk/trunk/src/org/cachebench/cluster/Transport.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/cluster/Transport.java (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/cluster/Transport.java 2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,54 @@
+package org.cachebench.cluster;
+
+import org.cachebench.config.ClusterConfig;
+
+import java.util.Properties;
+import java.util.Map;
+
+/**
+ * Generic transport abstraction for all different transports (JGroups, JMS, UDP, TCP). The lifecycle is
+ * <ol>
+ * <li>Create an instance of the transport (using the empty constructor)
+ * <li>Call <code>create()</code>
+ * <li>Possibly call <code>setReceiver()</code>
+ * <li>Call <code>start()</code>
+ * <li>Call <code>send()</code>
+ * <li>Call <code>stop()</stop>
+ * <li>Call <code>destroy()</code> (alternatively call <code>start()</code> again)
+ * </ol>
+ * @author Bela Ban Jan 22
+ * @author 2004
+ * @version $Id: Transport.java,v 1.4 2006/12/19 08:51:47 belaban Exp $
+ */
+public interface Transport {
+
+ /** Create the transport
+ * @param clusterConfig*/
+ void create(ClusterConfig clusterConfig) throws Exception;
+
+ /** Get the local address (= endpoint) of this transport. Guaranteed to be called <em>after</em>
+ * <code>create()</code>, possibly even later (after <code>start()</code>) */
+ Object getLocalAddress();
+
+ /** Start the transport */
+ void start() throws Exception;
+
+ /** Stop the transport */
+ void stop();
+
+ /** Destroy the transport. Transport cannot be reused after this call, but a new instance has to be created */
+ void destroy();
+
+ /** Set the receiver */
+ void setReceiver(Receiver r);
+
+ Map dumpStats();
+
+ /**
+ * Send a message
+ * @param destination A destination. If null, send a message to all members
+ * @param payload A buffer to be sent
+ * @throws Exception
+ */
+ void send(Object destination, Object payload) throws Exception;
+}
Added: cache-bench-fwk/trunk/src/org/cachebench/config/CacheWarmupConfig.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/config/CacheWarmupConfig.java (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/config/CacheWarmupConfig.java 2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,34 @@
+package org.cachebench.config;
+
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ */
+public class CacheWarmupConfig
+{
+ private String warmupClass;
+
+ private Map<String, String> configParams = new HashMap<String, String>();
+
+ public void addConfigParam(NVPair nvPair)
+ {
+ configParams.put(nvPair.getName(), nvPair.getValue());
+ }
+
+ public String getWarmupClass()
+ {
+ return warmupClass;
+ }
+
+ public void setWarmupClass(String warmupClass)
+ {
+ this.warmupClass = warmupClass;
+ }
+
+ public Map<String, String> getConfigParams()
+ {
+ return configParams;
+ }
+}
Added: cache-bench-fwk/trunk/src/org/cachebench/config/ClusterConfig.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/config/ClusterConfig.java (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/config/ClusterConfig.java 2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,91 @@
+package org.cachebench.config;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Configuration for this cache instance.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public class ClusterConfig
+{
+ private int currentNodeIndex = -1;
+
+ private List<NodeAddress> members = new ArrayList<NodeAddress>();
+
+ private String bindAddress;
+
+ public int getCurrentNodeIndex()
+ {
+ if (currentNodeIndex == -1)
+ {
+ String serverindexStr = System.getProperty("currentIndex");
+ try
+ {
+ currentNodeIndex = Integer.parseInt(serverindexStr);
+ } catch (NumberFormatException e)
+ {
+ throw new IllegalStateException("Configuration 'currentIndex' is missing!");
+ }
+ }
+ return currentNodeIndex;
+ }
+
+ public List<NodeAddress> getMembers()
+ {
+ return members;
+ }
+
+ public int getPortForThisNode()
+ {
+ NodeAddress address = members.get(getCurrentNodeIndex());
+ return Integer.parseInt(address.getPort());
+ }
+
+ public int getClusterSize()
+ {
+ return members.size();
+ }
+
+ public boolean isMaster()
+ {
+ return getCurrentNodeIndex() == 0;
+ }
+
+ public void addMember(NodeAddress member)
+ {
+ members.add(member);
+ }
+
+ public void setCurrentNodeIndex(int currentNodeIndex)
+ {
+ this.currentNodeIndex = currentNodeIndex;
+ }
+
+ public String getBindAddress()
+ {
+ return bindAddress;
+ }
+
+ public void setBindAddress(String bindAddress)
+ {
+ this.bindAddress = bindAddress;
+ }
+
+ public List<InetSocketAddress> getMemberAddresses()
+ {
+ List<InetSocketAddress> result = new ArrayList();
+ for (NodeAddress address : getMembers())
+ {
+ result.add(new InetSocketAddress(address.getHost(), address.getPortAsInt()));
+ }
+ return result;
+ }
+
+ public String toString()
+ {
+ return "{bindAddress:" + bindAddress + ", members:" + members + "}";
+ }
+}
Added: cache-bench-fwk/trunk/src/org/cachebench/config/ConfigBuilder.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/config/ConfigBuilder.java (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/config/ConfigBuilder.java 2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,107 @@
+package org.cachebench.config;
+
+import org.apache.commons.digester.Digester;
+
+import java.net.URL;
+import java.net.MalformedURLException;
+import java.io.File;
+
+/**
+ * Helper class for loading configurations.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public class ConfigBuilder
+{
+ /**
+ * Util method to locate a resource on the filesystem as a URL
+ *
+ * @param filename
+ * @return The URL object of the file
+ */
+ private static URL findOnFS(String filename)
+ {
+ File f = new File(filename);
+ try
+ {
+ if (f.exists()) return f.toURL();
+ }
+ catch (MalformedURLException mue)
+ {
+ // bad URL
+ }
+ return null;
+ }
+
+ /**
+ * Looks for config file on disk then on class path.
+ * @return null if the file cannot be found
+ */
+ public static URL findConfigFile(String s)
+ {
+ URL confFile = findOnFS(s);
+ if (confFile == null)
+ {
+ confFile = findInClasspath(s);
+ }
+ return confFile;
+ }
+
+ public static Configuration parseConfiguration(URL url) throws Exception
+ {
+ Digester digester = new Digester();
+ // set up the digester rules.
+ digester.setValidating(false);
+ digester.addObjectCreate("cachebench", "org.cachebench.config.Configuration");
+ digester.addSetProperties("cachebench");
+
+ digester.addObjectCreate("cachebench/cluster", "org.cachebench.config.ClusterConfig");
+ digester.addSetProperties("cachebench/cluster");
+ digester.addObjectCreate("cachebench/cluster/member", "org.cachebench.config.NodeAddress");
+ digester.addSetProperties("cachebench/cluster/member");
+ digester.addSetNext("cachebench/cluster/member", "addMember", "org.cachebench.config.NodeAddress");
+ digester.addSetNext("cachebench/cluster", "setClusterConfig", "org.cachebench.config.ClusterConfig");
+
+ digester.addObjectCreate("cachebench/testcase", "org.cachebench.config.TestCase");
+ digester.addSetProperties("cachebench/testcase");
+
+ digester.addObjectCreate("cachebench/testcase/warmup", "org.cachebench.config.CacheWarmupConfig");
+ digester.addSetProperties("cachebench/testcase/warmup");
+
+ digester.addObjectCreate("cachebench/testcase/warmup/param", "org.cachebench.config.NVPair");
+ digester.addSetProperties("cachebench/testcase/warmup/param");
+ digester.addSetNext("cachebench/testcase/warmup/param", "addConfigParam", "org.cachebench.config.NVPair");
+
+ digester.addSetNext("cachebench/testcase/warmup", "setCacheWarmupConfig", "org.cachebench.config.CacheWarmupConfig");
+
+ digester.addObjectCreate("cachebench/testcase/test", "org.cachebench.config.TestConfig");
+ digester.addSetProperties("cachebench/testcase/test");
+ digester.addSetNext("cachebench/testcase/test", "addTest", "org.cachebench.config.TestConfig");
+
+ digester.addObjectCreate("cachebench/testcase/param", "org.cachebench.config.NVPair");
+ digester.addSetProperties("cachebench/testcase/param");
+
+ digester.addSetNext("cachebench/testcase/param", "addParam", "org.cachebench.config.NVPair");
+ digester.addSetNext("cachebench/testcase", "addTestCase", "org.cachebench.config.TestCase");
+
+ digester.addObjectCreate("cachebench/report", "org.cachebench.config.Report");
+ digester.addSetProperties("cachebench/report");
+
+ digester.addObjectCreate("cachebench/report/param", "org.cachebench.config.NVPair");
+ digester.addSetProperties("cachebench/report/param");
+ digester.addSetNext("cachebench/report/param", "addParam", "org.cachebench.config.NVPair");
+
+ digester.addSetNext("cachebench/report", "addReport", "org.cachebench.config.Report");
+ return (Configuration) digester.parse(url.openStream());
+ }
+
+ /**
+ * Util method to locate a resource in your classpath
+ */
+ private static URL findInClasspath(String filename)
+ {
+ return ConfigBuilder.class.getClassLoader().getResource(filename);
+ }
+
+
+}
Modified: cache-bench-fwk/trunk/src/org/cachebench/config/Configuration.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/config/Configuration.java 2007-12-19 17:49:35 UTC (rev 4885)
+++ cache-bench-fwk/trunk/src/org/cachebench/config/Configuration.java 2007-12-19 21:51:57 UTC (rev 4886)
@@ -15,6 +15,8 @@
private boolean gcBetweenTestsEnabled;
private boolean emptyCacheBetweenTests;
+ private ClusterConfig clusterConfig;
+
private List<TestCase> testCases = new ArrayList<TestCase>();
private List<Report> reports = new ArrayList<Report>();
@@ -90,4 +92,14 @@
{
this.numThreads = numThreads;
}
+
+ public ClusterConfig getClusterConfig()
+ {
+ return clusterConfig;
+ }
+
+ public void setClusterConfig(ClusterConfig clusterConfig)
+ {
+ this.clusterConfig = clusterConfig;
+ }
}
Added: cache-bench-fwk/trunk/src/org/cachebench/config/NodeAddress.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/config/NodeAddress.java (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/config/NodeAddress.java 2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,58 @@
+package org.cachebench.config;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 2.2
+ */
+public class NodeAddress
+{
+ private String host;
+
+ private String port;
+
+ public String getHost()
+ {
+ return host;
+ }
+
+ public void setHost(String host)
+ {
+ this.host = host;
+ }
+
+ public String getPort()
+ {
+ return port;
+ }
+
+ public void setPort(String port)
+ {
+ this.port = port;
+ }
+
+ public int getPort(int defaultValue)
+ {
+ int portInt = Integer.parseInt(port);
+ if (portInt <= 0)
+ {
+ return defaultValue;
+ }
+ else
+ {
+ return portInt;
+ }
+ }
+
+ public String toString()
+ {
+ return host + ':' + port;
+ }
+
+ public int getPortAsInt()
+ {
+ return Integer.parseInt(port);
+ }
+}
Modified: cache-bench-fwk/trunk/src/org/cachebench/config/TestCase.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/config/TestCase.java 2007-12-19 17:49:35 UTC (rev 4885)
+++ cache-bench-fwk/trunk/src/org/cachebench/config/TestCase.java 2007-12-19 21:51:57 UTC (rev 4886)
@@ -1,5 +1,7 @@
package org.cachebench.config;
+import org.cachebench.warmup.CacheWarmup;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@@ -13,7 +15,19 @@
private Properties params = new Properties();
private List<TestConfig> tests = new ArrayList<TestConfig>();
+ private CacheWarmupConfig cacheWarmupConfig;
+
+ public CacheWarmupConfig getCacheWarmupConfig()
+ {
+ return cacheWarmupConfig;
+ }
+
+ public void setCacheWarmupConfig(CacheWarmupConfig cacheWarmupConfig)
+ {
+ this.cacheWarmupConfig = cacheWarmupConfig;
+ }
+
public String getName()
{
return name;
Modified: cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/AbstractReportGenerator.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/AbstractReportGenerator.java 2007-12-19 17:49:35 UTC (rev 4885)
+++ cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/AbstractReportGenerator.java 2007-12-19 21:51:57 UTC (rev 4886)
@@ -2,10 +2,10 @@
import org.apache.commons.logging.Log;
import org.cachebench.TestResult;
+import org.cachebench.config.ClusterConfig;
import java.io.File;
import java.util.List;
-import java.util.Map;
/**
* Base implementation of {@link org.cachebench.reportgenerators.ReportGenerator}
@@ -15,6 +15,7 @@
protected File output;
protected List<TestResult> results;
protected Log log;
+ protected ClusterConfig clusterConfig;
public void setOutputFile(File output)
{
@@ -25,4 +26,9 @@
{
this.results = results;
}
+
+ public void setClusterConfig(ClusterConfig clusterConfig)
+ {
+ this.clusterConfig = clusterConfig;
+ }
}
Modified: cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/ClusterReportGenerator.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/ClusterReportGenerator.java 2007-12-19 17:49:35 UTC (rev 4885)
+++ cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/ClusterReportGenerator.java 2007-12-19 21:51:57 UTC (rev 4886)
@@ -3,124 +3,64 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.cachebench.TestResult;
+import org.cachebench.cluster.ClusterBarrier;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.net.*;
+import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Gathers info from all the nodes executing tests.
- * Merges all the gathered information and generates an CSV file based on that.
+ * Merges all the gathered information and generates an CSV file.
+ * The file is generated on the master node, i.e. the node that has the NODE_INDEX == 0
*
* @author Mircea.Markus at jboss.com
- * @version 2.2
*/
public class ClusterReportGenerator extends AbstractReportGenerator
{
private static Log log = LogFactory.getLog(ClusterReportGenerator.class);
- private int clusterSize;
-
- private int masterPort;
-
- private String masterHost;
-
public void setConfigParams(Map<String, String> configParams)
{
log.trace("Received config params: " + configParams);
- this.clusterSize = Integer.parseInt(configParams.get("clusterSize"));
- this.masterPort = Integer.parseInt(configParams.get("masterPort"));
- this.masterHost = configParams.get("masterHost");
}
public void generate() throws Exception
{
try
{
- log.trace("Cluster report generator property value: " + System.getProperty("cluterReportGenerator"));
- boolean isGeneratorNode = System.getProperty("cluterReportGenerator") != null
- && "true".equalsIgnoreCase(System.getProperty("cluterReportGenerator"));
- log.trace(" Starting generating. Is master? " + isGeneratorNode);
- if (isGeneratorNode)
+ ClusterBarrier barrier = new ClusterBarrier();
+ barrier.setConfig(this.clusterConfig);
+ barrier.setAcknowledge(false);
+ barrier.barrier(results);
+ log.trace(" Starting generating. Is master? " + clusterConfig.isMaster());
+ if (clusterConfig.isMaster())
{
- generateReport();
+ log.trace("Received following results: " + results);
+ generateReport(barrier.getReceivedMessages());
}
- else
- {
- submitReportInfo();
- }
} catch (Exception e)
{
log.error("Error while generating report!", e);
}
}
- /**
- * If not a master, sends report data to master.
- */
- private void submitReportInfo() throws IOException, ClassNotFoundException
- {
- log.trace("Sending the following results to master: " + results);
- boolean connected = false;
- Socket socket = null;
- while (!connected)
- {
- try
- {
- log.trace("Connecting to master on " + masterHost + ":" + masterPort + "...");
- socket = new Socket(masterHost, masterPort);
- log.trace("Connected");
- connected = true;
- } catch (IOException e)
- {
- log.trace("Failed to connect(" + e.getMessage() + "), trying again..." );
- connected = false;
- }
- }
- ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
- oos.writeObject(this.results);
- oos.close();
- socket.close();
- log.trace("Following results were sent to server: " + results);
- }
- private void generateReport() throws Exception
+ private void generateReport(Map<SocketAddress, Object> receivedMessages) throws Exception
{
- List<List<TestResult>> results = retrieveResultsFromClients();
- List<TestResult> mergedResults = mergerTestResultsAndGenerateReport(results);
- generateReportFile(mergedResults);
- }
-
- @SuppressWarnings(value = "unchecked")
- private List<List<TestResult>> retrieveResultsFromClients()
- throws IOException, ClassNotFoundException
- {
- ServerSocket socket = new ServerSocket(masterPort);
+ log.trace("Received " + receivedMessages.size() + " results!");
List<List<TestResult>> results = new ArrayList<List<TestResult>>();
- results.add(this.results);
- //we go expect cluster size-1 because this is also a node, ant it does not sent report info
- for (int i = 0; i < clusterSize - 1; i++)
+ for (SocketAddress socketAddress : receivedMessages.keySet())
{
- log.trace("Expecting " + (clusterSize - i - 1) + " more client(s)");
- log.trace(" Waiting for client to conect...");
- Socket client = socket.accept();
- log.trace("Client connected: " + client.getInetAddress());
- ObjectInputStream ois = new ObjectInputStream(client.getInputStream());
- List<TestResult> testResults = (List<TestResult>) ois.readObject();
+ List<TestResult> testResults = (List<TestResult>) receivedMessages.get(socketAddress);
+ log.trace("From " + socketAddress + " received " + testResults);
results.add(testResults);
- log.trace("Received following resullts from client: " + testResults);
- ois.close();
- client.close();
}
- socket.close();
- return results;
+ List<TestResult> mergedResults = mergerTestResultsAndGenerateReport(results);
+ generateReportFile(mergedResults);
}
-
private void generateReportFile(List<TestResult> mergedResults) throws Exception
{
CSVReportGenerator generator = new CSVReportGenerator();
Modified: cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/ReportGenerator.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/ReportGenerator.java 2007-12-19 17:49:35 UTC (rev 4885)
+++ cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/ReportGenerator.java 2007-12-19 21:51:57 UTC (rev 4886)
@@ -1,6 +1,7 @@
package org.cachebench.reportgenerators;
import org.cachebench.TestResult;
+import org.cachebench.config.ClusterConfig;
import java.io.File;
import java.util.List;
@@ -13,11 +14,13 @@
*/
public interface ReportGenerator
{
- public abstract void setConfigParams(Map<String, String> configParams);
+ public void setConfigParams(Map<String, String> configParams);
public void setOutputFile(File output);
public void setResults(List<TestResult> results);
public void generate() throws Exception;
+
+ public void setClusterConfig(ClusterConfig clusterConfig);
}
Added: cache-bench-fwk/trunk/src/org/cachebench/warmup/CacheWarmup.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/warmup/CacheWarmup.java (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/warmup/CacheWarmup.java 2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,40 @@
+package org.cachebench.warmup;
+
+import org.cachebench.CacheWrapper;
+import org.cachebench.config.NVPair;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Warmups the cache.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 2.2
+ */
+public abstract class CacheWarmup
+{
+
+ private Map<String, String> configParams = new HashMap<String, String>();
+
+ public void setConfigParams(Map<String, String> configParams)
+ {
+ this.configParams = configParams;
+ }
+
+ public String getConfigParam(String name)
+ {
+ return configParams.get(name);
+ }
+
+ /**
+ * Calls {@link #performWarmupOperations(CacheWrapper)} amd clears the cache.
+ */
+ public final void warmup(CacheWrapper cacheWrapper) throws Exception
+ {
+ performWarmupOperations(cacheWrapper);
+ cacheWrapper.empty();
+ }
+
+ public abstract void performWarmupOperations(CacheWrapper wrapper) throws Exception;
+}
Added: cache-bench-fwk/trunk/src/org/cachebench/warmup/NoCacheWarmup.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/warmup/NoCacheWarmup.java (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/warmup/NoCacheWarmup.java 2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,19 @@
+package org.cachebench.warmup;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.cachebench.CacheWrapper;
+
+/**
+ * Does not warmup the cache.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public class NoCacheWarmup extends CacheWarmup
+{
+ Log log = LogFactory.getFactory().getInstance(NoCacheWarmup.class);
+ public void performWarmupOperations(CacheWrapper wrapper) throws Exception
+ {
+ log.info("Using no cache warmup");
+ }
+}
Added: cache-bench-fwk/trunk/src/org/cachebench/warmup/PutGetCacheWarmup.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/warmup/PutGetCacheWarmup.java (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/warmup/PutGetCacheWarmup.java 2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,32 @@
+package org.cachebench.warmup;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.cachebench.CacheWrapper;
+
+/**
+ * Perfoms N puts, gets and removals, where n is configurable.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 2.2
+ */
+public class PutGetCacheWarmup extends CacheWarmup
+{
+ Log log = LogFactory.getLog(PutGetCacheWarmup.class);
+
+ public void performWarmupOperations(CacheWrapper wrapper) throws Exception
+ {
+ Integer opCount = Integer.parseInt(getConfigParam("operationCount"));
+ log.trace("Cache launched, performing " + opCount + " put and get operations ");
+ for (int i = 0; i < opCount; i++)
+ {
+ wrapper.put(String.valueOf(opCount), String.valueOf(opCount));
+ }
+
+ for (int i = 0; i < opCount; i++)
+ {
+ wrapper.get(String.valueOf(opCount));
+ }
+ log.trace("Cache warmup ended!");
+ }
+}
More information about the jbosscache-commits
mailing list