[jbosscache-commits] JBoss Cache SVN: r5480 - in core/trunk/src: main/java/org/jboss/cache/marshall and 1 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Tue Apr 1 06:29:31 EDT 2008


Author: manik.surtani at jboss.com
Date: 2008-04-01 06:29:30 -0400 (Tue, 01 Apr 2008)
New Revision: 5480

Added:
   core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java
   core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedReturnValue.java
Modified:
   core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
   core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
   core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java
   core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java
   core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java
   core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
   core/trunk/src/test/java/org/jboss/cache/marshall/CacheMarshaller200Test.java
Log:
JBCACHE-1170: Leakage of thread locals in CacheMarshaller when using async replication

Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2008-03-31 17:53:52 UTC (rev 5479)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2008-04-01 10:29:30 UTC (rev 5480)
@@ -274,8 +274,17 @@
 
       // always use the InactiveRegionAwareRpcDispatcher - exceptions due to regions not being active should not propagate to remote
       // nodes as errors. - Manik
-      disp = new InactiveRegionAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(), remoteDelegate);
 
+      // but only if we are using region based marshalling?!??
+      if (configuration.isUseRegionBasedMarshalling())
+      {
+         disp = new InactiveRegionAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(), remoteDelegate);
+      }
+      else
+      {
+         disp = new RpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(), remoteDelegate);
+      }
+
       disp.setRequestMarshaller(marshaller);
       disp.setResponseMarshaller(marshaller);
    }

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java	2008-03-31 17:53:52 UTC (rev 5479)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java	2008-04-01 10:29:30 UTC (rev 5480)
@@ -19,6 +19,7 @@
 import org.jboss.cache.transaction.GlobalTransaction;
 
 import java.io.InputStream;
+import java.io.ObjectInputStream;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -86,6 +87,16 @@
       throw new RuntimeException("Needs to be overridden!");
    }
 
+   public RegionalizedMethodCall regionalizedMethodCallFromByteBuffer(byte[] buffer) throws Exception
+   {
+      throw new RuntimeException("Needs to be overridden!");
+   }
+
+   public RegionalizedMethodCall regionalizedMethodCallFromObjectStream(ObjectInputStream in) throws Exception
+   {
+      throw new RuntimeException("Needs to be overridden!");
+   }
+
    /**
     * This is "replicate" call with a single MethodCall argument.
     *

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java	2008-03-31 17:53:52 UTC (rev 5479)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java	2008-04-01 10:29:30 UTC (rev 5480)
@@ -6,7 +6,6 @@
  */
 package org.jboss.cache.marshall;
 
-import org.jboss.cache.CacheException;
 import org.jboss.cache.Fqn;
 import org.jboss.cache.Region;
 import static org.jboss.cache.Region.Status;
@@ -60,9 +59,6 @@
 
    protected static final InactiveRegionException IRE = new InactiveRegionException("Cannot unmarshall to an inactive region");
 
-   // this is pretty nasty, and may need more thought.
-   protected final ThreadLocal<Fqn> regionForCall = new ThreadLocal<Fqn>();
-
    public CacheMarshaller200()
    {
       initLogger();
@@ -70,50 +66,26 @@
       useRefs = true;
    }
 
-   /**
-    * Tests if the type of object being marshalled is a method call or a return value
-    *
-    * @param o object to marshall
-    * @return true if the object is a return value to a method call; false otherwise
-    */
-   protected boolean isReturnValue(Object o)
-   {
-      return !(o instanceof MethodCall);
-   }
-
    // -------- AbstractMarshaller interface
 
    public void objectToObjectStream(Object o, ObjectOutputStream out) throws Exception
    {
       if (useRegionBasedMarshalling)
       {
-         Fqn region;
-         if (o == null)
+         Fqn region = null;
+
+         if (o instanceof RegionalizedReturnValue)
          {
-            // if the return value we're trying to marshall is null we're easy ...
-            region = null;
-            // we still need to clear the thread local though.
-            regionForCall.remove();
+            RegionalizedReturnValue rrv = (RegionalizedReturnValue) o;
+            region = rrv.region;
+            o = rrv.returnValue;
          }
-         else if (isReturnValue(o))
+         else if (o instanceof MethodCall)
          {
-            // we are marshalling a return value from a method call.
-            // let's see if an incoming unmarshalling call for this exists, in the same thread stack and had registered
-            // a Fqn region.
-            region = regionForCall.get();
-            regionForCall.remove();
-            if (trace)
-               log.trace("Suspect this is a return value.  Extract region from ThreadLocal as " + region);
-
-            // otherwise, we need to marshall the retval.
-         }
-         else
-         {
-            // this is an outgoing method call.
-            // we first marshall the Fqn as a String
             MethodCall call = (MethodCall) o;
             region = extractFqnRegion(call);
          }
+
          if (trace) log.trace("Region based call.  Using region " + region);
          objectToObjectStream(o, out, region);
       }
@@ -125,11 +97,22 @@
       }
    }
 
+   @Override
+   public RegionalizedMethodCall regionalizedMethodCallFromObjectStream(ObjectInputStream in) throws Exception
+   {
+      // parse the stream as per normal.
+      Object[] retVal = objectFromObjectStreamRegionBased(in);
+      RegionalizedMethodCall rmc = new RegionalizedMethodCall();
+      rmc.call = (MethodCall) retVal[0];
+      rmc.region = (Fqn) retVal[1];
+      return rmc;
+   }
+
    public Object objectFromObjectStream(ObjectInputStream in) throws Exception
    {
       if (useRegionBasedMarshalling)
       {
-         return objectFromObjectStreamRegionBased(in);
+         return objectFromObjectStreamRegionBased(in)[0];
       }
       else
       {
@@ -172,7 +155,12 @@
       }
    }
 
-   protected Object objectFromObjectStreamRegionBased(ObjectInputStream in) throws Exception
+   /**
+    * @param in
+    * @return a 2-object array.  The first one is the unmarshalled object and the 2nd is an Fqn that relates to the region used.  If region-based marshalling is not used, the 2nd value is null.
+    * @throws Exception
+    */
+   protected Object[] objectFromObjectStreamRegionBased(ObjectInputStream in) throws Exception
    {
       UnmarshalledReferences refMap = useRefs ? new UnmarshalledReferences() : null;
       Object o = unmarshallObject(in, refMap);
@@ -190,7 +178,7 @@
       if (trace) log.trace("Unmarshalled regionFqn " + regionFqn + " from stream");
 
       Region region = null;
-      Object retValue;
+      Object[] retValue = {null, null};
 
       if (regionFqn != null)
       {
@@ -200,16 +188,14 @@
       {
          if (log.isDebugEnabled())
             log.debug("Region does not exist for Fqn " + regionFqn + " - not using a context classloader.");
-         retValue = unmarshallObject(in, defaultClassLoader, refMap, false);
+         retValue[0] = unmarshallObject(in, defaultClassLoader, refMap, false);
       }
       else
       {
-         retValue = unmarshallObject(in, region.getClassLoader(), refMap, true);
-
-         // only set this if this is an incoming method call and not a return value.
-         if (!isReturnValue(retValue)) regionForCall.set(regionFqn);
+         retValue[0] = unmarshallObject(in, region.getClassLoader(), refMap, true);
+         retValue[1] = regionFqn;
       }
-      if (trace) log.trace("Unmarshalled object " + retValue);
+      if (trace) log.trace("Unmarshalled object " + retValue[0] + " with region " + retValue[1]);
       return retValue;
    }
 

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java	2008-03-31 17:53:52 UTC (rev 5479)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java	2008-04-01 10:29:30 UTC (rev 5480)
@@ -4,7 +4,6 @@
 import org.jgroups.MembershipListener;
 import org.jgroups.Message;
 import org.jgroups.MessageListener;
-import org.jgroups.blocks.MethodCall;
 import org.jgroups.blocks.RpcDispatcher;
 
 /**
@@ -15,6 +14,8 @@
  */
 public class InactiveRegionAwareRpcDispatcher extends RpcDispatcher
 {
+   org.jboss.cache.marshall.Marshaller requestMarshaller;
+
    /**
     * Only provide the flavour of the {@link RpcDispatcher} constructor that we care about.
     */
@@ -23,6 +24,14 @@
       super(channel, l, l2, server_obj);
    }
 
+   @Override
+   public void setRequestMarshaller(Marshaller m)
+   {
+      super.setRequestMarshaller(m);
+      requestMarshaller = (org.jboss.cache.marshall.Marshaller) m;
+   }
+
+
    /**
     * Message contains MethodCall. Execute it against *this* object and return result.
     * Use MethodCall.invoke() to do this. Return result.
@@ -30,7 +39,6 @@
    @Override
    public Object handle(Message req)
    {
-      Object body = null;
       org.jgroups.blocks.MethodCall method_call;
 
       if (server_obj == null)
@@ -45,9 +53,13 @@
          return null;
       }
 
+      RegionalizedMethodCall rmc;
+
       try
       {
-         body = req_marshaller != null ? req_marshaller.objectFromByteBuffer(req.getBuffer()) : req.getObject();
+         // we will ALWAYS be using the marshaller to unmarshall requests.
+         rmc = requestMarshaller.regionalizedMethodCallFromByteBuffer(req.getBuffer());
+         method_call = rmc.call;
       }
       catch (Throwable e)
       {
@@ -61,20 +73,13 @@
          return e;
       }
 
-      if (body == null || !(body instanceof MethodCall))
-      {
-         if (log.isErrorEnabled()) log.error("message does not contain a MethodCall object");
-         return null;
-      }
-
-      method_call = (MethodCall) body;
-
       try
       {
          if (log.isTraceEnabled())
             log.trace("[sender=" + req.getSrc() + "], method_call: " + method_call);
 
-         return method_call.invoke(server_obj);
+         Object retVal = method_call.invoke(server_obj);
+         return new RegionalizedReturnValue(retVal, rmc);
       }
       catch (Throwable x)
       {
@@ -82,6 +87,7 @@
       }
    }
 
+   @Override
    public String toString()
    {
       return getClass().getSimpleName() + "[Outgoing marshaller: " + req_marshaller + "; incoming marshaller: " + rsp_marshaller + "]";

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java	2008-03-31 17:53:52 UTC (rev 5479)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java	2008-04-01 10:29:30 UTC (rev 5480)
@@ -78,4 +78,26 @@
     * @return
     */
    byte[] objectToByteBuffer(Object o, boolean writeStreamHeaders) throws Exception;
+
+   /**
+    * Returns a RegionalizedMethodCall from a byte buffer.  Only use if you <i>know</i> that the byte buffer contains a
+    * MethodCall and that you are using region-based marshalling, otherwise use {@link #objectFromByteBuffer(byte[])}
+    *
+    * @param buffer byte buffer
+    * @return a RegionalizedMethodCall
+    * @throws Exception if there are issues
+    * @since 2.1.1
+    */
+   RegionalizedMethodCall regionalizedMethodCallFromByteBuffer(byte[] buffer) throws Exception;
+
+   /**
+    * Returns a RegionalizedMethodCall from an object input stream.  Only use if you <i>know</i> that the byte buffer contains a
+    * MethodCall and that you are using region-based marshalling, otherwise use {@link #objectFromObjectStream(java.io.ObjectInputStream)}
+    *
+    * @param in object inout stream
+    * @return a RegionalizedMethodCall
+    * @throws Exception if there are issues
+    * @since 2.1.1
+    */
+   RegionalizedMethodCall regionalizedMethodCallFromObjectStream(ObjectInputStream in) throws Exception;
 }

Added: core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java	2008-04-01 10:29:30 UTC (rev 5480)
@@ -0,0 +1,20 @@
+package org.jboss.cache.marshall;
+
+import org.jboss.cache.Fqn;
+
+/**
+ * A regionalized MethodCall object, created when {@link Marshaller#regionalizedMethodCallFromByteBuffer(byte[])} or
+ * {@link org.jboss.cache.marshall.Marshaller#regionalizedMethodCallFromObjectStream(java.io.ObjectInputStream)} is called.
+ * <p/>
+ * Specifically used by the {@link org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher} so that the region used to unmarshall
+ * the method call is known, and can be used to marshall a result to return to the remote caller.
+ * <p/>
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 2.1.1
+ */
+class RegionalizedMethodCall
+{
+   MethodCall call;
+   Fqn region;
+}

Added: core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedReturnValue.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedReturnValue.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedReturnValue.java	2008-04-01 10:29:30 UTC (rev 5480)
@@ -0,0 +1,28 @@
+package org.jboss.cache.marshall;
+
+import org.jboss.cache.Fqn;
+
+/**
+ * A return value that holds region information, so that the marshaller knows which region to use (and hence which
+ * class loader to use) when marshalling this return value.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 2.1.1
+ */
+class RegionalizedReturnValue
+{
+   Object returnValue;
+   Fqn region;
+
+   /**
+    * Creates this value object.
+    *
+    * @param returnValue            return value to marshall
+    * @param regionalizedMethodCall method call that requested this return value.
+    */
+   RegionalizedReturnValue(Object returnValue, RegionalizedMethodCall regionalizedMethodCall)
+   {
+      this.returnValue = returnValue;
+      this.region = regionalizedMethodCall.region;
+   }
+}

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java	2008-03-31 17:53:52 UTC (rev 5479)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java	2008-04-01 10:29:30 UTC (rev 5480)
@@ -222,6 +222,37 @@
       }
    }
 
+   @Override
+   public RegionalizedMethodCall regionalizedMethodCallFromByteBuffer(byte[] buf) throws Exception
+   {
+      Marshaller marshaller;
+      int versionId;
+      ObjectInputStream in = pool.getInputStream(buf);
+
+      try
+      {
+         try
+         {
+            versionId = in.readShort();
+            if (trace) log.trace("Read version " + versionId);
+         }
+         catch (Exception e)
+         {
+            log.error("Unable to read version id from first two bytes of stream, barfing.");
+            throw e;
+         }
+
+         marshaller = getMarshaller(versionId);
+
+         return marshaller.regionalizedMethodCallFromObjectStream(in);
+      }
+      finally
+      {
+         pool.returnStreamToPool(in);
+      }
+   }
+
+   @Override
    public Object objectFromStream(InputStream is) throws Exception
    {
       if (is instanceof ByteArrayInputStream)
@@ -238,15 +269,6 @@
          short versionId;
          Marshaller marshaller;
 
-//         PushbackInputStream pis = new PushbackInputStream(is);
-//         byte[] first4bytes = new byte[4];
-//         pis.read(first4bytes);
-//         boolean needToWriteHeader = !Arrays.equals(first4bytes, ReusableObjectInputStream.INIT_BYTES);
-//         // first push back the bytes read
-//         pis.unread(first4bytes);
-//         if (needToWriteHeader) pis.unread(ReusableObjectInputStream.INIT_BYTES);
-//
-//         ObjectInputStream in = new MarshalledValueInputStream(pis);
          ObjectInputStream in = new MarshalledValueInputStream(is);
 
          try

Modified: core/trunk/src/test/java/org/jboss/cache/marshall/CacheMarshaller200Test.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/CacheMarshaller200Test.java	2008-03-31 17:53:52 UTC (rev 5479)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/CacheMarshaller200Test.java	2008-04-01 10:29:30 UTC (rev 5480)
@@ -114,10 +114,11 @@
                {
                   try
                   {
-                     cm200.objectFromObjectStream(new ObjectInputStream(new ByteArrayInputStream(stream)));
+                     RegionalizedMethodCall rmc = cm200.regionalizedMethodCallFromObjectStream(new ObjectInputStream(new ByteArrayInputStream(stream)));
                      ByteArrayOutputStream out = new ByteArrayOutputStream();
                      ObjectOutputStream outStream = new ObjectOutputStream(out);
-                     cm200.objectToObjectStream("A result", outStream);
+                     RegionalizedReturnValue rrv = new RegionalizedReturnValue("A result", rmc);
+                     cm200.objectToObjectStream(rrv, outStream);
                      outStream.close();
                      out.close();
                      // test that the output stream has got "/hello" as it's region Fqn.




More information about the jbosscache-commits mailing list