Author: david.lloyd(a)jboss.com
Date: 2008-07-11 17:21:16 -0400 (Fri, 11 Jul 2008)
New Revision: 4371
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Unmarshaller.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationUnmarshaller.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferOutputStream.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/MarshallerFactory.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java
Log:
Fix bad name in RemoteServiceEndpoint; add a first blush at re-re-re-re-re-designing
Marshallers (this go-around actually having the benefit of some use testing)
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java 2008-07-10
18:49:25 UTC (rev 4370)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java 2008-07-11
21:21:16 UTC (rev 4371)
@@ -7,31 +7,16 @@
import org.jboss.cx.remoting.stream.ObjectSource;
/**
- * A marshaller/unmarshaller for transmitting data over a wire protocol of some sort.
Each marshaller instance is
- * guaranteed to be used by only one thread. Marshallers are not pooled or reused in any
way. Any pooling of marshallers
- * must be done by implementations of this class and/or {@link
org.jboss.cx.remoting.spi.marshal.MarshallerFactory}.
+ * A marshaller for transmitting data over a wire protocol of some sort. Each marshaller
instance is
+ * guaranteed to be used by only one thread at a time.
*
* @param <T> the type of buffer that the marshaller uses, typically {@link
java.nio.ByteBuffer} or {@link java.nio.CharBuffer}
*/
-public interface Marshaller<T extends Buffer> extends Serializable {
+public interface Marshaller<T extends Buffer> {
- /**
- * Write objects to buffers. The buffers are allocated from the {@link
org.jboss.xnio.BufferAllocator} that was
- * provided to the {@link org.jboss.cx.remoting.spi.marshal.MarshallerFactory}.
- *
- * @param bufferSink the sink for filled (and flipped) buffers
- * @return a sink for objects
- * @throws IOException if an error occurs while creating the marshaling sink
- */
- ObjectSink<Object> getMarshalingSink(ObjectSink<T> bufferSink) throws
IOException;
+ void start(Object object) throws IOException, IllegalStateException;
- /**
- * Read objects from buffers. The buffers are freed to the {@link
org.jboss.xnio.BufferAllocator} that was
- * provided to the {@link org.jboss.cx.remoting.spi.marshal.MarshallerFactory}.
- *
- * @param bufferSource the source for filled (and flipped) buffers
- * @return a source for objects
- * @throws IOException if an error occurs while creating the unmarshaling source
- */
- ObjectSource<Object> getUnmarshalingSource(ObjectSource<T> bufferSource)
throws IOException;
+ boolean marshal(T buffer) throws IOException;
+
+ void clearClassPool() throws IOException;
}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/MarshallerFactory.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/MarshallerFactory.java 2008-07-10
18:49:25 UTC (rev 4370)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/MarshallerFactory.java 2008-07-11
21:21:16 UTC (rev 4371)
@@ -2,7 +2,6 @@
import java.io.IOException;
import java.nio.Buffer;
-import org.jboss.xnio.BufferAllocator;
/**
* A factory to produce marshallers.
@@ -14,11 +13,11 @@
/**
* Create a marshaller instance.
*
- * @param allocator the buffer allocator to use
* @param resolver the object resolver to use
- * @param classLoader the classloader to use
* @return a marshaller
* @throws IOException if an error occurs while creating the marshaller
*/
- Marshaller<T> createMarshaller(BufferAllocator<T> allocator,
ObjectResolver resolver, ClassLoader classLoader) throws IOException;
+ Marshaller<T> createMarshaller(ObjectResolver resolver) throws IOException;
+
+ Unmarshaller<T> createUnmarshaller(ObjectResolver resolver, ClassLoader
classLoader) throws IOException;
}
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Unmarshaller.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Unmarshaller.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Unmarshaller.java 2008-07-11
21:21:16 UTC (rev 4371)
@@ -0,0 +1,35 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi.marshal;
+
+import java.nio.Buffer;
+import java.io.IOException;
+
+/**
+ *
+ */
+public interface Unmarshaller<T extends Buffer> {
+ boolean unmarshal(T buffer) throws IOException;
+
+ Object get() throws IOException, IllegalStateException, ClassNotFoundException;
+}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java 2008-07-10
18:49:25 UTC (rev 4370)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java 2008-07-11
21:21:16 UTC (rev 4371)
@@ -41,7 +41,7 @@
* @return a client endpoint
* @throws RemotingException if a client could not be opened
*/
- RemoteClientEndpoint<I, O> openClient() throws RemotingException;
+ RemoteClientEndpoint<I, O> createClientEndpoint() throws RemotingException;
/**
* Get a handle to this service endpoint. The service endpoint will not auto-close
as long as there is at least
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java 2008-07-10
18:49:25 UTC (rev 4370)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java 2008-07-11
21:21:16 UTC (rev 4371)
@@ -48,7 +48,7 @@
if (! isOpen()) {
throw new RemotingException("Client source is not open");
}
- final RemoteClientEndpoint<I,O> clientEndpoint =
serviceEndpoint.openClient();
+ final RemoteClientEndpoint<I,O> clientEndpoint =
serviceEndpoint.createClientEndpoint();
final Client<I, O> client = endpoint.createClient(clientEndpoint);
clientEndpoint.autoClose();
return client;
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java 2008-07-10
18:49:25 UTC (rev 4370)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java 2008-07-11
21:21:16 UTC (rev 4371)
@@ -49,7 +49,7 @@
serviceContext = new ServiceContextImpl(executor);
}
- public RemoteClientEndpoint<I, O> openClient() throws RemotingException {
+ public RemoteClientEndpoint<I, O> createClientEndpoint() throws
RemotingException {
if (isOpen()) {
final RemoteClientEndpointLocalImpl<I, O> clientEndpoint = new
RemoteClientEndpointLocalImpl<I, O>(executor, this, requestListener);
clientEndpoint.open();
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java 2008-07-10
18:49:25 UTC (rev 4370)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java 2008-07-11
21:21:16 UTC (rev 4371)
@@ -1,72 +1,71 @@
package org.jboss.cx.remoting.core.marshal;
import java.io.IOException;
-import java.io.ObjectStreamClass;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.lang.reflect.Proxy;
-import java.util.Map;
-import java.util.HashMap;
+import java.util.concurrent.Executor;
import org.jboss.cx.remoting.spi.marshal.Marshaller;
import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
-import org.jboss.cx.remoting.stream.ObjectSink;
-import org.jboss.cx.remoting.stream.ObjectSource;
-import org.jboss.cx.remoting.stream.ByteBufferOutputStream;
-import org.jboss.cx.remoting.stream.ByteBufferInputStream;
-import org.jboss.cx.remoting.RemotingException;
-import org.jboss.xnio.BufferAllocator;
import org.jboss.serial.io.JBossObjectOutputStream;
-import org.jboss.serial.io.JBossObjectInputStream;
/**
*
*/
public class JBossSerializationMarhsaller implements Marshaller<ByteBuffer> {
- private static final long serialVersionUID = -8197192536466706414L;
+ private final Executor executor;
- private final BufferAllocator<ByteBuffer> allocator;
- private final ObjectResolver resolver;
- private final ClassLoader classLoader;
+ private final OurObjectOutputStream objectOutputStream;
+ private final OneBufferOutputStream outputStream;
- public JBossSerializationMarhsaller(final BufferAllocator<ByteBuffer>
allocator, final ObjectResolver resolver, final ClassLoader classLoader) {
- this.allocator = allocator;
- this.resolver = resolver;
- this.classLoader = classLoader;
- }
+ private final Object resultLock = new Object();
+ private boolean done = false;
- public ObjectSink<Object> getMarshalingSink(final ObjectSink<ByteBuffer>
bufferSink) throws IOException {
- return new MarshalingSink(bufferSink, allocator, resolver);
+ public JBossSerializationMarhsaller(final Executor executor, final ObjectResolver
resolver) throws IOException {
+ this.executor = executor;
+ outputStream = new OneBufferOutputStream(new Object());
+ objectOutputStream = new OurObjectOutputStream(outputStream, resolver);
}
- public ObjectSource<Object> getUnmarshalingSource(final
ObjectSource<ByteBuffer> bufferSource) throws IOException {
- return new MarshalingSource(bufferSource, allocator, resolver, classLoader);
+ public void start(final Object object) throws IOException, IllegalStateException {
+ executor.execute(new Runnable() {
+ public void run() {
+ try {
+ synchronized (objectOutputStream) {
+ objectOutputStream.writeObject(object);
+ objectOutputStream.flush();
+ synchronized (resultLock) {
+ outputStream.flush();
+ done = true;
+ resultLock.notify();
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ });
}
- public static final class MarshalingSink implements ObjectSink<Object> {
- private final OurObjectOutputStream stream;
-
- private MarshalingSink(final ObjectSink<ByteBuffer> bufferSink, final
BufferAllocator<ByteBuffer> allocator, final ObjectResolver resolver) throws
IOException {
- stream = new OurObjectOutputStream(bufferSink, allocator, resolver);
+ public boolean marshal(final ByteBuffer buffer) throws IOException {
+ outputStream.setBuffer(buffer);
+ synchronized (resultLock) {
+ outputStream.await();
+ return done;
}
+ }
- public void accept(final Object instance) throws IOException {
- stream.writeObject(instance);
+ public void clearClassPool() throws IOException {
+ synchronized (objectOutputStream) {
+ objectOutputStream.reset();
}
-
- public void flush() throws IOException {
- stream.flush();
- }
-
- public void close() throws IOException {
- stream.close();
- }
}
private static final class OurObjectOutputStream extends JBossObjectOutputStream {
private final ObjectResolver resolver;
- private OurObjectOutputStream(final ObjectSink<ByteBuffer> sink, final
BufferAllocator<ByteBuffer> allocator, final ObjectResolver resolver) throws
IOException {
- super(new ByteBufferOutputStream(sink, allocator));
+ private OurObjectOutputStream(final OutputStream outputStream, final
ObjectResolver resolver) throws IOException {
+ super(outputStream);
enableReplaceObject(true);
this.resolver = resolver;
}
@@ -75,81 +74,4 @@
return resolver.writeReplace(obj);
}
}
-
- public static final class MarshalingSource implements ObjectSource<Object> {
- private final OurObjectInputStream stream;
-
- private MarshalingSource(final ObjectSource<ByteBuffer> bufferSource, final
BufferAllocator<ByteBuffer> allocator, final ObjectResolver resolver, final
ClassLoader classLoader) throws IOException {
- stream = new OurObjectInputStream(bufferSource, allocator, resolver,
classLoader);
- }
-
- public boolean hasNext() throws IOException {
- return true;
- }
-
- public Object next() throws IOException {
- try {
- return stream.readObject();
- } catch (ClassNotFoundException e) {
- throw new RemotingException("No class found for next object in
stream", e);
- }
- }
-
- public void close() throws IOException {
- stream.close();
- }
- }
-
- private static final class OurObjectInputStream extends JBossObjectInputStream {
- private final ClassLoader classLoader;
- private final ObjectResolver resolver;
-
- private OurObjectInputStream(final ObjectSource<ByteBuffer> bufferSource,
final BufferAllocator<ByteBuffer> allocator, final ObjectResolver resolver, final
ClassLoader classLoader) throws IOException {
- super(new ByteBufferInputStream(bufferSource, allocator), classLoader);
- this.classLoader = classLoader;
- this.resolver = resolver;
- }
-
- protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException,
ClassNotFoundException {
- final String name = desc.getName();
- if (primitiveTypes.containsKey(name)) {
- return primitiveTypes.get(name);
- } else {
- return Class.forName(name, false, classLoader);
- }
- }
-
- protected Class<?> resolveProxyClass(final String[] interfaceNames) throws
IOException, ClassNotFoundException {
- final int length = interfaceNames.length;
- final Class<?>[] interfaces = new Class[length];
- for (int i = 0; i < length; i ++) {
- interfaces[i] = Class.forName(interfaceNames[i], false, classLoader);
- }
- return Proxy.getProxyClass(classLoader, interfaces);
- }
-
- protected Object resolveObject(final Object obj) throws IOException {
- return resolver.readResolve(obj);
- }
-
- private static final Map<String, Class<?>> primitiveTypes = new
HashMap<String, Class<?>>();
-
- private static <T> void add(Class<T> type) {
- primitiveTypes.put(type.getName(), type);
- }
-
- static {
- add(void.class);
- add(boolean.class);
- add(byte.class);
- add(short.class);
- add(int.class);
- add(long.class);
- add(float.class);
- add(double.class);
- add(char.class);
- }
- }
-
-
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java 2008-07-10
18:49:25 UTC (rev 4370)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java 2008-07-11
21:21:16 UTC (rev 4371)
@@ -2,17 +2,28 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
import org.jboss.cx.remoting.spi.marshal.Marshaller;
import org.jboss.cx.remoting.spi.marshal.MarshallerFactory;
import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
-import org.jboss.xnio.BufferAllocator;
+import org.jboss.cx.remoting.spi.marshal.Unmarshaller;
/**
*
*/
public class JBossSerializationMarshallerFactory implements
MarshallerFactory<ByteBuffer> {
- public Marshaller<ByteBuffer> createMarshaller(final
BufferAllocator<ByteBuffer> allocator, final ObjectResolver resolver, final
ClassLoader classLoader) throws IOException {
- return new JBossSerializationMarhsaller(allocator, resolver, classLoader);
+ private final Executor executor;
+
+ public JBossSerializationMarshallerFactory(final Executor executor) {
+ this.executor = executor;
}
+
+ public Marshaller<ByteBuffer> createMarshaller(final ObjectResolver resolver)
throws IOException {
+ return new JBossSerializationMarhsaller(executor, resolver);
+ }
+
+ public Unmarshaller<ByteBuffer> createUnmarshaller(final ObjectResolver
resolver, final ClassLoader classLoader) throws IOException {
+ return null;
+ }
}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationUnmarshaller.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationUnmarshaller.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationUnmarshaller.java 2008-07-11
21:21:16 UTC (rev 4371)
@@ -0,0 +1,132 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core.marshal;
+
+import org.jboss.cx.remoting.spi.marshal.Unmarshaller;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.serial.io.JBossObjectInputStream;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.ObjectStreamClass;
+import java.io.InputStream;
+import java.lang.reflect.Proxy;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ */
+public final class JBossSerializationUnmarshaller implements
Unmarshaller<ByteBuffer> {
+ private final Executor executor;
+ private final OurObjectInputStream objectInputStream;
+ private final ClassLoader classLoader;
+ private final ObjectResolver resolver;
+ private final AtomicBoolean running = new AtomicBoolean();
+
+
+ public JBossSerializationUnmarshaller(final Executor executor, final ObjectResolver
resolver, final ClassLoader classLoader) throws IOException {
+ this.executor = executor;
+ this.resolver = resolver;
+ this.classLoader = classLoader;
+ objectInputStream = new OurObjectInputStream(new OneBufferInputStream(),
resolver, classLoader);
+ }
+
+ public boolean unmarshal(final ByteBuffer buffer) throws IOException {
+ if (! running.getAndSet(true)) {
+ executor.execute(new Runnable() {
+ public void run() {
+ synchronized (objectInputStream) {
+ try {
+ final Object object = objectInputStream.readObject();
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ } finally {
+ running.set(false);
+ }
+ }
+ }
+ });
+ }
+ return false;
+ }
+
+ public Object get() throws IOException, ClassNotFoundException, IllegalStateException
{
+ return null;
+ }
+
+ private static final class OurObjectInputStream extends JBossObjectInputStream {
+ private final ClassLoader classLoader;
+ private final ObjectResolver resolver;
+
+ private OurObjectInputStream(final InputStream inputStream, final ObjectResolver
resolver, final ClassLoader classLoader) throws IOException {
+ super(inputStream, classLoader);
+ this.classLoader = classLoader;
+ this.resolver = resolver;
+ }
+
+ protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException,
ClassNotFoundException {
+ final String name = desc.getName();
+ if (primitiveTypes.containsKey(name)) {
+ return primitiveTypes.get(name);
+ } else {
+ return Class.forName(name, false, classLoader);
+ }
+ }
+
+ protected Class<?> resolveProxyClass(final String[] interfaceNames) throws
IOException, ClassNotFoundException {
+ final int length = interfaceNames.length;
+ final Class<?>[] interfaces = new Class[length];
+ for (int i = 0; i < length; i ++) {
+ interfaces[i] = Class.forName(interfaceNames[i], false, classLoader);
+ }
+ return Proxy.getProxyClass(classLoader, interfaces);
+ }
+
+ protected Object resolveObject(final Object obj) throws IOException {
+ return resolver.readResolve(obj);
+ }
+
+ private static final Map<String, Class<?>> primitiveTypes = new
HashMap<String, Class<?>>();
+
+ private static <T> void add(Class<T> type) {
+ primitiveTypes.put(type.getName(), type);
+ }
+
+ static {
+ add(void.class);
+ add(boolean.class);
+ add(byte.class);
+ add(short.class);
+ add(int.class);
+ add(long.class);
+ add(float.class);
+ add(double.class);
+ add(char.class);
+ }
+ }
+}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java 2008-07-11
21:21:16 UTC (rev 4371)
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+public final class OneBufferInputStream extends InputStream {
+
+ private final Object lock = new Object();
+ private ByteBuffer buffer;
+
+ private ByteBuffer getBuffer() throws InterruptedIOException {
+ synchronized (lock) {
+ for (;;) {
+ final ByteBuffer buffer = this.buffer;
+ if (buffer != null) {
+ if (! buffer.hasRemaining()) {
+ lock.notify();
+ this.buffer = null;
+ } else {
+ return buffer;
+ }
+ }
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("getBuffer() operation
interrupted!");
+ }
+ }
+ }
+ }
+
+ public int read() throws IOException {
+ synchronized (lock) {
+ return getBuffer().get() & 0xff;
+ }
+ }
+
+ public int read(final byte[] b, int off, int len) throws IOException {
+ synchronized (lock) {
+ return 0;
+ }
+ }
+}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferOutputStream.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferOutputStream.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferOutputStream.java 2008-07-11
21:21:16 UTC (rev 4371)
@@ -0,0 +1,115 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.OutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+public final class OneBufferOutputStream extends OutputStream {
+
+ private ByteBuffer buffer;
+ private final Object lock;
+
+ public OneBufferOutputStream(final Object lock) {
+ this.lock = lock;
+ }
+
+ public void setBuffer(ByteBuffer buffer) {
+ synchronized (lock) {
+ if (this.buffer != null) {
+ throw new IllegalStateException("Buffer already set");
+ }
+ this.buffer = buffer;
+ lock.notify();
+ }
+ }
+
+ private ByteBuffer getBuffer() throws InterruptedIOException {
+ synchronized (lock) {
+ for (;;) {
+ final ByteBuffer buffer = this.buffer;
+ if (buffer != null) {
+ if (! buffer.hasRemaining()) {
+ lock.notify();
+ this.buffer = null;
+ } else {
+ return buffer;
+ }
+ }
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("getBuffer() operation
interrupted!");
+ }
+ }
+ }
+ }
+
+ public void write(final int b) throws IOException {
+ synchronized (lock) {
+ final ByteBuffer buffer = getBuffer();
+ buffer.put((byte) b);
+ }
+ }
+
+ public void write(byte[] b, int off, int len) throws IOException {
+ synchronized (lock) {
+ while (len > 0) {
+ final ByteBuffer buffer = getBuffer();
+ final int rem = Math.min(buffer.remaining(), len);
+ buffer.put(b, off, rem);
+ len -= rem; off += rem;
+ }
+ }
+ }
+
+ public void flush() throws IOException {
+ synchronized (lock) {
+ buffer = null;
+ lock.notify();
+ }
+ }
+
+ public void close() throws IOException {
+ flush();
+ }
+
+ public void await() {
+ synchronized (lock) {
+ while (buffer != null) {
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ buffer = null;
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+}