[jbosscache-commits] JBoss Cache SVN: r6338 - in benchmarks/benchmark-fwk/trunk/src/org/cachebench: tests and 1 other directory.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Fri Jul 18 12:48:46 EDT 2008
Author: mircea.markus
Date: 2008-07-18 12:48:45 -0400 (Fri, 18 Jul 2008)
New Revision: 6338
Added:
benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/Message.java
Modified:
benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java
benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java
benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/Transport.java
benchmarks/benchmark-fwk/trunk/src/org/cachebench/tests/ReplicationOccursTest.java
Log:
enhanced logging
Modified: benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java
===================================================================
--- benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java 2008-07-18 15:52:27 UTC (rev 6337)
+++ benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java 2008-07-18 16:48:45 UTC (rev 6338)
@@ -82,7 +82,7 @@
if (errorMessage != null)
{
//might be that I am the intruder, give other members a chance to fail aswell
- transport.send(null, message);
+ transport.send(message);
transport.stop();
Thread.sleep(2000);
throw new IllegalStateException(errorMessage);
@@ -91,11 +91,11 @@
}
}
log.trace("sending message " + message + ", expecting " + getMissingMembersCount() + " member(s)");
- transport.send(null, message);
+ transport.send(message);
if (acknowledge)
{
log.trace("Send ack also");
- transport.send(null, getAcknowledgeMessage(message));
+ transport.send(getAcknowledgeMessage(message));
}
}
}
@@ -111,7 +111,7 @@
if (acknowledge && !isAcknowledgeMessage(payload))
{
log.trace("Sending ack, still expecting " + getMissingMembersCount() + " members.");
- transport.send(null, getAcknowledgeMessage(message));
+ transport.send(getAcknowledgeMessage(message));
return;
}
Added: benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/Message.java
===================================================================
--- benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/Message.java (rev 0)
+++ benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/Message.java 2008-07-18 16:48:45 UTC (rev 6338)
@@ -0,0 +1,27 @@
+package org.cachebench.cluster;
+
+import java.io.Serializable;
+import java.net.SocketAddress;
+import java.net.InetSocketAddress;
+
+/**
+ * Encapsulates an message to be send on the wire.
+ * @author Mircea.Markus at jboss.com
+ */
+public class Message implements Serializable {
+ private InetSocketAddress source;
+ private Object payload;
+
+ public Message(InetSocketAddress source, Object payload) {
+ this.source = source;
+ this.payload = payload;
+ }
+
+ public InetSocketAddress getSource() {
+ return source;
+ }
+
+ public Object getPayload() {
+ return payload;
+ }
+}
Modified: benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java
===================================================================
--- benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java 2008-07-18 15:52:27 UTC (rev 6337)
+++ benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java 2008-07-18 16:48:45 UTC (rev 6338)
@@ -29,7 +29,7 @@
int startPort = 7800;
ServerSocket srvSock = null;
InetAddress bindAddr = null;
- SocketAddress localAddr = null;
+ InetSocketAddress localAddr = null;
List receivers = new ArrayList();
private ServerSocket server;
private boolean isStoping;
@@ -162,10 +162,8 @@
return null;
}
- public void send(Object destination, Object payload) throws Exception
+ public void send(Object payload) throws Exception
{
- if (destination != null)
- throw new Exception("TcpTransport.send(): unicasts not supported");
connectionTable.writeMessage(payload);
}
@@ -208,19 +206,14 @@
void writeMessage(Object msg) throws Exception
{
int recieversCount = 0;
- for (int i = 0; i < connections.length; i++)
- {
- Connection c = connections[i];
- if (c != null)
- {
- try
- {
+ for (Connection c : connections) {
+ if (c != null) {
+ try {
c.writeMessage(msg);
- recieversCount ++;
+ recieversCount++;
}
- catch (Exception e)
- {
- // System.err.println("failed sending msg on " + c);
+ catch (Exception e) {
+ log.warn("failure sending message to " + c, e);
}
}
}
@@ -291,7 +284,8 @@
createSocket();
}
ObjectOutputStream oos = new ObjectOutputStream(out);
- oos.writeObject(msg);
+ Message message = new Message(localAddr, msg);
+ oos.writeObject(message);
}
out.flush();
}
@@ -306,6 +300,7 @@
}
catch (Exception ex)
{
+ log.warn("problems closing the connection", ex);
}
}
@@ -336,9 +331,9 @@
{
try
{
- Object message = new ObjectInputStream(in).readObject();
+ Message message = (Message) new ObjectInputStream(in).readObject();
if (receiver != null)
- receiver.receive(remote, message);
+ receiver.receive(message.getSource(), message.getPayload());
}
catch (Exception e) {
break;
Modified: benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/Transport.java
===================================================================
--- benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/Transport.java 2008-07-18 15:52:27 UTC (rev 6337)
+++ benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/Transport.java 2008-07-18 16:48:45 UTC (rev 6338)
@@ -46,9 +46,8 @@
/**
* Send a message
- * @param destination A destination. If null, send a message to all members
* @param payload A buffer to be sent
* @throws Exception
*/
- void send(Object destination, Object payload) throws Exception;
+ void send(Object payload) throws Exception;
}
Modified: benchmarks/benchmark-fwk/trunk/src/org/cachebench/tests/ReplicationOccursTest.java
===================================================================
--- benchmarks/benchmark-fwk/trunk/src/org/cachebench/tests/ReplicationOccursTest.java 2008-07-18 15:52:27 UTC (rev 6337)
+++ benchmarks/benchmark-fwk/trunk/src/org/cachebench/tests/ReplicationOccursTest.java 2008-07-18 16:48:45 UTC (rev 6338)
@@ -212,7 +212,7 @@
{
totalValue += Integer.valueOf(val.toString());
}
- log.info("********** Overall replication count is: " + totalValue);
+ log.info("Overall replication count is: " + totalValue);
//this means SOME replication occurred. This does not mean, though, that all nodes replicated successfuly.
//correct condition would be >= this.conf.getClusterConfig().getClusterSize()
//todo/FIXME - this seem not to work with all the products, so we will accept 'some replication'
@@ -221,7 +221,6 @@
log.warn("The replication was not total, but partial!!");
}
boolean isReplicationSuccess = totalValue > 0;
- log.info("Is replication passed? " + isReplicationSuccess);
return isReplicationSuccess;
}
}
More information about the jbosscache-commits
mailing list