Author: david.lloyd(a)jboss.com
Date: 2010-03-04 23:58:25 -0500 (Thu, 04 Mar 2010)
New Revision: 5801
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundObjectSinkReceiveTask.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundStream.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/MarshallerObjectSink.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundInputStreamTransmitTask.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundObjectSourceTransmitTask.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundStream.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/UnmarshallerObjectSource.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/ReaderInputStream.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/WriterOutputStream.java
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ConnectionImpl.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/ClientGreetingHandler.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/ClientOpenListener.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundRequestHandler.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryObjectTable.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionProvider.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Streams.java
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/AbstractRemoteTestCase.java
Log:
JBREM-1203: Streams (about 95% implemented)
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ConnectionImpl.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ConnectionImpl.java 2010-03-04
22:19:36 UTC (rev 5800)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ConnectionImpl.java 2010-03-05
04:58:25 UTC (rev 5801)
@@ -92,7 +92,7 @@
private final Class<I> requestClass;
private final Class<O> replyClass;
- private EndpointImpl endpoint;
+ private final EndpointImpl endpoint;
public ClientWrapper(final EndpointImpl endpoint, final
FutureResult<Client<I, O>> futureResult, final Class<I> requestClass,
final Class<O> replyClass) {
super(futureResult);
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java 2010-03-04
22:19:36 UTC (rev 5800)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java 2010-03-05
04:58:25 UTC (rev 5801)
@@ -571,15 +571,14 @@
}
final FutureResult<Connection> futureResult = new
FutureResult<Connection>(executor);
// Mark the stack because otherwise debugging connect problems can be incredibly
tough
- final Throwable t = new Throwable();
+ final Throwable mark = new Throwable();
futureResult.addCancelHandler(connectionProvider.connect(destination,
connectOptions, new Result<ConnectionHandlerFactory>() {
public boolean setResult(final ConnectionHandlerFactory result) {
return futureResult.setResult(new ConnectionImpl(EndpointImpl.this,
result, connectionProviderContext, destination.toString()));
}
public boolean setException(final IOException exception) {
- final StackTraceElement[] st0 = t.getStackTrace();
- exception.setStackTrace(Arrays.copyOfRange(st0, 1, st0.length));
+ glueStackTraces(exception, mark, 1);
return futureResult.setException(exception);
}
@@ -590,6 +589,15 @@
return futureResult.getIoFuture();
}
+ static void glueStackTraces(final Throwable exception, final Throwable
markerThrowable, final int trimCount) {
+ final StackTraceElement[] est = exception.getStackTrace();
+ final StackTraceElement[] ust = markerThrowable.getStackTrace();
+ final StackTraceElement[] fst = Arrays.copyOf(est, est.length + ust.length);
+ fst[est.length] = new StackTraceElement("...asynchronous invocation..",
"", null, -1);
+ System.arraycopy(ust, trimCount, fst, est.length + 1, ust.length - trimCount);
+ exception.setStackTrace(fst);
+ }
+
public IoFuture<? extends Connection> connect(final URI destination, final
OptionMap connectOptions, final CallbackHandler callbackHandler) throws IOException {
final Pair<String, String> userRealm = getUserAndRealm(destination);
final String uriUserName = userRealm.getA();
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/ClientGreetingHandler.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/ClientGreetingHandler.java 2010-03-04
22:19:36 UTC (rev 5800)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/ClientGreetingHandler.java 2010-03-05
04:58:25 UTC (rev 5801)
@@ -24,6 +24,10 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
@@ -44,16 +48,18 @@
private final RemoteConnection connection;
private final Result<ConnectionHandlerFactory> factoryResult;
private final CallbackHandler callbackHandler;
+ private final AccessControlContext accessControlContext;
- ClientGreetingHandler(final RemoteConnection connection, final
Result<ConnectionHandlerFactory> factoryResult, final CallbackHandler
callbackHandler) {
+ ClientGreetingHandler(final RemoteConnection connection, final
Result<ConnectionHandlerFactory> factoryResult, final CallbackHandler
callbackHandler, final AccessControlContext accessControlContext) {
super(connection, factoryResult);
this.connection = connection;
this.factoryResult = factoryResult;
this.callbackHandler = callbackHandler;
+ this.accessControlContext = accessControlContext;
}
public void handleMessage(final ByteBuffer buffer) {
- Set<String> saslMechs = new LinkedHashSet<String>();
+ final Set<String> saslMechs = new LinkedHashSet<String>();
String remoteEndpointName = "endpoint";
final int[] ourVersions =
connection.getProviderDescriptor().getSupportedVersions();
int bestVersion = -1;
@@ -116,10 +122,16 @@
final Map<String, ?> propertyMap =
SaslUtils.createPropertyMap(optionMap);
final SaslClient saslClient;
try {
- saslClient = Sasl.createSaslClient(saslMechs.toArray(new
String[saslMechs.size()]), userName, "remote", remoteEndpointName, propertyMap,
callbackHandler);
- } catch (SaslException e) {
- factoryResult.setException(e);
- RemoteConnectionHandler.log.trace(e, "Client connect
authentication error");
+ final String finalRemoteEndpointName = remoteEndpointName;
+ saslClient = AccessController.doPrivileged(new
PrivilegedExceptionAction<SaslClient>() {
+ public SaslClient run() throws SaslException {
+ return Sasl.createSaslClient(saslMechs.toArray(new
String[saslMechs.size()]), userName, "remote", finalRemoteEndpointName,
propertyMap, callbackHandler);
+ }
+ }, accessControlContext);
+ } catch (PrivilegedActionException e) {
+ final SaslException se = (SaslException) e.getCause();
+ factoryResult.setException(se);
+ RemoteConnectionHandler.log.trace(se, "Client connect
authentication error");
try {
remoteConnection.shutdownWritesBlocking();
} catch (IOException e1) {
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/ClientOpenListener.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/ClientOpenListener.java 2010-03-04
22:19:36 UTC (rev 5800)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/ClientOpenListener.java 2010-03-05
04:58:25 UTC (rev 5801)
@@ -25,6 +25,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.security.AccessControlContext;
import org.jboss.marshalling.ProviderDescriptor;
import org.jboss.remoting3.spi.ConnectionHandlerFactory;
import org.jboss.remoting3.spi.ConnectionProviderContext;
@@ -44,13 +45,15 @@
private final Result<ConnectionHandlerFactory> factoryResult;
private final CallbackHandler callbackHandler;
private final ProviderDescriptor providerDescriptor;
+ private final AccessControlContext accessControlContext;
- ClientOpenListener(final OptionMap optionMap, final ConnectionProviderContext
connectionProviderContext, final Result<ConnectionHandlerFactory> factoryResult,
final CallbackHandler callbackHandler, final ProviderDescriptor providerDescriptor) {
+ ClientOpenListener(final OptionMap optionMap, final ConnectionProviderContext
connectionProviderContext, final Result<ConnectionHandlerFactory> factoryResult,
final CallbackHandler callbackHandler, final ProviderDescriptor providerDescriptor, final
AccessControlContext accessControlContext) {
this.optionMap = optionMap;
this.connectionProviderContext = connectionProviderContext;
this.factoryResult = factoryResult;
this.callbackHandler = callbackHandler;
this.providerDescriptor = providerDescriptor;
+ this.accessControlContext = accessControlContext;
}
public void handleEvent(final ConnectedStreamChannel<InetSocketAddress>
channel) {
@@ -110,7 +113,7 @@
}
});
- connection.setMessageHandler(new ClientGreetingHandler(connection, factoryResult,
callbackHandler));
+ connection.setMessageHandler(new ClientGreetingHandler(connection, factoryResult,
callbackHandler, accessControlContext));
// and send the greeting
channel.resumeWrites();
}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundObjectSinkReceiveTask.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundObjectSinkReceiveTask.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundObjectSinkReceiveTask.java 2010-03-05
04:58:25 UTC (rev 5801)
@@ -0,0 +1,90 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.MarshallingConfiguration;
+import org.jboss.marshalling.NioByteInput;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.remoting3.stream.ObjectSink;
+import org.jboss.xnio.IoUtils;
+
+final class InboundObjectSinkReceiveTask implements Runnable {
+
+ private final NioByteInput byteInput;
+ private final InboundStream inboundStream;
+ private final RemoteConnectionHandler connectionHandler;
+ private final ObjectSink objectSink;
+
+ InboundObjectSinkReceiveTask(final NioByteInput byteInput, final InboundStream
inboundStream, final RemoteConnectionHandler connectionHandler, final ObjectSink
objectSink) {
+ this.byteInput = byteInput;
+ this.inboundStream = inboundStream;
+ this.connectionHandler = connectionHandler;
+ this.objectSink = objectSink;
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ public void run() {
+ final MarshallerFactory marshallerFactory =
connectionHandler.getMarshallerFactory();
+ final MarshallingConfiguration configuration =
connectionHandler.getMarshallingConfiguration();
+ final ObjectSink objectSink = this.objectSink;
+ boolean ok = false;
+ try {
+ final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller(configuration);
+ try {
+ unmarshaller.start(byteInput);
+ for (;;) {
+ final int cmd = unmarshaller.read();
+ switch (cmd) {
+ case RemoteProtocol.OSINK_OBJECT: {
+ final Object obj = unmarshaller.readObject();
+ objectSink.accept(obj);
+ break;
+ }
+ case RemoteProtocol.OSINK_FLUSH: {
+ objectSink.flush();
+ break;
+ }
+ case RemoteProtocol.OSINK_CLOSE:
+ case -1: {
+ objectSink.close();
+ ok = true;
+ return;
+ }
+ default: {
+ // no idea, just close everything and send an async
exception
+ return;
+ }
+ }
+ }
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (Exception e) {
+ // todo log it
+ } finally {
+ IoUtils.safeClose(objectSink);
+ if (! ok) inboundStream.sendAsyncException();
+ }
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundStream.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundStream.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundStream.java 2010-03-05
04:58:25 UTC (rev 5801)
@@ -0,0 +1,248 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import org.jboss.marshalling.NioByteInput;
+import org.jboss.xnio.IoUtils;
+
+final class InboundStream {
+ private final int id;
+ private final RemoteConnection remoteConnection;
+ private final Receiver receiver;
+
+ private State state;
+
+ InboundStream(final int id, final RemoteConnection remoteConnection, final Receiver
receiver) {
+ this.id = id;
+ this.remoteConnection = remoteConnection;
+ this.receiver = receiver;
+ }
+
+ InboundStream(final int id, final RemoteConnection remoteConnection, final
ByteInputResult byteInputResult) {
+ this.id = id;
+ this.remoteConnection = remoteConnection;
+ final NioByteInput byteInput = new NioByteInput(
+ new NioByteInputHandler()
+ );
+ receiver = new NioByteInputReceiver(byteInput);
+ byteInputResult.accept(byteInput, this);
+ }
+
+ InboundStream(final int id, final RemoteConnection remoteConnection, final
OutputStream outputStream) {
+ this.id = id;
+ this.remoteConnection = remoteConnection;
+ receiver = new OutputStreamReceiver(outputStream);
+ }
+
+ RemoteConnection getRemoteConnection() {
+ return remoteConnection;
+ }
+
+ Receiver getReceiver() {
+ return receiver;
+ }
+
+ enum State {
+ WAITING_FIRST,
+ WAITING_FIRST_EXCEPTION,
+ RUNNING,
+ CLOSE_WAIT,
+ CLOSED
+ }
+
+ interface Receiver {
+ void push(ByteBuffer buffer);
+
+ void pushEof();
+
+ void pushException();
+ }
+
+ private void doSend(byte code) {
+ final ByteBuffer buffer = remoteConnection.allocate();
+ buffer.position(4);
+ buffer.put(code);
+ buffer.putInt(id);
+ buffer.flip();
+ try {
+ remoteConnection.sendBlocking(buffer, true);
+ } catch (IOException e) {
+ // irrelevant
+ }
+ }
+
+ void sendAsyncClose() {
+ synchronized (this) {
+ OUT: for (;;) switch (state) {
+ case WAITING_FIRST_EXCEPTION: {
+ return;
+ }
+ case WAITING_FIRST: {
+ try {
+ wait();
+ break;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ if (state == State.WAITING_FIRST) {
+ state = State.WAITING_FIRST_EXCEPTION;
+ notifyAll();
+ return;
+ }
+ continue;
+ }
+ }
+ case RUNNING: {
+ state = State.CLOSE_WAIT;
+ break OUT;
+ }
+ case CLOSE_WAIT: {
+ state = State.CLOSED;
+ break OUT;
+ }
+ case CLOSED: {
+ return;
+ }
+ }
+ doSend(RemoteProtocol.STREAM_ASYNC_CLOSE);
+ }
+ }
+
+ void sendAsyncException() {
+ synchronized (this) {
+ switch (state) {
+ case RUNNING: {
+ state = State.CLOSE_WAIT;
+ break;
+ }
+ case CLOSE_WAIT: {
+ state = State.CLOSED;
+ break;
+ }
+ case CLOSED: {
+ return;
+ }
+ }
+ doSend(RemoteProtocol.STREAM_ASYNC_EXCEPTION);
+ }
+ }
+
+ void sendAsyncStart() {
+ doSend(RemoteProtocol.STREAM_ASYNC_START);
+ }
+
+ void sendAck() {
+ synchronized (this) {
+ switch (state) {
+ case RUNNING: {
+ state = State.CLOSE_WAIT;
+ break;
+ }
+ case CLOSE_WAIT: {
+ state = State.CLOSED;
+ break;
+ }
+ case CLOSED: {
+ return;
+ }
+ }
+ doSend(RemoteProtocol.STREAM_ASYNC_EXCEPTION);
+ }
+ }
+
+ private final class NioByteInputHandler implements NioByteInput.InputHandler {
+
+ public void acknowledge() {
+ sendAck();
+ }
+
+ public void close() throws IOException {
+ sendAsyncClose();
+ }
+ }
+
+ private final class NioByteInputReceiver implements Receiver,
NioByteInput.BufferReturn {
+ private final NioByteInput nioByteInput;
+
+ NioByteInputReceiver(final NioByteInput nioByteInput) {
+ this.nioByteInput = nioByteInput;
+ }
+
+ public void push(final ByteBuffer buffer) {
+ nioByteInput.push(buffer, this);
+ }
+
+ public void pushEof() {
+ nioByteInput.pushEof();
+ }
+
+ public void pushException() {
+ nioByteInput.pushException(new IOException("Remote stream exception
occurred on forwarded stream"));
+ }
+
+ public void returnBuffer(final ByteBuffer buffer) {
+ remoteConnection.free(buffer);
+ }
+ }
+
+ private class OutputStreamReceiver implements Receiver {
+
+ private final OutputStream outputStream;
+
+ OutputStreamReceiver(final OutputStream outputStream) {
+ this.outputStream = outputStream;
+ }
+
+ public void push(final ByteBuffer buffer) {
+ try {
+ if (buffer.hasArray()) {
+ final byte[] array = buffer.array();
+ final int offs = buffer.arrayOffset() + buffer.position();
+ final int len = buffer.remaining();
+ outputStream.write(array, offs, len);
+ } else {
+ final byte[] array = new byte[buffer.remaining()];
+ buffer.get(array);
+ outputStream.write(array);
+ }
+ } catch (IOException e) {
+ RemoteConnectionHandler.log.trace("Output stream write failed:
%s", e);
+ sendAsyncException();
+ }
+ }
+
+ public void pushEof() {
+ IoUtils.safeClose(outputStream);
+ }
+
+ public void pushException() {
+ IoUtils.safeClose(outputStream);
+ }
+ }
+
+ interface ByteInputResult {
+ void accept(NioByteInput nioByteInput, final InboundStream inboundStream);
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/MarshallerObjectSink.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/MarshallerObjectSink.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/MarshallerObjectSink.java 2010-03-05
04:58:25 UTC (rev 5801)
@@ -0,0 +1,47 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.io.IOException;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.remoting3.stream.ObjectSink;
+
+final class MarshallerObjectSink<T> implements ObjectSink<T> {
+ private final Marshaller marshaller;
+
+ MarshallerObjectSink(final Marshaller marshaller) {
+ this.marshaller = marshaller;
+ }
+
+ public void accept(final T instance) throws IOException {
+ marshaller.writeObject(instance);
+ }
+
+ public void flush() throws IOException {
+ marshaller.flush();
+ }
+
+ public void close() throws IOException {
+ marshaller.close();
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundInputStreamTransmitTask.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundInputStreamTransmitTask.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundInputStreamTransmitTask.java 2010-03-05
04:58:25 UTC (rev 5801)
@@ -0,0 +1,74 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import org.jboss.xnio.IoUtils;
+
+final class OutboundInputStreamTransmitTask implements Runnable {
+ private final InputStream inputStream;
+ private final OutboundStream outboundStream;
+
+ OutboundInputStreamTransmitTask(final InputStream inputStream, final OutboundStream
outboundStream) {
+ this.inputStream = inputStream;
+ this.outboundStream = outboundStream;
+ }
+
+ public void run() {
+ final InputStream inputStream = this.inputStream;
+ try {
+ final OutboundStream outboundStream = this.outboundStream;
+ byte[] bytes = new byte[1024];
+ for (;;) {
+ int res = 0;
+ try {
+ res = inputStream.read(bytes);
+ } catch (IOException e) {
+ outboundStream.sendException();
+ return;
+ }
+ if (res == -1) {
+ outboundStream.sendEof();
+ return;
+ }
+ try {
+ while (res > 0) {
+ final ByteBuffer buffer = outboundStream.getBuffer();
+ final int xsz = Math.min(buffer.remaining(), res);
+ res -= xsz;
+ buffer.put(bytes, 0, xsz).flip();
+ outboundStream.send(buffer);
+ }
+ } catch (IOException e) {
+ // async msg. received; stop transmitting, send close.
+ outboundStream.sendEof();
+ return;
+ }
+ }
+ } finally {
+ IoUtils.safeClose(inputStream);
+ }
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundObjectSourceTransmitTask.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundObjectSourceTransmitTask.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundObjectSourceTransmitTask.java 2010-03-05
04:58:25 UTC (rev 5801)
@@ -0,0 +1,84 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.MarshallingConfiguration;
+import org.jboss.marshalling.NioByteOutput;
+import org.jboss.remoting3.stream.ObjectSource;
+import org.jboss.xnio.IoUtils;
+
+final class OutboundObjectSourceTransmitTask implements Runnable {
+
+ private final ObjectSource objectSource;
+ private final OutboundStream outboundStream;
+ private final RemoteConnectionHandler connectionHandler;
+
+ OutboundObjectSourceTransmitTask(final ObjectSource objectSource, final
OutboundStream outboundStream, final RemoteConnectionHandler connectionHandler) {
+ this.objectSource = objectSource;
+ this.outboundStream = outboundStream;
+ this.connectionHandler = connectionHandler;
+ }
+
+ public void run() {
+ final ObjectSource objectSource = this.objectSource;
+ try {
+ final MarshallerFactory marshallerFactory =
connectionHandler.getMarshallerFactory();
+ final MarshallingConfiguration configuration =
connectionHandler.getMarshallingConfiguration();
+ try {
+ final Marshaller marshaller =
marshallerFactory.createMarshaller(configuration);
+ try {
+ marshaller.start(new NioByteOutput(new NioByteOutput.BufferWriter()
{
+ public ByteBuffer getBuffer() {
+ return outboundStream.getBuffer();
+ }
+
+ public void accept(final ByteBuffer buffer, final boolean eof)
throws IOException {
+ outboundStream.send(buffer);
+ if (eof) outboundStream.sendEof();
+ }
+
+ public void flush() throws IOException {
+ }
+ }));
+ while (objectSource.hasNext()) {
+ marshaller.writeByte(RemoteProtocol.OSOURCE_OBJECT);
+ marshaller.writeObject(objectSource.next());
+ }
+ marshaller.writeByte(RemoteProtocol.OSOURCE_CLOSE);
+ marshaller.finish();
+ marshaller.close();
+ } finally {
+ IoUtils.safeClose(marshaller);
+ }
+ } catch (Exception e) {
+ outboundStream.sendException();
+ }
+ } finally {
+ IoUtils.safeClose(objectSource);
+ }
+ }
+}
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundRequestHandler.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundRequestHandler.java 2010-03-04
22:19:36 UTC (rev 5800)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundRequestHandler.java 2010-03-05
04:58:25 UTC (rev 5801)
@@ -102,7 +102,7 @@
final ByteBuffer buf = bufferPool.allocate();
try {
buf.putInt(RemoteConnectionHandler.LENGTH_PLACEHOLDER);
- buf.put(RemoteProtocol.CLIENT_CLOSED);
+ buf.put(RemoteProtocol.CLIENT_CLOSE);
buf.putInt(outboundClient.getId());
buf.flip();
connectionHandler.getRemoteConnection().sendBlocking(buf, true);
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundStream.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundStream.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundStream.java 2010-03-05
04:58:25 UTC (rev 5801)
@@ -0,0 +1,209 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
+import java.util.concurrent.Semaphore;
+
+final class OutboundStream {
+
+ private final int id;
+ private final RemoteConnection remoteConnection;
+ private final Semaphore semaphore = new Semaphore(3);
+
+ private State state = State.WAITING;
+
+ enum State {
+
+ WAITING,
+ WAITING_EXCEPTION,
+ RUNNING,
+ ASYNC_CLOSE,
+ ASYNC_EXCEPTION,
+ CLOSE_WAIT, // close/exception sent, waiting for async close/exception
+ CLOSED,
+ }
+
+ OutboundStream(final int id, final RemoteConnection remoteConnection) {
+ this.id = id;
+ this.remoteConnection = remoteConnection;
+ }
+
+ /**
+ * Get the next buffer.
+ *
+ * @return the next buffer
+ */
+ ByteBuffer getBuffer() {
+ final ByteBuffer buffer = remoteConnection.allocate();
+ buffer.position(4);
+ buffer.put(RemoteProtocol.STREAM_DATA);
+ buffer.putInt(id);
+ return buffer;
+ }
+
+ /**
+ * Send a buffer acquired above.
+ *
+ * @return {@code false} if writing should cease
+ *
+ * @throws java.io.IOException in the event of an async close or exception
+ */
+ void send(ByteBuffer buffer) throws IOException {
+ try {
+ synchronized (this) {
+ OUT: for (;;) switch (state) {
+ case WAITING: {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException();
+ }
+ continue;
+ }
+ case ASYNC_CLOSE: {
+ state = State.CLOSED;
+ sendEof();
+ throw new AsynchronousCloseException();
+ }
+ case ASYNC_EXCEPTION: {
+ state = State.CLOSED;
+ throw new AsynchronousCloseException(); // todo pick a better
exception
+ }
+ case CLOSE_WAIT:
+ case CLOSED: {
+ throw new AsynchronousCloseException(); // todo pick a better
exception
+ }
+ case RUNNING: {
+ break OUT;
+ }
+ default: {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ remoteConnection.sendBlocking(buffer, true);
+ } finally {
+ remoteConnection.free(buffer);
+ }
+ }
+
+ void sendEof() {
+ synchronized (this) {
+ switch (state) {
+ case WAITING: {
+ state = State.CLOSE_WAIT;
+ return;
+ }
+ case ASYNC_EXCEPTION:
+ case ASYNC_CLOSE: {
+ state = State.CLOSED;
+ break;
+ }
+
+ }
+ doSend(RemoteProtocol.STREAM_CLOSE);
+ }
+ }
+
+ private void doSend(byte code) {
+ final ByteBuffer buffer = remoteConnection.allocate();
+ buffer.position(4);
+ buffer.put(code);
+ buffer.putInt(id);
+ buffer.flip();
+ try {
+ remoteConnection.sendBlocking(buffer, true);
+ } catch (IOException e) {
+ // irrelevant
+ }
+ }
+
+ void sendException() {
+ synchronized (this) {
+ if (state == State.WAITING) {
+ state = State.WAITING_EXCEPTION;
+ return;
+ } else {
+ state = State.CLOSE_WAIT;
+ }
+ doSend(RemoteProtocol.STREAM_EXCEPTION);
+ }
+ }
+
+ void asyncStart() {
+ synchronized (this) {
+ switch (state) {
+ case WAITING: {
+ state = State.RUNNING;
+ notifyAll();
+ return;
+ }
+ case WAITING_EXCEPTION: {
+ state = State.CLOSE_WAIT;
+ notifyAll();
+ sendException();
+ }
+ case RUNNING:
+ case ASYNC_CLOSE:
+ case ASYNC_EXCEPTION:
+ case CLOSE_WAIT:
+ case CLOSED:
+ }
+ if (state == State.WAITING_EXCEPTION) {
+ state = State.CLOSED;
+ doSend(RemoteProtocol.STREAM_EXCEPTION);
+ return;
+ }
+ }
+ }
+
+ void ack() {
+ semaphore.release();
+ }
+
+ void asyncClose() {
+ synchronized (this) {
+ switch (state) {
+ case WAITING:
+ case RUNNING:
+
+ {
+ doSend(RemoteProtocol.STREAM_CLOSE);
+ state = State.CLOSED;
+ notifyAll();
+ return;
+ }
+ }
+ }
+ }
+
+ void asyncException() {
+
+ }
+
+}
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryObjectTable.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryObjectTable.java 2010-03-04
22:19:36 UTC (rev 5800)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryObjectTable.java 2010-03-05
04:58:25 UTC (rev 5801)
@@ -23,22 +23,38 @@
package org.jboss.remoting3.remote;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Reader;
import java.io.StreamCorruptedException;
+import java.util.Random;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.NioByteInput;
import org.jboss.marshalling.ObjectTable;
import org.jboss.marshalling.Unmarshaller;
+import org.jboss.marshalling.util.IntKeyMap;
import org.jboss.remoting3.Endpoint;
+import org.jboss.remoting3.stream.ObjectSink;
+import org.jboss.remoting3.stream.ObjectSource;
+import org.jboss.remoting3.stream.ReaderInputStream;
+import org.jboss.remoting3.stream.WriterOutputStream;
final class PrimaryObjectTable implements ObjectTable {
private final Endpoint endpoint;
+ private final RemoteConnectionHandler connectionHandler;
+ private final Executor executor;
- PrimaryObjectTable(final Endpoint endpoint) {
+ PrimaryObjectTable(final Endpoint endpoint, final RemoteConnectionHandler
connectionHandler) {
this.endpoint = endpoint;
+ this.connectionHandler = connectionHandler;
+ executor =
this.connectionHandler.getConnectionContext().getConnectionProviderContext().getExecutor();
}
- private static final Writer ZERO_WRITER = new ByteWriter(0);
- private static final Writer ONE_WRITER = new ByteWriter(1);
+ private static final Writer ZERO_WRITER = new
ByteWriter(RemoteProtocol.OBJ_ENDPOINT);
+ private static final Writer ONE_WRITER = new
ByteWriter(RemoteProtocol.OBJ_CLIENT_CONNECTOR);
private static final class ByteWriter implements Writer {
private final byte b;
@@ -57,15 +73,132 @@
return ZERO_WRITER;
} else if (object ==
PrimaryExternalizerFactory.RequestHandlerConnectorExternalizer.INSTANCE) {
return ONE_WRITER;
+ } else if (object instanceof InputStream) {
+ return new Writer() {
+ public void writeObject(final Marshaller marshaller, final Object object)
throws IOException {
+ writeOutboundStream(marshaller, RemoteProtocol.OBJ_INPUT_STREAM,
(InputStream) object);
+ }
+ };
+ } else if (object instanceof OutputStream) {
+ return new Writer() {
+ public void writeObject(final Marshaller marshaller, final Object object)
throws IOException {
+ writeInboundStream(marshaller, RemoteProtocol.OBJ_OUTPUT_STREAM,
(OutputStream) object);
+ }
+ };
+ } else if (object instanceof Reader) {
+ return new Writer() {
+ public void writeObject(final Marshaller marshaller, final Object object)
throws IOException {
+ writeOutboundStream(marshaller, RemoteProtocol.OBJ_READER, new
ReaderInputStream((Reader)object, RemoteProtocol.UTF_8));
+ }
+ };
+ } else if (object instanceof java.io.Writer) {
+ return new Writer() {
+ public void writeObject(final Marshaller marshaller, final Object object)
throws IOException {
+ writeInboundStream(marshaller, RemoteProtocol.OBJ_WRITER, new
WriterOutputStream((java.io.Writer)object, RemoteProtocol.UTF_8));
+ }
+ };
+ } else if (object instanceof ObjectSource) {
+ return new Writer() {
+ public void writeObject(final Marshaller marshaller, final Object object)
throws IOException {
+ writeOutboundStream(marshaller, RemoteProtocol.OBJ_OBJECT_SOURCE,
(ObjectSource) object);
+ }
+ };
+ } else if (object instanceof ObjectSink) {
+ return new Writer() {
+ public void writeObject(final Marshaller marshaller, final Object object)
throws IOException {
+ writeInboundStream(marshaller, RemoteProtocol.OBJ_OBJECT_SINK,
(ObjectSink) object);
+ }
+ };
+ } else {
+ return null;
}
- return null;
}
+ private void writeInboundStream(final Marshaller marshaller, final byte code, final
ObjectSink objectSink) throws IOException {
+ marshaller.writeByte(code);
+ final IntKeyMap<InboundStream> inboundStreams =
connectionHandler.getInboundStreams();
+ final Random random = connectionHandler.getRandom();
+ int id;
+ synchronized (inboundStreams) {
+ while (inboundStreams.containsKey(id = random.nextInt() & ~1));
+ inboundStreams.put(id, new InboundStream(id,
connectionHandler.getRemoteConnection(), new InboundStream.ByteInputResult() {
+ public void accept(final NioByteInput nioByteInput, final InboundStream
inboundStream) {
+ try {
+ executor.execute(new InboundObjectSinkReceiveTask(nioByteInput,
inboundStream, connectionHandler, objectSink));
+ } catch (RejectedExecutionException e) {
+ RemoteConnectionHandler.log.warn("Unable to start task for
forwarded stream: %s", e);
+ inboundStream.sendAsyncException();
+ }
+ }
+ }));
+ }
+ marshaller.writeInt(id);
+ }
+
+ private void writeOutboundStream(final Marshaller marshaller, final byte code, final
ObjectSource objectSource) throws IOException {
+ marshaller.writeByte(code);
+ final IntKeyMap<OutboundStream> outboundStreams =
connectionHandler.getOutboundStreams();
+ final Random random = connectionHandler.getRandom();
+ int id;
+ final OutboundStream outboundStream;
+ synchronized (outboundStreams) {
+ while (outboundStreams.containsKey(id = random.nextInt() | 1));
+ outboundStreams.put(id, outboundStream = new OutboundStream(id,
connectionHandler.getRemoteConnection()));
+ }
+ marshaller.writeInt(id);
+ try {
+ executor.execute(new OutboundObjectSourceTransmitTask(objectSource,
outboundStream, connectionHandler));
+ } catch (RejectedExecutionException e) {
+ RemoteConnectionHandler.log.warn("Unable to start task for forwarded
stream: %s", e);
+ outboundStream.sendException();
+ }
+ }
+
+ /**
+ * This looks backwards but it really isn't. When we write an OutputStream, we
want the remote side to send us inbound
+ * to feed it.
+ *
+ * @param marshaller the marshaller
+ * @param code the code
+ * @param outputStream the output stream
+ * @throws IOException if an I/O error occurs
+ */
+ private void writeInboundStream(final Marshaller marshaller, final byte code, final
OutputStream outputStream) throws IOException {
+ marshaller.writeByte(code);
+ final IntKeyMap<InboundStream> inboundStreams =
connectionHandler.getInboundStreams();
+ final Random random = connectionHandler.getRandom();
+ int id;
+ synchronized (inboundStreams) {
+ while (inboundStreams.containsKey(id = random.nextInt() & ~1));
+ inboundStreams.put(id, new InboundStream(id,
connectionHandler.getRemoteConnection(), outputStream));
+ }
+ marshaller.writeInt(id);
+ }
+
+ private void writeOutboundStream(final Marshaller marshaller, final byte code, final
InputStream inputStream) throws IOException {
+ marshaller.writeByte(code);
+ final IntKeyMap<OutboundStream> outboundStreams =
connectionHandler.getOutboundStreams();
+ final Random random = connectionHandler.getRandom();
+ int id;
+ final OutboundStream outboundStream;
+ synchronized (outboundStreams) {
+ while (outboundStreams.containsKey(id = random.nextInt() | 1));
+ outboundStreams.put(id, outboundStream = new OutboundStream(id,
connectionHandler.getRemoteConnection()));
+ }
+ marshaller.writeInt(id);
+ try {
+ executor.execute(new OutboundInputStreamTransmitTask(inputStream,
outboundStream));
+ } catch (RejectedExecutionException e) {
+ RemoteConnectionHandler.log.warn("Unable to start task for forwarded
stream: %s", e);
+ outboundStream.sendException();
+ }
+ }
+
public Object readObject(final Unmarshaller unmarshaller) throws IOException,
ClassNotFoundException {
final int id = unmarshaller.readUnsignedByte();
switch (id) {
- case 0: return endpoint;
- case 1: return
PrimaryExternalizerFactory.RequestHandlerConnectorExternalizer.INSTANCE;
+ case RemoteProtocol.OBJ_ENDPOINT: return endpoint;
+ case RemoteProtocol.OBJ_CLIENT_CONNECTOR: return
PrimaryExternalizerFactory.RequestHandlerConnectorExternalizer.INSTANCE;
default: throw new StreamCorruptedException("Unknown object table ID
byte " + id);
}
}
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionProvider.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionProvider.java 2010-03-04
22:19:36 UTC (rev 5800)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionProvider.java 2010-03-05
04:58:25 UTC (rev 5801)
@@ -26,6 +26,8 @@
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
+import java.security.AccessControlContext;
+import java.security.AccessController;
import org.jboss.marshalling.ProviderDescriptor;
import org.jboss.remoting3.RemotingOptions;
import org.jboss.remoting3.security.ServerAuthenticationProvider;
@@ -75,10 +77,12 @@
if (port < 1) {
throw new IllegalArgumentException("Port number must be
specified");
}
+ // Get the caller context so that GSSAPI can work
+ final AccessControlContext acc = AccessController.getContext();
// Open a client channel
final IoFuture<? extends ConnectedStreamChannel<InetSocketAddress>>
futureChannel;
try {
- futureChannel = connector.connectTo(new
InetSocketAddress(InetAddress.getByName(host), port), new
ClientOpenListener(connectOptions, connectionProviderContext, result, callbackHandler,
providerDescriptor), null);
+ futureChannel = connector.connectTo(new
InetSocketAddress(InetAddress.getByName(host), port), new
ClientOpenListener(connectOptions, connectionProviderContext, result, callbackHandler,
providerDescriptor, acc), null);
} catch (UnknownHostException e) {
result.setException(e);
return IoUtils.nullCancellable();
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java 2010-03-04
22:19:36 UTC (rev 5800)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java 2010-03-05
04:58:25 UTC (rev 5801)
@@ -125,7 +125,7 @@
}
return;
}
- case RemoteProtocol.CLIENT_CLOSED: {
+ case RemoteProtocol.CLIENT_CLOSE: {
final int id = buffer.getInt();
final InboundClient client;
@@ -286,7 +286,7 @@
outboundRequest = outboundRequests.get(rid);
}
if (outboundRequest == null) {
- RemoteConnectionHandler.log.trace("Received
reply-exception-abort for unknown request ID %d", Integer.valueOf(rid));
+ RemoteConnectionHandler.log.warn("Received reply-exception-abort
for unknown request ID %d", Integer.valueOf(rid));
return;
}
final NioByteInput byteInput;
@@ -304,6 +304,108 @@
}
return;
}
+ case RemoteProtocol.ALIVE: {
+ // todo - mark the time
+ return;
+ }
+ case RemoteProtocol.STREAM_ACK: {
+ final int sid = buffer.getInt();
+ final IntKeyMap<OutboundStream> outboundStreams =
connectionHandler.getOutboundStreams();
+ final OutboundStream outboundStream;
+ synchronized (outboundStreams) {
+ outboundStream = outboundStreams.get(sid);
+ }
+ if (outboundStream == null) {
+ RemoteConnectionHandler.log.warn("Received stream-ack for
unknown stream ID %d", Integer.valueOf(sid));
+ return;
+ }
+ outboundStream.ack();
+ return;
+ }
+ case RemoteProtocol.STREAM_ASYNC_CLOSE: {
+ final int sid = buffer.getInt();
+ final IntKeyMap<OutboundStream> outboundStreams =
connectionHandler.getOutboundStreams();
+ final OutboundStream outboundStream;
+ synchronized (outboundStreams) {
+ outboundStream = outboundStreams.get(sid);
+ }
+ if (outboundStream == null) {
+ RemoteConnectionHandler.log.warn("Received stream-ack for
unknown stream ID %d", Integer.valueOf(sid));
+ return;
+ }
+ outboundStream.asyncClose();
+ return;
+ }
+ case RemoteProtocol.STREAM_ASYNC_EXCEPTION: {
+ final int sid = buffer.getInt();
+ final IntKeyMap<OutboundStream> outboundStreams =
connectionHandler.getOutboundStreams();
+ final OutboundStream outboundStream;
+ synchronized (outboundStreams) {
+ outboundStream = outboundStreams.get(sid);
+ }
+ if (outboundStream == null) {
+ RemoteConnectionHandler.log.warn("Received
stream-async-exception for unknown stream ID %d", Integer.valueOf(sid));
+ return;
+ }
+ outboundStream.asyncException();
+ return;
+ }
+ case RemoteProtocol.STREAM_ASYNC_START: {
+ final int sid = buffer.getInt();
+ final IntKeyMap<OutboundStream> outboundStreams =
connectionHandler.getOutboundStreams();
+ final OutboundStream outboundStream;
+ synchronized (outboundStreams) {
+ outboundStream = outboundStreams.get(sid);
+ }
+ if (outboundStream == null) {
+ RemoteConnectionHandler.log.warn("Received stream-async-start
for unknown stream ID %d", Integer.valueOf(sid));
+ return;
+ }
+ outboundStream.asyncStart();
+ return;
+ }
+ case RemoteProtocol.STREAM_CLOSE: {
+ final int sid = buffer.getInt();
+ final IntKeyMap<InboundStream> inboundStreams =
connectionHandler.getInboundStreams();
+ final InboundStream inboundStream;
+ synchronized (inboundStreams) {
+ inboundStream = inboundStreams.get(sid);
+ }
+ if (inboundStream == null) {
+ RemoteConnectionHandler.log.warn("Received stream-close for
unknown stream ID %d", Integer.valueOf(sid));
+ return;
+ }
+ inboundStream.getReceiver().pushEof();
+ return;
+ }
+ case RemoteProtocol.STREAM_DATA: {
+ final int sid = buffer.getInt();
+ final IntKeyMap<InboundStream> inboundStreams =
connectionHandler.getInboundStreams();
+ final InboundStream inboundStream;
+ synchronized (inboundStreams) {
+ inboundStream = inboundStreams.get(sid);
+ }
+ if (inboundStream == null) {
+ RemoteConnectionHandler.log.warn("Received stream-data for
unknown stream ID %d", Integer.valueOf(sid));
+ return;
+ }
+ inboundStream.getReceiver().push(buffer);
+ return;
+ }
+ case RemoteProtocol.STREAM_EXCEPTION: {
+ final int sid = buffer.getInt();
+ final IntKeyMap<InboundStream> inboundStreams =
connectionHandler.getInboundStreams();
+ final InboundStream inboundStream;
+ synchronized (inboundStreams) {
+ inboundStream = inboundStreams.get(sid);
+ }
+ if (inboundStream == null) {
+ RemoteConnectionHandler.log.warn("Received stream-exception for
unknown stream ID %d", Integer.valueOf(sid));
+ return;
+ }
+ inboundStream.getReceiver().pushException();
+ return;
+ }
default: {
RemoteConnectionHandler.log.error("Received invalid packet type on
%s, closing", connectionHandler);
IoUtils.safeClose(connectionHandler);
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java 2010-03-04
22:19:36 UTC (rev 5800)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java 2010-03-05
04:58:25 UTC (rev 5801)
@@ -23,6 +23,7 @@
package org.jboss.remoting3.remote;
import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
import org.jboss.remoting3.spi.ConnectionProvider;
import org.jboss.remoting3.spi.ConnectionProviderContext;
import org.jboss.xnio.Connector;
@@ -38,28 +39,44 @@
*/
public static final byte VERSION = 0;
+ // Message flags
+
static final int MSG_FLAG_FIRST = 1;
static final int MSG_FLAG_LAST = 2;
+ // Message types
+
static final byte GREETING = 0;
- static final byte SERVICE_REQUEST = 1;
- static final byte SERVICE_NOT_FOUND = 2;
- static final byte SERVICE_CLIENT_OPENED = 3;
- static final byte CLIENT_CLOSED = 4;
- static final byte REQUEST = 5;
- static final byte REQUEST_ABORT = 6;
- static final byte REQUEST_ACK_CHUNK = 7;
- static final byte REPLY = 8;
- static final byte REPLY_EXCEPTION = 9;
- static final byte REPLY_ACK_CHUNK = 10;
- static final byte REPLY_EXCEPTION_ABORT = 11;
- static final byte AUTH_REQUEST = 12;
- static final byte AUTH_CHALLENGE = 13;
- static final byte AUTH_RESPONSE = 14;
- static final byte AUTH_COMPLETE = 15;
- static final byte AUTH_REJECTED = 16;
+ static final byte AUTH_REQUEST = 1;
+ static final byte AUTH_CHALLENGE = 2;
+ static final byte AUTH_RESPONSE = 3;
+ static final byte AUTH_COMPLETE = 4;
+ static final byte AUTH_REJECTED = 5;
+ static final byte SERVICE_REQUEST = 16;
+ static final byte SERVICE_NOT_FOUND = 17;
+ static final byte SERVICE_CLIENT_OPENED = 18;
+ static final byte CLIENT_CLOSE = 19;
+ // todo CLIENT_ASYNC_CLOSE!
+
+ static final byte STREAM_DATA = 32; // from source -> sink side
+ static final byte STREAM_EXCEPTION = 33; // from source -> sink side
+ static final byte STREAM_CLOSE = 34; // from source -> sink side
+
+ static final byte STREAM_ACK = 35; // from sink -> source side
+ static final byte STREAM_ASYNC_CLOSE = 36; // from sink -> source side
+ static final byte STREAM_ASYNC_EXCEPTION = 37; // from sink -> source side
+ static final byte STREAM_ASYNC_START = 38; // from sink -> source side when
sending output streams
+
+ static final byte REQUEST = 48;
+ static final byte REQUEST_ABORT = 49;
+ static final byte REQUEST_ACK_CHUNK = 50;
+ static final byte REPLY = 51;
+ static final byte REPLY_EXCEPTION = 52;
+ static final byte REPLY_ACK_CHUNK = 53;
+ static final byte REPLY_EXCEPTION_ABORT = 54;
+
static final byte ALIVE = 99;
// Greeting types
@@ -69,6 +86,30 @@
static final byte GREETING_ENDPOINT_NAME = 2; // sent by client & server
static final byte GREETING_MARSHALLER_VERSION = 3; // sent by client & server
+ // Object table types
+
+ static final byte OBJ_ENDPOINT = 0;
+ static final byte OBJ_CLIENT_CONNECTOR = 1;
+ static final byte OBJ_INPUT_STREAM = 2;
+ static final byte OBJ_OUTPUT_STREAM = 3;
+ static final byte OBJ_READER = 4;
+ static final byte OBJ_WRITER = 5;
+ static final byte OBJ_OBJECT_SOURCE = 6;
+ static final byte OBJ_OBJECT_SINK = 7;
+
+ // Object sink stream commands
+
+ static final int OSINK_OBJECT = 0;
+ static final int OSINK_FLUSH = 1;
+ static final int OSINK_CLOSE = 2;
+
+ // Object source stream commands
+
+ static final int OSOURCE_OBJECT = 0;
+ static final int OSOURCE_CLOSE = 1;
+
+ static final Charset UTF_8 = Charset.forName("UTF8");
+
/**
* Create an instance of the connection provider for the "remote"
protocol.
*
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/UnmarshallerObjectSource.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/UnmarshallerObjectSource.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/UnmarshallerObjectSource.java 2010-03-05
04:58:25 UTC (rev 5801)
@@ -0,0 +1,47 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.remoting3.stream.ObjectSource;
+
+final class UnmarshallerObjectSource<T> implements ObjectSource<T> {
+ private final Unmarshaller unmarshaller;
+
+ UnmarshallerObjectSource(final Unmarshaller unmarshaller) {
+ this.unmarshaller = unmarshaller;
+ }
+
+ public boolean hasNext() throws IOException {
+ return false;
+ }
+
+ public T next() throws NoSuchElementException, IOException {
+ return null;
+ }
+
+ public void close() throws IOException {
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/ReaderInputStream.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/ReaderInputStream.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/ReaderInputStream.java 2010-03-05
04:58:25 UTC (rev 5801)
@@ -0,0 +1,173 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.stream;
+
+import java.io.CharConversionException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.CoderResult;
+import java.nio.charset.CodingErrorAction;
+import org.jboss.xnio.Buffers;
+
+public final class ReaderInputStream extends InputStream {
+
+ private final Reader reader;
+ private final CharsetEncoder encoder;
+ private final CharBuffer charBuffer;
+ private final ByteBuffer byteBuffer;
+
+ public ReaderInputStream(final Reader reader) {
+ this(reader, Charset.defaultCharset());
+ }
+
+ public ReaderInputStream(final Reader reader, final String charsetName) throws
UnsupportedEncodingException {
+ this(reader, Streams.getCharset(charsetName));
+ }
+
+ public ReaderInputStream(final Reader reader, final Charset charset) {
+ this(reader, getEncoder(charset));
+ }
+
+ public ReaderInputStream(final Reader reader, final CharsetEncoder encoder) {
+ this(reader, encoder, 1024);
+ }
+
+ public ReaderInputStream(final Reader reader, final CharsetEncoder encoder, final int
bufferSize) {
+ this.reader = reader;
+ this.encoder = encoder;
+ charBuffer = CharBuffer.wrap(new char[bufferSize]);
+ byteBuffer = ByteBuffer.wrap(new byte[(int) ((float)bufferSize *
encoder.averageBytesPerChar() + 0.5f)]);
+ }
+
+ private static CharsetEncoder getEncoder(final Charset charset) {
+ final CharsetEncoder encoder = charset.newEncoder();
+ encoder.onMalformedInput(CodingErrorAction.REPLACE);
+ encoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
+ return encoder;
+ }
+
+ public int read() throws IOException {
+ final ByteBuffer byteBuffer = this.byteBuffer;
+ if (! byteBuffer.hasRemaining()) {
+ if (! fill()) {
+ return -1;
+ }
+ }
+ return byteBuffer.get() & 0xff;
+ }
+
+ public int read(final byte[] b, int off, int len) throws IOException {
+ final ByteBuffer byteBuffer = this.byteBuffer;
+ int cnt = 0;
+ while (len > 0) {
+ final int r = byteBuffer.remaining();
+ if (r == 0) {
+ if (! fill()) return cnt == 0 ? -1 : cnt;
+ continue;
+ }
+ final int c = Math.min(r, len);
+ byteBuffer.get(b, off, c);
+ cnt += c;
+ off += c;
+ len -= c;
+ }
+ return cnt;
+ }
+
+ private boolean fill() throws IOException {
+ final CharBuffer charBuffer = this.charBuffer;
+ final ByteBuffer byteBuffer = this.byteBuffer;
+ byteBuffer.compact();
+ try {
+ while (byteBuffer.hasRemaining()) {
+ while (charBuffer.hasRemaining()) {
+ final CoderResult result = encoder.encode(charBuffer, byteBuffer,
false);
+ if (result.isOverflow()) {
+ return true;
+ }
+ if (result.isUnderflow()) {
+ break;
+ }
+ if (result.isError()) {
+ if (result.isMalformed()) {
+ throw new CharConversionException("Malformed
input");
+ }
+ if (result.isUnmappable()) {
+ throw new CharConversionException("Unmappable
character");
+ }
+ throw new CharConversionException("Character decoding
problem");
+ }
+ }
+ charBuffer.compact();
+ try {
+ final int cnt = reader.read(charBuffer);
+ if (cnt == -1) {
+ return false;
+ }
+ } finally {
+ charBuffer.flip();
+ }
+ }
+ return true;
+ } finally {
+ byteBuffer.flip();
+ }
+ }
+
+ public long skip(long n) throws IOException {
+ final ByteBuffer byteBuffer = this.byteBuffer;
+ int cnt = 0;
+ while (n > 0) {
+ final int r = byteBuffer.remaining();
+ if (r == 0) {
+ if (! fill()) return cnt;
+ continue;
+ }
+ final int c = Math.min(r, n > (long) Integer.MAX_VALUE ? Integer.MAX_VALUE
: (int) n);
+ Buffers.skip(byteBuffer, c);
+ cnt += c;
+ n -= c;
+ }
+ return cnt;
+ }
+
+ public int available() throws IOException {
+ return byteBuffer.remaining();
+ }
+
+ public void close() throws IOException {
+ byteBuffer.clear();
+ charBuffer.clear();
+ reader.close();
+ }
+
+ public String toString() {
+ return "ReaderInputStream over " + reader;
+ }
+}
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Streams.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Streams.java 2010-03-04
22:19:36 UTC (rev 5800)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Streams.java 2010-03-05
04:58:25 UTC (rev 5801)
@@ -24,6 +24,9 @@
import java.io.EOFException;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.UnsupportedCharsetException;
import java.util.Collection;
import java.util.Enumeration;
import java.util.Iterator;
@@ -274,6 +277,14 @@
return new EnumerationObjectSource<T>(enumeration);
}
+ static Charset getCharset(final String charsetName) throws
UnsupportedEncodingException {
+ try {
+ return Charset.forName(charsetName);
+ } catch (UnsupportedCharsetException e) {
+ throw new UnsupportedEncodingException(e.getMessage());
+ }
+ }
+
private static final class CollectionObjectSink<T> implements
ObjectSink<T> {
private final Collection<T> target;
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/WriterOutputStream.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/WriterOutputStream.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/WriterOutputStream.java 2010-03-05
04:58:25 UTC (rev 5801)
@@ -0,0 +1,152 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.stream;
+
+import java.io.CharConversionException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.io.Writer;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CoderResult;
+import java.nio.charset.CodingErrorAction;
+
+public final class WriterOutputStream extends OutputStream {
+
+ private final Writer writer;
+ private final CharsetDecoder decoder;
+ private final ByteBuffer byteBuffer;
+ private final char[] chars;
+ private volatile boolean closed;
+
+ public WriterOutputStream(final Writer writer) {
+ this(writer, Charset.defaultCharset());
+ }
+
+ public WriterOutputStream(final Writer writer, final CharsetDecoder decoder) {
+ this(writer, decoder, 1024);
+ }
+
+ public WriterOutputStream(final Writer writer, final CharsetDecoder decoder, int
bufferSize) {
+ this.writer = writer;
+ this.decoder = decoder;
+ byteBuffer = ByteBuffer.allocate(bufferSize);
+ chars = new char[(int) ((float)bufferSize * decoder.maxCharsPerByte() + 0.5f)];
+ }
+
+ public WriterOutputStream(final Writer writer, final Charset charset) {
+ this(writer, getDecoder(charset));
+ }
+
+ public WriterOutputStream(final Writer writer, final String charsetName) throws
UnsupportedEncodingException {
+ this(writer, Streams.getCharset(charsetName));
+ }
+
+ private static CharsetDecoder getDecoder(final Charset charset) {
+ final CharsetDecoder decoder = charset.newDecoder();
+ decoder.onMalformedInput(CodingErrorAction.REPLACE);
+ decoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
+ decoder.replaceWith("?");
+ return decoder;
+ }
+
+ public void write(final int b) throws IOException {
+ if (closed) throw new IOException("Stream closed");
+ final ByteBuffer byteBuffer = this.byteBuffer;
+ if (! byteBuffer.hasRemaining()) {
+ doFlush(false);
+ }
+ byteBuffer.put((byte) b);
+ }
+
+ public void write(final byte[] b, int off, int len) throws IOException {
+ if (closed) throw new IOException("Stream closed");
+ final ByteBuffer byteBuffer = this.byteBuffer;
+ // todo Correct first, fast later
+ while (len > 0) {
+ final int r = byteBuffer.remaining();
+ if (r == 0) {
+ doFlush(false);
+ continue;
+ }
+ final int c = Math.min(len, r);
+ byteBuffer.put(b, off, c);
+ len -= c;
+ off += c;
+ }
+ }
+
+ private void doFlush(final boolean eof) throws IOException {
+ final CharBuffer charBuffer = CharBuffer.wrap(chars);
+ final ByteBuffer byteBuffer = this.byteBuffer;
+ final CharsetDecoder decoder = this.decoder;
+ byteBuffer.flip();
+ try {
+ while (byteBuffer.hasRemaining()) {
+ final CoderResult result = decoder.decode(byteBuffer, charBuffer, eof);
+ if (result.isOverflow()) {
+ writer.write(chars, 0, charBuffer.position());
+ charBuffer.clear();
+ continue;
+ }
+ if (result.isUnderflow()) {
+ final int p = charBuffer.position();
+ if (p > 0) {
+ writer.write(chars, 0, p);
+ }
+ return;
+ }
+ if (result.isError()) {
+ if (result.isMalformed()) {
+ throw new CharConversionException("Malformed input");
+ }
+ if (result.isUnmappable()) {
+ throw new CharConversionException("Unmappable
character");
+ }
+ throw new CharConversionException("Character decoding
problem");
+ }
+ }
+ } finally {
+ byteBuffer.compact();
+ }
+ }
+
+ public void flush() throws IOException {
+ if (closed) throw new IOException("Stream closed");
+ writer.flush();
+ }
+
+ public void close() throws IOException {
+ closed = true;
+ doFlush(true);
+ byteBuffer.clear();
+ writer.close();
+ }
+
+ public String toString() {
+ return "Output stream writing to " + writer;
+ }
+}
Modified:
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/AbstractRemoteTestCase.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/AbstractRemoteTestCase.java 2010-03-04
22:19:36 UTC (rev 5800)
+++
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/AbstractRemoteTestCase.java 2010-03-05
04:58:25 UTC (rev 5801)
@@ -44,9 +44,11 @@
import org.jboss.xnio.channels.BoundChannel;
import org.jboss.xnio.channels.ConnectedStreamChannel;
import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
import static org.testng.Assert.assertNotNull;
+@Test
public abstract class AbstractRemoteTestCase extends InvocationTestBase {
@BeforeTest
@@ -67,17 +69,14 @@
assertNotNull(provider, "No remote provider interface");
final OptionMap serverOptions = OptionMap.builder()
.set(RemotingOptions.AUTHENTICATION_PROVIDER, "test")
-// .setSequence(Options.SASL_MECHANISMS, "EXTERNAL",
"DIGEST-MD5")
- .setSequence(Options.SASL_MECHANISMS, "DIGEST-MD5")
+ .setSequence(Options.SASL_MECHANISMS, "EXTERNAL",
"DIGEST-MD5")
.getMap();
final ChannelListener<ConnectedStreamChannel<InetSocketAddress>>
listener = provider.getServerListener(serverOptions);
final Xnio xnio = Xnio.getInstance();
final AcceptingServer<InetSocketAddress, ?, ?> server = getServer(listener,
xnio);
final IoFuture<? extends BoundChannel<InetSocketAddress>> future =
server.bind(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0));
final InetSocketAddress localAddress = future.get().getLocalAddress();
- final OptionMap clientOptions = OptionMap.builder()
- .setSequence(Options.SSL_ENABLED_CIPHER_SUITES,
"TLS_RSA_WITH_AES_128_CBC_SHA")
- .getMap();
+ final OptionMap clientOptions = OptionMap.EMPTY;
final Connection connection = endpoint.connect(new URI(getScheme(), null,
localAddress.getAddress().getHostAddress(), localAddress.getPort(), null, null, null),
clientOptions, "user", null, "password".toCharArray()).get();
connection.addCloseHandler(new CloseHandler<Connection>() {
public void handleClose(final Connection closed) {
@@ -87,8 +86,6 @@
return connection;
}
- protected void addClientOptions(OptionMap.Builder optionMapBuilder) {}
-
protected abstract String getScheme();
protected abstract AcceptingServer<InetSocketAddress, ?, ?>
getServer(ChannelListener<ConnectedStreamChannel<InetSocketAddress>> listener,
Xnio xnio) throws NoSuchProviderException, NoSuchAlgorithmException;