[jbosscache-commits] JBoss Cache SVN: r5247 - in benchmarks/benchmark-fwk/trunk: src/org/cachebench and 1 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Mon Jan 28 12:02:27 EST 2008


Author: mircea.markus
Date: 2008-01-28 12:02:27 -0500 (Mon, 28 Jan 2008)
New Revision: 5247

Modified:
   benchmarks/benchmark-fwk/trunk/TODO
   benchmarks/benchmark-fwk/trunk/src/org/cachebench/CacheBenchmarkRunner.java
   benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java
   benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java
Log:
enhanced the barrier - if an node receives an barrier named differently it will exit. This means that stale processes are running within the cluster

Modified: benchmarks/benchmark-fwk/trunk/TODO
===================================================================
--- benchmarks/benchmark-fwk/trunk/TODO	2008-01-28 15:00:27 UTC (rev 5246)
+++ benchmarks/benchmark-fwk/trunk/TODO	2008-01-28 17:02:27 UTC (rev 5247)
@@ -1,3 +1,4 @@
 1.  JBoss Cache 1.4.1 breaks during warmup
 2.  ReplicationOccursTest doesn't always work - reports false errors.
+	- it relies on cache.getReplicatedData(); most likely that is the cause of the problem, refere to javadoc
 3.  Replace cluster.sh, runNode.sh, killNode.sh and cache-product/product-x.y.z/config.sh with Ant scripts to make them cross-platform.

Modified: benchmarks/benchmark-fwk/trunk/src/org/cachebench/CacheBenchmarkRunner.java
===================================================================
--- benchmarks/benchmark-fwk/trunk/src/org/cachebench/CacheBenchmarkRunner.java	2008-01-28 15:00:27 UTC (rev 5246)
+++ benchmarks/benchmark-fwk/trunk/src/org/cachebench/CacheBenchmarkRunner.java	2008-01-28 17:02:27 UTC (rev 5247)
@@ -97,7 +97,7 @@
       List<TestResult> results = new ArrayList<TestResult>();
       for (TestCase test : conf.getTestCases())
       {
-         CacheWrapper cache;
+         CacheWrapper cache = null;
          try
          {
             cache = getCacheWrapperInstance(test);
@@ -123,6 +123,13 @@
          }
          catch (Exception e)
          {
+            try
+            {
+               shutdownCache(cache);
+            } catch (Exception e1)
+            {
+               //ignore
+            }
             logger.warn("Unable to Initialize or Setup the Cache - Not performing any tests", e);
             errorLogger.error("Unable to Initialize or Setup the Cache: " + test.getCacheWrapper(), e);
             errorLogger.error("Skipping this test");

Modified: benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java
===================================================================
--- benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java	2008-01-28 15:00:27 UTC (rev 5246)
+++ benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java	2008-01-28 17:02:27 UTC (rev 5247)
@@ -19,11 +19,13 @@
 
    private ClusterConfig config;
    public final Map<SocketAddress, Object> receivedMessages = new HashMap<SocketAddress, Object>();
-   private Transport transport;
+   private TcpTransport transport;
    private Object message;
    private int numMembers;
    private boolean acknowledge;
    private static final String ACK = "_ACK";
+   private boolean failOnWrongMessaages;
+   private String errorMessage;
 
 
    /**
@@ -40,6 +42,7 @@
    public void setAcknowledge(boolean acknowledge)
    {
       this.acknowledge = acknowledge;
+      this.failOnWrongMessaages = acknowledge;
    }
 
    /**
@@ -76,6 +79,14 @@
             receivedAllMessages = receivedMessages.size() >= numMembers;
             if (!receivedAllMessages)
             {
+               if (errorMessage != null)
+               {
+                  //might be that I am the intruder, give other members a chance to fail aswell
+                  transport.send(null, message);
+                  transport.stop();
+                  Thread.sleep(2000);
+                  throw new IllegalStateException(errorMessage);
+               }
                receivedMessages.wait(2000);
             }
          }
@@ -91,19 +102,29 @@
 
    public void receive(SocketAddress sender, Object payload) throws Exception
    {
-      log.trace("Received '" + payload + "' from " + sender + " still expecting " + getMissingMembersCount() + " member(s)");
+      log.trace("Received '" + payload + "' from " + formatName(sender) + " still expecting " + getMissingMembersCount() + " member(s)");
       if (payload == null)
       {
          log.warn("payload is incorrect (sender=" + sender + "): " + payload);
          return;
       }
-      if (acknowledge && !isAcknowledgeMessage(payload, message))
+      if (acknowledge && !isAcknowledgeMessage(payload))
       {
          log.trace("Sending ack, still expecting " + getMissingMembersCount() + " members.");
          transport.send(null, getAcknowledgeMessage(message));
          return;
       }
 
+      if (failOnWrongMessaages && !message.equals(payload) && !getAcknowledgeMessage(message).equals(payload))
+      {
+         errorMessage = "We recieved an message from a differenet barrier. This normally means that there is an stale " +
+               "barrier running somewhere.The source of the message is '" + sender + "', message is:'" + payload + "', " +
+               "and we were expecting '" + message + "'";
+         log.error(errorMessage);
+         this.receivedMessages.notifyAll();
+
+      }      
+
       //we are here if either no ack or ack the message is an ack message
       synchronized (this.receivedMessages)
       {
@@ -113,10 +134,18 @@
             int expected = getMissingMembersCount();
             log.trace("Sender " + sender + " registered, still waiting for " + expected + " member(s)");
             this.receivedMessages.notifyAll();
+         } else {
+            log.trace("Sender '" + formatName(sender) + "' is already registered in the list of known senders!");
          }
+         log.trace("Current list of senders is: " + receivedMessages.keySet());
       }
    }
 
+   private String formatName(SocketAddress sender)
+   {
+      return transport.isLocal(sender) ? "<local(" + sender + ")>" : String.valueOf(sender);
+   }
+
    private int getMissingMembersCount()
    {
       return numMembers - receivedMessages.size();
@@ -138,13 +167,9 @@
       return message.toString() + ACK;
    }
 
-   private boolean isAcknowledgeMessage(Object payload, Object message)
+   private boolean isAcknowledgeMessage(Object payload)
    {
-      boolean result;
-      String payloadStr = payload.toString();
-      result = payloadStr.equals(getAcknowledgeMessage(message));
-      log.trace("Is acknowledge? " + result);
-      return result;
+      return payload == null ? false : (payload.toString().indexOf(ACK) >= 0);
    }
 
    public void setConfig(ClusterConfig config)

Modified: benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java
===================================================================
--- benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java	2008-01-28 15:00:27 UTC (rev 5246)
+++ benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java	2008-01-28 17:02:27 UTC (rev 5247)
@@ -47,7 +47,6 @@
    public void create(ClusterConfig clusterConfig) throws Exception
    {
       this.config = clusterConfig;
-      String tmp;
       startPort = config.getPortForThisNode();
       String bindAddrStr = config.getBindAddress();
       if (bindAddrStr != null)
@@ -121,7 +120,7 @@
          }
          catch (IOException ioEx)
          {
-            log.warn("An exception appeared whilst trying to create server socket on port " + start_port1 + ", error:"
+            log.trace("An exception appeared whilst trying to create server socket on port " + start_port1 + ", error:"
                   + ioEx.getMessage());
          }
          break;
@@ -173,13 +172,10 @@
 
    class ConnectionTable
    {
-      /**
-       * List<InetSocketAddress>
-       */
-      List myNodes;
+      List<InetSocketAddress> myNodes;
       final Connection[] connections;
 
-      ConnectionTable(List nodes) throws Exception
+      ConnectionTable(List<InetSocketAddress> nodes) throws Exception
       {
          this.myNodes = nodes;
          connections = new Connection[nodes.size()];
@@ -190,9 +186,8 @@
       {
          int i = 0;
          log.trace("Nodes is " + myNodes);
-         for (Iterator it = myNodes.iterator(); it.hasNext();)
+         for (InetSocketAddress addr : myNodes)
          {
-            InetSocketAddress addr = (InetSocketAddress) it.next();
             if (connections[i] == null)
             {
                try
@@ -204,10 +199,6 @@
                {
                   log.trace("-- failed to connect to " + addr);
                }
-               catch (Exception all_others)
-               {
-                  throw all_others;
-               }
             }
             i++;
          }
@@ -255,6 +246,19 @@
          }
          return sb.toString();
       }
+
+      public boolean isLocalConnection(SocketAddress socketAddress)
+      {
+         for (Connection conn : connections)
+         {
+            SocketAddress addr = conn.sock != null ? conn.sock.getLocalSocketAddress() : null;
+            if (addr != null && addr.equals(socketAddress))
+            {
+               return true;
+            }
+         }
+         return false;
+      }
    }
 
    class Connection
@@ -336,14 +340,9 @@
                if (receiver != null)
                   receiver.receive(remote, message);
             }
-            catch (EOFException eof)
-            {
+            catch (Exception e) {
                break;
             }
-            catch (Exception ex)
-            {
-               break;
-            }
          }
          log.trace("-- receiver thread for " + remote + " terminated");
       }
@@ -352,11 +351,15 @@
       {
          try
          {
+            log.trace("Closing receiver thread for: " + sock);
             sock.close();
+            in.close();
             sock = null;
+            this.interrupt();
          }
          catch (Exception ex)
          {
+            log.warn("Exception while closing the thread", ex);
          }
       }
    }
@@ -386,4 +389,9 @@
       return retval;
    }
 
+   public boolean isLocal(SocketAddress sa)
+   {
+      return connectionTable.isLocalConnection(sa);
+   }
+
 }




More information about the jbosscache-commits mailing list