Author: david.lloyd(a)jboss.com
Date: 2008-04-23 13:13:47 -0400 (Wed, 23 Apr 2008)
New Revision: 4046
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/CompositeObjectResolver.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/ClientResolver.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/ClientSourceResolver.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationObjectMessageInput.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationObjectMessageOutput.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/StreamResolver.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/MarshallerFactory.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/ObjectResolver.java
Log:
Separated Marshaller implementation
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/CompositeObjectResolver.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/CompositeObjectResolver.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/CompositeObjectResolver.java 2008-04-23
17:13:47 UTC (rev 4046)
@@ -0,0 +1,34 @@
+package org.jboss.cx.remoting.spi.marshal;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Collections;
+import org.jboss.cx.remoting.util.CollectionUtil;
+
+/**
+ *
+ */
+public final class CompositeObjectResolver implements ObjectResolver {
+
+ private static final long serialVersionUID = -5506005026832413276L;
+
+ private final List<ObjectResolver> resolvers;
+
+ public CompositeObjectResolver(List<ObjectResolver> resolvers) {
+ this.resolvers =
Collections.unmodifiableList(CollectionUtil.arrayList(resolvers));
+ }
+
+ public Object writeReplace(Object object) throws IOException {
+ for (ObjectResolver resolver : resolvers) {
+ object = resolver.writeReplace(object);
+ }
+ return object;
+ }
+
+ public Object readResolve(Object object) throws IOException {
+ for (ObjectResolver resolver : resolvers) {
+ object = resolver.readResolve(object);
+ }
+ return object;
+ }
+}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java 2008-04-23
04:09:14 UTC (rev 4045)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java 2008-04-23
17:13:47 UTC (rev 4046)
@@ -2,20 +2,32 @@
import java.io.IOException;
import java.io.Serializable;
-import org.jboss.cx.remoting.util.DataMessageInput;
-import org.jboss.cx.remoting.util.DataMessageOutput;
import org.jboss.cx.remoting.util.ObjectMessageInput;
import org.jboss.cx.remoting.util.ObjectMessageOutput;
+import org.jboss.cx.remoting.util.ByteMessageOutput;
+import org.jboss.cx.remoting.util.ByteMessageInput;
/**
- *
+ * A marshaller/unmarshaller for transmitting data over a wire protocol of some sort.
Each marshaller instance is
+ * guaranteed to be used by only one thread. Marshallers are not pooled or reused in any
way.
*/
public interface Marshaller extends Serializable {
- ObjectMessageOutput getMessageOutput(DataMessageOutput dataMessageOutput) throws
IOException;
- ObjectMessageInput getMessageInput(DataMessageInput dataMessageInput) throws
IOException;
+ /**
+ * Get a message writer that marshals to the given stream.
+ *
+ * @param byteMessageOutput the target stream
+ * @return the message writer
+ * @throws IOException if an error occurs
+ */
+ ObjectMessageOutput getMessageOutput(ByteMessageOutput byteMessageOutput) throws
IOException;
- void addFirstObjectResolver(ObjectResolver resolver);
-
- void addLastObjectResolver(ObjectResolver resolver);
+ /**
+ * Get a message reader that unmarshals from the given stream.
+ *
+ * @param byteMessageInput the source stream
+ * @return the message reader
+ * @throws IOException if an error occurs
+ */
+ ObjectMessageInput getMessageInput(ByteMessageInput byteMessageInput) throws
IOException;
}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/MarshallerFactory.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/MarshallerFactory.java 2008-04-23
04:09:14 UTC (rev 4045)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/MarshallerFactory.java 2008-04-23
17:13:47 UTC (rev 4046)
@@ -6,5 +6,5 @@
*
*/
public interface MarshallerFactory {
- Marshaller createRootMarshaller(ClassLoader classLoader) throws IOException;
+ Marshaller createRootMarshaller(ObjectResolver resolver, ClassLoader classLoader)
throws IOException;
}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/ObjectResolver.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/ObjectResolver.java 2008-04-23
04:09:14 UTC (rev 4045)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/ObjectResolver.java 2008-04-23
17:13:47 UTC (rev 4046)
@@ -4,10 +4,30 @@
import java.io.Serializable;
/**
- *
+ * A resolver for marshallers. Instances of this interface are used to dynamically
substitute marker objects for
+ * objects that behave specially, such as streams. Instances of this interface are used
by multiple threads and may
+ * be reused any number of times.
+ * <p/>
+ * All instances of this interface that are associated with a session will be called in
sequence, each instance getting
+ * the return value of the previous instance.
*/
public interface ObjectResolver extends Serializable {
- Object readResolve(Object original) throws IOException;
+ /**
+ * Substitute a real object with an object for the stream.
+ *
+ * @param original the real object
+ * @return the object for the stream
+ * @throws IOException if an error occurs
+ */
Object writeReplace(Object original) throws IOException;
+
+ /**
+ * Substitute an object from the stream with the real object.
+ *
+ * @param original the object from the stream
+ * @return the real object
+ * @throws IOException if an error occurs
+ */
+ Object readResolve(Object original) throws IOException;
}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/ClientResolver.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/ClientResolver.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/ClientResolver.java 2008-04-23
17:13:47 UTC (rev 4046)
@@ -0,0 +1,36 @@
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.IOException;
+import org.jboss.cx.remoting.core.AbstractRealClient;
+import org.jboss.cx.remoting.core.ClientMarker;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.cx.remoting.spi.protocol.ClientIdentifier;
+
+/**
+ *
+ */
+public final class ClientResolver implements ObjectResolver {
+
+ private static final long serialVersionUID = 7850552704308592325L;
+
+ public Object writeReplace(final Object original) throws IOException {
+ if (original instanceof AbstractRealClient) {
+ AbstractRealClient client = (AbstractRealClient) original;
+
+ return null;
+ } else {
+ return original;
+ }
+ }
+
+ public Object readResolve(final Object original) throws IOException {
+ if (original instanceof ClientMarker) {
+ ClientMarker clientMarker = (ClientMarker) original;
+ ClientIdentifier clientIdentifier = clientMarker.getClientIdentifer();
+
+ return null;
+ } else {
+ return original;
+ }
+ }
+}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/ClientSourceResolver.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/ClientSourceResolver.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/ClientSourceResolver.java 2008-04-23
17:13:47 UTC (rev 4046)
@@ -0,0 +1,21 @@
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.IOException;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+
+/**
+ *
+ */
+public final class ClientSourceResolver implements ObjectResolver {
+
+ private static final long serialVersionUID = 7850552704308592325L;
+
+ public Object writeReplace(final Object original) throws IOException {
+ return null;
+ }
+
+ public Object readResolve(final Object original) throws IOException {
+
+ return null;
+ }
+}
\ No newline at end of file
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java 2008-04-23
17:13:47 UTC (rev 4046)
@@ -0,0 +1,33 @@
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.IOException;
+import org.jboss.cx.remoting.spi.marshal.Marshaller;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.cx.remoting.util.ByteMessageInput;
+import org.jboss.cx.remoting.util.ByteMessageOutput;
+import org.jboss.cx.remoting.util.ObjectMessageInput;
+import org.jboss.cx.remoting.util.ObjectMessageOutput;
+
+/**
+ *
+ */
+public class JBossSerializationMarhsaller implements Marshaller {
+
+ private static final long serialVersionUID = -8197192536466706414L;
+
+ private final ObjectResolver resolver;
+ private final ClassLoader classLoader;
+
+ public JBossSerializationMarhsaller(final ObjectResolver resolver, final ClassLoader
classLoader) {
+ this.resolver = resolver;
+ this.classLoader = classLoader;
+ }
+
+ public ObjectMessageOutput getMessageOutput(final ByteMessageOutput
byteMessageOutput) throws IOException {
+ return new JBossSerializationObjectMessageOutput(resolver, byteMessageOutput);
+ }
+
+ public ObjectMessageInput getMessageInput(final ByteMessageInput byteMessageInput)
throws IOException {
+ return new JBossSerializationObjectMessageInput(resolver, byteMessageInput,
classLoader);
+ }
+}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java 2008-04-23
17:13:47 UTC (rev 4046)
@@ -0,0 +1,16 @@
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.IOException;
+import org.jboss.cx.remoting.spi.marshal.Marshaller;
+import org.jboss.cx.remoting.spi.marshal.MarshallerFactory;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+
+/**
+ *
+ */
+public class JBossSerializationMarshallerFactory implements MarshallerFactory {
+
+ public Marshaller createRootMarshaller(final ObjectResolver resolver, final
ClassLoader classLoader) throws IOException {
+ return new JBossSerializationMarhsaller(resolver, classLoader);
+ }
+}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationObjectMessageInput.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationObjectMessageInput.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationObjectMessageInput.java 2008-04-23
17:13:47 UTC (rev 4046)
@@ -0,0 +1,108 @@
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamClass;
+import java.lang.reflect.Proxy;
+import java.util.HashMap;
+import java.util.Map;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.cx.remoting.util.ByteMessageInput;
+import org.jboss.cx.remoting.util.ObjectMessageInput;
+import org.jboss.serial.io.JBossObjectInputStream;
+
+/**
+ *
+ */
+public class JBossSerializationObjectMessageInput extends JBossObjectInputStream
implements ObjectMessageInput {
+
+ private final ObjectResolver resolver;
+ private final ByteMessageInput dataMessageInput;
+
+ public JBossSerializationObjectMessageInput(final ObjectResolver resolver, final
ByteMessageInput dataMessageInput, final ClassLoader classLoader) throws IOException {
+ super(new InputStream() {
+
+ public int read(final byte b[]) throws IOException {
+ return dataMessageInput.read(b);
+ }
+
+ public int read(final byte b[], final int off, final int len) throws
IOException {
+ return dataMessageInput.read(b, off, len);
+ }
+
+ public int available() throws IOException {
+ return dataMessageInput.remaining();
+ }
+
+ public void close() throws IOException {
+ dataMessageInput.close();
+ }
+
+ public boolean markSupported() {
+ return false;
+ }
+
+ public int read() throws IOException {
+ return dataMessageInput.read();
+ }
+ }, classLoader);
+ if (resolver == null) {
+ throw new NullPointerException("resolver is null");
+ }
+ if (dataMessageInput == null) {
+ throw new NullPointerException("dataMessageInput is null");
+ }
+ if (classLoader == null) {
+ throw new NullPointerException("classLoader is null");
+ }
+ enableResolveObject(true);
+ this.resolver = resolver;
+ this.dataMessageInput = dataMessageInput;
+ }
+
+ public int remaining() {
+ return dataMessageInput.remaining();
+ }
+
+ protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException,
ClassNotFoundException {
+ final String name = desc.getName();
+ ClassLoader classLoader = getClassLoader();
+ if (primitiveTypes.containsKey(name)) {
+ return primitiveTypes.get(name);
+ } else {
+ return Class.forName(name, false, classLoader);
+ }
+ }
+
+ protected Class<?> resolveProxyClass(final String[] interfaceNames) throws
IOException, ClassNotFoundException {
+ final ClassLoader classLoader = getClassLoader();
+ final int length = interfaceNames.length;
+ final Class<?>[] interfaces = new Class[length];
+ for (int i = 0; i < length; i ++) {
+ interfaces[i] = Class.forName(interfaceNames[i], false, classLoader);
+ }
+ return Proxy.getProxyClass(classLoader, interfaces);
+ }
+
+ protected Object resolveObject(final Object obj) throws IOException {
+ return resolver.readResolve(obj);
+ }
+
+ private static final Map<String, Class<?>> primitiveTypes = new
HashMap<String, Class<?>>();
+
+ private static <T> void add(Class<T> type) {
+ primitiveTypes.put(type.getName(), type);
+ }
+
+ static {
+ add(void.class);
+ add(boolean.class);
+ add(byte.class);
+ add(short.class);
+ add(int.class);
+ add(long.class);
+ add(float.class);
+ add(double.class);
+ add(char.class);
+ }
+}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationObjectMessageOutput.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationObjectMessageOutput.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationObjectMessageOutput.java 2008-04-23
17:13:47 UTC (rev 4046)
@@ -0,0 +1,58 @@
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.cx.remoting.util.ByteMessageOutput;
+import org.jboss.cx.remoting.util.ObjectMessageOutput;
+import org.jboss.serial.io.JBossObjectOutputStream;
+
+/**
+ *
+ */
+public class JBossSerializationObjectMessageOutput extends JBossObjectOutputStream
implements ObjectMessageOutput {
+
+ private final ObjectResolver resolver;
+ private final ByteMessageOutput dataMessageOutput;
+
+ public JBossSerializationObjectMessageOutput(final ObjectResolver resolver, final
ByteMessageOutput dataMessageOutput) throws IOException {
+ super(new OutputStream() {
+ public void write(final int b) throws IOException {
+ dataMessageOutput.write(b);
+ }
+
+ public void write(final byte b[]) throws IOException {
+ dataMessageOutput.write(b);
+ }
+
+ public void write(final byte b[], final int off, final int len) throws
IOException {
+ dataMessageOutput.write(b, off, len);
+ }
+
+ public void flush() throws IOException {
+ dataMessageOutput.flush();
+ }
+
+ public void close() throws IOException {
+ dataMessageOutput.close();
+ }
+ });
+ enableReplaceObject(true);
+ this.resolver = resolver;
+ this.dataMessageOutput = dataMessageOutput;
+ }
+
+ public void commit() throws IOException {
+ flush();
+ dataMessageOutput.commit();
+ }
+
+ public int getBytesWritten() throws IOException {
+ flush();
+ return dataMessageOutput.getBytesWritten();
+ }
+
+ protected Object replaceObject(final Object obj) throws IOException {
+ return resolver.writeReplace(obj);
+ }
+}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/StreamResolver.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/StreamResolver.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/StreamResolver.java 2008-04-23
17:13:47 UTC (rev 4046)
@@ -0,0 +1,39 @@
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.IOException;
+import java.util.concurrent.Executor;
+import org.jboss.cx.remoting.core.StreamMarker;
+import org.jboss.cx.remoting.core.util.OrderedExecutorFactory;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+
+/**
+ *
+ */
+public final class StreamResolver implements ObjectResolver {
+
+ private static final long serialVersionUID = -5060456855964622071L;
+
+ private final Executor streamExecutor;
+
+ public StreamResolver(OrderedExecutorFactory factory) {
+ streamExecutor = factory.getOrderedExecutor();
+ }
+
+ public StreamResolver(final Executor streamExecutor) {
+ this.streamExecutor = streamExecutor;
+ }
+
+ public Object writeReplace(final Object original) throws IOException {
+ // todo - run thru stream detector(s)?
+ return null;
+ }
+
+ public Object readResolve(final Object original) throws IOException {
+ if (original instanceof StreamMarker) {
+ StreamMarker streamMarker = (StreamMarker) original;
+ return null;
+ } else {
+ return original;
+ }
+ }
+}