[jboss-remoting-commits] JBoss Remoting SVN: r4375 - in remoting3/trunk: api/src/main/java/org/jboss/cx/remoting/spi/marshal and 1 other directories.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Tue Jul 15 16:02:34 EDT 2008


Author: david.lloyd at jboss.com
Date: 2008-07-15 16:02:34 -0400 (Tue, 15 Jul 2008)
New Revision: 4375

Added:
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/IdentityResolver.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationMarshaller.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationUnmarshaller.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarhsaller.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarshallerFactory.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationUnmarshaller.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/MarshallingAction.java
Modified:
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationUnmarshaller.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferOutputStream.java
Log:
Marshaller implementations; about 95% done (still have to figure a way to defeat the header reading/writing)

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java	2008-07-14 21:04:18 UTC (rev 4374)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java	2008-07-15 20:02:34 UTC (rev 4375)
@@ -25,6 +25,7 @@
 import org.jboss.cx.remoting.spi.remote.ReplyHandler;
 import org.jboss.cx.remoting.spi.remote.RemoteRequestContext;
 import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
 import org.jboss.cx.remoting.RequestCancelHandler;
 import org.jboss.cx.remoting.RequestContext;
 import org.jboss.cx.remoting.CloseHandler;
@@ -143,6 +144,14 @@
         }
     }
 
+    public static void safeAutoClose(final RemoteServiceEndpoint<Object, Object> remoteServiceEndpoint) {
+        try {
+            remoteServiceEndpoint.autoClose();
+        } catch (Throwable t) {
+            log.error("Failed to set autoClose on %s: %s", remoteServiceEndpoint, t);
+        }
+    }
+
     private static final class BlankRemoteRequestContext implements RemoteRequestContext {
         public void cancel(final boolean mayInterrupt) {
         }

Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/IdentityResolver.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/IdentityResolver.java	                        (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/IdentityResolver.java	2008-07-15 20:02:34 UTC (rev 4375)
@@ -0,0 +1,48 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi.marshal;
+
+import java.io.IOException;
+
+/**
+ *
+ */
+public final class IdentityResolver implements ObjectResolver {
+
+    private static final long serialVersionUID = -6980574391387456877L;
+    private static final IdentityResolver INSTANCE = new IdentityResolver();
+
+    private IdentityResolver() {}
+
+    public Object writeReplace(final Object original) throws IOException {
+        return original;
+    }
+
+    public Object readResolve(final Object original) throws IOException {
+        return original;
+    }
+
+    public static IdentityResolver getInstance() {
+        return INSTANCE;
+    }
+}

Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationMarshaller.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationMarshaller.java	                        (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationMarshaller.java	2008-07-15 20:02:34 UTC (rev 4375)
@@ -0,0 +1,93 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core.marshal;
+
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.concurrent.Executor;
+import org.jboss.cx.remoting.spi.marshal.Marshaller;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.xnio.log.Logger;
+
+/**
+ *
+ */
+public abstract class AbstractSerializationMarshaller implements Marshaller<ByteBuffer> {
+    private static final Logger log = Logger.getLogger(AbstractSerializationMarshaller.class);
+
+    private final Executor executor;
+    private final ObjectOutputStream objectOutputStream;
+    private final Object resultLock = new Object();
+
+    protected final ObjectResolver resolver;
+    protected final OneBufferOutputStream outputStream = new OneBufferOutputStream(resultLock);
+
+    private boolean done = false;
+
+    protected AbstractSerializationMarshaller(final Executor executor, final ObjectResolver resolver) throws IOException {
+        this.executor = executor;
+        this.resolver = resolver;
+        objectOutputStream = getObjectOutputStream();
+    }
+
+    protected abstract ObjectOutputStream getObjectOutputStream() throws IOException;
+
+    public void start(final Object object) throws IOException, IllegalStateException {
+        executor.execute(new Runnable() {
+            public void run() {
+                try {
+                    log.trace("Beginning serializing object %s", object);
+                    synchronized (objectOutputStream) {
+                        objectOutputStream.writeObject(object);
+                        log.trace("Flushing stream");
+                        objectOutputStream.flush();
+                        synchronized (resultLock) {
+                            outputStream.flush();
+                            done = true;
+                            resultLock.notify();
+                            log.trace("Completed serializing object %s", object);
+                        }
+                    }
+                } catch (Throwable t) {
+                    log.error(t, "Serialization error");
+                }
+            }
+        });
+    }
+
+    public boolean marshal(final ByteBuffer buffer) throws IOException {
+        log.trace("Marshalling to buffer %s", buffer);
+        outputStream.setBuffer(buffer);
+        synchronized (resultLock) {
+            outputStream.await();
+            return done;
+        }
+    }
+
+    public void clearClassPool() throws IOException {
+        synchronized (objectOutputStream) {
+            objectOutputStream.reset();
+        }
+    }
+}

Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationUnmarshaller.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationUnmarshaller.java	                        (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationUnmarshaller.java	2008-07-15 20:02:34 UTC (rev 4375)
@@ -0,0 +1,94 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core.marshal;
+
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.InterruptedIOException;
+import java.util.concurrent.Executor;
+import org.jboss.cx.remoting.spi.marshal.Unmarshaller;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+
+/**
+ *
+ */
+public abstract class AbstractSerializationUnmarshaller implements Unmarshaller<ByteBuffer> {
+    private final Executor executor;
+    private final ObjectInputStream objectInputStream;
+    private final Object resultLock = new Object();
+
+    protected final ObjectResolver resolver;
+    protected final OneBufferInputStream inputStream = new OneBufferInputStream(resultLock);
+
+    private boolean done;
+    private Object result;
+    private Throwable cause;
+
+    protected AbstractSerializationUnmarshaller(final Executor executor, final ObjectResolver resolver) throws IOException {
+        this.executor = executor;
+        this.resolver = resolver;
+        objectInputStream = getObjectInputStream();
+    }
+
+    protected abstract ObjectInputStream getObjectInputStream() throws IOException;
+
+    public boolean unmarshal(final ByteBuffer buffer) throws IOException {
+        executor.execute(new Runnable() {
+            public void run() {
+                synchronized (resultLock) {
+                    try {
+                        result = objectInputStream.readObject();
+                    } catch (Throwable t) {
+                        cause = t;
+                    }
+                    done = true;
+                }
+            }
+        });
+        return false;
+    }
+
+    public Object get() throws IOException, IllegalStateException, ClassNotFoundException {
+        synchronized (resultLock) {
+            while (! done) {
+                try {
+                    resultLock.wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new InterruptedIOException("Interrupted while waiting for marshaling result");
+                }
+            }
+            if (cause != null) {
+                if (cause instanceof IOException) {
+                    throw (IOException) cause;
+                } else if (cause instanceof ClassNotFoundException) {
+                    throw (ClassNotFoundException) cause;
+                } else {
+                    throw new RuntimeException("Unmarshalling failed unexpectedly", cause);
+                }
+            }
+            return result;
+        }
+    }
+}

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java	2008-07-14 21:04:18 UTC (rev 4374)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java	2008-07-15 20:02:34 UTC (rev 4375)
@@ -2,65 +2,27 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
+import java.io.ObjectOutputStream;
 import java.util.concurrent.Executor;
-import org.jboss.cx.remoting.spi.marshal.Marshaller;
 import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
 import org.jboss.serial.io.JBossObjectOutputStream;
+import org.jboss.xnio.log.Logger;
 
 /**
  *
  */
-public class JBossSerializationMarhsaller implements Marshaller<ByteBuffer> {
+public class JBossSerializationMarhsaller extends AbstractSerializationMarshaller {
 
-    private final Executor executor;
+    private static final Logger log = Logger.getLogger(JBossSerializationMarhsaller.class);
 
-    private final OurObjectOutputStream objectOutputStream;
-    private final OneBufferOutputStream outputStream;
-
-    private final Object resultLock = new Object();
-    private boolean done = false;
-
     public JBossSerializationMarhsaller(final Executor executor, final ObjectResolver resolver) throws IOException {
-        this.executor = executor;
-        outputStream = new OneBufferOutputStream(new Object());
-        objectOutputStream = new OurObjectOutputStream(outputStream, resolver);
+        super(executor, resolver);
     }
 
-    public void start(final Object object) throws IOException, IllegalStateException {
-        executor.execute(new Runnable() {
-            public void run() {
-                try {
-                    synchronized (objectOutputStream) {
-                        objectOutputStream.writeObject(object);
-                        objectOutputStream.flush();
-                        synchronized (resultLock) {
-                            outputStream.flush();
-                            done = true;
-                            resultLock.notify();
-                        }
-                    }
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-            }
-        });
+    protected ObjectOutputStream getObjectOutputStream() throws IOException {
+        return new OurObjectOutputStream(outputStream, resolver);
     }
 
-    public boolean marshal(final ByteBuffer buffer) throws IOException {
-        outputStream.setBuffer(buffer);
-        synchronized (resultLock) {
-            outputStream.await();
-            return done;
-        }
-    }
-
-    public void clearClassPool() throws IOException {
-        synchronized (objectOutputStream) {
-            objectOutputStream.reset();
-        }
-    }
-
     private static final class OurObjectOutputStream extends JBossObjectOutputStream {
         private final ObjectResolver resolver;
 

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java	2008-07-14 21:04:18 UTC (rev 4374)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java	2008-07-15 20:02:34 UTC (rev 4375)
@@ -24,6 +24,6 @@
     }
 
     public Unmarshaller<ByteBuffer> createUnmarshaller(final ObjectResolver resolver, final ClassLoader classLoader) throws IOException {
-        return null;
+        return new JBossSerializationUnmarshaller(executor, resolver, classLoader);
     }
 }

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationUnmarshaller.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationUnmarshaller.java	2008-07-14 21:04:18 UTC (rev 4374)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationUnmarshaller.java	2008-07-15 20:02:34 UTC (rev 4375)
@@ -22,63 +22,32 @@
 
 package org.jboss.cx.remoting.core.marshal;
 
-import org.jboss.cx.remoting.spi.marshal.Unmarshaller;
 import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
 import org.jboss.serial.io.JBossObjectInputStream;
-import java.nio.ByteBuffer;
 import java.io.IOException;
 import java.io.ObjectStreamClass;
 import java.io.InputStream;
+import java.io.ObjectInputStream;
 import java.lang.reflect.Proxy;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  *
  */
-public final class JBossSerializationUnmarshaller implements Unmarshaller<ByteBuffer> {
-    private final Executor executor;
-    private final OurObjectInputStream objectInputStream;
+public final class JBossSerializationUnmarshaller extends AbstractSerializationUnmarshaller {
     private final ClassLoader classLoader;
-    private final ObjectResolver resolver;
-    private final AtomicBoolean running = new AtomicBoolean();
-    
 
     public JBossSerializationUnmarshaller(final Executor executor, final ObjectResolver resolver, final ClassLoader classLoader) throws IOException {
-        this.executor = executor;
-        this.resolver = resolver;
+        super(executor, resolver);
         this.classLoader = classLoader;
-        objectInputStream = new OurObjectInputStream(new OneBufferInputStream(), resolver, classLoader);
     }
 
-    public boolean unmarshal(final ByteBuffer buffer) throws IOException {
-        if (! running.getAndSet(true)) {
-            executor.execute(new Runnable() {
-                public void run() {
-                    synchronized (objectInputStream) {
-                        try {
-                            final Object object = objectInputStream.readObject();
-
-                        } catch (IOException e) {
-                            e.printStackTrace();
-                        } catch (ClassNotFoundException e) {
-                            e.printStackTrace();
-                        } finally {
-                            running.set(false);
-                        }
-                    }
-                }
-            });
-        }
-        return false;
+    protected ObjectInputStream getObjectInputStream() throws IOException {
+        return new OurObjectInputStream(inputStream, resolver, classLoader);
     }
 
-    public Object get() throws IOException, ClassNotFoundException, IllegalStateException {
-        return null;
-    }
-
     private static final class OurObjectInputStream extends JBossObjectInputStream {
         private final ClassLoader classLoader;
         private final ObjectResolver resolver;

Copied: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarhsaller.java (from rev 4371, remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java)
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarhsaller.java	                        (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarhsaller.java	2008-07-15 20:02:34 UTC (rev 4375)
@@ -0,0 +1,43 @@
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.ObjectOutputStream;
+import java.util.concurrent.Executor;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.cx.remoting.spi.marshal.IdentityResolver;
+import org.jboss.xnio.log.Logger;
+
+/**
+ *
+ */
+public class JavaSerializationMarhsaller extends AbstractSerializationMarshaller {
+
+    private static final Logger log = Logger.getLogger(JBossSerializationMarhsaller.class);
+
+    public JavaSerializationMarhsaller(final Executor executor, final ObjectResolver resolver) throws IOException {
+        super(executor, resolver);
+    }
+
+    protected ObjectOutputStream getObjectOutputStream() throws IOException {
+        return new OurObjectOutputStream(outputStream, resolver == null ? IdentityResolver.getInstance() : resolver);
+    }
+
+    private static final class OurObjectOutputStream extends ObjectOutputStream {
+        private final ObjectResolver resolver;
+
+        private OurObjectOutputStream(final OutputStream outputStream, final ObjectResolver resolver) throws IOException {
+            super(outputStream);
+            enableReplaceObject(true);
+            this.resolver = resolver;
+        }
+
+        protected Object replaceObject(final Object obj) throws IOException {
+            return resolver.writeReplace(obj);
+        }
+
+        protected void writeStreamHeader() throws IOException {
+            // no headers
+        }
+    }
+}
\ No newline at end of file

Copied: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarshallerFactory.java (from rev 4371, remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java)
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarshallerFactory.java	                        (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarshallerFactory.java	2008-07-15 20:02:34 UTC (rev 4375)
@@ -0,0 +1,29 @@
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import org.jboss.cx.remoting.spi.marshal.Marshaller;
+import org.jboss.cx.remoting.spi.marshal.MarshallerFactory;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.cx.remoting.spi.marshal.Unmarshaller;
+
+/**
+ *
+ */
+public class JavaSerializationMarshallerFactory implements MarshallerFactory<ByteBuffer> {
+
+    private final Executor executor;
+
+    public JavaSerializationMarshallerFactory(final Executor executor) {
+        this.executor = executor;
+    }
+
+    public Marshaller<ByteBuffer> createMarshaller(final ObjectResolver resolver) throws IOException {
+        return new JavaSerializationMarhsaller(executor, resolver);
+    }
+
+    public Unmarshaller<ByteBuffer> createUnmarshaller(final ObjectResolver resolver, final ClassLoader classLoader) throws IOException {
+        return new JavaSerializationUnmarshaller(executor, resolver, classLoader);
+    }
+}
\ No newline at end of file

Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationUnmarshaller.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationUnmarshaller.java	                        (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationUnmarshaller.java	2008-07-15 20:02:34 UTC (rev 4375)
@@ -0,0 +1,101 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.ObjectInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamClass;
+import java.util.concurrent.Executor;
+import java.util.Map;
+import java.util.HashMap;
+import java.lang.reflect.Proxy;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+
+/**
+ *
+ */
+public final class JavaSerializationUnmarshaller extends AbstractSerializationUnmarshaller {
+
+    private final ClassLoader classLoader;
+
+    public JavaSerializationUnmarshaller(final Executor executor, final ObjectResolver resolver, final ClassLoader classLoader) throws IOException {
+        super(executor, resolver);
+        this.classLoader = classLoader;
+    }
+
+    protected ObjectInputStream getObjectInputStream() throws IOException {
+        return new OurObjectInputStream(inputStream, resolver, classLoader);
+    }
+
+    private static final class OurObjectInputStream extends ObjectInputStream {
+        private final ClassLoader classLoader;
+        private final ObjectResolver resolver;
+
+        private OurObjectInputStream(final InputStream inputStream, final ObjectResolver resolver, final ClassLoader classLoader) throws IOException {
+            super(inputStream);
+            this.classLoader = classLoader;
+            this.resolver = resolver;
+        }
+
+        protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
+            final String name = desc.getName();
+            if (primitiveTypes.containsKey(name)) {
+                return primitiveTypes.get(name);
+            } else {
+                return Class.forName(name, false, classLoader);
+            }
+        }
+
+        protected Class<?> resolveProxyClass(final String[] interfaceNames) throws IOException, ClassNotFoundException {
+            final int length = interfaceNames.length;
+            final Class<?>[] interfaces = new Class[length];
+            for (int i = 0; i < length; i ++) {
+                interfaces[i] = Class.forName(interfaceNames[i], false, classLoader);
+            }
+            return Proxy.getProxyClass(classLoader, interfaces);
+        }
+
+        protected Object resolveObject(final Object obj) throws IOException {
+            return resolver.readResolve(obj);
+        }
+
+        private static final Map<String, Class<?>> primitiveTypes = new HashMap<String, Class<?>>();
+
+        private static <T> void add(Class<T> type) {
+            primitiveTypes.put(type.getName(), type);
+        }
+
+        static {
+            add(void.class);
+            add(boolean.class);
+            add(byte.class);
+            add(short.class);
+            add(int.class);
+            add(long.class);
+            add(float.class);
+            add(double.class);
+            add(char.class);
+        }
+    }
+}

Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/MarshallingAction.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/MarshallingAction.java	                        (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/MarshallingAction.java	2008-07-15 20:02:34 UTC (rev 4375)
@@ -0,0 +1,33 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+public interface MarshallingAction {
+    boolean marshal(ByteBuffer buffer) throws IOException;
+}

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java	2008-07-14 21:04:18 UTC (rev 4374)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java	2008-07-15 20:02:34 UTC (rev 4375)
@@ -32,15 +32,23 @@
  */
 public final class OneBufferInputStream extends InputStream {
 
-    private final Object lock = new Object();
+    private final Object lock;
     private ByteBuffer buffer;
+    private boolean eof;
 
+    public OneBufferInputStream(final Object lock) {
+        this.lock = lock;
+    }
+
     private ByteBuffer getBuffer() throws InterruptedIOException {
         synchronized (lock) {
             for (;;) {
                 final ByteBuffer buffer = this.buffer;
                 if (buffer != null) {
                     if (! buffer.hasRemaining()) {
+                        if (eof) {
+                            return null;
+                        }
                         lock.notify();
                         this.buffer = null;
                     } else {
@@ -57,15 +65,38 @@
         }
     }
 
+    public void setBuffer(ByteBuffer buffer, boolean eof) {
+        synchronized (lock) {
+            if (this.buffer != null) {
+                throw new IllegalStateException("Buffer already set");
+            }
+            this.buffer = buffer;
+            this.eof = eof;
+            lock.notify();
+        }
+    }
+
     public int read() throws IOException {
         synchronized (lock) {
-            return getBuffer().get() & 0xff;
+            final ByteBuffer buffer = getBuffer();
+            return buffer == null ? -1 : buffer.get() & 0xff;
         }
     }
 
     public int read(final byte[] b, int off, int len) throws IOException {
+        int c = 0;
         synchronized (lock) {
-            return 0;
+            while (len > 0) {
+                final ByteBuffer buffer = getBuffer();
+                if (buffer == null) {
+                    return c == 0 ? -1 : c;
+                }
+                int rem = Math.min(len, buffer.remaining());
+                buffer.get(b, off, rem);
+                off += rem;
+                len -= rem;
+            }
+            return c;
         }
     }
 }

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferOutputStream.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferOutputStream.java	2008-07-14 21:04:18 UTC (rev 4374)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferOutputStream.java	2008-07-15 20:02:34 UTC (rev 4375)
@@ -55,8 +55,8 @@
                 final ByteBuffer buffer = this.buffer;
                 if (buffer != null) {
                     if (! buffer.hasRemaining()) {
+                        this.buffer = null;
                         lock.notify();
-                        this.buffer = null;
                     } else {
                         return buffer;
                     }




More information about the jboss-remoting-commits mailing list