Author: mircea.markus
Date: 2008-01-28 12:02:27 -0500 (Mon, 28 Jan 2008)
New Revision: 5247
Modified:
benchmarks/benchmark-fwk/trunk/TODO
benchmarks/benchmark-fwk/trunk/src/org/cachebench/CacheBenchmarkRunner.java
benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java
benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java
Log:
enhanced the barrier - if an node receives an barrier named differently it will exit. This
means that stale processes are running within the cluster
Modified: benchmarks/benchmark-fwk/trunk/TODO
===================================================================
--- benchmarks/benchmark-fwk/trunk/TODO 2008-01-28 15:00:27 UTC (rev 5246)
+++ benchmarks/benchmark-fwk/trunk/TODO 2008-01-28 17:02:27 UTC (rev 5247)
@@ -1,3 +1,4 @@
1. JBoss Cache 1.4.1 breaks during warmup
2. ReplicationOccursTest doesn't always work - reports false errors.
+ - it relies on cache.getReplicatedData(); most likely that is the cause of the problem,
refere to javadoc
3. Replace cluster.sh, runNode.sh, killNode.sh and cache-product/product-x.y.z/config.sh
with Ant scripts to make them cross-platform.
Modified: benchmarks/benchmark-fwk/trunk/src/org/cachebench/CacheBenchmarkRunner.java
===================================================================
--- benchmarks/benchmark-fwk/trunk/src/org/cachebench/CacheBenchmarkRunner.java 2008-01-28
15:00:27 UTC (rev 5246)
+++ benchmarks/benchmark-fwk/trunk/src/org/cachebench/CacheBenchmarkRunner.java 2008-01-28
17:02:27 UTC (rev 5247)
@@ -97,7 +97,7 @@
List<TestResult> results = new ArrayList<TestResult>();
for (TestCase test : conf.getTestCases())
{
- CacheWrapper cache;
+ CacheWrapper cache = null;
try
{
cache = getCacheWrapperInstance(test);
@@ -123,6 +123,13 @@
}
catch (Exception e)
{
+ try
+ {
+ shutdownCache(cache);
+ } catch (Exception e1)
+ {
+ //ignore
+ }
logger.warn("Unable to Initialize or Setup the Cache - Not performing
any tests", e);
errorLogger.error("Unable to Initialize or Setup the Cache: " +
test.getCacheWrapper(), e);
errorLogger.error("Skipping this test");
Modified: benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java
===================================================================
---
benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java 2008-01-28
15:00:27 UTC (rev 5246)
+++
benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/ClusterBarrier.java 2008-01-28
17:02:27 UTC (rev 5247)
@@ -19,11 +19,13 @@
private ClusterConfig config;
public final Map<SocketAddress, Object> receivedMessages = new
HashMap<SocketAddress, Object>();
- private Transport transport;
+ private TcpTransport transport;
private Object message;
private int numMembers;
private boolean acknowledge;
private static final String ACK = "_ACK";
+ private boolean failOnWrongMessaages;
+ private String errorMessage;
/**
@@ -40,6 +42,7 @@
public void setAcknowledge(boolean acknowledge)
{
this.acknowledge = acknowledge;
+ this.failOnWrongMessaages = acknowledge;
}
/**
@@ -76,6 +79,14 @@
receivedAllMessages = receivedMessages.size() >= numMembers;
if (!receivedAllMessages)
{
+ if (errorMessage != null)
+ {
+ //might be that I am the intruder, give other members a chance to fail
aswell
+ transport.send(null, message);
+ transport.stop();
+ Thread.sleep(2000);
+ throw new IllegalStateException(errorMessage);
+ }
receivedMessages.wait(2000);
}
}
@@ -91,19 +102,29 @@
public void receive(SocketAddress sender, Object payload) throws Exception
{
- log.trace("Received '" + payload + "' from " + sender +
" still expecting " + getMissingMembersCount() + " member(s)");
+ log.trace("Received '" + payload + "' from " +
formatName(sender) + " still expecting " + getMissingMembersCount() + "
member(s)");
if (payload == null)
{
log.warn("payload is incorrect (sender=" + sender + "): " +
payload);
return;
}
- if (acknowledge && !isAcknowledgeMessage(payload, message))
+ if (acknowledge && !isAcknowledgeMessage(payload))
{
log.trace("Sending ack, still expecting " + getMissingMembersCount() +
" members.");
transport.send(null, getAcknowledgeMessage(message));
return;
}
+ if (failOnWrongMessaages && !message.equals(payload) &&
!getAcknowledgeMessage(message).equals(payload))
+ {
+ errorMessage = "We recieved an message from a differenet barrier. This
normally means that there is an stale " +
+ "barrier running somewhere.The source of the message is '" +
sender + "', message is:'" + payload + "', " +
+ "and we were expecting '" + message + "'";
+ log.error(errorMessage);
+ this.receivedMessages.notifyAll();
+
+ }
+
//we are here if either no ack or ack the message is an ack message
synchronized (this.receivedMessages)
{
@@ -113,10 +134,18 @@
int expected = getMissingMembersCount();
log.trace("Sender " + sender + " registered, still waiting for
" + expected + " member(s)");
this.receivedMessages.notifyAll();
+ } else {
+ log.trace("Sender '" + formatName(sender) + "' is
already registered in the list of known senders!");
}
+ log.trace("Current list of senders is: " +
receivedMessages.keySet());
}
}
+ private String formatName(SocketAddress sender)
+ {
+ return transport.isLocal(sender) ? "<local(" + sender +
")>" : String.valueOf(sender);
+ }
+
private int getMissingMembersCount()
{
return numMembers - receivedMessages.size();
@@ -138,13 +167,9 @@
return message.toString() + ACK;
}
- private boolean isAcknowledgeMessage(Object payload, Object message)
+ private boolean isAcknowledgeMessage(Object payload)
{
- boolean result;
- String payloadStr = payload.toString();
- result = payloadStr.equals(getAcknowledgeMessage(message));
- log.trace("Is acknowledge? " + result);
- return result;
+ return payload == null ? false : (payload.toString().indexOf(ACK) >= 0);
}
public void setConfig(ClusterConfig config)
Modified: benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java
===================================================================
--- benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java 2008-01-28
15:00:27 UTC (rev 5246)
+++ benchmarks/benchmark-fwk/trunk/src/org/cachebench/cluster/TcpTransport.java 2008-01-28
17:02:27 UTC (rev 5247)
@@ -47,7 +47,6 @@
public void create(ClusterConfig clusterConfig) throws Exception
{
this.config = clusterConfig;
- String tmp;
startPort = config.getPortForThisNode();
String bindAddrStr = config.getBindAddress();
if (bindAddrStr != null)
@@ -121,7 +120,7 @@
}
catch (IOException ioEx)
{
- log.warn("An exception appeared whilst trying to create server socket on
port " + start_port1 + ", error:"
+ log.trace("An exception appeared whilst trying to create server socket
on port " + start_port1 + ", error:"
+ ioEx.getMessage());
}
break;
@@ -173,13 +172,10 @@
class ConnectionTable
{
- /**
- * List<InetSocketAddress>
- */
- List myNodes;
+ List<InetSocketAddress> myNodes;
final Connection[] connections;
- ConnectionTable(List nodes) throws Exception
+ ConnectionTable(List<InetSocketAddress> nodes) throws Exception
{
this.myNodes = nodes;
connections = new Connection[nodes.size()];
@@ -190,9 +186,8 @@
{
int i = 0;
log.trace("Nodes is " + myNodes);
- for (Iterator it = myNodes.iterator(); it.hasNext();)
+ for (InetSocketAddress addr : myNodes)
{
- InetSocketAddress addr = (InetSocketAddress) it.next();
if (connections[i] == null)
{
try
@@ -204,10 +199,6 @@
{
log.trace("-- failed to connect to " + addr);
}
- catch (Exception all_others)
- {
- throw all_others;
- }
}
i++;
}
@@ -255,6 +246,19 @@
}
return sb.toString();
}
+
+ public boolean isLocalConnection(SocketAddress socketAddress)
+ {
+ for (Connection conn : connections)
+ {
+ SocketAddress addr = conn.sock != null ? conn.sock.getLocalSocketAddress() :
null;
+ if (addr != null && addr.equals(socketAddress))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
}
class Connection
@@ -336,14 +340,9 @@
if (receiver != null)
receiver.receive(remote, message);
}
- catch (EOFException eof)
- {
+ catch (Exception e) {
break;
}
- catch (Exception ex)
- {
- break;
- }
}
log.trace("-- receiver thread for " + remote + "
terminated");
}
@@ -352,11 +351,15 @@
{
try
{
+ log.trace("Closing receiver thread for: " + sock);
sock.close();
+ in.close();
sock = null;
+ this.interrupt();
}
catch (Exception ex)
{
+ log.warn("Exception while closing the thread", ex);
}
}
}
@@ -386,4 +389,9 @@
return retval;
}
+ public boolean isLocal(SocketAddress sa)
+ {
+ return connectionTable.isLocalConnection(sa);
+ }
+
}