[jboss-remoting-commits] JBoss Remoting SVN: r3606 - remoting2/branches/2.x/src/main/org/jboss/remoting/transport/rmi.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Wed Mar 12 23:27:14 EDT 2008


Author: ron.sigal at jboss.com
Date: 2008-03-12 23:27:14 -0400 (Wed, 12 Mar 2008)
New Revision: 3606

Modified:
   remoting2/branches/2.x/src/main/org/jboss/remoting/transport/rmi/RMIClientInvoker.java
Log:
JBREM-167: Added ability to marshal/unmarshal in both directions.

Modified: remoting2/branches/2.x/src/main/org/jboss/remoting/transport/rmi/RMIClientInvoker.java
===================================================================
--- remoting2/branches/2.x/src/main/org/jboss/remoting/transport/rmi/RMIClientInvoker.java	2008-03-12 07:54:08 UTC (rev 3605)
+++ remoting2/branches/2.x/src/main/org/jboss/remoting/transport/rmi/RMIClientInvoker.java	2008-03-13 03:27:14 UTC (rev 3606)
@@ -32,18 +32,23 @@
 import org.jboss.remoting.RemoteClientInvoker;
 import org.jboss.remoting.Remoting;
 import org.jboss.remoting.ServerInvoker;
-import org.jboss.remoting.Version;
 import org.jboss.remoting.marshal.Marshaller;
 import org.jboss.remoting.marshal.MarshallerDecorator;
 import org.jboss.remoting.marshal.UnMarshaller;
+import org.jboss.remoting.marshal.UnMarshallerDecorator;
 import org.jboss.remoting.marshal.VersionedMarshaller;
+import org.jboss.remoting.marshal.VersionedUnMarshaller;
 import org.jboss.remoting.marshal.rmi.RMIMarshaller;
+import org.jboss.remoting.marshal.rmi.RMIUnMarshaller;
+import org.jboss.remoting.serialization.SerializationManager;
+import org.jboss.remoting.serialization.SerializationStreamFactory;
 import org.jboss.util.threadpool.BasicThreadPool;
 import org.jboss.util.threadpool.BlockingMode;
 import org.jboss.util.threadpool.RunnableTaskWrapper;
 import org.jboss.util.threadpool.Task;
 import org.jboss.util.threadpool.ThreadPool;
 
+import java.io.BufferedInputStream;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -92,6 +97,8 @@
    
    private Object timeoutThreadPoolLock = new Object();
    private ThreadPool timeoutThreadPool;
+   
+   protected boolean rmiOnewayMarshalling;
 
    /**
     * Need flag to indicate if have been able to lookup registry and set stub.
@@ -105,14 +112,47 @@
    public RMIClientInvoker(InvokerLocator locator)
    {
       super(locator);
+      configureParameters();
    }
 
    public RMIClientInvoker(InvokerLocator locator, Map configuration)
    {
       super(locator, configuration);
+      configureParameters();
    }
+   
+   protected void configureParameters()
+   {
+      Map params = configuration;
 
+      if (params == null)
+      {
+         return;
+      }
 
+      // look for enableTcpNoDelay param
+      Object val = params.get(RMIServerInvoker.RMI_ONEWAY_MARSHALLING);
+      if (val instanceof String)
+      {
+         try
+         {
+            rmiOnewayMarshalling = Boolean.valueOf((String)val).booleanValue();
+            log.debug(this + " setting rmiOnewayMarshalling to " + rmiOnewayMarshalling);
+         }
+         catch (Exception e)
+         {
+            log.warn(this + " could not convert " + RMIServerInvoker.RMI_ONEWAY_MARSHALLING +
+                     " value of " + val + " to a boolean value.  Defaulting to false");
+         }
+      }
+      else if (val != null)
+      {
+         log.warn(this + " value of " + RMIServerInvoker.RMI_ONEWAY_MARSHALLING +
+                  " (" + val + ") must be a String.  Defaulting to false");
+      }
+   }
+
+
    private int getRegistryPort(InvokerLocator locator)
    {
       int port = RMIServerInvoker.DEFAULT_REGISTRY_PORT;
@@ -303,19 +343,31 @@
                   ((VersionedMarshaller) marshaller).write(payload, byteOut, getVersion());
                else
                   marshaller.write(payload, byteOut);
-               ByteArrayInputStream byteIn = new ByteArrayInputStream(byteOut.toByteArray());
-               ObjectInputStream ois = new ObjectInputStream(byteIn);
+               
+               byteOut.close();
+               
+               if (rmiOnewayMarshalling)
+               {
+                  // Legacy treatment, pre 2.4.0.
+                  ByteArrayInputStream bais = new ByteArrayInputStream(byteOut.toByteArray());
+                  SerializationManager manager = SerializationStreamFactory.getManagerInstance(getSerializationType());
+                  ObjectInputStream ois = manager.createInput(bais, getClassLoader());
 
-               try
-               {
-                  byteOut.close();
-                  payload = ois.readObject();
-                  ois.close();
+                  try
+                  {
+                     byteOut.close();
+                     payload = ois.readObject();
+                     ois.close();
+                  }
+                  catch(ClassNotFoundException e)
+                  {
+                     log.debug("Could not marshall invocation payload object " + payload, e);
+                     throw new IOException(e.getMessage());
+                  }
                }
-               catch(ClassNotFoundException e)
+               else
                {
-                  log.debug("Could not marshall invocation payload object " + payload, e);
-                  throw new IOException(e.getMessage());
+                  payload = byteOut.toByteArray();
                }
             }
          }
@@ -323,7 +375,8 @@
          int simulatedTimeout = getSimulatedTimeout(configuration, metadata);
          if (simulatedTimeout <= 0)
          {
-            return server.transport(payload);
+            Object result = server.transport(payload);
+            return unmarshal(result, unmarshaller, metadata);
          }
          else
          {
@@ -360,7 +413,7 @@
             pool.runTaskWrapper(wrapper);
             if (log.isTraceEnabled()) log.trace("task finished in thread pool");
             
-            Object result = resultHolder.value;
+            Object result = unmarshal(resultHolder.value, unmarshaller, metadata);
             if (result == null)
             {
                if (log.isTraceEnabled()) log.trace("invocation timed out");
@@ -381,38 +434,6 @@
                return result;
             }
          }
-         
-//         Object response = server.transport(payload);
-
-         /* Jira Issue: JBREM-167
-         if(unmarshaller != null && !(unmarshaller instanceof RMIUnMarshaller))
-         {
-            PipedOutputStream pos = new PipedOutputStream();
-            PipedInputStream pis = new PipedInputStream();
-            pos.connect(pis);
-            ObjectOutputStream oos = new ObjectOutputStream(pos);
-            oos.writeObject(response);
-            oos.flush();
-            oos.reset();
-            oos.writeObject(Boolean.TRUE);
-            oos.flush();
-            oos.reset();
-            try
-            {
-               oos.close();
-               response = unmarshaller.read(pis, metadata);
-               pis.close();
-            }
-            catch(ClassNotFoundException e)
-            {
-               log.error("Could not unmarshall invocation response" + response, e);
-               throw new IOException(e.getMessage());
-            }
-
-         }
-         */
-
-//         return response;
       }
       catch(RemoteException e)
       {
@@ -459,6 +480,42 @@
    }
    
    
+   protected Object unmarshal(Object o, UnMarshaller unmarshaller, Map metadata) throws IOException
+   {
+      Object result = o;
+      if(unmarshaller != null && !(unmarshaller instanceof RMIUnMarshaller) && !rmiOnewayMarshalling)
+      {
+         if(unmarshaller instanceof UnMarshallerDecorator)
+         {
+            result = ((UnMarshallerDecorator) unmarshaller).removeDecoration(o);
+         }
+         else
+         {  
+            byte[] byteIn = (byte[]) o;
+            ByteArrayInputStream is = new ByteArrayInputStream(byteIn);
+
+            try
+            {
+               if (unmarshaller instanceof VersionedUnMarshaller)
+               {
+                  result = ((VersionedUnMarshaller) unmarshaller).read(is, metadata, getVersion());
+               }
+               else
+               {
+                  result = unmarshaller.read(is, metadata);
+               }
+            }
+            catch(ClassNotFoundException e)
+            {
+               log.debug("Could not unmarshall invocation response" + o, e);
+               throw new IOException(e.getMessage());
+            }
+         }
+      }
+      
+      return result;
+   }
+   
    /**
     * Gets the thread pool being used for simulating timeouts. If one has
     * not been specifically set via configuration or call to set it, will always return




More information about the jboss-remoting-commits mailing list