Author: david.lloyd(a)jboss.com
Date: 2008-09-11 15:54:37 -0400 (Thu, 11 Sep 2008)
New Revision: 4566
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamContext.java
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/OrderedExecutor.java
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/OrderedExecutorFactory.java
Removed:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/DelegatingObjectInput.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/OrderedExecutorFactory.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamSerializerFactory.java
remoting3/trunk/build.properties
remoting3/trunk/build.xml
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/InputStreamStreamSerializerFactory.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/OutputStreamStreamSerailizerFactory.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
Log:
Initial switch towards new marshalling to replace old
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamContext.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamContext.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamContext.java 2008-09-11
19:54:37 UTC (rev 4566)
@@ -0,0 +1,46 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi.stream;
+
+import java.util.concurrent.Executor;
+import org.jboss.marshalling.MarshallerFactory;
+
+/**
+ * A context for stream serialization.
+ */
+public interface StreamContext {
+
+ /**
+ * Get an executor which may be used for various asynchronous tasks.
+ *
+ * @return an executor
+ */
+ Executor getExecutor();
+
+ /**
+ * Get a marshaller factory which is configured compatibly with the channel.
+ *
+ * @return the marshaller factory
+ */
+ MarshallerFactory getMarshallerFactory();
+}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamSerializerFactory.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamSerializerFactory.java 2008-09-09
14:05:02 UTC (rev 4565)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamSerializerFactory.java 2008-09-11
19:54:37 UTC (rev 4566)
@@ -2,15 +2,36 @@
import org.jboss.xnio.IoHandler;
import org.jboss.xnio.ChannelSource;
-import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
import java.io.IOException;
import java.io.Serializable;
/**
- *
+ * A factory for stream serializers. Stream serializers are responsible for forwarding
streams across the network
+ * in a manner specific to the stream type.
*/
public interface StreamSerializerFactory extends Serializable {
- IoHandler<? super StreamChannel> getLocalSide(Object localSide) throws
IOException;
- Object getRemoteSide(ChannelSource<StreamChannel> remoteClient) throws
IOException;
+ /**
+ * Get the XNIO handler for the local side of the serializer. This side will access
the local instance. The returned
+ * handler is attached to the channel that is associated with the stream instance.
+ *
+ * @param localSide the instance that is being serialized
+ * @param streamContext the stream context
+ * @return the local handler
+ * @throws IOException if an error occurs while preparing the handler
+ */
+ IoHandler<? super AllocatedMessageChannel> getLocalSide(Object localSide,
StreamContext streamContext) throws IOException;
+
+ /**
+ * Get the remote proxy instance for the remote side of the serializer. This side
will emulate the streaming object
+ * on the remote side. This method is responsible for initiating the communications
channel, which the returned
+ * instance will use to transmit data.
+ *
+ * @param channelSource the channel source which is used to create the stream's
channel
+ * @param streamContext the stream context
+ * @return the remote proxy instance
+ * @throws IOException if an error occurs while preparing the handler
+ */
+ Object getRemoteSide(ChannelSource<AllocatedMessageChannel> channelSource,
StreamContext streamContext) throws IOException;
}
Modified: remoting3/trunk/build.properties
===================================================================
--- remoting3/trunk/build.properties 2008-09-09 14:05:02 UTC (rev 4565)
+++ remoting3/trunk/build.properties 2008-09-11 19:54:37 UTC (rev 4566)
@@ -9,7 +9,7 @@
# Dependencies - keep in alpha order by property name
# ===================================================
-lib.apiviz.version=1.0.5
+lib.apiviz.version=1.2.3.GA
lib.apiviz.name=apiviz-${lib.apiviz.version}.jar
lib.apiviz.license=lgpl
lib.apiviz.dir=apiviz/${lib.apiviz.version}/lib
Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml 2008-09-09 14:05:02 UTC (rev 4565)
+++ remoting3/trunk/build.xml 2008-09-11 19:54:37 UTC (rev 4566)
@@ -1300,21 +1300,25 @@
<!-- JAVADOCS -->
<!-- ============================================== -->
- <target name="api-javadoc"
depends="api,core,standalone,util,lib.apiviz">
+ <target name="api-javadoc"
depends="api,core,standalone,util,lib.apiviz,lib.jboss-marshalling-api,lib.xnio-api">
<delete dir="api/target/main/docs"/>
<mkdir dir="api/target/main/docs"/>
<javadoc destdir="api/target/main/docs" author="false"
version="false" use="false" windowtitle="JBoss Remoting
API">
<doclet name="${lib.apiviz.doclet}"
path="${lib.apiviz.local}"/>
<packageset dir="api/src/main/java"/>
+ <packageset dir="standalone/src/main/java"/>
<packageset dir="util/src/main/java"/>
- <packageset dir="standalone/src/main/java"/>
<doctitle><![CDATA[<h1>JBoss Remoting
3</h1>]]></doctitle>
<bottom><![CDATA[<i>Copyright © 2008 JBoss, a
division of Red Hat, Inc.</i>]]></bottom>
<link
href="http://java.sun.com/j2se/1.5.0/docs/api/"/>
<link
href="http://docs.jboss.org/xnio/1.0/api/"/>
<classpath>
<path refid="core.classpath"/>
+ <path refid="api.classpath"/>
+ <path refid="standalone.classpath"/>
+ <path refid="util.classpath"/>
<pathelement location="${lib.xnio-api.local}"/>
+ <pathelement
location="${lib.jboss-marshalling-api.local}"/>
</classpath>
</javadoc>
</target>
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java 2008-09-09
14:05:02 UTC (rev 4565)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java 2008-09-11
19:54:37 UTC (rev 4566)
@@ -18,7 +18,7 @@
import org.jboss.cx.remoting.ClientSource;
import org.jboss.cx.remoting.SimpleCloseable;
import org.jboss.cx.remoting.ServiceListener;
-import org.jboss.cx.remoting.core.util.OrderedExecutorFactory;
+import org.jboss.cx.remoting.util.OrderedExecutorFactory;
import org.jboss.cx.remoting.spi.remote.RequestHandler;
import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
import org.jboss.cx.remoting.spi.remote.Handle;
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/InputStreamStreamSerializerFactory.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/InputStreamStreamSerializerFactory.java 2008-09-09
14:05:02 UTC (rev 4565)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/InputStreamStreamSerializerFactory.java 2008-09-11
19:54:37 UTC (rev 4566)
@@ -3,14 +3,15 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
+import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
import org.jboss.xnio.log.Logger;
import org.jboss.cx.remoting.spi.stream.StreamSerializerFactory;
-import org.jboss.cx.remoting.core.util.DecodingBuilder;
-import org.jboss.xnio.channels.StreamChannel;
-import org.jboss.xnio.channels.StreamSinkChannel;
-import org.jboss.xnio.channels.CommonOptions;
-import org.jboss.xnio.channels.StreamSourceChannel;
+import org.jboss.cx.remoting.spi.stream.StreamContext;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.xnio.channels.WritableMessageChannel;
+import org.jboss.xnio.channels.ReadableAllocatedMessageChannel;
import org.jboss.xnio.IoHandler;
import org.jboss.xnio.ChannelSource;
import org.jboss.xnio.IoUtils;
@@ -21,9 +22,9 @@
/**
* An input stream serializer. The input stream transfer protocol consists of two types
of "chunks": data and error.
- * A data chunk starts with an ASCII {@code 'd'}, followed by a two-byte
(unsigned) length field (a value of
- * {@code 0x0000} indicates a 65536-byte chunk), followed by the actual data. An error
chunk consists of a series of
- * UTF-8 bytes representing a description of the error, followed by the end of the
stream.
+ * A data chunk starts with an ASCII {@code 'd'}, followed by the actual data.
An error chunk starts with an ASCII
+ * {@code 'e'} followed by a series of UTF-8 bytes representing a description of
the error, followed by the end of
+ * the stream.
*
* Normally data chunks are transferred over the stream until the original {@link
InputStream} is exhausted, at which time
* the proxy stream will return a {@code -1} for the EOF condition.
@@ -39,13 +40,13 @@
// no-arg constructor required
}
- public IoHandler<StreamSinkChannel> getLocalSide(final Object localSide) throws
IOException {
- return new LocalHandler((InputStream) localSide, allocator);
+ public IoHandler<? super AllocatedMessageChannel> getLocalSide(final Object
localSide, final StreamContext streamContext) throws IOException {
+ return new LocalHandler((InputStream) localSide, allocator,
streamContext.getExecutor());
}
- public Object getRemoteSide(final ChannelSource<StreamChannel> remoteClient)
throws IOException {
-// return new RemoteInputStream(taskList, futureChannel);
- return null;
+ public Object getRemoteSide(final ChannelSource<AllocatedMessageChannel>
remoteClient, final StreamContext streamContext) throws IOException {
+ final RemoteHandler handler = new RemoteHandler();
+ return new RemoteInputStream(remoteClient.open(handler), allocator, handler);
}
public BufferAllocator<ByteBuffer> getAllocator() {
@@ -56,345 +57,273 @@
this.allocator = allocator;
}
- private static final byte DATA_CHUNK = (byte) 'd';
- private static final byte ERROR = (byte) 'e';
+ private static final int DATA_CHUNK = 'd';
+ private static final int ERROR = 'e';
- public static final class LocalHandler implements IoHandler<StreamSinkChannel>
{
+ public static final class LocalHandler implements
IoHandler<WritableMessageChannel> {
- // @protectedby {@code this}
- private final InputStream inputStream;
+ private final Object lock = new Object();
+ private final Executor executor;
private final BufferAllocator<ByteBuffer> allocator;
- private volatile ByteBuffer current;
- private volatile boolean eof;
+ private final Runnable fillTask = new FillTask();
- private LocalHandler(final InputStream inputStream, final
BufferAllocator<ByteBuffer> allocator) {
+ // @protectedby {@code lock}
+ private WritableMessageChannel channel;
+ // @protectedby {@code lock}
+ private final InputStream inputStream;
+ // @protectedby {@code lock}
+ private ByteBuffer writing;
+ // @protectedby {@code lock}
+ private boolean eof;
+
+ private LocalHandler(final InputStream inputStream, final
BufferAllocator<ByteBuffer> allocator, final Executor executor) {
this.inputStream = inputStream;
this.allocator = allocator;
+ this.executor = executor;
}
- private boolean fillBuffer() throws IOException {
- final ByteBuffer buffer = allocator.allocate();
- buffer.put(DATA_CHUNK);
- buffer.putShort((short) 0);
- final int cnt;
- if (buffer.hasArray()) {
- final byte[] a = buffer.array();
- final int off = buffer.arrayOffset();
- final int rem = Math.min(buffer.remaining(), 65536);
- cnt = inputStream.read(a, off, rem);
- if (cnt == -1) {
- return false;
- }
- skip(current, cnt);
- } else {
- final int rem = Math.min(buffer.remaining(), 65536);
- final byte[] a = new byte[rem];
- cnt = inputStream.read(a);
- if (cnt == -1) {
- return false;
- }
- current.put(a);
- }
- buffer.putShort(1, (short) cnt);
- current = flip(buffer);
- return true;
+ public void handleOpened(final WritableMessageChannel channel) {
+ this.channel = channel;
+ executor.execute(fillTask);
}
- private void prepareChunk(final StreamSinkChannel channel) {
- try {
- eof = fillBuffer();
- } catch (Throwable e) {
- try {
- current = ByteBuffer.wrap(("e" +
e.getMessage()).getBytes("utf-8"));
- } catch (UnsupportedEncodingException e1) {
- current = ByteBuffer.wrap(new byte[] { ERROR });
- }
- eof = true;
- }
- channel.resumeWrites();
+ public void handleReadable(final WritableMessageChannel channel) {
+ // not called on a sink channel
}
- public void handleOpened(final StreamSinkChannel channel) {
- if (channel.getOptions().contains(CommonOptions.TCP_NODELAY)) {
- try {
- channel.setOption(CommonOptions.TCP_NODELAY, Boolean.TRUE);
- } catch (IOException e) {
- // not too big a deal; just skip it
- log.trace(e, "Failed to enable TCP_NODELAY");
+ public void handleWritable(final WritableMessageChannel channel) {
+ synchronized (lock) {
+ final ByteBuffer buffer = writing;
+ if (buffer == null) {
+ if (eof) {
+ IoUtils.safeClose(channel);
+ } else {
+ executor.execute(fillTask);
+ }
+ } else {
+ final boolean sent;
+ try {
+ sent = channel.send(buffer);
+ } catch (IOException e) {
+ log.debug("Channel write failed: %s", e);
+ IoUtils.safeClose(channel);
+ return;
+ }
+ if (sent) {
+ writing = null;
+ allocator.free(buffer);
+ executor.execute(fillTask);
+ } else {
+ channel.resumeWrites();
+ }
}
}
- prepareChunk(channel);
}
- public void handleReadable(final StreamSinkChannel channel) {
- // not called on a sink channel
+ public void handleClosed(final WritableMessageChannel channel) {
+ synchronized (this) {
+ IoUtils.safeClose(inputStream);
+ }
}
- public void handleWritable(final StreamSinkChannel channel) {
- while (current.hasRemaining()) {
+ private final class FillTask implements Runnable {
+ public void run() {
try {
- final int c = channel.write(current);
- if (c == 0) {
- channel.resumeWrites();
- return;
+ final ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) DATA_CHUNK);
+ buffer.putShort((short) 0);
+ final int rem = buffer.remaining();
+ final int cnt;
+ if (buffer.hasArray()) {
+ final byte[] a = buffer.array();
+ final int off = buffer.arrayOffset();
+ cnt = inputStream.read(a, off, rem);
+ if (cnt == -1) {
+ synchronized (lock) {
+ eof = true;
+ return;
+ }
+ }
+ skip(buffer, cnt);
+ } else {
+ final byte[] a = new byte[rem];
+ cnt = inputStream.read(a);
+ if (cnt == -1) {
+ synchronized (lock) {
+ eof = true;
+ return;
+ }
+ }
+ buffer.put(a);
}
+ buffer.putShort(1, (short) cnt);
+ synchronized (lock) {
+ writing = flip(buffer);
+ }
+ channel.resumeWrites();
+ return;
} catch (IOException e) {
- log.debug("Channel write failed: %s", e);
- IoUtils.safeClose(channel);
+ synchronized (lock) {
+ eof = true;
+ try {
+ // this could probably be improved upon
+ writing = ByteBuffer.wrap((Character.toString((char) ERROR) +
e.getMessage()).getBytes("utf-8"));
+ } catch (UnsupportedEncodingException e1) {
+ writing = ByteBuffer.wrap(new byte[] { ERROR });
+ }
+ }
}
}
- if (eof) {
- IoUtils.safeClose(channel);
- } else {
- prepareChunk(channel);
- }
}
-
- public void handleClosed(final StreamSinkChannel channel) {
- synchronized (this) {
- IoUtils.safeClose(inputStream);
- }
- }
}
- public static final class RemoteHandler implements
IoHandler<StreamSourceChannel> {
+ public static final class RemoteHandler implements
IoHandler<ReadableAllocatedMessageChannel> {
- private enum DecoderState {
- NEW_CHUNK,
- IN_ERROR,
- IN_DATA,
- }
+ private final Object lock = new Object();
- private final RemoteInputStream remoteInputStream;
- private final ByteBuffer initialBuffer = ByteBuffer.allocate(5);
+ private ByteBuffer current;
+ private boolean done;
+ private IOException exception;
- private volatile ByteBuffer dataBuffer = null;
+ private RemoteHandler() {
+ }
- private volatile DecodingBuilder exceptionBuilder;
- private volatile DecoderState decoderState = DecoderState.NEW_CHUNK;
-
- private RemoteHandler(final RemoteInputStream remoteInputStream, final
BufferAllocator<ByteBuffer> allocator) {
- this.remoteInputStream = remoteInputStream;
+ public ByteBuffer getBuffer() throws IOException {
+ synchronized (lock) {
+ if (exception != null) {
+ final IOException ex = new IOException("I/O exception from
channel receive");
+ ex.initCause(exception);
+ throw ex;
+ }
+ try {
+ while (current == null && ! done) {
+ lock.wait();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("Interrupted while reading from
input stream");
+ }
+ try {
+ return current;
+ } finally {
+ current = null;
+ }
+ }
}
- public void handleOpened(final StreamSourceChannel channel) {
+ public void handleOpened(final ReadableAllocatedMessageChannel channel) {
channel.resumeReads();
}
- public void handleReadable(final StreamSourceChannel channel) {
- try {
- for (;;) switch (decoderState) {
- case NEW_CHUNK: {
- int n = channel.read(initialBuffer);
- if (n == -1) {
- IoUtils.safeClose(channel);
- return;
+ public void handleReadable(final ReadableAllocatedMessageChannel channel) {
+ synchronized (lock) {
+ if (current != null) {
+ return;
+ }
+ try {
+ final ByteBuffer buffer = channel.receive();
+ if (buffer == null) {
+ IoUtils.safeClose(channel);
+ return;
+ }
+ if (! buffer.hasRemaining()) {
+ channel.resumeReads();
+ return;
+ }
+ final byte type = buffer.get();
+ switch (type) {
+ case DATA_CHUNK: {
+ current = buffer;
+ // only one waiter would be able to use this anyway
+ lock.notify();
+ break;
}
- if (n == 0) {
- remoteInputStream.scheduleResumeReads(channel);
- return;
- }
- if (initialBuffer.get(0) == DATA_CHUNK) {
- if (initialBuffer.hasRemaining()) {
- handleReadable(channel);
- return;
+ case ERROR: {
+ if (buffer.hasArray()) {
+ IoUtils.safeClose(channel);
+ final byte[] a = buffer.array();
+ final int offs = buffer.arrayOffset();
+ final int rem = buffer.remaining();
+ exception = new IOException(new String(a, offs + 1, rem,
"utf-8"));
}
- initialBuffer.flip();
- initialBuffer.get();
- final int length = (initialBuffer.getShort() - 1) &
0xffff + 1;
- dataBuffer = ByteBuffer.allocate(length);
- decoderState = DecoderState.IN_DATA;
break;
- } else if (initialBuffer.get(0) == ERROR) {
- decoderState = DecoderState.IN_ERROR;
- initialBuffer.flip();
- initialBuffer.get();
- exceptionBuilder.append(initialBuffer);
- initialBuffer.clear();
- break;
- } else {
- remoteInputStream.acceptException("Received garbage from
remote side");
- IoUtils.safeClose(channel);
- return;
}
- }
- case IN_ERROR: {
- ByteBuffer buffer = ByteBuffer.allocate(256);
- int n = channel.read(buffer);
- if (n == -1) {
-
remoteInputStream.acceptException(exceptionBuilder.finish().toString());
- exceptionBuilder = null;
+ default: {
IoUtils.safeClose(channel);
- return;
+ exception = new IOException("Remote data stream was
corrupted");
+ break;
}
- if (n == 0) {
- remoteInputStream.scheduleResumeReads(channel);
- return;
- }
- exceptionBuilder.append(buffer);
- break;
}
- case IN_DATA: {
- if (! dataBuffer.hasRemaining()) {
- dataBuffer.flip();
- remoteInputStream.acceptBuffer(dataBuffer);
- dataBuffer = null;
- decoderState = DecoderState.NEW_CHUNK;
- }
- int n = channel.read(dataBuffer);
- if (n == -1) {
- IoUtils.safeClose(channel);
- return;
- }
- if (n == 0) {
- remoteInputStream.scheduleResumeReads(channel);
- return;
- }
- break;
- }
+ } catch (IOException e) {
+ IoUtils.safeClose(channel);
+ // should only be one waiter, but just in case, notify em all so they
all catch the exception...
+ exception = e;
}
- } catch (IOException e) {
- remoteInputStream.acceptException("Read from remote input stream
failed: " + e.getMessage());
- IoUtils.safeClose(channel);
}
}
- public void handleWritable(final StreamSourceChannel channel) {
+ public void handleWritable(final ReadableAllocatedMessageChannel channel) {
+ // empty
}
- public void handleClosed(final StreamSourceChannel channel) {
- remoteInputStream.acceptEof();
+ public void handleClosed(final ReadableAllocatedMessageChannel channel) {
+ synchronized (lock) {
+ done = true;
+ lock.notifyAll();
+ }
}
}
public static final class RemoteInputStream extends InputStream {
- private enum StreamState {
- RUNNING,
- EOF,
- CLOSED,
- }
-
- private final IoFuture<StreamSourceChannel> futureChannel;
private final BufferAllocator<ByteBuffer> allocator;
private final Object lock = new Object();
// @protectedby lock
- private StreamState state;
private ByteBuffer current;
- private ByteBuffer next;
- private String pendingException;
- private boolean pendingResumeReads = false;
- private RemoteInputStream(final IoFuture<StreamSourceChannel>
futureChannel, final BufferAllocator<ByteBuffer> allocator) {
+ private final IoFuture<? extends ReadableAllocatedMessageChannel>
futureChannel;
+ private final RemoteHandler handler;
+
+ private RemoteInputStream(final IoFuture<? extends
ReadableAllocatedMessageChannel> futureChannel, final BufferAllocator<ByteBuffer>
allocator, final RemoteHandler handler) {
this.futureChannel = futureChannel;
this.allocator = allocator;
+ this.handler = handler;
}
- protected void acceptBuffer(ByteBuffer buffer) {
- synchronized (lock) {
- if (! buffer.hasRemaining()) {
- throw new IllegalArgumentException("empty buffer");
- }
- if (state == StreamState.CLOSED) {
- allocator.free(buffer);
- }
- if (current == null) {
+ // call under {@code lock}
+ private ByteBuffer getCurrent() throws IOException {
+ if (current != null) {
+ return current;
+ } else {
+ final ByteBuffer buffer = handler.getBuffer();
+ if (buffer != null) {
current = buffer;
- lock.notifyAll();
- } else if (next == null) {
- next = buffer;
+ return buffer;
} else {
- throw new IllegalStateException();
+ return null;
}
}
}
- protected void acceptException(String exception) {
- synchronized (lock) {
- pendingException = exception;
- if (current == null) {
- lock.notifyAll();
- }
- }
- }
-
- protected void acceptEof() {
- synchronized (lock) {
- if (state == StreamState.RUNNING) {
- state = StreamState.EOF;
- if (current == null) {
- lock.notifyAll();
- }
- }
- }
- }
-
- protected void scheduleResumeReads(StreamSourceChannel channel) {
- synchronized (lock) {
- if (state == StreamState.CLOSED || state == StreamState.EOF) {
- return;
- }
- if (next == null || current == null) {
- channel.resumeReads();
- } else {
- pendingResumeReads = true;
- }
- }
- }
-
- private ByteBuffer getCurrent() throws IOException {
- boolean intr = false;
- try {
- while (current == null) {
- if (pendingException != null) {
- throw new IOException(pendingException);
- } else if (state == StreamState.EOF) {
- return null;
- }
- try {
- lock.wait();
- } catch (InterruptedException e) {
- intr = true;
- }
- }
- return current;
- } finally {
- if (intr) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
public int read() throws IOException {
synchronized (lock) {
- if (state == StreamState.CLOSED) {
- return -1;
- }
final ByteBuffer buffer = getCurrent();
if (buffer == null) {
return -1;
}
final byte v = buffer.get();
if (! buffer.hasRemaining()) {
- current = next;
- next = null;
+ current = null;
allocator.free(buffer);
- if (pendingResumeReads) {
- futureChannel.get().resumeReads();
- pendingResumeReads = false;
- }
}
return v & 0xff;
}
}
- public int read(final byte b[], final int off, final int len) throws IOException
{
+ public int read(final byte[] b, final int off, final int len) throws IOException
{
synchronized (lock) {
- if (state == StreamState.CLOSED) {
- return -1;
- }
final ByteBuffer buffer = getCurrent();
if (buffer == null) {
return -1;
@@ -402,13 +331,8 @@
final int cnt = Math.min(buffer.remaining(), len);
buffer.get(b, off, cnt);
if (! buffer.hasRemaining()) {
- current = next;
- next = null;
+ current = null;
allocator.free(buffer);
- if (pendingResumeReads) {
- futureChannel.get().resumeReads();
- pendingResumeReads = false;
- }
}
return cnt;
}
@@ -416,24 +340,17 @@
public void close() throws IOException {
synchronized (lock) {
- if (state != StreamState.CLOSED) {
- if (current != null) {
- allocator.free(current);
- current = null;
- }
- if (next != null) {
- allocator.free(next);
- next = null;
- }
- state = StreamState.CLOSED;
- futureChannel.get().close();
+ if (current != null) {
+ allocator.free(current);
+ current = null;
}
+ futureChannel.get().close();
}
}
public int available() throws IOException {
synchronized (lock) {
- return current == null ? 0 : current.remaining() + (next == null ? 0 :
next.remaining());
+ return current == null ? 0 : current.remaining();
}
}
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java 2008-09-09
14:05:02 UTC (rev 4565)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java 2008-09-11
19:54:37 UTC (rev 4566)
@@ -1,17 +1,23 @@
package org.jboss.cx.remoting.core.stream;
import java.io.IOException;
+import java.util.concurrent.Executor;
+import java.util.List;
+import java.util.ArrayList;
+import java.nio.ByteBuffer;
import org.jboss.cx.remoting.spi.stream.StreamSerializerFactory;
+import org.jboss.cx.remoting.spi.stream.StreamContext;
import org.jboss.cx.remoting.stream.ObjectSource;
-import org.jboss.xnio.channels.StreamChannel;
-import org.jboss.xnio.channels.StreamSourceChannel;
-import org.jboss.xnio.channels.StreamSinkChannel;
-import org.jboss.xnio.channels.CommonOptions;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
import org.jboss.xnio.IoHandler;
import org.jboss.xnio.ChannelSource;
import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.BufferAllocator;
+import static org.jboss.xnio.Buffers.flip;
import org.jboss.xnio.log.Logger;
import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.ByteOutput;
/**
*
@@ -22,7 +28,6 @@
private static final Logger log =
Logger.getLogger(ObjectSourceStreamSerializerFactory.class);
-
private MarshallerFactory marshallerFactory;
public MarshallerFactory getMarshallerFactory() {
@@ -33,55 +38,159 @@
this.marshallerFactory = marshallerFactory;
}
- public IoHandler<? super StreamChannel> getLocalSide(final Object localSide)
throws IOException {
-
+ public IoHandler<? super AllocatedMessageChannel> getLocalSide(final Object
localSide, final StreamContext streamContext) throws IOException {
return null;
}
- public Object getRemoteSide(final ChannelSource<StreamChannel> remoteClient)
throws IOException {
+ public Object getRemoteSide(final ChannelSource<AllocatedMessageChannel>
channelSource, final StreamContext streamContext) throws IOException {
return null;
}
- public static class LocalHandler implements IoHandler<StreamSinkChannel> {
+ public static class LocalHandler implements IoHandler<AllocatedMessageChannel>
{
private final ObjectSource objectSource;
+ private final Object lock = new Object();
+ private final Executor executor;
+ private final Marshaller marshaller;
+ private ByteBuffer[] current;
+ private final Runnable fillTask = new FillTask();
+ private final BufferAllocator<ByteBuffer> allocator;
- public LocalHandler(final ObjectSource source) {
+ public LocalHandler(final ObjectSource source, final Executor executor, final
Marshaller marshaller, final BufferAllocator<ByteBuffer> allocator) {
objectSource = source;
+ this.executor = executor;
+ this.marshaller = marshaller;
+ this.allocator = allocator;
}
- public void handleOpened(final StreamSinkChannel channel) {
- if (channel.getOptions().contains(CommonOptions.TCP_NODELAY)) try {
- channel.setOption(CommonOptions.TCP_NODELAY, Boolean.TRUE);
- } catch (Exception e) {
- log.trace("Error setting TCP_NODELAY option: %s",
e.getMessage());
- }
- channel.resumeWrites();
+ public void handleOpened(final AllocatedMessageChannel channel) {
+ executor.execute(fillTask);
}
- public void handleReadable(final StreamSinkChannel channel) {
+ public void handleReadable(final AllocatedMessageChannel channel) {
// not invoked
}
- public void handleWritable(final StreamSinkChannel channel) {
+ public void handleWritable(final AllocatedMessageChannel channel) {
}
- public void handleClosed(final StreamSinkChannel channel) {
+ public void handleClosed(final AllocatedMessageChannel channel) {
IoUtils.safeClose(objectSource);
}
+
+ public class FillTask implements Runnable {
+ public void run() {
+ try {
+ if (objectSource.hasNext()) {
+ final BufferProducingByteOutput output = new
BufferProducingByteOutput(allocator);
+ try {
+ marshaller.start(output);
+ marshaller.writeObject(objectSource.next());
+ marshaller.finish();
+ output.flush();
+ final ByteBuffer[] buffers = output.takeBuffers();
+
+ } finally {
+ IoUtils.safeClose(output);
+ }
+
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
}
- public static class RemoteHandler implements IoHandler<StreamSourceChannel> {
+ public static class BufferProducingByteOutput implements ByteOutput {
- public void handleOpened(final StreamSourceChannel channel) {
+ private final BufferAllocator<ByteBuffer> allocator;
+ private final List<ByteBuffer> buffers = new
ArrayList<ByteBuffer>();
+ private ByteBuffer current;
+
+ public BufferProducingByteOutput(final BufferAllocator<ByteBuffer>
allocator) {
+ this.allocator = allocator;
}
- public void handleReadable(final StreamSourceChannel channel) {
+ public void write(final int i) throws IOException {
+ ByteBuffer buffer = current;
+ if (buffer == null) {
+ buffer = (current = allocator.allocate());
+ }
+ buffer.put((byte) i);
+ if (! buffer.hasRemaining()) {
+ buffers.add(flip(buffer));
+ current = null;
+ }
}
- public void handleWritable(final StreamSourceChannel channel) {
+ public void write(final byte[] bytes) throws IOException {
+ write(bytes, 0, bytes.length);
}
- public void handleClosed(final StreamSourceChannel channel) {
+ public void write(final byte[] bytes, int offs, int len) throws IOException {
+ while (len > 0) {
+ ByteBuffer buffer = current;
+ if (buffer == null) {
+ buffer = (current = allocator.allocate());
+ }
+ final int rem = Math.min(buffer.remaining(), len);
+ buffer.put(bytes, offs, rem);
+ offs += rem;
+ len -= rem;
+ if (! buffer.hasRemaining()) {
+ buffers.add(flip(buffer));
+ current = null;
+ }
+ }
}
+
+ public void close() throws IOException {
+ flush();
+ }
+
+ public void flush() throws IOException {
+ final ByteBuffer buffer = current;
+ if (buffer != null) {
+ buffers.add(buffer);
+ current = null;
+ }
+ }
+
+ public ByteBuffer[] takeBuffers() {
+ try {
+ return buffers.toArray(new ByteBuffer[buffers.size()]);
+ } finally {
+ buffers.clear();
+ }
+ }
}
+
+ public static class RemoteHandler implements IoHandler<AllocatedMessageChannel>
{
+
+ public void handleOpened(final AllocatedMessageChannel channel) {
+ }
+
+ public void handleReadable(final AllocatedMessageChannel channel) {
+ }
+
+ public void handleWritable(final AllocatedMessageChannel channel) {
+ }
+
+ public void handleClosed(final AllocatedMessageChannel channel) {
+ }
+ }
+
+ public static class RemoteObjectSource implements ObjectSource {
+
+ public boolean hasNext() throws IOException {
+ return false;
+ }
+
+ public Object next() throws IOException {
+ return null;
+ }
+
+ public void close() throws IOException {
+ }
+ }
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/OutputStreamStreamSerailizerFactory.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/OutputStreamStreamSerailizerFactory.java 2008-09-09
14:05:02 UTC (rev 4565)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/OutputStreamStreamSerailizerFactory.java 2008-09-11
19:54:37 UTC (rev 4566)
@@ -2,13 +2,16 @@
import java.io.IOException;
import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
-import java.util.concurrent.Semaphore;
+import java.util.concurrent.Executor;
import org.jboss.cx.remoting.spi.stream.StreamSerializerFactory;
-import org.jboss.xnio.channels.StreamChannel;
-import org.jboss.xnio.channels.StreamSourceChannel;
-import org.jboss.xnio.channels.StreamSinkChannel;
+import org.jboss.cx.remoting.spi.stream.StreamContext;
+import org.jboss.cx.remoting.util.OrderedExecutor;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.xnio.channels.WritableMessageChannel;
import org.jboss.xnio.IoHandler;
import org.jboss.xnio.ChannelSource;
import org.jboss.xnio.BufferAllocator;
@@ -25,123 +28,196 @@
private static final long serialVersionUID = -5934238025840749071L;
- public IoHandler<? super StreamChannel> getLocalSide(final Object localSide)
throws IOException {
- return new LocalHandler((OutputStream) localSide, new
BufferAllocator<ByteBuffer>() {
- public ByteBuffer allocate() {
- return ByteBuffer.allocate(512);
- }
+ private BufferAllocator<ByteBuffer> allocator;
- public void free(final ByteBuffer byteBuffer) {
- }
- });
+ public BufferAllocator<ByteBuffer> getAllocator() {
+ return allocator;
}
- public Object getRemoteSide(final ChannelSource<StreamChannel> remoteClient)
throws IOException {
- final RemoteHandler handler = new RemoteHandler(new
BufferAllocator<ByteBuffer>() {
- public ByteBuffer allocate() {
- return ByteBuffer.allocate(512);
- }
+ public void setAllocator(final BufferAllocator<ByteBuffer> allocator) {
+ this.allocator = allocator;
+ }
- public void free(final ByteBuffer byteBuffer) {
- }
- });
- final IoFuture<StreamChannel> futureChannel = remoteClient.open(handler);
- return new RemoteOutputStream(handler, futureChannel);
+ public IoHandler<? super AllocatedMessageChannel> getLocalSide(final Object
localSide, final StreamContext streamContext) throws IOException {
+ return new LocalHandler((OutputStream) localSide, allocator, new
OrderedExecutor(streamContext.getExecutor()));
}
- public static final class LocalHandler implements
IoHandler<StreamSourceChannel> {
+ public Object getRemoteSide(final ChannelSource<AllocatedMessageChannel>
remoteClient, final StreamContext streamContext) throws IOException {
+ final RemoteHandler handler = new RemoteHandler(allocator);
+ final IoFuture<AllocatedMessageChannel> futureChannel =
remoteClient.open(handler);
+ return new RemoteOutputStream(handler, futureChannel, allocator);
+ }
+ public static final class LocalHandler implements
IoHandler<AllocatedMessageChannel> {
+
private final OutputStream outputStream;
private final BufferAllocator<ByteBuffer> allocator;
+ private final Executor executor;
+ private volatile String exceptionMessage;
- public LocalHandler(final OutputStream outputStream, final
BufferAllocator<ByteBuffer> allocator) {
+ public LocalHandler(final OutputStream outputStream, final
BufferAllocator<ByteBuffer> allocator, final Executor executor) {
this.outputStream = outputStream;
this.allocator = allocator;
+ this.executor = executor;
}
- public void handleOpened(final StreamSourceChannel channel) {
+ public void handleOpened(final AllocatedMessageChannel channel) {
channel.resumeReads();
}
- public void handleReadable(final StreamSourceChannel channel) {
- ByteBuffer buffer = allocator.allocate();
+ public void handleReadable(final AllocatedMessageChannel channel) {
try {
- for (;; buffer.clear()) {
- final int c = channel.read(buffer);
- if (c == 0) {
+ for (;;) {
+ final ByteBuffer buffer = channel.receive();
+ if (buffer == null) {
+ IoUtils.safeClose(channel);
+ log.trace("Remote output stream closed normally");
+ } else if (! buffer.hasRemaining()) {
channel.resumeReads();
return;
- } else if (c == -1) {
- IoUtils.safeClose(channel);
- log.trace("Remote output stream closed normally");
} else {
buffer.flip();
- if (buffer.hasArray()) {
- outputStream.write(buffer.array(), buffer.arrayOffset(),
buffer.remaining());
- } else {
- final byte[] bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
- outputStream.write(bytes);
- }
+ executor.execute(new Runnable() {
+ public void run() {
+ try {
+ if (buffer.hasArray()) {
+ outputStream.write(buffer.array(),
buffer.arrayOffset(), buffer.remaining());
+ } else {
+ final byte[] bytes = new
byte[buffer.remaining()];
+ buffer.get(bytes);
+ outputStream.write(bytes);
+ }
+ channel.resumeReads();
+ } catch (Throwable t) {
+ exceptionMessage = t.getMessage();
+ channel.resumeWrites();
+ try {
+ channel.shutdownReads();
+ } catch (Throwable tt) {
+ log.warn(tt, "Unable to shutdown reads on a
channel");
+ }
+ } finally {
+ allocator.free(buffer);
+ }
+ }
+ });
}
}
} catch (IOException e) {
IoUtils.safeClose(channel);
log.trace("Remote output stream closed due to exception: %s",
e.getMessage());
} finally {
- allocator.free(buffer);
}
}
- public void handleWritable(final StreamSourceChannel channel) {
+ public void handleWritable(final AllocatedMessageChannel channel) {
+ final String msg = exceptionMessage;
+ if (msg == null) {
+ // spurious...
+ return;
+ }
+ try {
+ final ByteBuffer buffer =
ByteBuffer.wrap(msg.getBytes("utf-8"));
+ if (! channel.send(buffer)) {
+ channel.resumeWrites();
+ return;
+ }
+ } catch (UnsupportedEncodingException e) {
+ // should not happen; if it does, just close the channel
+ } catch (IOException e) {
+ // nothing we can do about it
+ }
+ IoUtils.safeClose(channel);
+ exceptionMessage = null;
}
- public void handleClosed(final StreamSourceChannel channel) {
+ public void handleClosed(final AllocatedMessageChannel channel) {
IoUtils.safeClose(outputStream);
}
}
- public static final class RemoteHandler implements IoHandler<StreamSinkChannel>
{
+ public static final class RemoteHandler implements
IoHandler<AllocatedMessageChannel> {
- private final Semaphore semaphore = new Semaphore(0);
+ private final Object lock = new Object();
private final BufferAllocator<ByteBuffer> allocator;
- private volatile boolean closed;
- private volatile ByteBuffer buffer;
+ private boolean closed;
+ private ByteBuffer buffer;
+ private IOException exception;
private RemoteHandler(final BufferAllocator<ByteBuffer> allocator) {
this.allocator = allocator;
}
- public void handleOpened(final StreamSinkChannel channel) {
- // block sends until the channel is up
- semaphore.release();
+ public void pushBuffer(final WritableMessageChannel channel, final ByteBuffer
buffer) throws IOException {
+ synchronized (lock) {
+ final IOException exception = this.exception;
+ if (exception != null) {
+ this.exception = null;
+ IOException ioe = new IOException("Write failed");
+ ioe.initCause(exception);
+ throw ioe;
+ }
+ if (closed) {
+ throw new IOException("Channel closed");
+ }
+ while (this.buffer != null) {
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("Operation
interrupted");
+ }
+ }
+ if (! channel.send(buffer)) {
+ channel.resumeWrites();
+ this.buffer = buffer;
+ }
+ }
}
- public void handleReadable(final StreamSinkChannel channel) {
+ public void handleOpened(final AllocatedMessageChannel channel) {
+ channel.resumeReads();
+ synchronized (lock) {
+ if (buffer != null) {
+ channel.resumeWrites();
+ }
+ }
}
- public void handleWritable(final StreamSinkChannel channel) {
- final ByteBuffer buffer = this.buffer;
- if (buffer != null) {
+ public void handleReadable(final AllocatedMessageChannel channel) {
+ try {
+ final ByteBuffer buffer = channel.receive();
+ if (buffer == null) {
+ // normal close
+ IoUtils.safeClose(channel);
+ }
+ } catch (IOException e) {
+ exception = new IOException("Received unexpected I/O
exception");
+ exception.initCause(e);
+ IoUtils.safeClose(channel);
+ }
+ }
+
+ public void handleWritable(final AllocatedMessageChannel channel) {
+ synchronized (lock) {
+ final ByteBuffer buffer = this.buffer;
+ if (buffer == null) {
+ return;
+ }
try {
- while (buffer.hasRemaining()) {
- if (channel.write(buffer) == 0) {
- channel.resumeWrites();
- return;
- }
+ if (channel.send(buffer)) {
+ allocator.free(buffer);
+ this.buffer = null;
}
} catch (IOException e) {
- log.trace("Send exception: %s", e.getMessage());
+ exception = e;
IoUtils.safeClose(channel);
- semaphore.release();
}
- this.buffer = null;
- allocator.free(buffer);
}
}
- public void handleClosed(final StreamSinkChannel channel) {
+ public void handleClosed(final AllocatedMessageChannel channel) {
closed = true;
final ByteBuffer buffer = this.buffer;
if (buffer != null) {
@@ -149,80 +225,94 @@
allocator.free(buffer);
}
}
-
- private void send(final ByteBuffer buffer) throws IOException {
- if (closed) {
- throw new IOException("Channel closed");
- }
- semaphore.acquireUninterruptibly();
- if (closed) {
- semaphore.release();
- allocator.free(buffer);
- throw new IOException("Channel closed");
- }
- this.buffer = buffer;
- }
}
public static final class RemoteOutputStream extends OutputStream {
private final RemoteHandler handler;
- private final IoFuture<? extends Channel> futureChannel;
+ private final IoFuture<? extends WritableMessageChannel> futureChannel;
+ private final Object lock = new Object();
+ private final BufferAllocator<ByteBuffer> allocator;
private ByteBuffer buffer;
- public RemoteOutputStream(final RemoteHandler handler, final IoFuture<?
extends Channel> futureChannel) {
+ public RemoteOutputStream(final RemoteHandler handler, final IoFuture<?
extends WritableMessageChannel> futureChannel, final BufferAllocator<ByteBuffer>
allocator) {
this.handler = handler;
this.futureChannel = futureChannel;
+ this.allocator = allocator;
+ synchronized (lock) {
+ buffer = allocator.allocate();
+ }
}
public void write(final int b) throws IOException {
- if (handler.closed) {
- throw new IOException("Channel closed");
+ synchronized (lock) {
+ final ByteBuffer buffer = this.buffer;
+ if (buffer == null) {
+ throw new IOException("Channel closed");
+ }
+ buffer.put((byte)b);
+ if (! buffer.hasRemaining()) {
+ flush();
+ }
}
- if (buffer == null) {
- buffer = handler.allocator.allocate();
- }
- buffer.put((byte)b);
- if (! buffer.hasRemaining()) {
- flush();
- }
}
public void write(final byte[] bytes, int offset, int length) throws IOException
{
- if (handler.closed) {
- throw new IOException("Channel closed");
- }
- if (buffer == null) {
- buffer = handler.allocator.allocate();
- }
- while (length > 0) {
- int size = Math.min(buffer.remaining(), length);
- buffer.put(bytes, offset, size);
- length -= size; offset += size;
- if (! buffer.hasRemaining()) {
- flush();
+ synchronized (lock) {
+ while (length > 0) {
+ final ByteBuffer buffer = this.buffer;
+ if (buffer == null) {
+ throw new IOException("Channel closed");
+ }
+ int size = Math.min(buffer.remaining(), length);
+ buffer.put(bytes, offset, size);
+ length -= size; offset += size;
+ if (! buffer.hasRemaining()) {
+ flush();
+ }
}
}
}
public void flush() throws IOException {
- try {
- handler.send(buffer);
- } finally {
- buffer = null;
+ synchronized (lock) {
+ if (doFlush()) {
+ buffer = allocator.allocate();
+ }
}
}
+ private boolean doFlush() throws IOException {
+ final ByteBuffer buffer = this.buffer;
+ if (buffer != null) {
+ handler.pushBuffer(futureChannel.get(), buffer);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
public void close() throws IOException {
final Channel channel;
try {
channel = futureChannel.get();
+ if (channel == null) {
+ return;
+ }
} catch (IOException ex) {
// throwing this exception would cause close() to appear to not be
idempotent
log.trace("No channel to close: %s", ex.getMessage());
return;
}
- channel.close();
+ try {
+ synchronized (lock) {
+ doFlush();
+ buffer = null;
+ }
+ channel.close();
+ } finally {
+ IoUtils.safeClose(channel);
+ }
}
}
}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/DelegatingObjectInput.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/DelegatingObjectInput.java 2008-09-09
14:05:02 UTC (rev 4565)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/DelegatingObjectInput.java 2008-09-11
19:54:37 UTC (rev 4566)
@@ -1,103 +0,0 @@
-package org.jboss.cx.remoting.core.util;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-
-/**
- *
- */
-public class DelegatingObjectInput implements ObjectInput {
- private final ObjectInput delegate;
-
- public DelegatingObjectInput(final ObjectInput delegate) {
- this.delegate = delegate;
- }
-
- public int read() throws IOException {
- return delegate.read();
- }
-
- public int read(final byte[] data) throws IOException {
- return delegate.read(data);
- }
-
- public int read(final byte[] data, final int offs, final int len) throws IOException
{
- return delegate.read(data, offs, len);
- }
-
- public void close() throws IOException {
- delegate.close();
- }
-
- public Object readObject() throws ClassNotFoundException, IOException {
- return delegate.readObject();
- }
-
- public long skip(final long n) throws IOException {
- return delegate.skip(n);
- }
-
- public int available() throws IOException {
- return delegate.available();
- }
-
- public void readFully(final byte[] b) throws IOException {
- delegate.readFully(b);
- }
-
- public void readFully(final byte[] b, final int off, final int len) throws
IOException {
- delegate.readFully(b, off, len);
- }
-
- public int skipBytes(final int n) throws IOException {
- return delegate.skipBytes(n);
- }
-
- public boolean readBoolean() throws IOException {
- return delegate.readBoolean();
- }
-
- public byte readByte() throws IOException {
- return delegate.readByte();
- }
-
- public int readUnsignedByte() throws IOException {
- return delegate.readUnsignedByte();
- }
-
- public short readShort() throws IOException {
- return delegate.readShort();
- }
-
- public int readUnsignedShort() throws IOException {
- return delegate.readUnsignedShort();
- }
-
- public char readChar() throws IOException {
- return delegate.readChar();
- }
-
- public int readInt() throws IOException {
- return delegate.readInt();
- }
-
- public long readLong() throws IOException {
- return delegate.readLong();
- }
-
- public float readFloat() throws IOException {
- return delegate.readFloat();
- }
-
- public double readDouble() throws IOException {
- return delegate.readDouble();
- }
-
- public String readLine() throws IOException {
- return delegate.readLine();
- }
-
- public String readUTF() throws IOException {
- return delegate.readUTF();
- }
-}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/OrderedExecutorFactory.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/OrderedExecutorFactory.java 2008-09-09
14:05:02 UTC (rev 4565)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/OrderedExecutorFactory.java 2008-09-11
19:54:37 UTC (rev 4566)
@@ -1,50 +0,0 @@
-package org.jboss.cx.remoting.core.util;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Set;
-import java.util.concurrent.Executor;
-
-/**
- *
- */
-public final class OrderedExecutorFactory {
- private final Executor parent;
- private final Set<ChildExecutor> runningChildren =
Collections.synchronizedSet(new HashSet<ChildExecutor>());
-
- public OrderedExecutorFactory(final Executor parent) {
- this.parent = parent;
- }
-
- public Executor getOrderedExecutor() {
- return new ChildExecutor();
- }
-
- private final class ChildExecutor implements Executor, Runnable {
- private final LinkedList<Runnable> tasks = new
LinkedList<Runnable>();
-
- public void execute(Runnable command) {
- synchronized(tasks) {
- tasks.add(command);
- if (tasks.size() == 1 && runningChildren.add(this)) {
- parent.execute(this);
- }
- }
- }
-
- public void run() {
- for (;;) {
- final Runnable task;
- synchronized(tasks) {
- task = tasks.poll();
- if (task == null) {
- runningChildren.remove(this);
- return;
- }
- }
- task.run();
- }
- }
- }
-}
Modified:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-09-09
14:05:02 UTC (rev 4565)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-09-11
19:54:37 UTC (rev 4566)
@@ -83,9 +83,9 @@
private final ConcurrentIntegerMap<RequestHandler> remoteClients =
concurrentIntegerMap();
// forwarded to remote side (handled on this side)
private final ConcurrentIntegerMap<Handle<RequestHandler>>
forwardedClients = concurrentIntegerMap();
- // sequence for forwarded clients
+ // sequence for forwarded clients (unsigned; shift left one bit, add one)
private final AtomicInteger forwardedClientSequence = new AtomicInteger();
- // sequence for clients created from services forwarded to us
+ // sequence for clients created from services forwarded to us (unsigned; shift left
one bit)
private final AtomicInteger remoteClientSequence = new AtomicInteger();
// services forwarded to us
Added: remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/OrderedExecutor.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/OrderedExecutor.java
(rev 0)
+++
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/OrderedExecutor.java 2008-09-11
19:54:37 UTC (rev 4566)
@@ -0,0 +1,80 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.util;
+
+import java.util.concurrent.Executor;
+import java.util.LinkedList;
+
+/**
+ * An executor that always runs all tasks in order, using a delegate executor to run the
tasks.
+ * <p/>
+ * More specifically, any call B to the {@link #execute(Runnable)} method that
happens-after another call A to the
+ * same method, will result in B's task running after A's.
+ */
+public final class OrderedExecutor implements Executor {
+ // @protectedby tasks
+ private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
+ // @protectedby tasks
+ private boolean running;
+ private final Executor parent;
+ private final Runnable runner;
+
+ /**
+ * Construct a new instance.
+ *
+ * @param parent the parent executor
+ */
+ public OrderedExecutor(final Executor parent) {
+ this.parent = parent;
+ runner = new Runnable() {
+ public void run() {
+ for (;;) {
+ final Runnable task;
+ synchronized(tasks) {
+ task = tasks.poll();
+ if (task == null) {
+ running = false;
+ return;
+ }
+ }
+ task.run();
+ }
+ }
+ };
+ }
+
+ /**
+ * Run a task.
+ *
+ * @param command the task to run.
+ */
+ public void execute(Runnable command) {
+ synchronized(tasks) {
+ tasks.add(command);
+ if (! running) {
+ running = true;
+ parent.execute(runner);
+ }
+ }
+ }
+}
Copied:
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/OrderedExecutorFactory.java
(from rev 4514,
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/OrderedExecutorFactory.java)
===================================================================
---
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/OrderedExecutorFactory.java
(rev 0)
+++
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/OrderedExecutorFactory.java 2008-09-11
19:54:37 UTC (rev 4566)
@@ -0,0 +1,28 @@
+package org.jboss.cx.remoting.util;
+
+import java.util.concurrent.Executor;
+
+/**
+ * A factory for producing executors that run all tasks in order, which delegate to a
single common executor instance.
+ */
+public final class OrderedExecutorFactory {
+ private final Executor parent;
+
+ /**
+ * Construct a new instance delegating to the given parent executor.
+ *
+ * @param parent the parent executor
+ */
+ public OrderedExecutorFactory(final Executor parent) {
+ this.parent = parent;
+ }
+
+ /**
+ * Get an executor that always executes tasks in order.
+ *
+ * @return an ordered executor
+ */
+ public Executor getOrderedExecutor() {
+ return new OrderedExecutor(parent);
+ }
+}