[jbosscache-commits] JBoss Cache SVN: r6338 - in benchmarks/benchmark-fwk/trunk/src/org/cachebench: tests and 1 other directory.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Fri Jul 18 12:48:46 EDT 2008


Author: mircea.markus
Date: 2008-07-18 12:48:45 -0400 (Fri, 18 Jul 2008)
New Revision: 6338

Added:
   benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/Message.java
Modified:
   benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java
   benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java
   benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/Transport.java
   benchmarks/benchmark-fwk/trunk/src/org/cachebench/tests/ReplicationOccursTest.java
Log:
enhanced logging

Modified: benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java
===================================================================
--- benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java	2008-07-18 15:52:27 UTC (rev 6337)
+++ benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java	2008-07-18 16:48:45 UTC (rev 6338)
@@ -82,7 +82,7 @@
                if (errorMessage != null)
                {
                   //might be that I am the intruder, give other members a chance to fail aswell
-                  transport.send(null, message);
+                  transport.send(message);
                   transport.stop();
                   Thread.sleep(2000);
                   throw new IllegalStateException(errorMessage);
@@ -91,11 +91,11 @@
             }
          }
          log.trace("sending message " + message + ", expecting " + getMissingMembersCount() + " member(s)");
-         transport.send(null, message);
+         transport.send(message);
          if (acknowledge)
          {
             log.trace("Send ack also");
-            transport.send(null, getAcknowledgeMessage(message));
+            transport.send(getAcknowledgeMessage(message));
          }
       }
    }
@@ -111,7 +111,7 @@
       if (acknowledge && !isAcknowledgeMessage(payload))
       {
          log.trace("Sending ack, still expecting " + getMissingMembersCount() + " members.");
-         transport.send(null, getAcknowledgeMessage(message));
+         transport.send(getAcknowledgeMessage(message));
          return;
       }
 

Added: benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/Message.java
===================================================================
--- benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/Message.java	                        (rev 0)
+++ benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/Message.java	2008-07-18 16:48:45 UTC (rev 6338)
@@ -0,0 +1,27 @@
+package org.cachebench.cluster;
+
+import java.io.Serializable;
+import java.net.SocketAddress;
+import java.net.InetSocketAddress;
+
+/**
+ * Encapsulates an message to be send on the wire.
+ * @author Mircea.Markus at jboss.com
+ */
+public class Message implements Serializable {
+   private InetSocketAddress source;
+   private Object payload;
+
+   public Message(InetSocketAddress source, Object payload) {
+      this.source = source;
+      this.payload = payload;
+   }
+
+   public InetSocketAddress getSource() {
+      return source;
+   }
+
+   public Object getPayload() {
+      return payload;
+   }
+}

Modified: benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java
===================================================================
--- benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java	2008-07-18 15:52:27 UTC (rev 6337)
+++ benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java	2008-07-18 16:48:45 UTC (rev 6338)
@@ -29,7 +29,7 @@
    int startPort = 7800;
    ServerSocket srvSock = null;
    InetAddress bindAddr = null;
-   SocketAddress localAddr = null;
+   InetSocketAddress localAddr = null;
    List receivers = new ArrayList();
    private ServerSocket server;
    private boolean isStoping;
@@ -162,10 +162,8 @@
       return null;
    }
 
-   public void send(Object destination, Object payload) throws Exception
+   public void send(Object payload) throws Exception
    {
-      if (destination != null)
-         throw new Exception("TcpTransport.send(): unicasts not supported");
       connectionTable.writeMessage(payload);
    }
 
@@ -208,19 +206,14 @@
       void writeMessage(Object msg) throws Exception
       {
          int recieversCount = 0;
-         for (int i = 0; i < connections.length; i++)
-         {
-            Connection c = connections[i];
-            if (c != null)
-            {
-               try
-               {
+         for (Connection c : connections) {
+            if (c != null) {
+               try {
                   c.writeMessage(msg);
-                  recieversCount ++;
+                  recieversCount++;
                }
-               catch (Exception e)
-               {
-                  // System.err.println("failed sending msg on " + c);
+               catch (Exception e) {
+                  log.warn("failure sending message to " + c, e);
                }
             }
          }
@@ -291,7 +284,8 @@
                createSocket();
             }
             ObjectOutputStream oos = new ObjectOutputStream(out);
-            oos.writeObject(msg);
+            Message message = new Message(localAddr, msg);
+            oos.writeObject(message);
          }
          out.flush();
       }
@@ -306,6 +300,7 @@
          }
          catch (Exception ex)
          {
+            log.warn("problems closing the connection", ex);
          }
       }
 
@@ -336,9 +331,9 @@
          {
             try
             {
-               Object message = new ObjectInputStream(in).readObject();
+               Message message = (Message) new ObjectInputStream(in).readObject();
                if (receiver != null)
-                  receiver.receive(remote, message);
+                  receiver.receive(message.getSource(), message.getPayload());
             }
             catch (Exception e) {
                break;

Modified: benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/Transport.java
===================================================================
--- benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/Transport.java	2008-07-18 15:52:27 UTC (rev 6337)
+++ benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/Transport.java	2008-07-18 16:48:45 UTC (rev 6338)
@@ -46,9 +46,8 @@
 
     /**
      * Send a message
-     * @param destination A destination. If null, send a message to all members
      * @param payload A buffer to be sent
      * @throws Exception
      */
-    void send(Object destination, Object payload) throws Exception;
+    void send(Object payload) throws Exception;
 }

Modified: benchmarks/benchmark-fwk/trunk/src/org/cachebench/tests/ReplicationOccursTest.java
===================================================================
--- benchmarks/benchmark-fwk/trunk/src/org/cachebench/tests/ReplicationOccursTest.java	2008-07-18 15:52:27 UTC (rev 6337)
+++ benchmarks/benchmark-fwk/trunk/src/org/cachebench/tests/ReplicationOccursTest.java	2008-07-18 16:48:45 UTC (rev 6338)
@@ -212,7 +212,7 @@
       {
          totalValue += Integer.valueOf(val.toString());
       }
-      log.info("********** Overall replication count is: " + totalValue);
+      log.info("Overall replication count is: " + totalValue);
       //this means SOME replication occurred. This does not mean, though, that all nodes replicated successfuly.
       //correct condition would be >= this.conf.getClusterConfig().getClusterSize()
       //todo/FIXME - this seem not to work with all the products, so we will accept 'some replication'
@@ -221,7 +221,6 @@
          log.warn("The replication was not total, but partial!!");
       }
       boolean isReplicationSuccess = totalValue > 0;
-      log.info("Is replication passed? " + isReplicationSuccess);
       return isReplicationSuccess;
    }
 }




More information about the jbosscache-commits mailing list