[jboss-remoting-commits] JBoss Remoting SVN: r4371 - in remoting3/trunk: api/src/main/java/org/jboss/cx/remoting/spi/remote and 2 other directories.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Fri Jul 11 17:21:16 EDT 2008


Author: david.lloyd at jboss.com
Date: 2008-07-11 17:21:16 -0400 (Fri, 11 Jul 2008)
New Revision: 4371

Added:
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Unmarshaller.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationUnmarshaller.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferOutputStream.java
Modified:
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/MarshallerFactory.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java
Log:
Fix bad name in RemoteServiceEndpoint; add a first blush at re-re-re-re-re-designing Marshallers (this go-around actually having the benefit of some use testing)

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java	2008-07-10 18:49:25 UTC (rev 4370)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java	2008-07-11 21:21:16 UTC (rev 4371)
@@ -7,31 +7,16 @@
 import org.jboss.cx.remoting.stream.ObjectSource;
 
 /**
- * A marshaller/unmarshaller for transmitting data over a wire protocol of some sort.  Each marshaller instance is
- * guaranteed to be used by only one thread.  Marshallers are not pooled or reused in any way.  Any pooling of marshallers
- * must be done by implementations of this class and/or {@link org.jboss.cx.remoting.spi.marshal.MarshallerFactory}.
+ * A marshaller for transmitting data over a wire protocol of some sort.  Each marshaller instance is
+ * guaranteed to be used by only one thread at a time.
  *
  * @param <T> the type of buffer that the marshaller uses, typically {@link java.nio.ByteBuffer} or {@link java.nio.CharBuffer}
  */
-public interface Marshaller<T extends Buffer> extends Serializable {
+public interface Marshaller<T extends Buffer> {
 
-    /**
-     * Write objects to buffers.  The buffers are allocated from the {@link org.jboss.xnio.BufferAllocator} that was
-     * provided to the {@link org.jboss.cx.remoting.spi.marshal.MarshallerFactory}.
-     *
-     * @param bufferSink the sink for filled (and flipped) buffers
-     * @return a sink for objects
-     * @throws IOException if an error occurs while creating the marshaling sink
-     */
-    ObjectSink<Object> getMarshalingSink(ObjectSink<T> bufferSink) throws IOException;
+    void start(Object object) throws IOException, IllegalStateException;
 
-    /**
-     * Read objects from buffers.  The buffers are freed to the {@link org.jboss.xnio.BufferAllocator} that was
-     * provided to the {@link org.jboss.cx.remoting.spi.marshal.MarshallerFactory}.
-     *
-     * @param bufferSource the source for filled (and flipped) buffers
-     * @return a source for objects
-     * @throws IOException if an error occurs while creating the unmarshaling source
-     */
-    ObjectSource<Object> getUnmarshalingSource(ObjectSource<T> bufferSource) throws IOException;
+    boolean marshal(T buffer) throws IOException;
+
+    void clearClassPool() throws IOException;
 }

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/MarshallerFactory.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/MarshallerFactory.java	2008-07-10 18:49:25 UTC (rev 4370)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/MarshallerFactory.java	2008-07-11 21:21:16 UTC (rev 4371)
@@ -2,7 +2,6 @@
 
 import java.io.IOException;
 import java.nio.Buffer;
-import org.jboss.xnio.BufferAllocator;
 
 /**
  * A factory to produce marshallers.
@@ -14,11 +13,11 @@
     /**
      * Create a marshaller instance.
      *
-     * @param allocator the buffer allocator to use
      * @param resolver the object resolver to use
-     * @param classLoader the classloader to use
      * @return a marshaller
      * @throws IOException if an error occurs while creating the marshaller
      */
-    Marshaller<T> createMarshaller(BufferAllocator<T> allocator, ObjectResolver resolver, ClassLoader classLoader) throws IOException;
+    Marshaller<T> createMarshaller(ObjectResolver resolver) throws IOException;
+
+    Unmarshaller<T> createUnmarshaller(ObjectResolver resolver, ClassLoader classLoader) throws IOException;
 }

Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Unmarshaller.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Unmarshaller.java	                        (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Unmarshaller.java	2008-07-11 21:21:16 UTC (rev 4371)
@@ -0,0 +1,35 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi.marshal;
+
+import java.nio.Buffer;
+import java.io.IOException;
+
+/**
+ *
+ */
+public interface Unmarshaller<T extends Buffer>  {
+    boolean unmarshal(T buffer) throws IOException;
+
+    Object get() throws IOException, IllegalStateException, ClassNotFoundException;
+}

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java	2008-07-10 18:49:25 UTC (rev 4370)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java	2008-07-11 21:21:16 UTC (rev 4371)
@@ -41,7 +41,7 @@
      * @return a client endpoint
      * @throws RemotingException if a client could not be opened
      */
-    RemoteClientEndpoint<I, O> openClient() throws RemotingException;
+    RemoteClientEndpoint<I, O> createClientEndpoint() throws RemotingException;
 
     /**
      * Get a handle to this service endpoint.  The service endpoint will not auto-close as long as there is at least

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java	2008-07-10 18:49:25 UTC (rev 4370)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java	2008-07-11 21:21:16 UTC (rev 4371)
@@ -48,7 +48,7 @@
         if (! isOpen()) {
             throw new RemotingException("Client source is not open");
         }
-        final RemoteClientEndpoint<I,O> clientEndpoint = serviceEndpoint.openClient();
+        final RemoteClientEndpoint<I,O> clientEndpoint = serviceEndpoint.createClientEndpoint();
         final Client<I, O> client = endpoint.createClient(clientEndpoint);
         clientEndpoint.autoClose();
         return client;

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java	2008-07-10 18:49:25 UTC (rev 4370)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java	2008-07-11 21:21:16 UTC (rev 4371)
@@ -49,7 +49,7 @@
         serviceContext = new ServiceContextImpl(executor);
     }
 
-    public RemoteClientEndpoint<I, O> openClient() throws RemotingException {
+    public RemoteClientEndpoint<I, O> createClientEndpoint() throws RemotingException {
         if (isOpen()) {
             final RemoteClientEndpointLocalImpl<I, O> clientEndpoint = new RemoteClientEndpointLocalImpl<I, O>(executor, this, requestListener);
             clientEndpoint.open();

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java	2008-07-10 18:49:25 UTC (rev 4370)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java	2008-07-11 21:21:16 UTC (rev 4371)
@@ -1,72 +1,71 @@
 package org.jboss.cx.remoting.core.marshal;
 
 import java.io.IOException;
-import java.io.ObjectStreamClass;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
-import java.lang.reflect.Proxy;
-import java.util.Map;
-import java.util.HashMap;
+import java.util.concurrent.Executor;
 import org.jboss.cx.remoting.spi.marshal.Marshaller;
 import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
-import org.jboss.cx.remoting.stream.ObjectSink;
-import org.jboss.cx.remoting.stream.ObjectSource;
-import org.jboss.cx.remoting.stream.ByteBufferOutputStream;
-import org.jboss.cx.remoting.stream.ByteBufferInputStream;
-import org.jboss.cx.remoting.RemotingException;
-import org.jboss.xnio.BufferAllocator;
 import org.jboss.serial.io.JBossObjectOutputStream;
-import org.jboss.serial.io.JBossObjectInputStream;
 
 /**
  *
  */
 public class JBossSerializationMarhsaller implements Marshaller<ByteBuffer> {
 
-    private static final long serialVersionUID = -8197192536466706414L;
+    private final Executor executor;
 
-    private final BufferAllocator<ByteBuffer> allocator;
-    private final ObjectResolver resolver;
-    private final ClassLoader classLoader;
+    private final OurObjectOutputStream objectOutputStream;
+    private final OneBufferOutputStream outputStream;
 
-    public JBossSerializationMarhsaller(final BufferAllocator<ByteBuffer> allocator, final ObjectResolver resolver, final ClassLoader classLoader) {
-        this.allocator = allocator;
-        this.resolver = resolver;
-        this.classLoader = classLoader;
-    }
+    private final Object resultLock = new Object();
+    private boolean done = false;
 
-    public ObjectSink<Object> getMarshalingSink(final ObjectSink<ByteBuffer> bufferSink) throws IOException {
-        return new MarshalingSink(bufferSink, allocator, resolver);
+    public JBossSerializationMarhsaller(final Executor executor, final ObjectResolver resolver) throws IOException {
+        this.executor = executor;
+        outputStream = new OneBufferOutputStream(new Object());
+        objectOutputStream = new OurObjectOutputStream(outputStream, resolver);
     }
 
-    public ObjectSource<Object> getUnmarshalingSource(final ObjectSource<ByteBuffer> bufferSource) throws IOException {
-        return new MarshalingSource(bufferSource, allocator, resolver, classLoader);
+    public void start(final Object object) throws IOException, IllegalStateException {
+        executor.execute(new Runnable() {
+            public void run() {
+                try {
+                    synchronized (objectOutputStream) {
+                        objectOutputStream.writeObject(object);
+                        objectOutputStream.flush();
+                        synchronized (resultLock) {
+                            outputStream.flush();
+                            done = true;
+                            resultLock.notify();
+                        }
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
     }
 
-    public static final class MarshalingSink implements ObjectSink<Object> {
-        private final OurObjectOutputStream stream;
-
-        private MarshalingSink(final ObjectSink<ByteBuffer> bufferSink, final BufferAllocator<ByteBuffer> allocator, final ObjectResolver resolver) throws IOException {
-            stream = new OurObjectOutputStream(bufferSink, allocator, resolver);
+    public boolean marshal(final ByteBuffer buffer) throws IOException {
+        outputStream.setBuffer(buffer);
+        synchronized (resultLock) {
+            outputStream.await();
+            return done;
         }
+    }
 
-        public void accept(final Object instance) throws IOException {
-            stream.writeObject(instance);
+    public void clearClassPool() throws IOException {
+        synchronized (objectOutputStream) {
+            objectOutputStream.reset();
         }
-
-        public void flush() throws IOException {
-            stream.flush();
-        }
-
-        public void close() throws IOException {
-            stream.close();
-        }
     }
 
     private static final class OurObjectOutputStream extends JBossObjectOutputStream {
         private final ObjectResolver resolver;
 
-        private OurObjectOutputStream(final ObjectSink<ByteBuffer> sink, final BufferAllocator<ByteBuffer> allocator, final ObjectResolver resolver) throws IOException {
-            super(new ByteBufferOutputStream(sink, allocator));
+        private OurObjectOutputStream(final OutputStream outputStream, final ObjectResolver resolver) throws IOException {
+            super(outputStream);
             enableReplaceObject(true);
             this.resolver = resolver;
         }
@@ -75,81 +74,4 @@
             return resolver.writeReplace(obj);
         }
     }
-
-    public static final class MarshalingSource implements ObjectSource<Object> {
-        private final OurObjectInputStream stream;
-
-        private MarshalingSource(final ObjectSource<ByteBuffer> bufferSource, final BufferAllocator<ByteBuffer> allocator, final ObjectResolver resolver, final ClassLoader classLoader) throws IOException {
-            stream = new OurObjectInputStream(bufferSource, allocator, resolver, classLoader);
-        }
-
-        public boolean hasNext() throws IOException {
-            return true;
-        }
-
-        public Object next() throws IOException {
-            try {
-                return stream.readObject();
-            } catch (ClassNotFoundException e) {
-                throw new RemotingException("No class found for next object in stream", e);
-            }
-        }
-
-        public void close() throws IOException {
-            stream.close();
-        }
-    }
-
-    private static final class OurObjectInputStream extends JBossObjectInputStream {
-        private final ClassLoader classLoader;
-        private final ObjectResolver resolver;
-
-        private OurObjectInputStream(final ObjectSource<ByteBuffer> bufferSource, final BufferAllocator<ByteBuffer> allocator, final ObjectResolver resolver, final ClassLoader classLoader) throws IOException {
-            super(new ByteBufferInputStream(bufferSource, allocator), classLoader);
-            this.classLoader = classLoader;
-            this.resolver = resolver;
-        }
-
-        protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
-            final String name = desc.getName();
-            if (primitiveTypes.containsKey(name)) {
-                return primitiveTypes.get(name);
-            } else {
-                return Class.forName(name, false, classLoader);
-            }
-        }
-
-        protected Class<?> resolveProxyClass(final String[] interfaceNames) throws IOException, ClassNotFoundException {
-            final int length = interfaceNames.length;
-            final Class<?>[] interfaces = new Class[length];
-            for (int i = 0; i < length; i ++) {
-                interfaces[i] = Class.forName(interfaceNames[i], false, classLoader);
-            }
-            return Proxy.getProxyClass(classLoader, interfaces);
-        }
-
-        protected Object resolveObject(final Object obj) throws IOException {
-            return resolver.readResolve(obj);
-        }
-
-        private static final Map<String, Class<?>> primitiveTypes = new HashMap<String, Class<?>>();
-
-        private static <T> void add(Class<T> type) {
-            primitiveTypes.put(type.getName(), type);
-        }
-
-        static {
-            add(void.class);
-            add(boolean.class);
-            add(byte.class);
-            add(short.class);
-            add(int.class);
-            add(long.class);
-            add(float.class);
-            add(double.class);
-            add(char.class);
-        }
-    }
-
-
 }

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java	2008-07-10 18:49:25 UTC (rev 4370)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java	2008-07-11 21:21:16 UTC (rev 4371)
@@ -2,17 +2,28 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
 import org.jboss.cx.remoting.spi.marshal.Marshaller;
 import org.jboss.cx.remoting.spi.marshal.MarshallerFactory;
 import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
-import org.jboss.xnio.BufferAllocator;
+import org.jboss.cx.remoting.spi.marshal.Unmarshaller;
 
 /**
  *
  */
 public class JBossSerializationMarshallerFactory implements MarshallerFactory<ByteBuffer> {
 
-    public Marshaller<ByteBuffer> createMarshaller(final BufferAllocator<ByteBuffer> allocator, final ObjectResolver resolver, final ClassLoader classLoader) throws IOException {
-        return new JBossSerializationMarhsaller(allocator, resolver, classLoader);
+    private final Executor executor;
+
+    public JBossSerializationMarshallerFactory(final Executor executor) {
+        this.executor = executor;
     }
+
+    public Marshaller<ByteBuffer> createMarshaller(final ObjectResolver resolver) throws IOException {
+        return new JBossSerializationMarhsaller(executor, resolver);
+    }
+
+    public Unmarshaller<ByteBuffer> createUnmarshaller(final ObjectResolver resolver, final ClassLoader classLoader) throws IOException {
+        return null;
+    }
 }

Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationUnmarshaller.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationUnmarshaller.java	                        (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationUnmarshaller.java	2008-07-11 21:21:16 UTC (rev 4371)
@@ -0,0 +1,132 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core.marshal;
+
+import org.jboss.cx.remoting.spi.marshal.Unmarshaller;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.serial.io.JBossObjectInputStream;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.ObjectStreamClass;
+import java.io.InputStream;
+import java.lang.reflect.Proxy;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ */
+public final class JBossSerializationUnmarshaller implements Unmarshaller<ByteBuffer> {
+    private final Executor executor;
+    private final OurObjectInputStream objectInputStream;
+    private final ClassLoader classLoader;
+    private final ObjectResolver resolver;
+    private final AtomicBoolean running = new AtomicBoolean();
+    
+
+    public JBossSerializationUnmarshaller(final Executor executor, final ObjectResolver resolver, final ClassLoader classLoader) throws IOException {
+        this.executor = executor;
+        this.resolver = resolver;
+        this.classLoader = classLoader;
+        objectInputStream = new OurObjectInputStream(new OneBufferInputStream(), resolver, classLoader);
+    }
+
+    public boolean unmarshal(final ByteBuffer buffer) throws IOException {
+        if (! running.getAndSet(true)) {
+            executor.execute(new Runnable() {
+                public void run() {
+                    synchronized (objectInputStream) {
+                        try {
+                            final Object object = objectInputStream.readObject();
+
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        } catch (ClassNotFoundException e) {
+                            e.printStackTrace();
+                        } finally {
+                            running.set(false);
+                        }
+                    }
+                }
+            });
+        }
+        return false;
+    }
+
+    public Object get() throws IOException, ClassNotFoundException, IllegalStateException {
+        return null;
+    }
+
+    private static final class OurObjectInputStream extends JBossObjectInputStream {
+        private final ClassLoader classLoader;
+        private final ObjectResolver resolver;
+
+        private OurObjectInputStream(final InputStream inputStream, final ObjectResolver resolver, final ClassLoader classLoader) throws IOException {
+            super(inputStream, classLoader);
+            this.classLoader = classLoader;
+            this.resolver = resolver;
+        }
+
+        protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
+            final String name = desc.getName();
+            if (primitiveTypes.containsKey(name)) {
+                return primitiveTypes.get(name);
+            } else {
+                return Class.forName(name, false, classLoader);
+            }
+        }
+
+        protected Class<?> resolveProxyClass(final String[] interfaceNames) throws IOException, ClassNotFoundException {
+            final int length = interfaceNames.length;
+            final Class<?>[] interfaces = new Class[length];
+            for (int i = 0; i < length; i ++) {
+                interfaces[i] = Class.forName(interfaceNames[i], false, classLoader);
+            }
+            return Proxy.getProxyClass(classLoader, interfaces);
+        }
+
+        protected Object resolveObject(final Object obj) throws IOException {
+            return resolver.readResolve(obj);
+        }
+
+        private static final Map<String, Class<?>> primitiveTypes = new HashMap<String, Class<?>>();
+
+        private static <T> void add(Class<T> type) {
+            primitiveTypes.put(type.getName(), type);
+        }
+
+        static {
+            add(void.class);
+            add(boolean.class);
+            add(byte.class);
+            add(short.class);
+            add(int.class);
+            add(long.class);
+            add(float.class);
+            add(double.class);
+            add(char.class);
+        }
+    }
+}

Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java	                        (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java	2008-07-11 21:21:16 UTC (rev 4371)
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+public final class OneBufferInputStream extends InputStream {
+
+    private final Object lock = new Object();
+    private ByteBuffer buffer;
+
+    private ByteBuffer getBuffer() throws InterruptedIOException {
+        synchronized (lock) {
+            for (;;) {
+                final ByteBuffer buffer = this.buffer;
+                if (buffer != null) {
+                    if (! buffer.hasRemaining()) {
+                        lock.notify();
+                        this.buffer = null;
+                    } else {
+                        return buffer;
+                    }
+                }
+                try {
+                    lock.wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new InterruptedIOException("getBuffer() operation interrupted!");
+                }
+            }
+        }
+    }
+
+    public int read() throws IOException {
+        synchronized (lock) {
+            return getBuffer().get() & 0xff;
+        }
+    }
+
+    public int read(final byte[] b, int off, int len) throws IOException {
+        synchronized (lock) {
+            return 0;
+        }
+    }
+}

Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferOutputStream.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferOutputStream.java	                        (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferOutputStream.java	2008-07-11 21:21:16 UTC (rev 4371)
@@ -0,0 +1,115 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.OutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+public final class OneBufferOutputStream extends OutputStream {
+
+    private ByteBuffer buffer;
+    private final Object lock;
+
+    public OneBufferOutputStream(final Object lock) {
+        this.lock = lock;
+    }
+
+    public void setBuffer(ByteBuffer buffer) {
+        synchronized (lock) {
+            if (this.buffer != null) {
+                throw new IllegalStateException("Buffer already set");
+            }
+            this.buffer = buffer;
+            lock.notify();
+        }
+    }
+
+    private ByteBuffer getBuffer() throws InterruptedIOException {
+        synchronized (lock) {
+            for (;;) {
+                final ByteBuffer buffer = this.buffer;
+                if (buffer != null) {
+                    if (! buffer.hasRemaining()) {
+                        lock.notify();
+                        this.buffer = null;
+                    } else {
+                        return buffer;
+                    }
+                }
+                try {
+                    lock.wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new InterruptedIOException("getBuffer() operation interrupted!");
+                }
+            }
+        }
+    }
+
+    public void write(final int b) throws IOException {
+        synchronized (lock) {
+            final ByteBuffer buffer = getBuffer();
+            buffer.put((byte) b);
+        }
+    }
+
+    public void write(byte[] b, int off, int len) throws IOException {
+        synchronized (lock) {
+            while (len > 0) {
+                final ByteBuffer buffer = getBuffer();
+                final int rem = Math.min(buffer.remaining(), len);
+                buffer.put(b, off, rem);
+                len -= rem; off += rem;
+            }
+        }
+    }
+
+    public void flush() throws IOException {
+        synchronized (lock) {
+            buffer = null;
+            lock.notify();
+        }
+    }
+
+    public void close() throws IOException {
+        flush();
+    }
+
+    public void await() {
+        synchronized (lock) {
+            while (buffer != null) {
+                try {
+                    lock.wait();
+                } catch (InterruptedException e) {
+                    buffer = null;
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+    }
+}




More information about the jboss-remoting-commits mailing list