[jbosscache-commits] JBoss Cache SVN: r4886 - in cache-bench-fwk/trunk: cache-products/jbosscache-2.0.0/src/org/cachebench/cachewrappers and 6 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Dec 19 16:51:58 EST 2007


Author: mircea.markus
Date: 2007-12-19 16:51:57 -0500 (Wed, 19 Dec 2007)
New Revision: 4886

Added:
   cache-bench-fwk/trunk/src/org/cachebench/ClusterConfigurationCheck.java
   cache-bench-fwk/trunk/src/org/cachebench/cluster/
   cache-bench-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java
   cache-bench-fwk/trunk/src/org/cachebench/cluster/Receiver.java
   cache-bench-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java
   cache-bench-fwk/trunk/src/org/cachebench/cluster/Transport.java
   cache-bench-fwk/trunk/src/org/cachebench/config/CacheWarmupConfig.java
   cache-bench-fwk/trunk/src/org/cachebench/config/ClusterConfig.java
   cache-bench-fwk/trunk/src/org/cachebench/config/ConfigBuilder.java
   cache-bench-fwk/trunk/src/org/cachebench/config/NodeAddress.java
   cache-bench-fwk/trunk/src/org/cachebench/warmup/
   cache-bench-fwk/trunk/src/org/cachebench/warmup/CacheWarmup.java
   cache-bench-fwk/trunk/src/org/cachebench/warmup/NoCacheWarmup.java
   cache-bench-fwk/trunk/src/org/cachebench/warmup/PutGetCacheWarmup.java
Modified:
   cache-bench-fwk/trunk/build.xml
   cache-bench-fwk/trunk/cache-products/jbosscache-2.0.0/src/org/cachebench/cachewrappers/JBossCache200Wrapper.java
   cache-bench-fwk/trunk/conf/cachebench.xml
   cache-bench-fwk/trunk/src/org/cachebench/CacheBenchmarkRunner.java
   cache-bench-fwk/trunk/src/org/cachebench/config/Configuration.java
   cache-bench-fwk/trunk/src/org/cachebench/config/TestCase.java
   cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/AbstractReportGenerator.java
   cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/ClusterReportGenerator.java
   cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/ReportGenerator.java
Log:
enhance adding barriers and warmup

Modified: cache-bench-fwk/trunk/build.xml
===================================================================
--- cache-bench-fwk/trunk/build.xml	2007-12-19 17:49:35 UTC (rev 4885)
+++ cache-bench-fwk/trunk/build.xml	2007-12-19 21:51:57 UTC (rev 4886)
@@ -42,9 +42,9 @@
   <!-- Module Framework -->
   
   <dirname property="module.framework.basedir" file="${ant.file}"/>
+
+  <property environment="env"/>
   
-  
-  
   <property name="compiler.args.framework" value="${compiler.args}"/>
   
   <property name="framework.output.dir" value="${module.framework.basedir}/classes/production/Framework"/>
@@ -89,15 +89,15 @@
    all       - builds the entire project, including plugins for all
                cache products in /cache-products/.  Output classes
                in /classes/.
-   runMaster - runs the CacheBenchFwk in "master" mode.
-   runSlave  - runs the CacheBenchFwk in "slave" mode.
+   runNode - runs the CacheBenchFwk. Depending on the number of nodes in the cluster(configured in cachebenchmark.xml),
+             it will wait for all configured nodes to be launched before starting the tests
 
-For either runMaster or runSlave, make sure you have looked at:
+Make sure you have looked at:
 
    1) 'build.properties' and have set JVM params (such as heap size,
        etc.) as necessary.
-   2) '/conf/cachebench.xml' to configure the tests you want run and
-      output file for reports.
+   2) '/conf/cachebench.xml' to configure the tests you want run, the
+        nodes in the cluster if the case and the output file for reports.
    3) '/conf/log4j.xml' for logging settings (make sure these aren't
        very verbose as it can skew tests).
    4)  Provided one of the plugins as a system property.  This is
@@ -109,14 +109,17 @@
          -Dorg.cachebench.plugins.jbosscache2=true -Dbind.address=${MYTESTIP_1}
          -Dorg.cachebench.pluins.ehcache=true -Dbind.address=${MYTESTIP_1}
          -Dorg.cachebench.plugins.coherence=true -Dtangosol.coherence.localhost=${MYTESTIP_1}
-        
         // WORK IN PROGRESS
-         -Dorg.cachebench.plugins.terracotta=true
+        -Dorg.cachebench.plugins.terracotta=true
+        when running ant.  Note that only one can be set at any time.
+   5)  Make sure you set up an correct NODE_INDEX environment property 
+       indicating the index of the node before starting it. E.g. if
+       we have 3 nodes  each process should have an environment
+       variable named NODE_INDEX, having values in the range 0-2,
+       each node having an distict value.
+       see cachebench.xml\cachebench\cluster for more details
 
-
-      when running ant.  Note that only one can be set at any time.
-
-NB: NEEDS Ant >= 1.7.0        
+   NB: NEEDS Ant >= 1.7.0        
      </echo>
   </target>
 
@@ -531,27 +534,32 @@
            <!--<pathelement location="./cache-products/terracotta-2.4.8/lib/bootstrap/boot.jar" />-->
         <!--</bootclasspath>-->
         <sysproperty key="bind.address" value="${bind.address}" />
-	<sysproperty key="tangosol.coherence.localhost" value="${tangosol.coherence.localhost}" />
-	<sysproperty key="cluterReportGenerator" value="${cluterReportGenerator}" />
-	<sysproperty key="org.cachebench.debug" value="${org.cachebench.debug}" />
+        <sysproperty key="currentIndex" value="${env.NODE_INDEX}" />
+        <sysproperty key="tangosol.coherence.localhost" value="${tangosol.coherence.localhost}" />
+        <sysproperty key="org.cachebench.debug" value="${org.cachebench.debug}" />
         <sysproperty key="java.net.preferIPv4Stack" value="${java.net.preferIPv4Stack}" />
         <classpath refid="framework.module.classpath" />
         <classpath refid="${plugin.classpath.ref}"/>
      </java>
   </target>
 
-  <target name="runSlave">
+  <target name="runNode">
      <antcall target="run">
         <param name="runtime.classname" value="org.cachebench.CacheBenchmarkRunner" />
-        <param name="cluterReportGenerator" value="false"/>
      </antcall>
   </target>
 
-  <target name="runMaster">
-     <antcall target="run">
-        <param name="runtime.classname" value="org.cachebench.CacheBenchmarkRunner" />
-        <param name="cluterReportGenerator" value="true"/>
-     </antcall>
-  </target>
-   
+   <target name="checkClusterAddresses" depends="compile.module.framework.production" description="Check whether the cluster config is a valid one">
+      <!--<antcall target="run">-->
+         <!--<param name="runtime.classname" value="org.cachebench.ClusterConfigurationCheck" />-->
+      <!--</antcall>-->
+
+      <java classname="org.cachebench.ClusterConfigurationCheck" clonevm="true" fork="true">
+         <classpath>
+            <pathelement location="${framework.output.dir}"/>
+            <pathelement location="${framework.testoutput.dir}"/>
+         </classpath>
+         <classpath refid="framework.module.classpath" />
+      </java>
+   </target>
 </project>

Modified: cache-bench-fwk/trunk/cache-products/jbosscache-2.0.0/src/org/cachebench/cachewrappers/JBossCache200Wrapper.java
===================================================================
--- cache-bench-fwk/trunk/cache-products/jbosscache-2.0.0/src/org/cachebench/cachewrappers/JBossCache200Wrapper.java	2007-12-19 17:49:35 UTC (rev 4885)
+++ cache-bench-fwk/trunk/cache-products/jbosscache-2.0.0/src/org/cachebench/cachewrappers/JBossCache200Wrapper.java	2007-12-19 21:51:57 UTC (rev 4886)
@@ -40,7 +40,8 @@
 
    public void empty() throws Exception
    {
-      cache.removeNode(Fqn.ROOT);
+      //not removing root because there it fails with buddy replication: http://jira.jboss.com/jira/browse/JBCACHE-1241
+      cache.removeNode(new Fqn("test"));
    }
 
    public int getNumMembers()

Modified: cache-bench-fwk/trunk/conf/cachebench.xml
===================================================================
--- cache-bench-fwk/trunk/conf/cachebench.xml	2007-12-19 17:49:35 UTC (rev 4885)
+++ cache-bench-fwk/trunk/conf/cachebench.xml	2007-12-19 21:51:57 UTC (rev 4886)
@@ -7,16 +7,32 @@
 	emptyCacheBetweenTests - again, use if you're running out of mem.
 	numThreads - the number of executor threads to use to perform the required number of operations.  
 -->
-<cachebench sampleSize="10000" gcBetweenTestsEnabled="true" sleepBetweenTests="1000" emptyCacheBetweenTests="true" numThreads="10">
+<cachebench sampleSize="2000" gcBetweenTestsEnabled="true" sleepBetweenTests="1000" emptyCacheBetweenTests="true" numThreads="10">
 
-	<!-- Each testcase represents either a single configuration or a cacheing product.
-		  
-		 For example, WhirlyCache would be one test case. JBossCache-standalone could be another, JBossCache-replicated could be yet another 
-		 
-		 See the javadoc for org.cachebench.CacheWrapper for the cacheWrapper property
-		 -->
 
    <!--
+         There are various steps we want to start executing at once: e.g. all the tests should start at the same time,
+      otherwise (part of) cluster operations do not replicate on all instances. We configure here one socket addresses
+      on each tested node, so that the framework can communicate with all the nodes and sync whenever needed.
+         - bindAddress can be used on a multi-homed host for a ServerSocket that will only accept connect requests to one
+         of its addresses
+         - for each node instance socket address is specified. You can make sure that addresses are available by using
+         checkClusterAddresses ant target 
+   -->
+   <cluster bindAddress="127.0.0.1">
+      <member host="127.0.0.1" port="7800"/>
+      <member host="127.0.0.1" port="7801"/>
+      <member host="127.0.0.1" port="7802"/>
+      <member host="127.0.0.1" port="7803"/>
+      <member host="127.0.0.1" port="7804"/>
+   </cluster>
+
+   <!-- Each testcase represents either a single configuration or a cacheing product.
+   For example, WhirlyCache would be one test case. JBossCache-standalone could be another, JBossCache-replicated could be yet another
+   See the javadoc for org.cachebench.CacheWrapper for the cacheWrapper property
+   -->
+
+   <!--
       Note that if you are using REPLICATED tests, using the "ant runSlave" command, you can only run one test at a time.
       Otherwise, if you are using the test in standalone mode (testing a LOCAL cache), you can add multiple "testcase" elements.
    -->
@@ -33,6 +49,14 @@
     <!--<testcase name="JBossCache2x-Pessimistic-REPL_ASYNC" cacheWrapper="org.cachebench.cachewrappers.JBossCache200Wrapper">-->
     <testcase name="JBossCache2.0" cacheWrapper="org.cachebench.cachewrappers.JBossCache200Wrapper">
 
+
+      <!-- warms up the cache by doing operation on it; simulates a real-world environment. If no warmup is needed use
+      org.cachebench.warmup.NoCacheWarmup
+      -->
+      <warmup warmupClass="org.cachebench.warmup.PutGetCacheWarmup">
+         <param name="operationCount" value="100"/>
+      </warmup>
+
       <!--
          * The "name" attrib is just used for display in the reports.
          * You can write your own custom testClass.
@@ -65,14 +89,10 @@
 	<!--
 	   Available generators are: CSVReportGenerator and ClusterReportGenerator.
 	   See javadocs for org.cachebench.reportgenerators.ReportGenerator for writing your
-		own report generators such as XML generators, graphic generators, etc 
+		own report generators such as XML generators, graphic generators, etc
    -->
    <!-- The CSV report generated can be plugged in to a spreadsheet to generate graphs, cluster size is
    needed so that the . -->
-   <report outputFile="performance.csv" generator="org.cachebench.reportgenerators.ClusterReportGenerator">
-      <param name="clusterSize" value="3"/>
-      <param name="masterHost" value="127.0.0.1"/>
-      <param name="masterPort" value="54334"/>
-   </report>
+   <report outputFile="performance.csv" generator="org.cachebench.reportgenerators.ClusterReportGenerator"/>
 
 </cachebench>

Modified: cache-bench-fwk/trunk/src/org/cachebench/CacheBenchmarkRunner.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/CacheBenchmarkRunner.java	2007-12-19 17:49:35 UTC (rev 4885)
+++ cache-bench-fwk/trunk/src/org/cachebench/CacheBenchmarkRunner.java	2007-12-19 21:51:57 UTC (rev 4886)
@@ -1,19 +1,16 @@
 package org.cachebench;
 
 
-import org.apache.commons.digester.Digester;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.cachebench.config.Configuration;
-import org.cachebench.config.Report;
-import org.cachebench.config.TestCase;
-import org.cachebench.config.TestConfig;
+import org.cachebench.config.*;
 import org.cachebench.reportgenerators.ReportGenerator;
 import org.cachebench.tests.CacheTest;
+import org.cachebench.cluster.ClusterBarrier;
 import org.cachebench.utils.Instantiator;
+import org.cachebench.warmup.CacheWarmup;
 
 import java.io.File;
-import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Date;
@@ -56,13 +53,9 @@
    private CacheBenchmarkRunner(String s)
    {
       // first, try and find the configuration on the filesystem.
-      URL confFile = findOnFS(s);
+      URL confFile = ConfigBuilder.findConfigFile(s);
       if (confFile == null)
       {
-         confFile = findInClasspath(s);
-      }
-      if (confFile == null)
-      {
          logger.warn("Unable to locate a configuration file; Application terminated");
       }
       else
@@ -71,7 +64,7 @@
          logger.debug("Parsing configuration");
          try
          {
-            conf = parseConfiguration(confFile);
+            conf = ConfigBuilder.parseConfiguration(confFile);
             logger.info("Starting Benchmarking....");
             List<TestResult> results = runTests(); // Run the tests from this point.
             if (results != null && results.size() != 0)
@@ -86,43 +79,12 @@
          }
          catch (Exception e)
          {
-            logger.warn("Unable to parse configuration file " + confFile + ". Application terminated");
+            logger.warn("Unable to parse configuration file " + confFile + ". Application terminated", e);
             errorLogger.fatal("Unable to parse configuration file " + confFile, e);
          }
       }
    }
 
-   private Configuration parseConfiguration(URL url) throws Exception
-   {
-      Digester digester = new Digester();
-      // set up the digester rules.
-      digester.setValidating(false);
-      digester.addObjectCreate("cachebench", "org.cachebench.config.Configuration");
-      digester.addSetProperties("cachebench");
-      digester.addObjectCreate("cachebench/testcase", "org.cachebench.config.TestCase");
-      digester.addSetProperties("cachebench/testcase");
-
-      digester.addObjectCreate("cachebench/testcase/test", "org.cachebench.config.TestConfig");
-      digester.addSetProperties("cachebench/testcase/test");
-      digester.addSetNext("cachebench/testcase/test", "addTest", "org.cachebench.config.TestConfig");
-
-      digester.addObjectCreate("cachebench/testcase/param", "org.cachebench.config.NVPair");
-      digester.addSetProperties("cachebench/testcase/param");
-
-      digester.addSetNext("cachebench/testcase/param", "addParam", "org.cachebench.config.NVPair");
-      digester.addSetNext("cachebench/testcase", "addTestCase", "org.cachebench.config.TestCase");
-
-      digester.addObjectCreate("cachebench/report", "org.cachebench.config.Report");
-      digester.addSetProperties("cachebench/report");
-
-      digester.addObjectCreate("cachebench/report/param", "org.cachebench.config.NVPair");
-      digester.addSetProperties("cachebench/report/param");
-      digester.addSetNext("cachebench/report/param", "addParam", "org.cachebench.config.NVPair");      
-
-      digester.addSetNext("cachebench/report", "addReport", "org.cachebench.config.Report");
-      return (Configuration) digester.parse(url.openStream());
-   }
-
    /**
     * Executes each test case and returns the result.
     *
@@ -140,8 +102,14 @@
             if (cache != null)
             {
                cache.init(test.getParams());
+               barrier("BEFORE_WARMUP");
+               warmupCache(test, cache);
+               barrier("AFTER_WARMUP");
+
+               //now start testing
                cache.setUp();
                List<TestResult> resultsForCache = runTestsOnCache(cache, test);
+               barrier("AFTER_TEST_RUN");
                shutdownCache(cache);
                results.addAll(resultsForCache);
             }
@@ -156,6 +124,25 @@
       return results;
    }
 
+   private void barrier(String messageName) throws Exception
+   {
+      ClusterBarrier barrier = new ClusterBarrier();
+      logger.trace("Using following cluster config: " + conf.getClusterConfig());
+      barrier.setConfig(conf.getClusterConfig());
+      barrier.setAcknowledge(true);
+      barrier.barrier(messageName);
+      logger.trace("Barrier finished");
+   }
+
+   private void warmupCache(TestCase test, CacheWrapper cache) throws Exception
+   {
+      CacheWarmupConfig warmupConfig = test.getCacheWarmupConfig();
+      logger.trace("Warmup config is: " + warmupConfig);
+      CacheWarmup warmup = (CacheWarmup) Instantiator.getInstance().createClass(warmupConfig.getWarmupClass());
+      warmup.setConfigParams(warmupConfig.getConfigParams());
+      warmup.warmup(cache);
+   }
+
    /**
     * Peforms the necessary external tasks for cache benchmarking.
     * These external tasks are defined in the cachebench.xml and would
@@ -184,9 +171,9 @@
       }
       catch (Exception e)
       {
-         // The Empty process of the cache failed. Add a foot note for the TestResult here.
-         testResult.setFootNote("The Cache Empty process failed after test case: " + testResult.getTestName() + " : " + testResult.getTestType());
-         errorLogger.error("The Cache Empty process failed after test case : " + testResult.getTestName() + ", " + testResult.getTestType(), e);
+         // The Empty barrier of the cache failed. Add a foot note for the TestResult here.
+         testResult.setFootNote("The Cache Empty barrier failed after test case: " + testResult.getTestName() + " : " + testResult.getTestType());
+         errorLogger.error("The Cache Empty barrier failed after test case : " + testResult.getTestName() + ", " + testResult.getTestType(), e);
       }
 
       return testResult;
@@ -241,6 +228,7 @@
                generator.setConfigParams(report.getParams());
                generator.setResults(results);
                generator.setOutputFile(new File(report.getOutputFile()));
+               generator.setClusterConfig(conf.getClusterConfig());
                generator.generate();
                logger.info("Report Generation Completed");
             }
@@ -259,37 +247,6 @@
       }
    }
 
-   /**
-    * Util method to locate a resource on the filesystem as a URL
-    *
-    * @param filename
-    * @return The URL object of the file
-    */
-   private URL findOnFS(String filename)
-   {
-      File f = new File(filename);
-      try
-      {
-         if (f.exists()) return f.toURL();
-      }
-      catch (MalformedURLException mue)
-      {
-         // bad URL
-      }
-      return null;
-   }
-
-   /**
-    * Util method to locate a resource in your classpath
-    *
-    * @param filename
-    * @return The URL object of the file
-    */
-   private URL findInClasspath(String filename)
-   {
-      return getClass().getClassLoader().getResource(filename);
-   }
-
    private CacheWrapper getCacheWrapperInstance(TestCase testCaseClass)
    {
       CacheWrapper cache = null;

Added: cache-bench-fwk/trunk/src/org/cachebench/ClusterConfigurationCheck.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/ClusterConfigurationCheck.java	                        (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/ClusterConfigurationCheck.java	2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,63 @@
+package org.cachebench;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.cachebench.config.ClusterConfig;
+import org.cachebench.config.ConfigBuilder;
+
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URL;
+import java.io.IOException;
+
+/**
+ * Loads config file and checks whether the specified ports are available.
+ * Can be used from a script, following exit codes are used:
+ * <pre>
+ *  0 - all ports are available
+ *  1 - config file not found
+ *  2 - a address is in use
+ * </pre>
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 2.2
+ */
+public class ClusterConfigurationCheck
+{
+
+   private static Log log = LogFactory.getLog("ClusterConfigurationCheck");
+
+   public static void main(String[] args) throws Exception
+   {
+      String configFile = "cachebench.xml";
+      if (args.length >= 1)
+      {
+         configFile = args[0];
+      }
+      URL configUrl = ConfigBuilder.findConfigFile(configFile);
+      if (configUrl == null)
+      {
+         log.info("Could not find the config file, exiting with code 1");
+         System.exit(1);
+      }
+      ClusterConfig config = ConfigBuilder.parseConfiguration(configUrl).getClusterConfig();
+      boolean areSuspects = false;
+      for (InetSocketAddress address : config.getMemberAddresses())
+      {
+         try
+         {
+            Socket sock = new Socket(address.getHostName(), address.getPort());
+            areSuspects = true;
+            log.info("Managed to connect to " + address);
+         } catch (IOException e)
+         {
+            log.trace("Connection to : " +  address + " failed; expected behavior");
+         }
+      }
+      if (!areSuspects)
+      {
+         log.info("Success (could not establish any connection)");
+      }
+      System.exit(areSuspects ? 2 : 0);
+   }
+}

Added: cache-bench-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java	                        (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java	2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,154 @@
+package org.cachebench.cluster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.cachebench.config.ClusterConfig;
+
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Use for making the nodes in the cluster to hold until ALL nodes reached the barrier.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public class ClusterBarrier implements Receiver
+{
+   private static Log log = LogFactory.getLog(ClusterBarrier.class);
+
+   private ClusterConfig config;
+   public final Map<SocketAddress, Object> receivedMessages = new HashMap<SocketAddress, Object>();
+   private Transport transport;
+   private Object message;
+   private int numMembers;
+   private boolean acknowledge;
+   private static final String ACK = "_ACK";
+
+
+   /**
+    * Returns the messages sent between the nodes in the cluster.
+    */
+   public Map<SocketAddress, Object> getReceivedMessages()
+   {
+      return receivedMessages;
+   }
+
+   /**
+    * Setting this to true would tide up barrier delays.
+    */
+   public void setAcknowledge(boolean acknowledge)
+   {
+      this.acknowledge = acknowledge;
+   }
+
+   /**
+    * Message sent between cluster members to ack on the barrier.
+    */
+   public void barrier(Object message) throws Exception
+   {
+      log.trace("Started processing a message cluster, message='" + message + "'");
+      receivedMessages.clear();
+      this.message = message;
+      transport = new TcpTransport();
+      numMembers = config.getClusterSize();
+      transport.create(config);
+      transport.setReceiver(this);
+      transport.start();
+      log.trace("Transport started, local address is: " + transport.getLocalAddress());
+      log.trace("Waiting for " + numMembers + " member(s) to join");
+      waitForAllMemebers();
+      transport.stop();
+      //just to make sure all other nodes closed their resources
+      //needed e.g. when a new barrier is immediately initiated on this barrier which might connect
+      //to a port on a barrier that is about to close, so messages sent by new barrier might be lost
+      // a nicer way to implement this is to accept connections only if the sender uses same barrier name
+      Thread.sleep(2000);
+   }
+
+   private void waitForAllMemebers() throws Exception
+   {
+      boolean receivedAllMessages = false;
+      while (!receivedAllMessages)
+      {
+         synchronized (receivedMessages)
+         {
+            receivedAllMessages = receivedMessages.size() >= numMembers;
+            if (!receivedAllMessages)
+            {
+               receivedMessages.wait(2000);
+            }
+         }
+         log.trace("sending message " + message + ", expecting " + getMissingMembersCount() + " member(s)");
+         transport.send(null, message);
+         if (acknowledge)
+         {
+            log.trace("Send ack also");
+            transport.send(null, getAcknowledgeMessage(message));
+         }
+      }
+   }
+
+   public void receive(SocketAddress sender, Object payload) throws Exception
+   {
+      log.trace("Received '" + payload + "' from " + sender + " still expecting " + getMissingMembersCount() + " member(s)");
+      if (payload == null)
+      {
+         log.warn("payload is incorrect (sender=" + sender + "): " + payload);
+         return;
+      }
+      if (acknowledge && !isAcknowledgeMessage(payload, message))
+      {
+         log.trace("Sending ack, still expecting " + getMissingMembersCount() + " members.");
+         transport.send(null, getAcknowledgeMessage(message));
+         return;
+      }
+
+      //we are here if either no ack or ack the message is an ack message
+      synchronized (this.receivedMessages)
+      {
+         if (!this.receivedMessages.containsKey(sender))
+         {
+            this.receivedMessages.put(sender, getMessage(payload));
+            int expected = getMissingMembersCount();
+            log.trace("Sender " + sender + " registered, still waiting for " + expected + " member(s)");
+            this.receivedMessages.notifyAll();
+         }
+      }
+   }
+
+   private int getMissingMembersCount()
+   {
+      return numMembers - receivedMessages.size();
+   }
+
+   private Object getMessage(Object payload)
+   {
+      if (!acknowledge)
+      {
+         return message;
+      }
+      String payloadStr = payload.toString();
+      int endIndex = payloadStr.length() - ACK.length();
+      return payloadStr.substring(0, endIndex);
+   }
+
+   private String getAcknowledgeMessage(Object message)
+   {
+      return message.toString() + ACK;
+   }
+
+   private boolean isAcknowledgeMessage(Object payload, Object message)
+   {
+      boolean result;
+      String payloadStr = payload.toString();
+      result = payloadStr.equals(getAcknowledgeMessage(message));
+      log.trace("Is acknowledge? " + result);
+      return result;
+   }
+
+   public void setConfig(ClusterConfig config)
+   {
+      this.config = config;
+   }
+}

Added: cache-bench-fwk/trunk/src/org/cachebench/cluster/Receiver.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/cluster/Receiver.java	                        (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/cluster/Receiver.java	2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,12 @@
+package org.cachebench.cluster;
+
+import java.net.SocketAddress;
+
+/**
+ * @author Bela Ban Jan 22
+ * @author 2004
+ * @version $Id: Receiver.java,v 1.1 2004/01/23 00:08:31 belaban Exp $
+ */
+public interface Receiver {
+    void receive(SocketAddress sender, Object payload) throws Exception;
+}

Added: cache-bench-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java	                        (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java	2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,389 @@
+package org.cachebench.cluster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.cachebench.config.ClusterConfig;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * note: this code is copyed and adapted from jgroups test.
+ *
+ * @author Bela Ban Jan 22
+ * @author 2004
+ * @version $Id: TcpTransport.java,v 1.16 2006/12/19 08:51:46 belaban Exp $
+ */
+public class TcpTransport implements Transport
+{
+
+   private static Log log = LogFactory.getLog(TcpTransport.class);
+   
+   Receiver receiver = null;
+   ClusterConfig config = null;
+   int max_receiver_buffer_size = 500000;
+   int max_send_buffer_size = 500000;
+   List<InetSocketAddress> nodes;
+   ConnectionTable connectionTable;
+   int startPort = 7800;
+   ServerSocket srvSock = null;
+   InetAddress bindAddr = null;
+   SocketAddress localAddr = null;
+   List receivers = new ArrayList();
+   private ServerSocket server;
+   private boolean isStoping;
+
+
+   public TcpTransport()
+   {
+   }
+
+   public Object getLocalAddress()
+   {
+      return localAddr;
+   }
+
+   public void create(ClusterConfig clusterConfig) throws Exception
+   {
+      this.config = clusterConfig;
+      String tmp;
+      startPort = config.getPortForThisNode();
+      String bindAddrStr = config.getBindAddress();
+      if (bindAddrStr != null)
+      {
+         bindAddr = InetAddress.getByName(bindAddrStr);
+      }
+      else
+      {
+         bindAddr = InetAddress.getLocalHost();
+      }
+      nodes = clusterConfig.getMemberAddresses();
+      connectionTable = new ConnectionTable(nodes);
+   }
+
+
+   public void start() throws Exception
+   {
+      srvSock = createServerSocket();
+      log.trace("ServerSock created, listening on: "+ srvSock.getLocalSocketAddress());
+      localAddr = new InetSocketAddress(srvSock.getInetAddress(), srvSock.getLocalPort());
+      connectionTable.init();
+
+      // accept connections and start 1 Receiver per connection
+      Thread acceptor = new Thread()
+      {
+         public void run()
+         {
+            while (true)
+            {
+               try
+               {
+                  Socket s = srvSock.accept();
+                  log.trace("Accepted client " + s.getRemoteSocketAddress());
+                  ReceiverThread r = new ReceiverThread(s);
+                  r.setDaemon(true);
+                  receivers.add(r);
+                  r.start();
+               }
+               catch (Exception ex)
+               {
+                  if (!isStoping)
+                  {
+                     log.warn("Exception whilst accepting new threads", ex);
+                  }
+                  break;
+               }
+            }
+         }
+      };
+      acceptor.setDaemon(true);
+      acceptor.start();
+   }
+
+   private ServerSocket createServerSocket()
+   {
+      int start_port1 = startPort;
+      server = null;
+
+      while (true)
+      {
+         try
+         {
+            server = new ServerSocket(start_port1, 50, bindAddr);
+         }
+         catch (BindException bindEx)
+         {
+            log.trace("Binding exception, most likely port " + start_port1 + " is in use. Trying next value. Error:"
+                  + bindEx.getMessage());
+            start_port1++;
+            continue;
+         }
+         catch (IOException ioEx)
+         {
+            log.warn("An exception appeared whilst trying to create server socket on port " + start_port1 + ", error:"
+                  + ioEx.getMessage());
+         }
+         break;
+      }
+      return server;
+   }
+
+   public void stop()
+   {
+      try
+      {
+         isStoping = true;
+         server.close();
+         log.trace("Successfully closed server socket " + server);
+      } catch (IOException e)
+      {
+         log.warn("Failed to close servet socket for " + server + ", error is " + e.getMessage());
+      }
+      connectionTable.close();
+      for (Iterator it = receivers.iterator(); it.hasNext();)
+      {
+         ReceiverThread thread = (ReceiverThread) it.next();
+         thread.stopThread();
+      }
+   }
+
+   public void destroy()
+   {
+      ;
+   }
+
+   public void setReceiver(Receiver r)
+   {
+      this.receiver = r;
+   }
+
+   public Map dumpStats()
+   {
+      return null;
+   }
+
+   public void send(Object destination, Object payload) throws Exception
+   {
+      if (destination != null)
+         throw new Exception("TcpTransport.send(): unicasts not supported");
+      connectionTable.writeMessage(payload);
+   }
+
+
+   class ConnectionTable
+   {
+      /**
+       * List<InetSocketAddress>
+       */
+      List myNodes;
+      final Connection[] connections;
+
+      ConnectionTable(List nodes) throws Exception
+      {
+         this.myNodes = nodes;
+         connections = new Connection[nodes.size()];
+      }
+
+
+      void init() throws Exception
+      {
+         int i = 0;
+         log.trace("Nodes is " + myNodes);
+         for (Iterator it = myNodes.iterator(); it.hasNext();)
+         {
+            InetSocketAddress addr = (InetSocketAddress) it.next();
+            if (connections[i] == null)
+            {
+               try
+               {
+                  connections[i] = new Connection(addr);
+                  connections[i].createSocket();
+               }
+               catch (ConnectException connect_ex)
+               {
+                  log.trace("-- failed to connect to " + addr);
+               }
+               catch (Exception all_others)
+               {
+                  throw all_others;
+               }
+            }
+            i++;
+         }
+      }
+
+      // todo: parallelize
+      void writeMessage(Object msg) throws Exception
+      {
+         int recieversCount = 0;
+         for (int i = 0; i < connections.length; i++)
+         {
+            Connection c = connections[i];
+            if (c != null)
+            {
+               try
+               {
+                  c.writeMessage(msg);
+                  recieversCount ++;
+               }
+               catch (Exception e)
+               {
+                  // System.err.println("failed sending msg on " + c);
+               }
+            }
+         }
+         log.trace("Message successfully sent to " + recieversCount + "/" + connections.length);
+      }
+
+      void close()
+      {
+         for (int i = 0; i < connections.length; i++)
+         {
+            Connection c = connections[i];
+            if (c != null)
+               c.close();
+         }
+      }
+
+      public String toString()
+      {
+         StringBuffer sb = new StringBuffer();
+         for (Iterator it = myNodes.iterator(); it.hasNext();)
+         {
+            sb.append(it.next()).append(' ');
+         }
+         return sb.toString();
+      }
+   }
+
+   class Connection
+   {
+      Socket sock = null;
+      DataOutputStream out;
+      InetSocketAddress to;
+      final Object mutex = new Object();
+
+      Connection(InetSocketAddress addr)
+      {
+         this.to = addr;
+      }
+
+      void createSocket() throws IOException
+      {
+         sock = new Socket(to.getAddress(), to.getPort());
+         sock.setSendBufferSize(max_send_buffer_size);
+         sock.setReceiveBufferSize(max_receiver_buffer_size);
+         out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream()));
+         log.trace("-- connected to " + to + ". Local address is " + sock.getLocalSocketAddress());
+      }
+
+      void writeMessage(Object msg) throws Exception
+      {
+         synchronized (mutex)
+         {
+            if (sock == null)
+            {
+               createSocket();
+            }
+            ObjectOutputStream oos = new ObjectOutputStream(out);
+            oos.writeObject(msg);
+         }
+         out.flush();
+      }
+
+
+      void close()
+      {
+         try
+         {
+            out.flush();
+            sock.close();
+         }
+         catch (Exception ex)
+         {
+         }
+      }
+
+      public String toString()
+      {
+         return "Connection from " + localAddr + " to " + to;
+      }
+   }
+
+
+   class ReceiverThread extends Thread
+   {
+      Socket sock;
+      DataInputStream in;
+      SocketAddress remote;
+
+      ReceiverThread(Socket sock) throws Exception
+      {
+         this.sock = sock;
+         // sock.setSoTimeout(5000);
+         in = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
+         remote = sock.getRemoteSocketAddress();
+      }
+
+      public void run()
+      {
+         while (sock != null)
+         {
+            try
+            {
+               Object message = new ObjectInputStream(in).readObject();
+               if (receiver != null)
+                  receiver.receive(remote, message);
+            }
+            catch (EOFException eof)
+            {
+               break;
+            }
+            catch (Exception ex)
+            {
+               break;
+            }
+         }
+         log.trace("-- receiver thread for " + remote + " terminated");
+      }
+
+      void stopThread()
+      {
+         try
+         {
+            sock.close();
+            sock = null;
+         }
+         catch (Exception ex)
+         {
+         }
+      }
+   }
+
+   public List parseCommaDelimitedList(String s) throws Exception
+   {
+      List retval = new ArrayList();
+      StringTokenizer tok;
+      String hostname, tmp;
+      int port;
+      InetSocketAddress addr;
+      int index;
+
+      if (s == null) return null;
+      tok = new StringTokenizer(s, ",");
+      while (tok.hasMoreTokens())
+      {
+         tmp = tok.nextToken();
+         index = tmp.indexOf(':');
+         if (index == -1)
+            throw new Exception("host must be in format <host:port>, was " + tmp);
+         hostname = tmp.substring(0, index);
+         port = Integer.parseInt(tmp.substring(index + 1));
+         addr = new InetSocketAddress(hostname, port);
+         retval.add(addr);
+      }
+      return retval;
+   }
+
+}

Added: cache-bench-fwk/trunk/src/org/cachebench/cluster/Transport.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/cluster/Transport.java	                        (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/cluster/Transport.java	2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,54 @@
+package org.cachebench.cluster;
+
+import org.cachebench.config.ClusterConfig;
+
+import java.util.Properties;
+import java.util.Map;
+
+/**
+ * Generic transport abstraction for all different transports (JGroups, JMS, UDP, TCP). The lifecycle is
+ * <ol>
+ * <li>Create an instance of the transport (using the empty constructor)
+ * <li>Call <code>create()</code>
+ * <li>Possibly call <code>setReceiver()</code>
+ * <li>Call <code>start()</code>
+ * <li>Call <code>send()</code>
+ * <li>Call <code>stop()</stop>
+ * <li>Call <code>destroy()</code> (alternatively call <code>start()</code> again)
+ * </ol>
+ * @author Bela Ban Jan 22
+ * @author 2004
+ * @version $Id: Transport.java,v 1.4 2006/12/19 08:51:47 belaban Exp $
+ */
+public interface Transport {
+
+    /** Create the transport
+     * @param clusterConfig*/
+    void create(ClusterConfig clusterConfig) throws Exception;
+
+    /** Get the local address (= endpoint) of this transport. Guaranteed to be called <em>after</em>
+     *  <code>create()</code>, possibly even later (after <code>start()</code>) */
+    Object getLocalAddress();
+
+    /** Start the transport */
+    void start() throws Exception;
+
+    /** Stop the transport */
+    void stop();
+
+    /** Destroy the transport. Transport cannot be reused after this call, but a new instance has to be created */
+    void destroy();
+
+    /** Set the receiver */
+    void setReceiver(Receiver r);
+
+    Map dumpStats();
+
+    /**
+     * Send a message
+     * @param destination A destination. If null, send a message to all members
+     * @param payload A buffer to be sent
+     * @throws Exception
+     */
+    void send(Object destination, Object payload) throws Exception;
+}

Added: cache-bench-fwk/trunk/src/org/cachebench/config/CacheWarmupConfig.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/config/CacheWarmupConfig.java	                        (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/config/CacheWarmupConfig.java	2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,34 @@
+package org.cachebench.config;
+
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ */
+public class CacheWarmupConfig
+{
+   private String warmupClass;
+
+   private Map<String, String> configParams = new HashMap<String, String>();
+
+   public void addConfigParam(NVPair nvPair) 
+   {
+      configParams.put(nvPair.getName(), nvPair.getValue());
+   }
+
+   public String getWarmupClass()
+   {
+      return warmupClass;
+   }
+
+   public void setWarmupClass(String warmupClass)
+   {
+      this.warmupClass = warmupClass;
+   }
+
+   public Map<String, String> getConfigParams()
+   {
+      return configParams;
+   }
+}

Added: cache-bench-fwk/trunk/src/org/cachebench/config/ClusterConfig.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/config/ClusterConfig.java	                        (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/config/ClusterConfig.java	2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,91 @@
+package org.cachebench.config;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Configuration for this cache instance.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public class ClusterConfig
+{
+   private int currentNodeIndex = -1;
+
+   private List<NodeAddress> members = new ArrayList<NodeAddress>();
+
+   private String bindAddress;
+
+   public int getCurrentNodeIndex()
+   {
+      if (currentNodeIndex == -1)
+      {
+         String serverindexStr = System.getProperty("currentIndex");
+         try
+         {
+            currentNodeIndex = Integer.parseInt(serverindexStr);
+         } catch (NumberFormatException e)
+         {
+            throw new IllegalStateException("Configuration 'currentIndex' is missing!");
+         }
+      }
+      return currentNodeIndex;
+   }
+
+   public List<NodeAddress> getMembers()
+   {
+      return members;
+   }
+
+   public int getPortForThisNode()
+   {
+      NodeAddress address = members.get(getCurrentNodeIndex());
+      return Integer.parseInt(address.getPort());
+   }
+
+   public int getClusterSize()
+   {
+      return members.size();
+   }
+
+   public boolean isMaster()
+   {
+      return getCurrentNodeIndex() == 0;
+   }
+
+   public void addMember(NodeAddress member)
+   {
+      members.add(member);
+   }
+
+   public void setCurrentNodeIndex(int currentNodeIndex)
+   {
+      this.currentNodeIndex = currentNodeIndex;
+   }
+
+   public String getBindAddress()
+   {
+      return bindAddress;
+   }
+
+   public void setBindAddress(String bindAddress)
+   {
+      this.bindAddress = bindAddress;
+   }
+
+   public List<InetSocketAddress> getMemberAddresses()
+   {
+      List<InetSocketAddress> result = new ArrayList();
+      for (NodeAddress address : getMembers())
+      {
+         result.add(new InetSocketAddress(address.getHost(), address.getPortAsInt()));
+      }
+      return result;
+   }
+
+   public String toString()
+   {
+      return "{bindAddress:" + bindAddress + ", members:" + members + "}";
+   }
+}

Added: cache-bench-fwk/trunk/src/org/cachebench/config/ConfigBuilder.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/config/ConfigBuilder.java	                        (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/config/ConfigBuilder.java	2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,107 @@
+package org.cachebench.config;
+
+import org.apache.commons.digester.Digester;
+
+import java.net.URL;
+import java.net.MalformedURLException;
+import java.io.File;
+
+/**
+ * Helper class for loading configurations.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public class ConfigBuilder
+{
+   /**
+    * Util method to locate a resource on the filesystem as a URL
+    *
+    * @param filename
+    * @return The URL object of the file
+    */
+   private static URL findOnFS(String filename)
+   {
+      File f = new File(filename);
+      try
+      {
+         if (f.exists()) return f.toURL();
+      }
+      catch (MalformedURLException mue)
+      {
+         // bad URL
+      }
+      return null;
+   }
+
+   /**
+    * Looks for config file on disk then on class path.
+    * @return null if the file cannot be found
+    */
+   public static URL findConfigFile(String s)
+   {
+      URL confFile = findOnFS(s);
+      if (confFile == null)
+      {
+         confFile = findInClasspath(s);
+      }
+      return confFile;
+   }
+
+   public static Configuration parseConfiguration(URL url) throws Exception
+   {
+      Digester digester = new Digester();
+      // set up the digester rules.
+      digester.setValidating(false);
+      digester.addObjectCreate("cachebench", "org.cachebench.config.Configuration");
+      digester.addSetProperties("cachebench");
+
+      digester.addObjectCreate("cachebench/cluster", "org.cachebench.config.ClusterConfig");
+      digester.addSetProperties("cachebench/cluster");
+      digester.addObjectCreate("cachebench/cluster/member", "org.cachebench.config.NodeAddress");
+      digester.addSetProperties("cachebench/cluster/member");
+      digester.addSetNext("cachebench/cluster/member", "addMember", "org.cachebench.config.NodeAddress");
+      digester.addSetNext("cachebench/cluster", "setClusterConfig", "org.cachebench.config.ClusterConfig");
+
+      digester.addObjectCreate("cachebench/testcase", "org.cachebench.config.TestCase");
+      digester.addSetProperties("cachebench/testcase");
+
+      digester.addObjectCreate("cachebench/testcase/warmup", "org.cachebench.config.CacheWarmupConfig");
+      digester.addSetProperties("cachebench/testcase/warmup");
+
+      digester.addObjectCreate("cachebench/testcase/warmup/param", "org.cachebench.config.NVPair");
+      digester.addSetProperties("cachebench/testcase/warmup/param");
+      digester.addSetNext("cachebench/testcase/warmup/param", "addConfigParam", "org.cachebench.config.NVPair");
+
+      digester.addSetNext("cachebench/testcase/warmup", "setCacheWarmupConfig", "org.cachebench.config.CacheWarmupConfig");
+
+      digester.addObjectCreate("cachebench/testcase/test", "org.cachebench.config.TestConfig");
+      digester.addSetProperties("cachebench/testcase/test");
+      digester.addSetNext("cachebench/testcase/test", "addTest", "org.cachebench.config.TestConfig");
+
+      digester.addObjectCreate("cachebench/testcase/param", "org.cachebench.config.NVPair");
+      digester.addSetProperties("cachebench/testcase/param");
+
+      digester.addSetNext("cachebench/testcase/param", "addParam", "org.cachebench.config.NVPair");
+      digester.addSetNext("cachebench/testcase", "addTestCase", "org.cachebench.config.TestCase");
+
+      digester.addObjectCreate("cachebench/report", "org.cachebench.config.Report");
+      digester.addSetProperties("cachebench/report");
+
+      digester.addObjectCreate("cachebench/report/param", "org.cachebench.config.NVPair");
+      digester.addSetProperties("cachebench/report/param");
+      digester.addSetNext("cachebench/report/param", "addParam", "org.cachebench.config.NVPair");
+
+      digester.addSetNext("cachebench/report", "addReport", "org.cachebench.config.Report");
+      return (Configuration) digester.parse(url.openStream());
+   }
+
+   /**
+     * Util method to locate a resource in your classpath
+     */
+    private static URL findInClasspath(String filename)
+    {
+       return ConfigBuilder.class.getClassLoader().getResource(filename);
+    }
+
+
+}

Modified: cache-bench-fwk/trunk/src/org/cachebench/config/Configuration.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/config/Configuration.java	2007-12-19 17:49:35 UTC (rev 4885)
+++ cache-bench-fwk/trunk/src/org/cachebench/config/Configuration.java	2007-12-19 21:51:57 UTC (rev 4886)
@@ -15,6 +15,8 @@
    private boolean gcBetweenTestsEnabled;
    private boolean emptyCacheBetweenTests;
 
+   private ClusterConfig clusterConfig;
+
    private List<TestCase> testCases = new ArrayList<TestCase>();
 
    private List<Report> reports = new ArrayList<Report>();
@@ -90,4 +92,14 @@
    {
       this.numThreads = numThreads;
    }
+
+   public ClusterConfig getClusterConfig()
+   {
+      return clusterConfig;
+   }
+
+   public void setClusterConfig(ClusterConfig clusterConfig)
+   {
+      this.clusterConfig = clusterConfig;
+   }
 }

Added: cache-bench-fwk/trunk/src/org/cachebench/config/NodeAddress.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/config/NodeAddress.java	                        (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/config/NodeAddress.java	2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,58 @@
+package org.cachebench.config;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 2.2
+ */
+public class NodeAddress
+{
+   private String host;
+
+   private String port;
+
+   public String getHost()
+   {
+      return host;
+   }
+
+   public void setHost(String host)
+   {
+      this.host = host;
+   }
+
+   public String getPort()
+   {
+      return port;
+   }
+
+   public void setPort(String port)
+   {
+      this.port = port;
+   }
+
+   public int getPort(int defaultValue)
+   {
+      int portInt = Integer.parseInt(port);
+      if (portInt <= 0)
+      {
+         return defaultValue;
+      }
+      else
+      {
+         return portInt;
+      }
+   }
+
+   public String toString()
+   {
+      return host + ':' + port;
+   }
+
+   public int getPortAsInt()
+   {
+      return Integer.parseInt(port);
+   }
+}

Modified: cache-bench-fwk/trunk/src/org/cachebench/config/TestCase.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/config/TestCase.java	2007-12-19 17:49:35 UTC (rev 4885)
+++ cache-bench-fwk/trunk/src/org/cachebench/config/TestCase.java	2007-12-19 21:51:57 UTC (rev 4886)
@@ -1,5 +1,7 @@
 package org.cachebench.config;
 
+import org.cachebench.warmup.CacheWarmup;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
@@ -13,7 +15,19 @@
    private Properties params = new Properties();
    private List<TestConfig> tests = new ArrayList<TestConfig>();
 
+   private CacheWarmupConfig cacheWarmupConfig;
 
+
+   public CacheWarmupConfig getCacheWarmupConfig()
+   {
+      return cacheWarmupConfig;
+   }
+
+   public void setCacheWarmupConfig(CacheWarmupConfig cacheWarmupConfig)
+   {
+      this.cacheWarmupConfig = cacheWarmupConfig;
+   }
+
    public String getName()
    {
       return name;

Modified: cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/AbstractReportGenerator.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/AbstractReportGenerator.java	2007-12-19 17:49:35 UTC (rev 4885)
+++ cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/AbstractReportGenerator.java	2007-12-19 21:51:57 UTC (rev 4886)
@@ -2,10 +2,10 @@
 
 import org.apache.commons.logging.Log;
 import org.cachebench.TestResult;
+import org.cachebench.config.ClusterConfig;
 
 import java.io.File;
 import java.util.List;
-import java.util.Map;
 
 /**
  * Base implementation of {@link org.cachebench.reportgenerators.ReportGenerator}
@@ -15,6 +15,7 @@
    protected File output;
    protected List<TestResult> results;
    protected Log log;
+   protected ClusterConfig clusterConfig;
 
    public void setOutputFile(File output)
    {
@@ -25,4 +26,9 @@
    {
       this.results = results;
    }
+
+   public void setClusterConfig(ClusterConfig clusterConfig)
+   {
+      this.clusterConfig = clusterConfig;
+   }
 }

Modified: cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/ClusterReportGenerator.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/ClusterReportGenerator.java	2007-12-19 17:49:35 UTC (rev 4885)
+++ cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/ClusterReportGenerator.java	2007-12-19 21:51:57 UTC (rev 4886)
@@ -3,124 +3,64 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.cachebench.TestResult;
+import org.cachebench.cluster.ClusterBarrier;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.net.*;
+import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 /**
  * Gathers info from all the nodes executing tests.
- * Merges all the gathered information and generates an CSV file based on that.
+ * Merges all the gathered information and generates an CSV file.
+ * The file is generated on the master node, i.e. the node that has the NODE_INDEX == 0
  *
  * @author Mircea.Markus at jboss.com
- * @version 2.2
  */
 public class ClusterReportGenerator extends AbstractReportGenerator
 {
    private static Log log = LogFactory.getLog(ClusterReportGenerator.class);
 
-   private int clusterSize;
-
-   private int masterPort;
-
-   private String masterHost;
-
    public void setConfigParams(Map<String, String> configParams)
    {
       log.trace("Received config params: " + configParams);
-      this.clusterSize = Integer.parseInt(configParams.get("clusterSize"));
-      this.masterPort = Integer.parseInt(configParams.get("masterPort"));
-      this.masterHost = configParams.get("masterHost");
    }
 
    public void generate() throws Exception
    {
       try
       {
-         log.trace("Cluster report generator property value: " + System.getProperty("cluterReportGenerator"));
-         boolean isGeneratorNode = System.getProperty("cluterReportGenerator") != null
-               && "true".equalsIgnoreCase(System.getProperty("cluterReportGenerator"));
-         log.trace(" Starting generating. Is master? " + isGeneratorNode);
-         if (isGeneratorNode)
+         ClusterBarrier barrier = new ClusterBarrier();
+         barrier.setConfig(this.clusterConfig);
+         barrier.setAcknowledge(false);
+         barrier.barrier(results);
+         log.trace(" Starting generating. Is master? " + clusterConfig.isMaster());
+         if (clusterConfig.isMaster())
          {
-            generateReport();
+            log.trace("Received following results: " + results);
+            generateReport(barrier.getReceivedMessages());
          }
-         else
-         {
-            submitReportInfo();
-         }
       } catch (Exception e)
       {
          log.error("Error while generating report!", e);
       }
    }
 
-   /**
-    * If not a master, sends report data to master.
-    */
-   private void submitReportInfo() throws IOException, ClassNotFoundException
-   {
-      log.trace("Sending the following results to  master: " + results);
-      boolean connected = false;
-      Socket socket = null;
-      while (!connected)
-      {
-         try
-         {
-            log.trace("Connecting to master on " + masterHost + ":" + masterPort + "...");
-            socket = new Socket(masterHost, masterPort);
-            log.trace("Connected");
-            connected = true;
-         } catch (IOException e)
-         {
-            log.trace("Failed to connect(" + e.getMessage() + "), trying again..." );
-            connected = false;
-         }
-      }
-      ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
-      oos.writeObject(this.results);
-      oos.close();
-      socket.close();
-      log.trace("Following results were sent to server: " + results);
-   }
 
-   private void generateReport() throws Exception
+   private void generateReport(Map<SocketAddress, Object> receivedMessages) throws Exception
    {
-      List<List<TestResult>> results = retrieveResultsFromClients();
-      List<TestResult> mergedResults = mergerTestResultsAndGenerateReport(results);
-      generateReportFile(mergedResults);
-   }
-
-   @SuppressWarnings(value = "unchecked")
-   private List<List<TestResult>> retrieveResultsFromClients()
-         throws IOException, ClassNotFoundException
-   {
-      ServerSocket socket = new ServerSocket(masterPort);
+      log.trace("Received " + receivedMessages.size() + " results!");
       List<List<TestResult>> results = new ArrayList<List<TestResult>>();
-      results.add(this.results);
-      //we go expect cluster size-1 because this is also a node, ant it does not sent report info 
-      for (int i = 0; i < clusterSize - 1; i++)
+      for (SocketAddress socketAddress : receivedMessages.keySet())
       {
-         log.trace("Expecting " + (clusterSize - i - 1) + " more client(s)");
-         log.trace(" Waiting for client to conect...");
-         Socket client = socket.accept();
-         log.trace("Client connected: " + client.getInetAddress());
-         ObjectInputStream ois = new ObjectInputStream(client.getInputStream());
-         List<TestResult> testResults = (List<TestResult>) ois.readObject();
+         List<TestResult> testResults = (List<TestResult>) receivedMessages.get(socketAddress);
+         log.trace("From " + socketAddress + " received " + testResults);
          results.add(testResults);
-         log.trace("Received following resullts from client: " + testResults);
-         ois.close();
-         client.close();
       }
-      socket.close();
-      return results;
+      List<TestResult> mergedResults = mergerTestResultsAndGenerateReport(results);
+      generateReportFile(mergedResults);
    }
 
-
    private void generateReportFile(List<TestResult> mergedResults) throws Exception
    {
       CSVReportGenerator generator = new CSVReportGenerator();

Modified: cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/ReportGenerator.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/ReportGenerator.java	2007-12-19 17:49:35 UTC (rev 4885)
+++ cache-bench-fwk/trunk/src/org/cachebench/reportgenerators/ReportGenerator.java	2007-12-19 21:51:57 UTC (rev 4886)
@@ -1,6 +1,7 @@
 package org.cachebench.reportgenerators;
 
 import org.cachebench.TestResult;
+import org.cachebench.config.ClusterConfig;
 
 import java.io.File;
 import java.util.List;
@@ -13,11 +14,13 @@
  */
 public interface ReportGenerator
 {
-   public abstract void setConfigParams(Map<String, String> configParams);
+   public void setConfigParams(Map<String, String> configParams);
    
    public void setOutputFile(File output);
 
    public void setResults(List<TestResult> results);
 
    public void generate() throws Exception;
+
+   public void setClusterConfig(ClusterConfig clusterConfig);
 }

Added: cache-bench-fwk/trunk/src/org/cachebench/warmup/CacheWarmup.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/warmup/CacheWarmup.java	                        (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/warmup/CacheWarmup.java	2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,40 @@
+package org.cachebench.warmup;
+
+import org.cachebench.CacheWrapper;
+import org.cachebench.config.NVPair;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Warmups the cache.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 2.2
+ */
+public abstract class CacheWarmup
+{
+
+   private Map<String, String> configParams = new HashMap<String, String>();
+
+   public void setConfigParams(Map<String, String> configParams)
+   {
+      this.configParams = configParams;
+   }
+
+   public String getConfigParam(String name)
+   {
+      return configParams.get(name);
+   }
+
+   /**
+    * Calls {@link #performWarmupOperations(CacheWrapper)} amd clears the cache.
+    */
+   public final void warmup(CacheWrapper cacheWrapper) throws Exception
+   {
+      performWarmupOperations(cacheWrapper);
+      cacheWrapper.empty();
+   }
+
+   public abstract void performWarmupOperations(CacheWrapper wrapper) throws Exception;
+}

Added: cache-bench-fwk/trunk/src/org/cachebench/warmup/NoCacheWarmup.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/warmup/NoCacheWarmup.java	                        (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/warmup/NoCacheWarmup.java	2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,19 @@
+package org.cachebench.warmup;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.cachebench.CacheWrapper;
+
+/**
+ * Does not warmup the cache.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public class NoCacheWarmup extends CacheWarmup
+{
+   Log log = LogFactory.getFactory().getInstance(NoCacheWarmup.class);
+   public void performWarmupOperations(CacheWrapper wrapper) throws Exception
+   {
+      log.info("Using no cache warmup");      
+   }
+}

Added: cache-bench-fwk/trunk/src/org/cachebench/warmup/PutGetCacheWarmup.java
===================================================================
--- cache-bench-fwk/trunk/src/org/cachebench/warmup/PutGetCacheWarmup.java	                        (rev 0)
+++ cache-bench-fwk/trunk/src/org/cachebench/warmup/PutGetCacheWarmup.java	2007-12-19 21:51:57 UTC (rev 4886)
@@ -0,0 +1,32 @@
+package org.cachebench.warmup;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.cachebench.CacheWrapper;
+
+/**
+ * Perfoms N puts, gets and removals, where n is configurable.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 2.2
+ */
+public class PutGetCacheWarmup extends CacheWarmup
+{
+   Log log = LogFactory.getLog(PutGetCacheWarmup.class);
+
+   public void performWarmupOperations(CacheWrapper wrapper) throws Exception
+   {
+      Integer opCount = Integer.parseInt(getConfigParam("operationCount"));
+      log.trace("Cache launched, performing " + opCount + " put and get operations ");
+      for (int i = 0; i < opCount; i++)
+      {
+         wrapper.put(String.valueOf(opCount), String.valueOf(opCount));
+      }
+
+      for (int i = 0; i < opCount; i++)
+      {
+         wrapper.get(String.valueOf(opCount));
+      }
+      log.trace("Cache warmup ended!");
+   }
+}




More information about the jbosscache-commits mailing list