Author: manik.surtani(a)jboss.com
Date: 2008-04-01 06:29:30 -0400 (Tue, 01 Apr 2008)
New Revision: 5480
Added:
core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java
core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedReturnValue.java
Modified:
core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java
core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java
core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java
core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
core/trunk/src/test/java/org/jboss/cache/marshall/CacheMarshaller200Test.java
Log:
JBCACHE-1170: Leakage of thread locals in CacheMarshaller when using async replication
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2008-03-31 17:53:52 UTC
(rev 5479)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2008-04-01 10:29:30 UTC
(rev 5480)
@@ -274,8 +274,17 @@
// always use the InactiveRegionAwareRpcDispatcher - exceptions due to regions not
being active should not propagate to remote
// nodes as errors. - Manik
- disp = new InactiveRegionAwareRpcDispatcher(channel, messageListener, new
MembershipListenerAdaptor(), remoteDelegate);
+ // but only if we are using region based marshalling?!??
+ if (configuration.isUseRegionBasedMarshalling())
+ {
+ disp = new InactiveRegionAwareRpcDispatcher(channel, messageListener, new
MembershipListenerAdaptor(), remoteDelegate);
+ }
+ else
+ {
+ disp = new RpcDispatcher(channel, messageListener, new
MembershipListenerAdaptor(), remoteDelegate);
+ }
+
disp.setRequestMarshaller(marshaller);
disp.setResponseMarshaller(marshaller);
}
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2008-03-31
17:53:52 UTC (rev 5479)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2008-04-01
10:29:30 UTC (rev 5480)
@@ -19,6 +19,7 @@
import org.jboss.cache.transaction.GlobalTransaction;
import java.io.InputStream;
+import java.io.ObjectInputStream;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -86,6 +87,16 @@
throw new RuntimeException("Needs to be overridden!");
}
+ public RegionalizedMethodCall regionalizedMethodCallFromByteBuffer(byte[] buffer)
throws Exception
+ {
+ throw new RuntimeException("Needs to be overridden!");
+ }
+
+ public RegionalizedMethodCall regionalizedMethodCallFromObjectStream(ObjectInputStream
in) throws Exception
+ {
+ throw new RuntimeException("Needs to be overridden!");
+ }
+
/**
* This is "replicate" call with a single MethodCall argument.
*
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java 2008-03-31
17:53:52 UTC (rev 5479)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java 2008-04-01
10:29:30 UTC (rev 5480)
@@ -6,7 +6,6 @@
*/
package org.jboss.cache.marshall;
-import org.jboss.cache.CacheException;
import org.jboss.cache.Fqn;
import org.jboss.cache.Region;
import static org.jboss.cache.Region.Status;
@@ -60,9 +59,6 @@
protected static final InactiveRegionException IRE = new
InactiveRegionException("Cannot unmarshall to an inactive region");
- // this is pretty nasty, and may need more thought.
- protected final ThreadLocal<Fqn> regionForCall = new ThreadLocal<Fqn>();
-
public CacheMarshaller200()
{
initLogger();
@@ -70,50 +66,26 @@
useRefs = true;
}
- /**
- * Tests if the type of object being marshalled is a method call or a return value
- *
- * @param o object to marshall
- * @return true if the object is a return value to a method call; false otherwise
- */
- protected boolean isReturnValue(Object o)
- {
- return !(o instanceof MethodCall);
- }
-
// -------- AbstractMarshaller interface
public void objectToObjectStream(Object o, ObjectOutputStream out) throws Exception
{
if (useRegionBasedMarshalling)
{
- Fqn region;
- if (o == null)
+ Fqn region = null;
+
+ if (o instanceof RegionalizedReturnValue)
{
- // if the return value we're trying to marshall is null we're easy
...
- region = null;
- // we still need to clear the thread local though.
- regionForCall.remove();
+ RegionalizedReturnValue rrv = (RegionalizedReturnValue) o;
+ region = rrv.region;
+ o = rrv.returnValue;
}
- else if (isReturnValue(o))
+ else if (o instanceof MethodCall)
{
- // we are marshalling a return value from a method call.
- // let's see if an incoming unmarshalling call for this exists, in the
same thread stack and had registered
- // a Fqn region.
- region = regionForCall.get();
- regionForCall.remove();
- if (trace)
- log.trace("Suspect this is a return value. Extract region from
ThreadLocal as " + region);
-
- // otherwise, we need to marshall the retval.
- }
- else
- {
- // this is an outgoing method call.
- // we first marshall the Fqn as a String
MethodCall call = (MethodCall) o;
region = extractFqnRegion(call);
}
+
if (trace) log.trace("Region based call. Using region " + region);
objectToObjectStream(o, out, region);
}
@@ -125,11 +97,22 @@
}
}
+ @Override
+ public RegionalizedMethodCall regionalizedMethodCallFromObjectStream(ObjectInputStream
in) throws Exception
+ {
+ // parse the stream as per normal.
+ Object[] retVal = objectFromObjectStreamRegionBased(in);
+ RegionalizedMethodCall rmc = new RegionalizedMethodCall();
+ rmc.call = (MethodCall) retVal[0];
+ rmc.region = (Fqn) retVal[1];
+ return rmc;
+ }
+
public Object objectFromObjectStream(ObjectInputStream in) throws Exception
{
if (useRegionBasedMarshalling)
{
- return objectFromObjectStreamRegionBased(in);
+ return objectFromObjectStreamRegionBased(in)[0];
}
else
{
@@ -172,7 +155,12 @@
}
}
- protected Object objectFromObjectStreamRegionBased(ObjectInputStream in) throws
Exception
+ /**
+ * @param in
+ * @return a 2-object array. The first one is the unmarshalled object and the 2nd is
an Fqn that relates to the region used. If region-based marshalling is not used, the 2nd
value is null.
+ * @throws Exception
+ */
+ protected Object[] objectFromObjectStreamRegionBased(ObjectInputStream in) throws
Exception
{
UnmarshalledReferences refMap = useRefs ? new UnmarshalledReferences() : null;
Object o = unmarshallObject(in, refMap);
@@ -190,7 +178,7 @@
if (trace) log.trace("Unmarshalled regionFqn " + regionFqn + " from
stream");
Region region = null;
- Object retValue;
+ Object[] retValue = {null, null};
if (regionFqn != null)
{
@@ -200,16 +188,14 @@
{
if (log.isDebugEnabled())
log.debug("Region does not exist for Fqn " + regionFqn + " -
not using a context classloader.");
- retValue = unmarshallObject(in, defaultClassLoader, refMap, false);
+ retValue[0] = unmarshallObject(in, defaultClassLoader, refMap, false);
}
else
{
- retValue = unmarshallObject(in, region.getClassLoader(), refMap, true);
-
- // only set this if this is an incoming method call and not a return value.
- if (!isReturnValue(retValue)) regionForCall.set(regionFqn);
+ retValue[0] = unmarshallObject(in, region.getClassLoader(), refMap, true);
+ retValue[1] = regionFqn;
}
- if (trace) log.trace("Unmarshalled object " + retValue);
+ if (trace) log.trace("Unmarshalled object " + retValue[0] + " with
region " + retValue[1]);
return retValue;
}
Modified:
core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java 2008-03-31
17:53:52 UTC (rev 5479)
+++
core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java 2008-04-01
10:29:30 UTC (rev 5480)
@@ -4,7 +4,6 @@
import org.jgroups.MembershipListener;
import org.jgroups.Message;
import org.jgroups.MessageListener;
-import org.jgroups.blocks.MethodCall;
import org.jgroups.blocks.RpcDispatcher;
/**
@@ -15,6 +14,8 @@
*/
public class InactiveRegionAwareRpcDispatcher extends RpcDispatcher
{
+ org.jboss.cache.marshall.Marshaller requestMarshaller;
+
/**
* Only provide the flavour of the {@link RpcDispatcher} constructor that we care
about.
*/
@@ -23,6 +24,14 @@
super(channel, l, l2, server_obj);
}
+ @Override
+ public void setRequestMarshaller(Marshaller m)
+ {
+ super.setRequestMarshaller(m);
+ requestMarshaller = (org.jboss.cache.marshall.Marshaller) m;
+ }
+
+
/**
* Message contains MethodCall. Execute it against *this* object and return result.
* Use MethodCall.invoke() to do this. Return result.
@@ -30,7 +39,6 @@
@Override
public Object handle(Message req)
{
- Object body = null;
org.jgroups.blocks.MethodCall method_call;
if (server_obj == null)
@@ -45,9 +53,13 @@
return null;
}
+ RegionalizedMethodCall rmc;
+
try
{
- body = req_marshaller != null ?
req_marshaller.objectFromByteBuffer(req.getBuffer()) : req.getObject();
+ // we will ALWAYS be using the marshaller to unmarshall requests.
+ rmc = requestMarshaller.regionalizedMethodCallFromByteBuffer(req.getBuffer());
+ method_call = rmc.call;
}
catch (Throwable e)
{
@@ -61,20 +73,13 @@
return e;
}
- if (body == null || !(body instanceof MethodCall))
- {
- if (log.isErrorEnabled()) log.error("message does not contain a MethodCall
object");
- return null;
- }
-
- method_call = (MethodCall) body;
-
try
{
if (log.isTraceEnabled())
log.trace("[sender=" + req.getSrc() + "], method_call: "
+ method_call);
- return method_call.invoke(server_obj);
+ Object retVal = method_call.invoke(server_obj);
+ return new RegionalizedReturnValue(retVal, rmc);
}
catch (Throwable x)
{
@@ -82,6 +87,7 @@
}
}
+ @Override
public String toString()
{
return getClass().getSimpleName() + "[Outgoing marshaller: " +
req_marshaller + "; incoming marshaller: " + rsp_marshaller + "]";
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java 2008-03-31 17:53:52
UTC (rev 5479)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java 2008-04-01 10:29:30
UTC (rev 5480)
@@ -78,4 +78,26 @@
* @return
*/
byte[] objectToByteBuffer(Object o, boolean writeStreamHeaders) throws Exception;
+
+ /**
+ * Returns a RegionalizedMethodCall from a byte buffer. Only use if you
<i>know</i> that the byte buffer contains a
+ * MethodCall and that you are using region-based marshalling, otherwise use {@link
#objectFromByteBuffer(byte[])}
+ *
+ * @param buffer byte buffer
+ * @return a RegionalizedMethodCall
+ * @throws Exception if there are issues
+ * @since 2.1.1
+ */
+ RegionalizedMethodCall regionalizedMethodCallFromByteBuffer(byte[] buffer) throws
Exception;
+
+ /**
+ * Returns a RegionalizedMethodCall from an object input stream. Only use if you
<i>know</i> that the byte buffer contains a
+ * MethodCall and that you are using region-based marshalling, otherwise use {@link
#objectFromObjectStream(java.io.ObjectInputStream)}
+ *
+ * @param in object inout stream
+ * @return a RegionalizedMethodCall
+ * @throws Exception if there are issues
+ * @since 2.1.1
+ */
+ RegionalizedMethodCall regionalizedMethodCallFromObjectStream(ObjectInputStream in)
throws Exception;
}
Added: core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java
(rev 0)
+++
core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java 2008-04-01
10:29:30 UTC (rev 5480)
@@ -0,0 +1,20 @@
+package org.jboss.cache.marshall;
+
+import org.jboss.cache.Fqn;
+
+/**
+ * A regionalized MethodCall object, created when {@link
Marshaller#regionalizedMethodCallFromByteBuffer(byte[])} or
+ * {@link
org.jboss.cache.marshall.Marshaller#regionalizedMethodCallFromObjectStream(java.io.ObjectInputStream)}
is called.
+ * <p/>
+ * Specifically used by the {@link
org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher} so that the region used to
unmarshall
+ * the method call is known, and can be used to marshall a result to return to the remote
caller.
+ * <p/>
+ *
+ * @author Manik Surtani (<a
href="mailto:manik@jboss.org">manik@jboss.org</a>)
+ * @since 2.1.1
+ */
+class RegionalizedMethodCall
+{
+ MethodCall call;
+ Fqn region;
+}
Added: core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedReturnValue.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedReturnValue.java
(rev 0)
+++
core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedReturnValue.java 2008-04-01
10:29:30 UTC (rev 5480)
@@ -0,0 +1,28 @@
+package org.jboss.cache.marshall;
+
+import org.jboss.cache.Fqn;
+
+/**
+ * A return value that holds region information, so that the marshaller knows which
region to use (and hence which
+ * class loader to use) when marshalling this return value.
+ *
+ * @author Manik Surtani (<a
href="mailto:manik@jboss.org">manik@jboss.org</a>)
+ * @since 2.1.1
+ */
+class RegionalizedReturnValue
+{
+ Object returnValue;
+ Fqn region;
+
+ /**
+ * Creates this value object.
+ *
+ * @param returnValue return value to marshall
+ * @param regionalizedMethodCall method call that requested this return value.
+ */
+ RegionalizedReturnValue(Object returnValue, RegionalizedMethodCall
regionalizedMethodCall)
+ {
+ this.returnValue = returnValue;
+ this.region = regionalizedMethodCall.region;
+ }
+}
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java 2008-03-31
17:53:52 UTC (rev 5479)
+++
core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java 2008-04-01
10:29:30 UTC (rev 5480)
@@ -222,6 +222,37 @@
}
}
+ @Override
+ public RegionalizedMethodCall regionalizedMethodCallFromByteBuffer(byte[] buf) throws
Exception
+ {
+ Marshaller marshaller;
+ int versionId;
+ ObjectInputStream in = pool.getInputStream(buf);
+
+ try
+ {
+ try
+ {
+ versionId = in.readShort();
+ if (trace) log.trace("Read version " + versionId);
+ }
+ catch (Exception e)
+ {
+ log.error("Unable to read version id from first two bytes of stream,
barfing.");
+ throw e;
+ }
+
+ marshaller = getMarshaller(versionId);
+
+ return marshaller.regionalizedMethodCallFromObjectStream(in);
+ }
+ finally
+ {
+ pool.returnStreamToPool(in);
+ }
+ }
+
+ @Override
public Object objectFromStream(InputStream is) throws Exception
{
if (is instanceof ByteArrayInputStream)
@@ -238,15 +269,6 @@
short versionId;
Marshaller marshaller;
-// PushbackInputStream pis = new PushbackInputStream(is);
-// byte[] first4bytes = new byte[4];
-// pis.read(first4bytes);
-// boolean needToWriteHeader = !Arrays.equals(first4bytes,
ReusableObjectInputStream.INIT_BYTES);
-// // first push back the bytes read
-// pis.unread(first4bytes);
-// if (needToWriteHeader) pis.unread(ReusableObjectInputStream.INIT_BYTES);
-//
-// ObjectInputStream in = new MarshalledValueInputStream(pis);
ObjectInputStream in = new MarshalledValueInputStream(is);
try
Modified: core/trunk/src/test/java/org/jboss/cache/marshall/CacheMarshaller200Test.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/marshall/CacheMarshaller200Test.java 2008-03-31
17:53:52 UTC (rev 5479)
+++
core/trunk/src/test/java/org/jboss/cache/marshall/CacheMarshaller200Test.java 2008-04-01
10:29:30 UTC (rev 5480)
@@ -114,10 +114,11 @@
{
try
{
- cm200.objectFromObjectStream(new ObjectInputStream(new
ByteArrayInputStream(stream)));
+ RegionalizedMethodCall rmc =
cm200.regionalizedMethodCallFromObjectStream(new ObjectInputStream(new
ByteArrayInputStream(stream)));
ByteArrayOutputStream out = new ByteArrayOutputStream();
ObjectOutputStream outStream = new ObjectOutputStream(out);
- cm200.objectToObjectStream("A result", outStream);
+ RegionalizedReturnValue rrv = new RegionalizedReturnValue("A
result", rmc);
+ cm200.objectToObjectStream(rrv, outStream);
outStream.close();
out.close();
// test that the output stream has got "/hello" as
it's region Fqn.