Author: ron.sigal(a)jboss.com
Date: 2008-03-12 23:27:14 -0400 (Wed, 12 Mar 2008)
New Revision: 3606
Modified:
remoting2/branches/2.x/src/main/org/jboss/remoting/transport/rmi/RMIClientInvoker.java
Log:
JBREM-167: Added ability to marshal/unmarshal in both directions.
Modified:
remoting2/branches/2.x/src/main/org/jboss/remoting/transport/rmi/RMIClientInvoker.java
===================================================================
---
remoting2/branches/2.x/src/main/org/jboss/remoting/transport/rmi/RMIClientInvoker.java 2008-03-12
07:54:08 UTC (rev 3605)
+++
remoting2/branches/2.x/src/main/org/jboss/remoting/transport/rmi/RMIClientInvoker.java 2008-03-13
03:27:14 UTC (rev 3606)
@@ -32,18 +32,23 @@
import org.jboss.remoting.RemoteClientInvoker;
import org.jboss.remoting.Remoting;
import org.jboss.remoting.ServerInvoker;
-import org.jboss.remoting.Version;
import org.jboss.remoting.marshal.Marshaller;
import org.jboss.remoting.marshal.MarshallerDecorator;
import org.jboss.remoting.marshal.UnMarshaller;
+import org.jboss.remoting.marshal.UnMarshallerDecorator;
import org.jboss.remoting.marshal.VersionedMarshaller;
+import org.jboss.remoting.marshal.VersionedUnMarshaller;
import org.jboss.remoting.marshal.rmi.RMIMarshaller;
+import org.jboss.remoting.marshal.rmi.RMIUnMarshaller;
+import org.jboss.remoting.serialization.SerializationManager;
+import org.jboss.remoting.serialization.SerializationStreamFactory;
import org.jboss.util.threadpool.BasicThreadPool;
import org.jboss.util.threadpool.BlockingMode;
import org.jboss.util.threadpool.RunnableTaskWrapper;
import org.jboss.util.threadpool.Task;
import org.jboss.util.threadpool.ThreadPool;
+import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -92,6 +97,8 @@
private Object timeoutThreadPoolLock = new Object();
private ThreadPool timeoutThreadPool;
+
+ protected boolean rmiOnewayMarshalling;
/**
* Need flag to indicate if have been able to lookup registry and set stub.
@@ -105,14 +112,47 @@
public RMIClientInvoker(InvokerLocator locator)
{
super(locator);
+ configureParameters();
}
public RMIClientInvoker(InvokerLocator locator, Map configuration)
{
super(locator, configuration);
+ configureParameters();
}
+
+ protected void configureParameters()
+ {
+ Map params = configuration;
+ if (params == null)
+ {
+ return;
+ }
+ // look for enableTcpNoDelay param
+ Object val = params.get(RMIServerInvoker.RMI_ONEWAY_MARSHALLING);
+ if (val instanceof String)
+ {
+ try
+ {
+ rmiOnewayMarshalling = Boolean.valueOf((String)val).booleanValue();
+ log.debug(this + " setting rmiOnewayMarshalling to " +
rmiOnewayMarshalling);
+ }
+ catch (Exception e)
+ {
+ log.warn(this + " could not convert " +
RMIServerInvoker.RMI_ONEWAY_MARSHALLING +
+ " value of " + val + " to a boolean value.
Defaulting to false");
+ }
+ }
+ else if (val != null)
+ {
+ log.warn(this + " value of " + RMIServerInvoker.RMI_ONEWAY_MARSHALLING
+
+ " (" + val + ") must be a String. Defaulting to
false");
+ }
+ }
+
+
private int getRegistryPort(InvokerLocator locator)
{
int port = RMIServerInvoker.DEFAULT_REGISTRY_PORT;
@@ -303,19 +343,31 @@
((VersionedMarshaller) marshaller).write(payload, byteOut,
getVersion());
else
marshaller.write(payload, byteOut);
- ByteArrayInputStream byteIn = new
ByteArrayInputStream(byteOut.toByteArray());
- ObjectInputStream ois = new ObjectInputStream(byteIn);
+
+ byteOut.close();
+
+ if (rmiOnewayMarshalling)
+ {
+ // Legacy treatment, pre 2.4.0.
+ ByteArrayInputStream bais = new
ByteArrayInputStream(byteOut.toByteArray());
+ SerializationManager manager =
SerializationStreamFactory.getManagerInstance(getSerializationType());
+ ObjectInputStream ois = manager.createInput(bais, getClassLoader());
- try
- {
- byteOut.close();
- payload = ois.readObject();
- ois.close();
+ try
+ {
+ byteOut.close();
+ payload = ois.readObject();
+ ois.close();
+ }
+ catch(ClassNotFoundException e)
+ {
+ log.debug("Could not marshall invocation payload object "
+ payload, e);
+ throw new IOException(e.getMessage());
+ }
}
- catch(ClassNotFoundException e)
+ else
{
- log.debug("Could not marshall invocation payload object " +
payload, e);
- throw new IOException(e.getMessage());
+ payload = byteOut.toByteArray();
}
}
}
@@ -323,7 +375,8 @@
int simulatedTimeout = getSimulatedTimeout(configuration, metadata);
if (simulatedTimeout <= 0)
{
- return server.transport(payload);
+ Object result = server.transport(payload);
+ return unmarshal(result, unmarshaller, metadata);
}
else
{
@@ -360,7 +413,7 @@
pool.runTaskWrapper(wrapper);
if (log.isTraceEnabled()) log.trace("task finished in thread
pool");
- Object result = resultHolder.value;
+ Object result = unmarshal(resultHolder.value, unmarshaller, metadata);
if (result == null)
{
if (log.isTraceEnabled()) log.trace("invocation timed out");
@@ -381,38 +434,6 @@
return result;
}
}
-
-// Object response = server.transport(payload);
-
- /* Jira Issue: JBREM-167
- if(unmarshaller != null && !(unmarshaller instanceof RMIUnMarshaller))
- {
- PipedOutputStream pos = new PipedOutputStream();
- PipedInputStream pis = new PipedInputStream();
- pos.connect(pis);
- ObjectOutputStream oos = new ObjectOutputStream(pos);
- oos.writeObject(response);
- oos.flush();
- oos.reset();
- oos.writeObject(Boolean.TRUE);
- oos.flush();
- oos.reset();
- try
- {
- oos.close();
- response = unmarshaller.read(pis, metadata);
- pis.close();
- }
- catch(ClassNotFoundException e)
- {
- log.error("Could not unmarshall invocation response" + response,
e);
- throw new IOException(e.getMessage());
- }
-
- }
- */
-
-// return response;
}
catch(RemoteException e)
{
@@ -459,6 +480,42 @@
}
+ protected Object unmarshal(Object o, UnMarshaller unmarshaller, Map metadata) throws
IOException
+ {
+ Object result = o;
+ if(unmarshaller != null && !(unmarshaller instanceof RMIUnMarshaller)
&& !rmiOnewayMarshalling)
+ {
+ if(unmarshaller instanceof UnMarshallerDecorator)
+ {
+ result = ((UnMarshallerDecorator) unmarshaller).removeDecoration(o);
+ }
+ else
+ {
+ byte[] byteIn = (byte[]) o;
+ ByteArrayInputStream is = new ByteArrayInputStream(byteIn);
+
+ try
+ {
+ if (unmarshaller instanceof VersionedUnMarshaller)
+ {
+ result = ((VersionedUnMarshaller) unmarshaller).read(is, metadata,
getVersion());
+ }
+ else
+ {
+ result = unmarshaller.read(is, metadata);
+ }
+ }
+ catch(ClassNotFoundException e)
+ {
+ log.debug("Could not unmarshall invocation response" + o, e);
+ throw new IOException(e.getMessage());
+ }
+ }
+ }
+
+ return result;
+ }
+
/**
* Gets the thread pool being used for simulating timeouts. If one has
* not been specifically set via configuration or call to set it, will always return