Author: manik.surtani(a)jboss.com
Date: 2008-07-29 18:55:16 -0400 (Tue, 29 Jul 2008)
New Revision: 6422
Modified:
core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java
core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServer.java
Log:
Added more debug, sync on loader and not on stream
Modified: core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java 2008-07-29
21:40:01 UTC (rev 6421)
+++
core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java 2008-07-29
22:55:16 UTC (rev 6422)
@@ -42,11 +42,12 @@
*/
public class TcpDelegatingCacheLoader extends AbstractCacheLoader
{
- private Socket sock;
+ volatile private Socket sock;
private TcpDelegatingCacheLoaderConfig config;
- ObjectInputStream in;
- ObjectOutputStream out;
+ volatile ObjectInputStream in;
+ volatile ObjectOutputStream out;
private static final Log log = LogFactory.getLog(TcpDelegatingCacheLoader.class);
+ private static final boolean trace = log.isTraceEnabled();
private static Method GET_CHILDREN_METHOD, GET_METHOD, PUT_KEY_METHOD,
PUT_DATA_METHOD, REMOVE_KEY_METHOD, REMOVE_METHOD, PUT_MODS_METHOD, EXISTS_METHOD,
REMOVE_DATA_METHOD;
static
@@ -106,7 +107,10 @@
{
try
{
- return m.invoke(this, params);
+ if (trace) log.trace("About to invoke operation " + m);
+ Object rv = m.invoke(this, params);
+ if (trace) log.trace("Completed invocation of " + m);
+ return rv;
}
catch (IllegalAccessException e)
{
@@ -137,7 +141,8 @@
throw new CacheException("Problems invoking method call!", e);
}
}
- } while (System.currentTimeMillis() < endTime);
+ }
+ while (System.currentTimeMillis() < endTime);
throw new CacheException("Unable to communicate with TCPCacheServer(" +
config.getHost() + ":" + config.getPort() + ") after " +
config.getTimeout() + " millis, with reconnects every " +
config.getReconnectWaitTime() + " millis.");
}
@@ -149,6 +154,7 @@
return (Set<?>) invokeWithRetries(GET_CHILDREN_METHOD, fqn);
}
+ @SuppressWarnings("unchecked")
public Map<Object, Object> get(Fqn name) throws Exception
{
return (Map<Object, Object>) invokeWithRetries(GET_METHOD, name);
@@ -195,7 +201,7 @@
protected Set<?> _getChildrenNames(Fqn fqn) throws Exception
{
Set cn;
- synchronized (out)
+ synchronized (this)
{
out.reset();
out.writeByte(TcpCacheOperations.GET_CHILDREN_NAMES);
@@ -214,9 +220,10 @@
return cn;
}
+ @SuppressWarnings("unchecked")
protected Map<Object, Object> _get(Fqn name) throws Exception
{
- synchronized (out)
+ synchronized (this)
{
out.reset();
@@ -234,7 +241,7 @@
protected boolean _exists(Fqn name) throws Exception
{
- synchronized (out)
+ synchronized (this)
{
out.reset();
@@ -252,7 +259,7 @@
protected Object _put(Fqn name, Object key, Object value) throws Exception
{
- synchronized (out)
+ synchronized (this)
{
out.reset();
@@ -272,7 +279,7 @@
protected void _put(Fqn name, Map<Object, Object> attributes) throws Exception
{
- synchronized (out)
+ synchronized (this)
{
out.reset();
@@ -290,7 +297,7 @@
protected void _put(List<Modification> modifications) throws Exception
{
- synchronized (out)
+ synchronized (this)
{
out.reset();
@@ -312,7 +319,7 @@
protected Object _remove(Fqn fqn, Object key) throws Exception
{
- synchronized (out)
+ synchronized (this)
{
out.reset();
@@ -331,7 +338,7 @@
protected void _remove(Fqn fqn) throws Exception
{
- synchronized (out)
+ synchronized (this)
{
out.reset();
@@ -348,7 +355,7 @@
protected void _removeData(Fqn fqn) throws Exception
{
- synchronized (out)
+ synchronized (this)
{
out.reset();
Modified: core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServer.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServer.java 2008-07-29
21:40:01 UTC (rev 6421)
+++ core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServer.java 2008-07-29
22:55:16 UTC (rev 6422)
@@ -51,7 +51,8 @@
* whether or not to start the server thread as a daemon. Should be false if started
from the command line, true if started as an MBean.
*/
boolean daemon = true;
- static final Log log = LogFactory.getLog(TcpCacheServer.class);
+ private static final Log log = LogFactory.getLog(TcpCacheServer.class);
+ private final static boolean trace = log.isTraceEnabled();
public String getBindAddress()
@@ -251,7 +252,7 @@
public void start()
{
- t = new Thread(this, "TcpCacheServer.Connection");
+ t = new Thread(this, "TcpCacheServer.Connection(" + sock.getPort() +
")");
t.setDaemon(true);
t.start();
}
@@ -296,10 +297,11 @@
NodeSPI n;
boolean flag;
- while (t != null && Thread.currentThread().equals(t))
+ while (t != null && Thread.currentThread().equals(t) &&
t.isAlive())
{
try
{
+ if (trace) log.trace("Reading next byte");
op = input.readByte();
}
catch (IOException e)
@@ -311,6 +313,7 @@
try
{
+ if (trace) log.trace("Resetting output");
output.reset();
switch (op)
{
@@ -328,7 +331,7 @@
break;
case TcpCacheOperations.GET:
fqn = (Fqn) input.readObject();
- n = (NodeSPI) c.getRoot().getChild(fqn);
+ n = c.getNode(fqn);
if (n == null)
{
// node doesn't exist - return null
@@ -428,6 +431,7 @@
log.error("Operation " + op + " unknown");
break;
}
+ if (trace) log.trace("Flushing stream");
output.flush();
}
catch (Exception e)