Author: david.lloyd(a)jboss.com
Date: 2008-07-15 16:02:34 -0400 (Tue, 15 Jul 2008)
New Revision: 4375
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/IdentityResolver.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationMarshaller.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationUnmarshaller.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarhsaller.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarshallerFactory.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationUnmarshaller.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/MarshallingAction.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationUnmarshaller.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferOutputStream.java
Log:
Marshaller implementations; about 95% done (still have to figure a way to defeat the
header reading/writing)
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java 2008-07-14
21:04:18 UTC (rev 4374)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java 2008-07-15
20:02:34 UTC (rev 4375)
@@ -25,6 +25,7 @@
import org.jboss.cx.remoting.spi.remote.ReplyHandler;
import org.jboss.cx.remoting.spi.remote.RemoteRequestContext;
import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
import org.jboss.cx.remoting.RequestCancelHandler;
import org.jboss.cx.remoting.RequestContext;
import org.jboss.cx.remoting.CloseHandler;
@@ -143,6 +144,14 @@
}
}
+ public static void safeAutoClose(final RemoteServiceEndpoint<Object, Object>
remoteServiceEndpoint) {
+ try {
+ remoteServiceEndpoint.autoClose();
+ } catch (Throwable t) {
+ log.error("Failed to set autoClose on %s: %s",
remoteServiceEndpoint, t);
+ }
+ }
+
private static final class BlankRemoteRequestContext implements RemoteRequestContext
{
public void cancel(final boolean mayInterrupt) {
}
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/IdentityResolver.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/IdentityResolver.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/IdentityResolver.java 2008-07-15
20:02:34 UTC (rev 4375)
@@ -0,0 +1,48 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi.marshal;
+
+import java.io.IOException;
+
+/**
+ *
+ */
+public final class IdentityResolver implements ObjectResolver {
+
+ private static final long serialVersionUID = -6980574391387456877L;
+ private static final IdentityResolver INSTANCE = new IdentityResolver();
+
+ private IdentityResolver() {}
+
+ public Object writeReplace(final Object original) throws IOException {
+ return original;
+ }
+
+ public Object readResolve(final Object original) throws IOException {
+ return original;
+ }
+
+ public static IdentityResolver getInstance() {
+ return INSTANCE;
+ }
+}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationMarshaller.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationMarshaller.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationMarshaller.java 2008-07-15
20:02:34 UTC (rev 4375)
@@ -0,0 +1,93 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core.marshal;
+
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.concurrent.Executor;
+import org.jboss.cx.remoting.spi.marshal.Marshaller;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.xnio.log.Logger;
+
+/**
+ *
+ */
+public abstract class AbstractSerializationMarshaller implements
Marshaller<ByteBuffer> {
+ private static final Logger log =
Logger.getLogger(AbstractSerializationMarshaller.class);
+
+ private final Executor executor;
+ private final ObjectOutputStream objectOutputStream;
+ private final Object resultLock = new Object();
+
+ protected final ObjectResolver resolver;
+ protected final OneBufferOutputStream outputStream = new
OneBufferOutputStream(resultLock);
+
+ private boolean done = false;
+
+ protected AbstractSerializationMarshaller(final Executor executor, final
ObjectResolver resolver) throws IOException {
+ this.executor = executor;
+ this.resolver = resolver;
+ objectOutputStream = getObjectOutputStream();
+ }
+
+ protected abstract ObjectOutputStream getObjectOutputStream() throws IOException;
+
+ public void start(final Object object) throws IOException, IllegalStateException {
+ executor.execute(new Runnable() {
+ public void run() {
+ try {
+ log.trace("Beginning serializing object %s", object);
+ synchronized (objectOutputStream) {
+ objectOutputStream.writeObject(object);
+ log.trace("Flushing stream");
+ objectOutputStream.flush();
+ synchronized (resultLock) {
+ outputStream.flush();
+ done = true;
+ resultLock.notify();
+ log.trace("Completed serializing object %s",
object);
+ }
+ }
+ } catch (Throwable t) {
+ log.error(t, "Serialization error");
+ }
+ }
+ });
+ }
+
+ public boolean marshal(final ByteBuffer buffer) throws IOException {
+ log.trace("Marshalling to buffer %s", buffer);
+ outputStream.setBuffer(buffer);
+ synchronized (resultLock) {
+ outputStream.await();
+ return done;
+ }
+ }
+
+ public void clearClassPool() throws IOException {
+ synchronized (objectOutputStream) {
+ objectOutputStream.reset();
+ }
+ }
+}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationUnmarshaller.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationUnmarshaller.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationUnmarshaller.java 2008-07-15
20:02:34 UTC (rev 4375)
@@ -0,0 +1,94 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core.marshal;
+
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.InterruptedIOException;
+import java.util.concurrent.Executor;
+import org.jboss.cx.remoting.spi.marshal.Unmarshaller;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+
+/**
+ *
+ */
+public abstract class AbstractSerializationUnmarshaller implements
Unmarshaller<ByteBuffer> {
+ private final Executor executor;
+ private final ObjectInputStream objectInputStream;
+ private final Object resultLock = new Object();
+
+ protected final ObjectResolver resolver;
+ protected final OneBufferInputStream inputStream = new
OneBufferInputStream(resultLock);
+
+ private boolean done;
+ private Object result;
+ private Throwable cause;
+
+ protected AbstractSerializationUnmarshaller(final Executor executor, final
ObjectResolver resolver) throws IOException {
+ this.executor = executor;
+ this.resolver = resolver;
+ objectInputStream = getObjectInputStream();
+ }
+
+ protected abstract ObjectInputStream getObjectInputStream() throws IOException;
+
+ public boolean unmarshal(final ByteBuffer buffer) throws IOException {
+ executor.execute(new Runnable() {
+ public void run() {
+ synchronized (resultLock) {
+ try {
+ result = objectInputStream.readObject();
+ } catch (Throwable t) {
+ cause = t;
+ }
+ done = true;
+ }
+ }
+ });
+ return false;
+ }
+
+ public Object get() throws IOException, IllegalStateException, ClassNotFoundException
{
+ synchronized (resultLock) {
+ while (! done) {
+ try {
+ resultLock.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("Interrupted while waiting for
marshaling result");
+ }
+ }
+ if (cause != null) {
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else if (cause instanceof ClassNotFoundException) {
+ throw (ClassNotFoundException) cause;
+ } else {
+ throw new RuntimeException("Unmarshalling failed
unexpectedly", cause);
+ }
+ }
+ return result;
+ }
+ }
+}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java 2008-07-14
21:04:18 UTC (rev 4374)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java 2008-07-15
20:02:34 UTC (rev 4375)
@@ -2,65 +2,27 @@
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
+import java.io.ObjectOutputStream;
import java.util.concurrent.Executor;
-import org.jboss.cx.remoting.spi.marshal.Marshaller;
import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
import org.jboss.serial.io.JBossObjectOutputStream;
+import org.jboss.xnio.log.Logger;
/**
*
*/
-public class JBossSerializationMarhsaller implements Marshaller<ByteBuffer> {
+public class JBossSerializationMarhsaller extends AbstractSerializationMarshaller {
- private final Executor executor;
+ private static final Logger log =
Logger.getLogger(JBossSerializationMarhsaller.class);
- private final OurObjectOutputStream objectOutputStream;
- private final OneBufferOutputStream outputStream;
-
- private final Object resultLock = new Object();
- private boolean done = false;
-
public JBossSerializationMarhsaller(final Executor executor, final ObjectResolver
resolver) throws IOException {
- this.executor = executor;
- outputStream = new OneBufferOutputStream(new Object());
- objectOutputStream = new OurObjectOutputStream(outputStream, resolver);
+ super(executor, resolver);
}
- public void start(final Object object) throws IOException, IllegalStateException {
- executor.execute(new Runnable() {
- public void run() {
- try {
- synchronized (objectOutputStream) {
- objectOutputStream.writeObject(object);
- objectOutputStream.flush();
- synchronized (resultLock) {
- outputStream.flush();
- done = true;
- resultLock.notify();
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- });
+ protected ObjectOutputStream getObjectOutputStream() throws IOException {
+ return new OurObjectOutputStream(outputStream, resolver);
}
- public boolean marshal(final ByteBuffer buffer) throws IOException {
- outputStream.setBuffer(buffer);
- synchronized (resultLock) {
- outputStream.await();
- return done;
- }
- }
-
- public void clearClassPool() throws IOException {
- synchronized (objectOutputStream) {
- objectOutputStream.reset();
- }
- }
-
private static final class OurObjectOutputStream extends JBossObjectOutputStream {
private final ObjectResolver resolver;
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java 2008-07-14
21:04:18 UTC (rev 4374)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java 2008-07-15
20:02:34 UTC (rev 4375)
@@ -24,6 +24,6 @@
}
public Unmarshaller<ByteBuffer> createUnmarshaller(final ObjectResolver
resolver, final ClassLoader classLoader) throws IOException {
- return null;
+ return new JBossSerializationUnmarshaller(executor, resolver, classLoader);
}
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationUnmarshaller.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationUnmarshaller.java 2008-07-14
21:04:18 UTC (rev 4374)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationUnmarshaller.java 2008-07-15
20:02:34 UTC (rev 4375)
@@ -22,63 +22,32 @@
package org.jboss.cx.remoting.core.marshal;
-import org.jboss.cx.remoting.spi.marshal.Unmarshaller;
import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
import org.jboss.serial.io.JBossObjectInputStream;
-import java.nio.ByteBuffer;
import java.io.IOException;
import java.io.ObjectStreamClass;
import java.io.InputStream;
+import java.io.ObjectInputStream;
import java.lang.reflect.Proxy;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
*
*/
-public final class JBossSerializationUnmarshaller implements
Unmarshaller<ByteBuffer> {
- private final Executor executor;
- private final OurObjectInputStream objectInputStream;
+public final class JBossSerializationUnmarshaller extends
AbstractSerializationUnmarshaller {
private final ClassLoader classLoader;
- private final ObjectResolver resolver;
- private final AtomicBoolean running = new AtomicBoolean();
-
public JBossSerializationUnmarshaller(final Executor executor, final ObjectResolver
resolver, final ClassLoader classLoader) throws IOException {
- this.executor = executor;
- this.resolver = resolver;
+ super(executor, resolver);
this.classLoader = classLoader;
- objectInputStream = new OurObjectInputStream(new OneBufferInputStream(),
resolver, classLoader);
}
- public boolean unmarshal(final ByteBuffer buffer) throws IOException {
- if (! running.getAndSet(true)) {
- executor.execute(new Runnable() {
- public void run() {
- synchronized (objectInputStream) {
- try {
- final Object object = objectInputStream.readObject();
-
- } catch (IOException e) {
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- } finally {
- running.set(false);
- }
- }
- }
- });
- }
- return false;
+ protected ObjectInputStream getObjectInputStream() throws IOException {
+ return new OurObjectInputStream(inputStream, resolver, classLoader);
}
- public Object get() throws IOException, ClassNotFoundException, IllegalStateException
{
- return null;
- }
-
private static final class OurObjectInputStream extends JBossObjectInputStream {
private final ClassLoader classLoader;
private final ObjectResolver resolver;
Copied:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarhsaller.java
(from rev 4371,
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java)
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarhsaller.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarhsaller.java 2008-07-15
20:02:34 UTC (rev 4375)
@@ -0,0 +1,43 @@
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.ObjectOutputStream;
+import java.util.concurrent.Executor;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.cx.remoting.spi.marshal.IdentityResolver;
+import org.jboss.xnio.log.Logger;
+
+/**
+ *
+ */
+public class JavaSerializationMarhsaller extends AbstractSerializationMarshaller {
+
+ private static final Logger log =
Logger.getLogger(JBossSerializationMarhsaller.class);
+
+ public JavaSerializationMarhsaller(final Executor executor, final ObjectResolver
resolver) throws IOException {
+ super(executor, resolver);
+ }
+
+ protected ObjectOutputStream getObjectOutputStream() throws IOException {
+ return new OurObjectOutputStream(outputStream, resolver == null ?
IdentityResolver.getInstance() : resolver);
+ }
+
+ private static final class OurObjectOutputStream extends ObjectOutputStream {
+ private final ObjectResolver resolver;
+
+ private OurObjectOutputStream(final OutputStream outputStream, final
ObjectResolver resolver) throws IOException {
+ super(outputStream);
+ enableReplaceObject(true);
+ this.resolver = resolver;
+ }
+
+ protected Object replaceObject(final Object obj) throws IOException {
+ return resolver.writeReplace(obj);
+ }
+
+ protected void writeStreamHeader() throws IOException {
+ // no headers
+ }
+ }
+}
\ No newline at end of file
Copied:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarshallerFactory.java
(from rev 4371,
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java)
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarshallerFactory.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarshallerFactory.java 2008-07-15
20:02:34 UTC (rev 4375)
@@ -0,0 +1,29 @@
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import org.jboss.cx.remoting.spi.marshal.Marshaller;
+import org.jboss.cx.remoting.spi.marshal.MarshallerFactory;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.cx.remoting.spi.marshal.Unmarshaller;
+
+/**
+ *
+ */
+public class JavaSerializationMarshallerFactory implements
MarshallerFactory<ByteBuffer> {
+
+ private final Executor executor;
+
+ public JavaSerializationMarshallerFactory(final Executor executor) {
+ this.executor = executor;
+ }
+
+ public Marshaller<ByteBuffer> createMarshaller(final ObjectResolver resolver)
throws IOException {
+ return new JavaSerializationMarhsaller(executor, resolver);
+ }
+
+ public Unmarshaller<ByteBuffer> createUnmarshaller(final ObjectResolver
resolver, final ClassLoader classLoader) throws IOException {
+ return new JavaSerializationUnmarshaller(executor, resolver, classLoader);
+ }
+}
\ No newline at end of file
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationUnmarshaller.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationUnmarshaller.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationUnmarshaller.java 2008-07-15
20:02:34 UTC (rev 4375)
@@ -0,0 +1,101 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.ObjectInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamClass;
+import java.util.concurrent.Executor;
+import java.util.Map;
+import java.util.HashMap;
+import java.lang.reflect.Proxy;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+
+/**
+ *
+ */
+public final class JavaSerializationUnmarshaller extends
AbstractSerializationUnmarshaller {
+
+ private final ClassLoader classLoader;
+
+ public JavaSerializationUnmarshaller(final Executor executor, final ObjectResolver
resolver, final ClassLoader classLoader) throws IOException {
+ super(executor, resolver);
+ this.classLoader = classLoader;
+ }
+
+ protected ObjectInputStream getObjectInputStream() throws IOException {
+ return new OurObjectInputStream(inputStream, resolver, classLoader);
+ }
+
+ private static final class OurObjectInputStream extends ObjectInputStream {
+ private final ClassLoader classLoader;
+ private final ObjectResolver resolver;
+
+ private OurObjectInputStream(final InputStream inputStream, final ObjectResolver
resolver, final ClassLoader classLoader) throws IOException {
+ super(inputStream);
+ this.classLoader = classLoader;
+ this.resolver = resolver;
+ }
+
+ protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException,
ClassNotFoundException {
+ final String name = desc.getName();
+ if (primitiveTypes.containsKey(name)) {
+ return primitiveTypes.get(name);
+ } else {
+ return Class.forName(name, false, classLoader);
+ }
+ }
+
+ protected Class<?> resolveProxyClass(final String[] interfaceNames) throws
IOException, ClassNotFoundException {
+ final int length = interfaceNames.length;
+ final Class<?>[] interfaces = new Class[length];
+ for (int i = 0; i < length; i ++) {
+ interfaces[i] = Class.forName(interfaceNames[i], false, classLoader);
+ }
+ return Proxy.getProxyClass(classLoader, interfaces);
+ }
+
+ protected Object resolveObject(final Object obj) throws IOException {
+ return resolver.readResolve(obj);
+ }
+
+ private static final Map<String, Class<?>> primitiveTypes = new
HashMap<String, Class<?>>();
+
+ private static <T> void add(Class<T> type) {
+ primitiveTypes.put(type.getName(), type);
+ }
+
+ static {
+ add(void.class);
+ add(boolean.class);
+ add(byte.class);
+ add(short.class);
+ add(int.class);
+ add(long.class);
+ add(float.class);
+ add(double.class);
+ add(char.class);
+ }
+ }
+}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/MarshallingAction.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/MarshallingAction.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/MarshallingAction.java 2008-07-15
20:02:34 UTC (rev 4375)
@@ -0,0 +1,33 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+public interface MarshallingAction {
+ boolean marshal(ByteBuffer buffer) throws IOException;
+}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java 2008-07-14
21:04:18 UTC (rev 4374)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java 2008-07-15
20:02:34 UTC (rev 4375)
@@ -32,15 +32,23 @@
*/
public final class OneBufferInputStream extends InputStream {
- private final Object lock = new Object();
+ private final Object lock;
private ByteBuffer buffer;
+ private boolean eof;
+ public OneBufferInputStream(final Object lock) {
+ this.lock = lock;
+ }
+
private ByteBuffer getBuffer() throws InterruptedIOException {
synchronized (lock) {
for (;;) {
final ByteBuffer buffer = this.buffer;
if (buffer != null) {
if (! buffer.hasRemaining()) {
+ if (eof) {
+ return null;
+ }
lock.notify();
this.buffer = null;
} else {
@@ -57,15 +65,38 @@
}
}
+ public void setBuffer(ByteBuffer buffer, boolean eof) {
+ synchronized (lock) {
+ if (this.buffer != null) {
+ throw new IllegalStateException("Buffer already set");
+ }
+ this.buffer = buffer;
+ this.eof = eof;
+ lock.notify();
+ }
+ }
+
public int read() throws IOException {
synchronized (lock) {
- return getBuffer().get() & 0xff;
+ final ByteBuffer buffer = getBuffer();
+ return buffer == null ? -1 : buffer.get() & 0xff;
}
}
public int read(final byte[] b, int off, int len) throws IOException {
+ int c = 0;
synchronized (lock) {
- return 0;
+ while (len > 0) {
+ final ByteBuffer buffer = getBuffer();
+ if (buffer == null) {
+ return c == 0 ? -1 : c;
+ }
+ int rem = Math.min(len, buffer.remaining());
+ buffer.get(b, off, rem);
+ off += rem;
+ len -= rem;
+ }
+ return c;
}
}
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferOutputStream.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferOutputStream.java 2008-07-14
21:04:18 UTC (rev 4374)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferOutputStream.java 2008-07-15
20:02:34 UTC (rev 4375)
@@ -55,8 +55,8 @@
final ByteBuffer buffer = this.buffer;
if (buffer != null) {
if (! buffer.hasRemaining()) {
+ this.buffer = null;
lock.notify();
- this.buffer = null;
} else {
return buffer;
}