JBoss Remoting SVN: r5821 - in remoting3/trunk: jboss-remoting and 2 other directories.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2010-03-10 00:52:08 -0500 (Wed, 10 Mar 2010)
New Revision: 5821
Modified:
remoting3/trunk/jboss-remoting/pom.xml
remoting3/trunk/pom.xml
remoting3/trunk/samples/pom.xml
remoting3/trunk/taglet/pom.xml
Log:
Prep 3.1.0.Beta2
Modified: remoting3/trunk/jboss-remoting/pom.xml
===================================================================
--- remoting3/trunk/jboss-remoting/pom.xml 2010-03-10 05:51:44 UTC (rev 5820)
+++ remoting3/trunk/jboss-remoting/pom.xml 2010-03-10 05:52:08 UTC (rev 5821)
@@ -31,14 +31,14 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <xnio.version>2.1.0.CR2-SNAPSHOT</xnio.version>
- <jbmar.version>1.3.0.CR2-SNAPSHOT</jbmar.version>
+ <xnio.version>2.1.0.CR2</xnio.version>
+ <jbmar.version>1.3.0.CR2</jbmar.version>
</properties>
<groupId>org.jboss.remoting3</groupId>
<artifactId>jboss-remoting</artifactId>
<packaging>jar</packaging>
- <version>3.1.0.Beta2-SNAPSHOT</version>
+ <version>3.1.0.Beta2</version>
<licenses>
<license>
@@ -165,12 +165,6 @@
<manifest>
<mainClass>org.jboss.remoting3.Version</mainClass>
</manifest>
- <addDefaultImplementationEntries/>
- <manifestEntries>
- <Specification-Title>${pom.name}</Specification-Title>
- <Specification-Version>3.1</Specification-Version>
- <Specification-Vendor>${pom.organization.name}</Specification-Vendor>
- </manifestEntries>
</archive>
</configuration>
</plugin>
@@ -190,7 +184,7 @@
<configuration>
<taglet>org.jboss.remoting3.taglet.RemotingTypeTaglet</taglet>
<tagletArtifact>
- <groupId>org.jboss.remoting</groupId>
+ <groupId>org.jboss.remoting3</groupId>
<artifactId>jboss-remoting-taglet</artifactId>
<version>${version}</version>
</tagletArtifact>
Modified: remoting3/trunk/pom.xml
===================================================================
--- remoting3/trunk/pom.xml 2010-03-10 05:51:44 UTC (rev 5820)
+++ remoting3/trunk/pom.xml 2010-03-10 05:52:08 UTC (rev 5821)
@@ -36,7 +36,7 @@
<groupId>org.jboss.remoting3</groupId>
<artifactId>jboss-remoting-all</artifactId>
<packaging>pom</packaging>
- <version>3.1.0.Beta2-SNAPSHOT</version>
+ <version>3.1.0.Beta2</version>
<modules>
<module>taglet</module>
<module>jboss-remoting</module>
Modified: remoting3/trunk/samples/pom.xml
===================================================================
--- remoting3/trunk/samples/pom.xml 2010-03-10 05:51:44 UTC (rev 5820)
+++ remoting3/trunk/samples/pom.xml 2010-03-10 05:52:08 UTC (rev 5821)
@@ -36,12 +36,12 @@
<groupId>org.jboss.remoting3</groupId>
<artifactId>jboss-remoting-samples</artifactId>
<packaging>jar</packaging>
- <version>3.1.0.Beta2-SNAPSHOT</version>
+ <version>3.1.0.Beta2</version>
<dependencies>
<dependency>
<groupId>org.jboss.remoting3</groupId>
<artifactId>jboss-remoting</artifactId>
- <version>3.1.0.Beta2-SNAPSHOT</version>
+ <version>3.1.0.Beta2</version>
<scope>compile</scope>
</dependency>
<dependency>
Modified: remoting3/trunk/taglet/pom.xml
===================================================================
--- remoting3/trunk/taglet/pom.xml 2010-03-10 05:51:44 UTC (rev 5820)
+++ remoting3/trunk/taglet/pom.xml 2010-03-10 05:52:08 UTC (rev 5821)
@@ -35,7 +35,7 @@
<name>JBoss Remoting 3 Taglet</name>
<description>JBoss Remoting 3 Documentation Taglet</description>
<packaging>jar</packaging>
- <version>3.1.0.Beta2-SNAPSHOT</version>
+ <version>3.1.0.Beta2</version>
<dependencies>
<dependency>
14 years, 8 months
JBoss Remoting SVN: r5820 - remoting-mc-int/trunk.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2010-03-10 00:51:44 -0500 (Wed, 10 Mar 2010)
New Revision: 5820
Modified:
remoting-mc-int/trunk/pom.xml
Log:
Next is 1.0.0.Beta2
Modified: remoting-mc-int/trunk/pom.xml
===================================================================
--- remoting-mc-int/trunk/pom.xml 2010-03-10 05:51:25 UTC (rev 5819)
+++ remoting-mc-int/trunk/pom.xml 2010-03-10 05:51:44 UTC (rev 5820)
@@ -35,7 +35,7 @@
<name>JBoss Remoting 3 Metadata</name>
<artifactId>jboss-remoting-metadata</artifactId>
<packaging>jar</packaging>
- <version>1.0.0.Beta1</version>
+ <version>1.0.0.Beta2-SNAPSHOT</version>
<licenses>
<license>
<name>LGPL 2.1</name>
14 years, 8 months
JBoss Remoting SVN: r5819 - remoting-mc-int/tags.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2010-03-10 00:51:25 -0500 (Wed, 10 Mar 2010)
New Revision: 5819
Added:
remoting-mc-int/tags/1.0.0.Beta1/
Log:
Tag 1.0.0.Beta1
Copied: remoting-mc-int/tags/1.0.0.Beta1 (from rev 5818, remoting-mc-int/trunk)
14 years, 8 months
JBoss Remoting SVN: r5818 - remoting-mc-int/trunk.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2010-03-10 00:50:52 -0500 (Wed, 10 Mar 2010)
New Revision: 5818
Modified:
remoting-mc-int/trunk/pom.xml
Log:
Prep 1.0.0.Beta1
Modified: remoting-mc-int/trunk/pom.xml
===================================================================
--- remoting-mc-int/trunk/pom.xml 2010-03-10 04:40:00 UTC (rev 5817)
+++ remoting-mc-int/trunk/pom.xml 2010-03-10 05:50:52 UTC (rev 5818)
@@ -35,27 +35,38 @@
<name>JBoss Remoting 3 Metadata</name>
<artifactId>jboss-remoting-metadata</artifactId>
<packaging>jar</packaging>
- <version>1.0.0.Beta1-SNAPSHOT</version>
+ <version>1.0.0.Beta1</version>
+ <licenses>
+ <license>
+ <name>LGPL 2.1</name>
+ <url>http://www.gnu.org/licenses/lgpl-2.1.html</url>
+ <distribution>repo</distribution>
+ </license>
+ </licenses>
+ <organization>
+ <name>JBoss, a division of Red Hat, Inc.</name>
+ <url>http://www.jboss.org/</url>
+ </organization>
<dependencies>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-river</artifactId>
- <version>1.3.0.CR2-SNAPSHOT</version>
+ <version>1.3.0.CR2</version>
</dependency>
<dependency>
<groupId>org.jboss.remoting3</groupId>
<artifactId>jboss-remoting</artifactId>
- <version>3.1.0.Beta2-SNAPSHOT</version>
+ <version>3.1.0.Beta2</version>
</dependency>
<dependency>
<groupId>org.jboss.xnio</groupId>
<artifactId>xnio-metadata</artifactId>
- <version>2.1.0.CR1</version>
+ <version>2.1.0.CR2</version>
</dependency>
<dependency>
<groupId>org.jboss.xnio</groupId>
<artifactId>xnio-api</artifactId>
- <version>2.1.0.CR1</version>
+ <version>2.1.0.CR2</version>
</dependency>
<dependency>
<groupId>org.jboss.kernel</groupId>
14 years, 8 months
JBoss Remoting SVN: r5817 - in remoting3/trunk/jboss-remoting/src: main/java/org/jboss/remoting3/spi and 1 other directories.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2010-03-09 23:40:00 -0500 (Tue, 09 Mar 2010)
New Revision: 5817
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionProvider.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ProtocolServiceType.java
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java
Log:
Imports
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionProvider.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionProvider.java 2010-03-10 04:36:18 UTC (rev 5816)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionProvider.java 2010-03-10 04:40:00 UTC (rev 5817)
@@ -29,7 +29,6 @@
import java.security.AccessControlContext;
import java.security.AccessController;
import org.jboss.marshalling.ProviderDescriptor;
-import org.jboss.remoting3.RemotingOptions;
import org.jboss.remoting3.security.ServerAuthenticationProvider;
import org.jboss.remoting3.spi.ConnectionHandlerFactory;
import org.jboss.remoting3.spi.ConnectionProvider;
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ProtocolServiceType.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ProtocolServiceType.java 2010-03-10 04:36:18 UTC (rev 5816)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ProtocolServiceType.java 2010-03-10 04:40:00 UTC (rev 5817)
@@ -29,7 +29,6 @@
import org.jboss.marshalling.ObjectResolver;
import org.jboss.marshalling.ObjectTable;
import org.jboss.marshalling.ProviderDescriptor;
-import org.jboss.remoting3.security.ServerAuthenticationProvider;
public final class ProtocolServiceType<T> implements Serializable {
Modified: remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java 2010-03-10 04:36:18 UTC (rev 5816)
+++ remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java 2010-03-10 04:40:00 UTC (rev 5817)
@@ -27,7 +27,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.Reader;
import java.util.concurrent.atomic.AtomicReference;
import org.jboss.remoting3.Client;
import org.jboss.remoting3.ClientConnector;
@@ -53,7 +52,10 @@
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
@Test
public abstract class InvocationTestBase {
14 years, 8 months
JBoss Remoting SVN: r5816 - in remoting3/trunk/jboss-remoting/src: main/java/org/jboss/remoting3/remote and 3 other directories.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2010-03-09 23:36:18 -0500 (Tue, 09 Mar 2010)
New Revision: 5816
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryClassTable.java
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ConnectionImpl.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/LocalRemoteRequestHandler.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundStream.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/MarshallerObjectSink.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryExternalizerFactory.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryObjectTable.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/UnmarshallerObjectSource.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Streams.java
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java
remoting3/trunk/jboss-remoting/src/test/resources/logging.properties
Log:
Streams/object table/externalizer fixes, plus a couple more tests
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ConnectionImpl.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ConnectionImpl.java 2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ConnectionImpl.java 2010-03-10 04:36:18 UTC (rev 5816)
@@ -88,7 +88,7 @@
public <I, O> ClientConnector<I, O> createClientConnector(final RequestListener<I, O> listener, final Class<I> requestClass, final Class<O> replyClass, final OptionMap optionMap) throws IOException {
final ClientContextImpl context = new ClientContextImpl(getExecutor(), this);
- final LocalRequestHandler localRequestHandler = endpoint.createLocalRequestHandler(listener, context, requestClass, replyClass, optionMap);
+ final LocalRequestHandler localRequestHandler = endpoint.createLocalRequestHandler(listener, context, requestClass, replyClass);
final RequestHandlerConnector connector = connectionHandler.createConnector(localRequestHandler);
context.addCloseHandler(SpiUtils.closingCloseHandler(localRequestHandler));
return new ClientConnectorImpl<I, O>(connector, endpoint, requestClass, replyClass, context);
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java 2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java 2010-03-10 04:36:18 UTC (rev 5816)
@@ -194,7 +194,7 @@
}
}
- <I, O> LocalRequestHandler createLocalRequestHandler(final RequestListener<? super I, ? extends O> requestListener, final ClientContextImpl clientContext, final Class<I> requestClass, final Class<O> replyClass, final OptionMap optionMap) throws IOException {
+ <I, O> LocalRequestHandler createLocalRequestHandler(final RequestListener<? super I, ? extends O> requestListener, final ClientContextImpl clientContext, final Class<I> requestClass, final Class<O> replyClass) throws IOException {
if (requestListener == null) {
throw new IllegalArgumentException("requestListener is null");
}
@@ -234,7 +234,7 @@
}
private final class ServiceBuilderImpl<I, O> implements ServiceBuilder<I, O> {
- private String groupName;
+ private String groupName = "default";
private String serviceType;
private Class<I> requestType;
private Class<O> replyType;
@@ -255,7 +255,7 @@
@SuppressWarnings({ "unchecked" })
public <N> ServiceBuilder<N, O> setRequestType(final Class<N> newRequestType) {
if (newRequestType == null) {
- throw new NullPointerException("newRequestType is null");
+ throw new IllegalArgumentException("newRequestType is null");
}
clientListener = null;
ServiceBuilderImpl<N, O> castBuilder = (ServiceBuilderImpl<N, O>) this;
@@ -266,7 +266,7 @@
@SuppressWarnings({ "unchecked" })
public <N> ServiceBuilder<I, N> setReplyType(final Class<N> newReplyType) {
if (newReplyType == null) {
- throw new NullPointerException("newReplyType is null");
+ throw new IllegalArgumentException ("newReplyType is null");
}
clientListener = null;
ServiceBuilderImpl<I, N> castBuilder = (ServiceBuilderImpl<I, N>) this;
@@ -289,7 +289,7 @@
public ServiceBuilder<I, O> setOptionMap(final OptionMap optionMap) {
if (optionMap == null) {
- throw new NullPointerException("optionMap is null");
+ throw new IllegalArgumentException ("optionMap is null");
}
this.optionMap = optionMap;
return this;
@@ -301,19 +301,19 @@
sm.checkPermission(REGISTER_SERVICE_PERM);
}
if (groupName == null) {
- throw new NullPointerException("groupName is null");
+ throw new IllegalArgumentException("groupName is null");
}
if (serviceType == null) {
- throw new NullPointerException("serviceType is null");
+ throw new IllegalArgumentException("serviceType is null");
}
if (requestType == null) {
- throw new NullPointerException("requestType is null");
+ throw new IllegalArgumentException("requestType is null");
}
if (replyType == null) {
- throw new NullPointerException("replyType is null");
+ throw new IllegalArgumentException("replyType is null");
}
if (clientListener == null) {
- throw new NullPointerException("clientListener is null");
+ throw new IllegalArgumentException("clientListener is null");
}
final Integer metric = optionMap.get(RemotingOptions.METRIC);
if (metric != null && metric.intValue() < 0) {
@@ -430,13 +430,13 @@
sm.checkPermission(CREATE_CLIENT_PERM);
}
if (requestHandler == null) {
- throw new NullPointerException("requestHandler is null");
+ throw new IllegalArgumentException("requestHandler is null");
}
if (requestType == null) {
- throw new NullPointerException("requestType is null");
+ throw new IllegalArgumentException("requestType is null");
}
if (replyType == null) {
- throw new NullPointerException("replyType is null");
+ throw new IllegalArgumentException("replyType is null");
}
checkOpen();
final ClientImpl<I, O> client = ClientImpl.create(requestHandler, executor, requestType, replyType, clientClassLoader);
@@ -534,7 +534,7 @@
public <I, O> Client<I, O> createLocalClient(final ClientListener<I, O> clientListener, final Class<I> requestClass, final Class<O> replyClass, final ClassLoader clientClassLoader, final OptionMap optionMap) throws IOException {
final ClientContextImpl context = new ClientContextImpl(executor, null);
final RequestListener<I, O> requestListener = clientListener.handleClientOpen(context, optionMap);
- final LocalRequestHandler localRequestHandler = createLocalRequestHandler(requestListener, context, requestClass, replyClass, optionMap);
+ final LocalRequestHandler localRequestHandler = createLocalRequestHandler(requestListener, context, requestClass, replyClass);
final LocalRemoteRequestHandler remoteRequestHandler = new LocalRemoteRequestHandler(localRequestHandler, clientClassLoader, optionMap, this.optionMap, executor);
return ClientImpl.create(remoteRequestHandler, executor, requestClass, replyClass, clientClassLoader);
}
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/LocalRemoteRequestHandler.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/LocalRemoteRequestHandler.java 2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/LocalRemoteRequestHandler.java 2010-03-10 04:36:18 UTC (rev 5816)
@@ -23,11 +23,17 @@
package org.jboss.remoting3;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InvalidClassException;
import java.io.InvalidObjectException;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.io.Writer;
import java.util.NoSuchElementException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.jboss.marshalling.ByteInput;
+import org.jboss.marshalling.ByteOutput;
import org.jboss.marshalling.Pair;
import org.jboss.marshalling.cloner.ClassCloner;
import org.jboss.marshalling.cloner.ClassLoaderClassCloner;
@@ -108,6 +114,18 @@
return new CloningObjectSource((ObjectSource) original, outboundCloner);
} else if (original instanceof ObjectSink) {
return new CloningObjectSink(inboundCloner, (ObjectSink) original);
+ } else if (original instanceof InputStream) {
+ return original;
+ } else if (original instanceof OutputStream) {
+ return original;
+ } else if (original instanceof Reader) {
+ return original;
+ } else if (original instanceof Writer) {
+ return original;
+ } else if (original instanceof ByteInput) {
+ return original;
+ } else if (original instanceof ByteOutput) {
+ return original;
} else if (original instanceof LocalRequestHandlerConnector) {
return original;
} else if (original instanceof EndpointImpl) {
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundStream.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundStream.java 2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundStream.java 2010-03-10 04:36:18 UTC (rev 5816)
@@ -34,7 +34,7 @@
private final RemoteConnection remoteConnection;
private final Receiver receiver;
- private State state;
+ private State state = State.WAITING_FIRST;
private static final Logger log = Loggers.main;
InboundStream(final int id, final RemoteConnection remoteConnection, final Receiver receiver) {
@@ -49,7 +49,7 @@
final NioByteInput byteInput = new NioByteInput(
new NioByteInputHandler()
);
- receiver = new NioByteInputReceiver(byteInput);
+ receiver = new NioByteInputReceiver(byteInput, remoteConnection);
byteInputResult.accept(byteInput, this);
}
@@ -152,7 +152,9 @@
}
void sendAsyncStart() {
- doSend(RemoteProtocol.STREAM_ASYNC_START);
+ synchronized (this) {
+ doSend(RemoteProtocol.STREAM_ASYNC_START);
+ }
}
void sendAck() {
@@ -185,11 +187,13 @@
}
}
- private final class NioByteInputReceiver implements Receiver, NioByteInput.BufferReturn {
+ private static final class NioByteInputReceiver implements Receiver, NioByteInput.BufferReturn {
private final NioByteInput nioByteInput;
+ private final RemoteConnection remoteConnection;
- NioByteInputReceiver(final NioByteInput nioByteInput) {
+ NioByteInputReceiver(final NioByteInput nioByteInput, final RemoteConnection remoteConnection) {
this.nioByteInput = nioByteInput;
+ this.remoteConnection = remoteConnection;
}
public void push(final ByteBuffer buffer) {
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/MarshallerObjectSink.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/MarshallerObjectSink.java 2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/MarshallerObjectSink.java 2010-03-10 04:36:18 UTC (rev 5816)
@@ -34,14 +34,17 @@
}
public void accept(final T instance) throws IOException {
+ marshaller.writeByte(RemoteProtocol.OSINK_OBJECT);
marshaller.writeObject(instance);
}
public void flush() throws IOException {
+ marshaller.writeByte(RemoteProtocol.OSINK_FLUSH);
marshaller.flush();
}
public void close() throws IOException {
+ marshaller.writeByte(RemoteProtocol.OSINK_CLOSE);
marshaller.close();
}
}
Added: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryClassTable.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryClassTable.java (rev 0)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryClassTable.java 2010-03-10 04:36:18 UTC (rev 5816)
@@ -0,0 +1,97 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.util.Arrays;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import org.jboss.marshalling.ClassTable;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.remoting3.stream.ObjectSink;
+import org.jboss.remoting3.stream.ObjectSource;
+import org.jboss.xnio.OptionMap;
+
+final class PrimaryClassTable implements ClassTable {
+ static final PrimaryClassTable INSTANCE = new PrimaryClassTable();
+
+ private static final List<Class<?>> READ_TABLE;
+ private static final Map<Class<?>, Writer> WRITE_TABLE;
+
+ private static final int CLASS_MAX = 8;
+
+ private static final int CLASS_INPUT_STREAM = 0;
+ private static final int CLASS_OUTPUT_STREAM = 1;
+ private static final int CLASS_READER = 2;
+ private static final int CLASS_WRITER = 3;
+ private static final int CLASS_OBJECT_SOURCE = 4;
+ private static final int CLASS_OBJECT_SINK = 5;
+ private static final int CLASS_OPTION_MAP = 6;
+
+ static {
+ final Map<Class<?>, Writer> map = new IdentityHashMap<Class<?>, Writer>();
+ final List<Class<?>> list = Arrays.asList(new Class<?>[CLASS_MAX]);
+ add(map, list, InputStream.class, CLASS_INPUT_STREAM);
+ add(map, list, OutputStream.class, CLASS_OUTPUT_STREAM);
+ add(map, list, Reader.class, CLASS_READER);
+ add(map, list, java.io.Writer.class, CLASS_WRITER);
+ add(map, list, ObjectSource.class, CLASS_OBJECT_SOURCE);
+ add(map, list, ObjectSink.class, CLASS_OBJECT_SINK);
+ add(map, list, OptionMap.class, CLASS_OPTION_MAP);
+ READ_TABLE = list;
+ WRITE_TABLE = map;
+ }
+
+ private PrimaryClassTable() {
+ }
+
+ private static void add(Map<Class<?>, Writer> map, List<Class<?>> list, Class<?> clazz, int idx) {
+ map.put(clazz, new ByteWriter(idx));
+ list.set(idx, clazz);
+ }
+
+ public Writer getClassWriter(final Class<?> clazz) throws IOException {
+ return WRITE_TABLE.get(clazz);
+ }
+
+ public Class<?> readClass(final Unmarshaller unmarshaller) throws IOException, ClassNotFoundException {
+ return READ_TABLE.get(unmarshaller.readUnsignedByte());
+ }
+
+ private static final class ByteWriter implements Writer {
+ private final byte b;
+
+ public ByteWriter(final int b) {
+ this.b = (byte) b;
+ }
+
+ public void writeClass(final Marshaller marshaller, final Class<?> clazz) throws IOException {
+ marshaller.writeByte(b);
+ }
+ }
+}
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryExternalizerFactory.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryExternalizerFactory.java 2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryExternalizerFactory.java 2010-03-10 04:36:18 UTC (rev 5816)
@@ -23,24 +23,76 @@
package org.jboss.remoting3.remote;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.InvalidObjectException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.io.Writer;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.jboss.marshalling.AbstractExternalizer;
import org.jboss.marshalling.ClassExternalizerFactory;
import org.jboss.marshalling.Creator;
import org.jboss.marshalling.Externalizer;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.NioByteInput;
+import org.jboss.marshalling.NioByteOutput;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.marshalling.util.IntKeyMap;
+import org.jboss.remoting3.stream.ObjectSink;
+import org.jboss.remoting3.stream.ObjectSource;
+import org.jboss.remoting3.stream.ReaderInputStream;
+import org.jboss.remoting3.stream.WriterOutputStream;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
final class PrimaryExternalizerFactory implements ClassExternalizerFactory {
- static final ClassExternalizerFactory INSTANCE = new PrimaryExternalizerFactory();
+ private static final Logger log = Loggers.main;
+ private final RemoteConnectionHandler connectionHandler;
+ private final Executor executor;
+
+ final Externalizer inputStream = new InputStreamExternalizer();
+ final Externalizer outputStream = new OutputStreamExternalizer();
+ final Externalizer reader = new ReaderExternalizer();
+ final Externalizer writer = new WriterExternalizer();
+ final Externalizer objectSource = new ObjectSourceExternalizer();
+ final Externalizer objectSink = new ObjectSinkExternalizer();
+
+ PrimaryExternalizerFactory(final RemoteConnectionHandler connectionHandler) {
+ this.connectionHandler = connectionHandler;
+ executor = connectionHandler.getConnectionContext().getConnectionProviderContext().getExecutor();
+ }
+
public Externalizer getExternalizer(final Class<?> type) {
if (type == UnsentRequestHandlerConnector.class) {
return RequestHandlerConnectorExternalizer.INSTANCE;
+ } else if (InputStream.class.isAssignableFrom(type)) {
+ return inputStream;
+ } else if (OutputStream.class.isAssignableFrom(type)) {
+ return outputStream;
+ } else if (Reader.class.isAssignableFrom(type)) {
+ return reader;
+ } else if (Writer.class.isAssignableFrom(type)) {
+ return writer;
+ } else if (ObjectSource.class.isAssignableFrom(type)) {
+ return objectSource;
+ } else if (ObjectSink.class.isAssignableFrom(type)) {
+ return objectSink;
+ } else {
+ return null;
}
- return null;
}
- static class RequestHandlerConnectorExternalizer implements Externalizer {
+ static class RequestHandlerConnectorExternalizer extends AbstractExternalizer {
static final RequestHandlerConnectorExternalizer INSTANCE = new RequestHandlerConnectorExternalizer();
private static final long serialVersionUID = 8137262079765758375L;
@@ -53,9 +105,219 @@
public Object createExternal(final Class<?> subjectType, final ObjectInput input, final Creator defaultCreator) throws IOException, ClassNotFoundException {
return new ReceivedRequestHandlerConnector(RemoteConnectionHandler.getCurrent(), input.readInt());
}
+ }
- public void readExternal(final Object subject, final ObjectInput input) throws IOException, ClassNotFoundException {
- // n/a
+ class InputStreamExternalizer extends AbstractExternalizer {
+
+ public void writeExternal(final Object subject, final ObjectOutput output) throws IOException {
+ writeOutboundStream(output, (InputStream) subject);
}
+
+ public InputStream createExternal(final Class<?> subjectType, final ObjectInput input, final Creator defaultCreator) throws IOException, ClassNotFoundException {
+ return readInboundStream(input.readInt());
+ }
}
+
+ class OutputStreamExternalizer extends AbstractExternalizer {
+
+ public void writeExternal(final Object subject, final ObjectOutput output) throws IOException {
+ writeInboundStream(output, (OutputStream) subject);
+ }
+
+ public OutputStream createExternal(final Class<?> subjectType, final ObjectInput input, final Creator defaultCreator) throws IOException, ClassNotFoundException {
+ return readOutboundStream(input.readInt());
+ }
+ }
+
+ class ReaderExternalizer extends AbstractExternalizer {
+
+ public void writeExternal(final Object subject, final ObjectOutput output) throws IOException {
+ writeOutboundStream(output, new ReaderInputStream((Reader)subject, RemoteProtocol.UTF_8));
+ }
+
+ public Reader createExternal(final Class<?> subjectType, final ObjectInput input, final Creator defaultCreator) throws IOException, ClassNotFoundException {
+ return new InputStreamReader(readInboundStream(input.readInt()), RemoteProtocol.UTF_8);
+ }
+ }
+
+ class WriterExternalizer extends AbstractExternalizer {
+
+ public void writeExternal(final Object subject, final ObjectOutput output) throws IOException {
+ writeInboundStream(output, new WriterOutputStream((java.io.Writer)subject, RemoteProtocol.UTF_8));
+ }
+
+ public java.io.Writer createExternal(final Class<?> subjectType, final ObjectInput input, final Creator defaultCreator) throws IOException, ClassNotFoundException {
+ return new OutputStreamWriter(readOutboundStream(input.readInt()), RemoteProtocol.UTF_8);
+ }
+ }
+
+ class ObjectSourceExternalizer extends AbstractExternalizer {
+
+ public void writeExternal(final Object subject, final ObjectOutput output) throws IOException {
+ writeOutboundStream(output, (ObjectSource) subject);
+ }
+
+ public ObjectSource createExternal(final Class<?> subjectType, final ObjectInput input, final Creator defaultCreator) throws IOException, ClassNotFoundException {
+ boolean ok = false;
+ final Unmarshaller unmarshaller = connectionHandler.getMarshallerFactory().createUnmarshaller(connectionHandler.getMarshallingConfiguration());
+ try {
+ unmarshaller.start(readInboundStream(input.readInt()));
+ return new UnmarshallerObjectSource(unmarshaller);
+ } finally {
+ if (! ok) {
+ IoUtils.safeClose(unmarshaller);
+ }
+ }
+ }
+ }
+
+ class ObjectSinkExternalizer extends AbstractExternalizer {
+
+ public void writeExternal(final Object subject, final ObjectOutput output) throws IOException {
+ writeInboundStream(output, (ObjectSink) subject);
+ }
+
+ public ObjectSink createExternal(final Class<?> subjectType, final ObjectInput input, final Creator defaultCreator) throws IOException, ClassNotFoundException {
+ boolean ok = false;
+ final Marshaller marshaller = connectionHandler.getMarshallerFactory().createMarshaller(connectionHandler.getMarshallingConfiguration());
+ try {
+ marshaller.start(readOutboundStream(input.readInt()));
+ return new MarshallerObjectSink(marshaller);
+ } finally {
+ if (! ok) {
+ IoUtils.safeClose(marshaller);
+ }
+ }
+ }
+ }
+
+ private void writeInboundStream(final ObjectOutput marshaller, final ObjectSink objectSink) throws IOException {
+ final IntKeyMap<InboundStream> inboundStreams = connectionHandler.getInboundStreams();
+ final Random random = connectionHandler.getRandom();
+ int id;
+ synchronized (inboundStreams) {
+ while (inboundStreams.containsKey(id = random.nextInt() & ~1));
+ inboundStreams.put(id, new InboundStream(id, connectionHandler.getRemoteConnection(), new InboundStream.ByteInputResult() {
+ public void accept(final NioByteInput nioByteInput, final InboundStream inboundStream) {
+ try {
+ executor.execute(new InboundObjectSinkReceiveTask(nioByteInput, inboundStream, connectionHandler, objectSink));
+ } catch (RejectedExecutionException e) {
+ log.warn("Unable to start task for forwarded stream: %s", e);
+ inboundStream.sendAsyncException();
+ }
+ }
+ }));
+ }
+ marshaller.writeInt(id);
+ }
+
+ private NioByteInput readInboundStream(final int id) throws InvalidObjectException {
+ final IntKeyMap<InboundStream> inboundStreams = connectionHandler.getInboundStreams();
+ final AtomicReference<NioByteInput> ref = new AtomicReference<NioByteInput>();
+ final InboundStream inboundStream;
+ synchronized (inboundStreams) {
+ if (inboundStreams.containsKey(id)) {
+ throw duplicateId(id);
+ }
+ inboundStream = new InboundStream(id, connectionHandler.getRemoteConnection(), new InboundStream.ByteInputResult() {
+ public void accept(final NioByteInput nioByteInput, final InboundStream inboundStream) {
+ ref.set(nioByteInput);
+ }
+ });
+ inboundStreams.put(id, inboundStream);
+ }
+ synchronized (inboundStream) {
+ inboundStream.sendAsyncStart();
+ }
+ return ref.get();
+ }
+
+ private void writeOutboundStream(final ObjectOutput marshaller, final ObjectSource objectSource) throws IOException {
+ final IntKeyMap<OutboundStream> outboundStreams = connectionHandler.getOutboundStreams();
+ final Random random = connectionHandler.getRandom();
+ int id;
+ final OutboundStream outboundStream;
+ synchronized (outboundStreams) {
+ while (outboundStreams.containsKey(id = random.nextInt() | 1));
+ outboundStreams.put(id, outboundStream = new OutboundStream(id, connectionHandler.getRemoteConnection()));
+ }
+ marshaller.writeInt(id);
+ try {
+ executor.execute(new OutboundObjectSourceTransmitTask(objectSource, outboundStream, connectionHandler));
+ } catch (RejectedExecutionException e) {
+ log.warn("Unable to start task for forwarded stream: %s", e);
+ outboundStream.sendException();
+ }
+ }
+
+ private NioByteOutput readOutboundStream(final int id) throws InvalidObjectException {
+ final IntKeyMap<OutboundStream> outboundStreams = connectionHandler.getOutboundStreams();
+ final OutboundStream outboundStream;
+ synchronized (outboundStreams) {
+ if (outboundStreams.containsKey(id)) {
+ throw duplicateId(id);
+ }
+ outboundStream = new OutboundStream(id, connectionHandler.getRemoteConnection());
+ outboundStreams.put(id, outboundStream);
+ }
+ synchronized (outboundStream) {
+ outboundStream.asyncStart();
+ }
+ return new NioByteOutput(new NioByteOutput.BufferWriter() {
+ public ByteBuffer getBuffer() {
+ return outboundStream.getBuffer();
+ }
+
+ public void accept(final ByteBuffer buffer, final boolean eof) throws IOException {
+ outboundStream.send(buffer);
+ if (eof) {
+ outboundStream.sendEof();
+ }
+ }
+
+ public void flush() throws IOException {
+ }
+ });
+ }
+
+ /**
+ * This looks backwards but it really isn't. When we write an OutputStream, we want the remote side to send us inbound
+ * to feed it.
+ *
+ * @param marshaller the marshaller
+ * @param outputStream the output stream
+ * @throws IOException if an I/O error occurs
+ */
+ private void writeInboundStream(final ObjectOutput marshaller, final OutputStream outputStream) throws IOException {
+ final IntKeyMap<InboundStream> inboundStreams = connectionHandler.getInboundStreams();
+ final Random random = connectionHandler.getRandom();
+ int id;
+ synchronized (inboundStreams) {
+ while (inboundStreams.containsKey(id = random.nextInt() & ~1));
+ inboundStreams.put(id, new InboundStream(id, connectionHandler.getRemoteConnection(), outputStream));
+ }
+ marshaller.writeInt(id);
+ }
+
+ private void writeOutboundStream(final ObjectOutput marshaller, final InputStream inputStream) throws IOException {
+ final IntKeyMap<OutboundStream> outboundStreams = connectionHandler.getOutboundStreams();
+ final Random random = connectionHandler.getRandom();
+ int id;
+ final OutboundStream outboundStream;
+ synchronized (outboundStreams) {
+ while (outboundStreams.containsKey(id = random.nextInt() | 1));
+ outboundStreams.put(id, outboundStream = new OutboundStream(id, connectionHandler.getRemoteConnection()));
+ }
+ marshaller.writeInt(id);
+ try {
+ executor.execute(new OutboundInputStreamTransmitTask(inputStream, outboundStream));
+ } catch (RejectedExecutionException e) {
+ log.warn("Unable to start task for forwarded stream: %s", e);
+ outboundStream.sendException();
+ }
+ }
+
+ private static InvalidObjectException duplicateId(final int id) {
+ return new InvalidObjectException("Duplicated stream ID " + id);
+ }
}
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryObjectTable.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryObjectTable.java 2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryObjectTable.java 2010-03-10 04:36:18 UTC (rev 5816)
@@ -23,41 +23,66 @@
package org.jboss.remoting3.remote;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Reader;
-import java.io.StreamCorruptedException;
-import java.util.Random;
-import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
+import java.util.Arrays;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
import org.jboss.marshalling.Marshaller;
-import org.jboss.marshalling.NioByteInput;
import org.jboss.marshalling.ObjectTable;
import org.jboss.marshalling.Unmarshaller;
-import org.jboss.marshalling.util.IntKeyMap;
import org.jboss.remoting3.Endpoint;
-import org.jboss.remoting3.stream.ObjectSink;
-import org.jboss.remoting3.stream.ObjectSource;
-import org.jboss.remoting3.stream.ReaderInputStream;
-import org.jboss.remoting3.stream.WriterOutputStream;
import org.jboss.xnio.log.Logger;
final class PrimaryObjectTable implements ObjectTable {
- private final Endpoint endpoint;
- private final RemoteConnectionHandler connectionHandler;
- private final Executor executor;
private static final Logger log = Loggers.main;
- PrimaryObjectTable(final Endpoint endpoint, final RemoteConnectionHandler connectionHandler) {
- this.endpoint = endpoint;
- this.connectionHandler = connectionHandler;
- executor = this.connectionHandler.getConnectionContext().getConnectionProviderContext().getExecutor();
+ private final Map<Object, Writer> writerMap;
+ private final List<Object> readerList;
+
+ // Object table types
+
+ static final byte OBJ_ENDPOINT = 0;
+ static final byte OBJ_CLIENT_CONNECTOR = 1;
+ static final byte OBJ_INPUT_STREAM = 2;
+ static final byte OBJ_OUTPUT_STREAM = 3;
+ static final byte OBJ_READER = 4;
+ static final byte OBJ_WRITER = 5;
+ static final byte OBJ_OBJECT_SOURCE = 6;
+ static final byte OBJ_OBJECT_SINK = 7;
+
+ PrimaryObjectTable(final Endpoint endpoint, final PrimaryExternalizerFactory externalizerFactory) {
+ final Map<Object, Writer> map = new IdentityHashMap<Object, Writer>();
+ final List<Object> list = Arrays.asList(new Object[8]);
+ add(map, list, 0, endpoint);
+ add(map, list, 1, PrimaryExternalizerFactory.RequestHandlerConnectorExternalizer.INSTANCE);
+ add(map, list, 2, externalizerFactory.inputStream);
+ add(map, list, 3, externalizerFactory.outputStream);
+ add(map, list, 4, externalizerFactory.reader);
+ add(map, list, 5, externalizerFactory.writer);
+ add(map, list, 6, externalizerFactory.objectSource);
+ add(map, list, 7, externalizerFactory.objectSink);
+ readerList = list;
+ writerMap = map;
}
- private static final Writer ZERO_WRITER = new ByteWriter(RemoteProtocol.OBJ_ENDPOINT);
- private static final Writer ONE_WRITER = new ByteWriter(RemoteProtocol.OBJ_CLIENT_CONNECTOR);
+ private static void add(final Map<Object, Writer> map, final List<Object> list, final int idx, final Object instance) {
+ final ByteWriter writer = CACHED_WRITERS[idx];
+ map.put(instance, writer);
+ list.set(idx, instance);
+ }
+ private static final ByteWriter[] CACHED_WRITERS = {
+ new ByteWriter(0),
+ new ByteWriter(1),
+ new ByteWriter(2),
+ new ByteWriter(3),
+ new ByteWriter(4),
+ new ByteWriter(5),
+ new ByteWriter(6),
+ new ByteWriter(7),
+ };
+
private static final class ByteWriter implements Writer {
private final byte b;
@@ -68,140 +93,18 @@
public void writeObject(final Marshaller marshaller, final Object object) throws IOException {
marshaller.writeByte(b);
}
- }
- public Writer getObjectWriter(final Object object) throws IOException {
- if (object == endpoint) {
- return ZERO_WRITER;
- } else if (object == PrimaryExternalizerFactory.RequestHandlerConnectorExternalizer.INSTANCE) {
- return ONE_WRITER;
- } else if (object instanceof InputStream) {
- return new Writer() {
- public void writeObject(final Marshaller marshaller, final Object object) throws IOException {
- writeOutboundStream(marshaller, RemoteProtocol.OBJ_INPUT_STREAM, (InputStream) object);
- }
- };
- } else if (object instanceof OutputStream) {
- return new Writer() {
- public void writeObject(final Marshaller marshaller, final Object object) throws IOException {
- writeInboundStream(marshaller, RemoteProtocol.OBJ_OUTPUT_STREAM, (OutputStream) object);
- }
- };
- } else if (object instanceof Reader) {
- return new Writer() {
- public void writeObject(final Marshaller marshaller, final Object object) throws IOException {
- writeOutboundStream(marshaller, RemoteProtocol.OBJ_READER, new ReaderInputStream((Reader)object, RemoteProtocol.UTF_8));
- }
- };
- } else if (object instanceof java.io.Writer) {
- return new Writer() {
- public void writeObject(final Marshaller marshaller, final Object object) throws IOException {
- writeInboundStream(marshaller, RemoteProtocol.OBJ_WRITER, new WriterOutputStream((java.io.Writer)object, RemoteProtocol.UTF_8));
- }
- };
- } else if (object instanceof ObjectSource) {
- return new Writer() {
- public void writeObject(final Marshaller marshaller, final Object object) throws IOException {
- writeOutboundStream(marshaller, RemoteProtocol.OBJ_OBJECT_SOURCE, (ObjectSource) object);
- }
- };
- } else if (object instanceof ObjectSink) {
- return new Writer() {
- public void writeObject(final Marshaller marshaller, final Object object) throws IOException {
- writeInboundStream(marshaller, RemoteProtocol.OBJ_OBJECT_SINK, (ObjectSink) object);
- }
- };
- } else {
- return null;
+ public int getByte() {
+ return b & 0xff;
}
}
- private void writeInboundStream(final Marshaller marshaller, final byte code, final ObjectSink objectSink) throws IOException {
- marshaller.writeByte(code);
- final IntKeyMap<InboundStream> inboundStreams = connectionHandler.getInboundStreams();
- final Random random = connectionHandler.getRandom();
- int id;
- synchronized (inboundStreams) {
- while (inboundStreams.containsKey(id = random.nextInt() & ~1));
- inboundStreams.put(id, new InboundStream(id, connectionHandler.getRemoteConnection(), new InboundStream.ByteInputResult() {
- public void accept(final NioByteInput nioByteInput, final InboundStream inboundStream) {
- try {
- executor.execute(new InboundObjectSinkReceiveTask(nioByteInput, inboundStream, connectionHandler, objectSink));
- } catch (RejectedExecutionException e) {
- log.warn("Unable to start task for forwarded stream: %s", e);
- inboundStream.sendAsyncException();
- }
- }
- }));
- }
- marshaller.writeInt(id);
+ public Writer getObjectWriter(final Object object) throws IOException {
+ return writerMap.get(object);
}
- private void writeOutboundStream(final Marshaller marshaller, final byte code, final ObjectSource objectSource) throws IOException {
- marshaller.writeByte(code);
- final IntKeyMap<OutboundStream> outboundStreams = connectionHandler.getOutboundStreams();
- final Random random = connectionHandler.getRandom();
- int id;
- final OutboundStream outboundStream;
- synchronized (outboundStreams) {
- while (outboundStreams.containsKey(id = random.nextInt() | 1));
- outboundStreams.put(id, outboundStream = new OutboundStream(id, connectionHandler.getRemoteConnection()));
- }
- marshaller.writeInt(id);
- try {
- executor.execute(new OutboundObjectSourceTransmitTask(objectSource, outboundStream, connectionHandler));
- } catch (RejectedExecutionException e) {
- log.warn("Unable to start task for forwarded stream: %s", e);
- outboundStream.sendException();
- }
- }
-
- /**
- * This looks backwards but it really isn't. When we write an OutputStream, we want the remote side to send us inbound
- * to feed it.
- *
- * @param marshaller the marshaller
- * @param code the code
- * @param outputStream the output stream
- * @throws IOException if an I/O error occurs
- */
- private void writeInboundStream(final Marshaller marshaller, final byte code, final OutputStream outputStream) throws IOException {
- marshaller.writeByte(code);
- final IntKeyMap<InboundStream> inboundStreams = connectionHandler.getInboundStreams();
- final Random random = connectionHandler.getRandom();
- int id;
- synchronized (inboundStreams) {
- while (inboundStreams.containsKey(id = random.nextInt() & ~1));
- inboundStreams.put(id, new InboundStream(id, connectionHandler.getRemoteConnection(), outputStream));
- }
- marshaller.writeInt(id);
- }
-
- private void writeOutboundStream(final Marshaller marshaller, final byte code, final InputStream inputStream) throws IOException {
- marshaller.writeByte(code);
- final IntKeyMap<OutboundStream> outboundStreams = connectionHandler.getOutboundStreams();
- final Random random = connectionHandler.getRandom();
- int id;
- final OutboundStream outboundStream;
- synchronized (outboundStreams) {
- while (outboundStreams.containsKey(id = random.nextInt() | 1));
- outboundStreams.put(id, outboundStream = new OutboundStream(id, connectionHandler.getRemoteConnection()));
- }
- marshaller.writeInt(id);
- try {
- executor.execute(new OutboundInputStreamTransmitTask(inputStream, outboundStream));
- } catch (RejectedExecutionException e) {
- log.warn("Unable to start task for forwarded stream: %s", e);
- outboundStream.sendException();
- }
- }
-
public Object readObject(final Unmarshaller unmarshaller) throws IOException, ClassNotFoundException {
final int id = unmarshaller.readUnsignedByte();
- switch (id) {
- case RemoteProtocol.OBJ_ENDPOINT: return endpoint;
- case RemoteProtocol.OBJ_CLIENT_CONNECTOR: return PrimaryExternalizerFactory.RequestHandlerConnectorExternalizer.INSTANCE;
- default: throw new StreamCorruptedException("Unknown object table ID byte " + id);
- }
+ return readerList.get(id);
}
}
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java 2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java 2010-03-10 04:36:18 UTC (rev 5816)
@@ -74,8 +74,11 @@
this.remoteConnection = remoteConnection;
this.marshallerFactory = marshallerFactory;
final MarshallingConfiguration config = new MarshallingConfiguration();
- config.setClassExternalizerFactory(PrimaryExternalizerFactory.INSTANCE);
- config.setObjectTable(new PrimaryObjectTable(connectionContext.getConnectionProviderContext().getEndpoint(), this));
+ final PrimaryExternalizerFactory externalizerFactory = new PrimaryExternalizerFactory(this);
+ final PrimaryObjectTable objectTable = new PrimaryObjectTable(connectionContext.getConnectionProviderContext().getEndpoint(), externalizerFactory);
+ config.setClassTable(PrimaryClassTable.INSTANCE);
+ config.setClassExternalizerFactory(externalizerFactory);
+ config.setObjectTable(objectTable);
config.setStreamHeader(Marshalling.nullStreamHeader());
// fixed for now (v0)
config.setVersion(2);
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java 2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java 2010-03-10 04:36:18 UTC (rev 5816)
@@ -87,17 +87,6 @@
static final byte GREETING_ENDPOINT_NAME = 2; // sent by client & server
static final byte GREETING_MARSHALLER_VERSION = 3; // sent by client & server
- // Object table types
-
- static final byte OBJ_ENDPOINT = 0;
- static final byte OBJ_CLIENT_CONNECTOR = 1;
- static final byte OBJ_INPUT_STREAM = 2;
- static final byte OBJ_OUTPUT_STREAM = 3;
- static final byte OBJ_READER = 4;
- static final byte OBJ_WRITER = 5;
- static final byte OBJ_OBJECT_SOURCE = 6;
- static final byte OBJ_OBJECT_SINK = 7;
-
// Object sink stream commands
static final int OSINK_OBJECT = 0;
@@ -111,6 +100,7 @@
static final Charset UTF_8 = Charset.forName("UTF8");
+
/**
* Create an instance of the connection provider for the "remote" protocol.
*
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/UnmarshallerObjectSource.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/UnmarshallerObjectSource.java 2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/UnmarshallerObjectSource.java 2010-03-10 04:36:18 UTC (rev 5816)
@@ -23,25 +23,63 @@
package org.jboss.remoting3.remote;
import java.io.IOException;
+import java.io.InvalidObjectException;
import java.util.NoSuchElementException;
import org.jboss.marshalling.Unmarshaller;
import org.jboss.remoting3.stream.ObjectSource;
final class UnmarshallerObjectSource<T> implements ObjectSource<T> {
private final Unmarshaller unmarshaller;
+ private State state;
+ enum State {
+ NEW,
+ READY,
+ DONE,
+ }
+
UnmarshallerObjectSource(final Unmarshaller unmarshaller) {
this.unmarshaller = unmarshaller;
}
public boolean hasNext() throws IOException {
- return false;
+ synchronized (this) {
+ if (state == State.NEW) {
+ final int cmd = unmarshaller.readUnsignedByte();
+ if (cmd == RemoteProtocol.OSOURCE_OBJECT) {
+ state = State.READY;
+ } else {
+ state = State.DONE;
+ unmarshaller.close();
+ return false;
+ }
+ }
+ return state == State.READY;
+ }
}
+ @SuppressWarnings({ "unchecked" })
public T next() throws NoSuchElementException, IOException {
- return null;
+ synchronized (this) {
+ if (hasNext()) {
+ try {
+ final T obj = (T) unmarshaller.readObject();
+ state = State.NEW;
+ return obj;
+ } catch (ClassNotFoundException e) {
+ state = State.NEW;
+ throw new InvalidObjectException("Class not found: " + e);
+ }
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
}
public void close() throws IOException {
+ synchronized (this) {
+ state = State.DONE;
+ unmarshaller.close();
+ }
}
}
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Streams.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Streams.java 2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Streams.java 2010-03-10 04:36:18 UTC (rev 5816)
@@ -24,6 +24,8 @@
import java.io.EOFException;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.nio.charset.UnsupportedCharsetException;
@@ -33,6 +35,8 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.Executor;
+import org.jboss.marshalling.ByteInput;
+import org.jboss.marshalling.ByteOutput;
import org.jboss.marshalling.Pair;
import org.jboss.xnio.Cancellable;
import org.jboss.xnio.FutureResult;
@@ -277,6 +281,116 @@
return new EnumerationObjectSource<T>(enumeration);
}
+ /**
+ * Copy from one stream to another.
+ *
+ * @param input the source stream
+ * @param output the destination stream
+ * @param close {@code true} if the input and output streams should be closed
+ * @param bufferSize the buffer size
+ * @throws IOException if an I/O error occurs
+ */
+ public static void copyStream(InputStream input, OutputStream output, boolean close, int bufferSize) throws IOException {
+ final byte[] buffer = new byte[bufferSize];
+ int res;
+ try {
+ for (;;) {
+ res = input.read(buffer);
+ if (res == -1) {
+ if (close) {
+ input.close();
+ output.close();
+ }
+ return;
+ }
+ output.write(buffer, 0, res);
+ }
+ } finally {
+ if (close) {
+ IoUtils.safeClose(input);
+ IoUtils.safeClose(output);
+ }
+ }
+ }
+
+ /**
+ * Copy from one stream to another. A default buffer size is assumed.
+ *
+ * @param input the source stream
+ * @param output the destination stream
+ * @param close {@code true} if the input and output streams should be closed
+ * @throws IOException if an I/O error occurs
+ */
+ public static void copyStream(InputStream input, OutputStream output, boolean close) throws IOException {
+ copyStream(input, output, close, 8192);
+ }
+
+ /**
+ * Copy from one stream to another. A default buffer size is assumed, and both streams are closed on completion.
+ *
+ * @param input the source stream
+ * @param output the destination stream
+ * @throws IOException if an I/O error occurs
+ */
+ public static void copyStream(InputStream input, OutputStream output) throws IOException {
+ copyStream(input, output, true, 8192);
+ }
+
+ /**
+ * Copy from one stream to another.
+ *
+ * @param input the source stream
+ * @param output the destination stream
+ * @param close {@code true} if the input and output streams should be closed
+ * @param bufferSize the buffer size
+ * @throws IOException if an I/O error occurs
+ */
+ public static void copyStream(ByteInput input, ByteOutput output, boolean close, int bufferSize) throws IOException {
+ final byte[] buffer = new byte[bufferSize];
+ int res;
+ try {
+ for (;;) {
+ res = input.read(buffer);
+ if (res == -1) {
+ if (close) {
+ input.close();
+ output.close();
+ }
+ return;
+ }
+ output.write(buffer, 0, res);
+ }
+ } finally {
+ if (close) {
+ IoUtils.safeClose(input);
+ IoUtils.safeClose(output);
+ }
+ }
+ }
+
+ /**
+ * Copy from one stream to another. A default buffer size is assumed.
+ *
+ * @param input the source stream
+ * @param output the destination stream
+ * @param close {@code true} if the input and output streams should be closed
+ * @throws IOException if an I/O error occurs
+ */
+ public static void copyStream(ByteInput input, ByteOutput output, boolean close) throws IOException {
+ copyStream(input, output, close, 8192);
+ }
+
+ /**
+ * Copy from one stream to another. A default buffer size is assumed, and both streams are closed on completion.
+ *
+ * @param input the source stream
+ * @param output the destination stream
+ * @throws IOException if an I/O error occurs
+ */
+ public static void copyStream(ByteInput input, ByteOutput output) throws IOException {
+ copyStream(input, output, true, 8192);
+ }
+
static Charset getCharset(final String charsetName) throws UnsupportedEncodingException {
try {
return Charset.forName(charsetName);
Modified: remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java 2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java 2010-03-10 04:36:18 UTC (rev 5816)
@@ -22,7 +22,12 @@
package org.jboss.remoting3.test;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Reader;
import java.util.concurrent.atomic.AtomicReference;
import org.jboss.remoting3.Client;
import org.jboss.remoting3.ClientConnector;
@@ -37,6 +42,7 @@
import org.jboss.remoting3.RequestContext;
import org.jboss.remoting3.RequestListener;
import org.jboss.remoting3.ServiceNotFoundException;
+import org.jboss.remoting3.stream.Streams;
import org.jboss.xnio.IoUtils;
import org.jboss.xnio.OptionMap;
import org.jboss.xnio.Options;
@@ -57,6 +63,7 @@
@BeforeTest
public void setUp() throws IOException {
+ log.info("::::: STARTING TEST FOR: %s :::::", getClass().getName());
enter();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
@@ -315,6 +322,111 @@
}
}
+ public void testInputStream() throws Throwable {
+ enter();
+ try {
+ final Registration registration = endpoint.serviceBuilder(InputStream.class, InputStream.class).setServiceType("streamtest").setClientListener(new ClientListener<InputStream, InputStream>() {
+ public RequestListener<InputStream, InputStream> handleClientOpen(final ClientContext clientContext, final OptionMap optionMap) {
+ return new RequestListener<InputStream, InputStream>() {
+ public void handleRequest(final RequestContext<InputStream> context, final InputStream request) throws RemoteExecutionException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ Streams.copyStream(request, baos);
+ } catch (IOException e) {
+ try {
+ context.sendFailure("I/O error", e);
+ } catch (IOException e1) {
+ // blah
+ }
+ }
+ try {
+ context.sendReply(new ByteArrayInputStream(baos.toByteArray()));
+ } catch (IOException e) {
+ // blah
+ }
+ }
+ };
+ }
+ }).register();
+ try {
+ final Connection connection = getConnection();
+ try {
+ final Client<InputStream, InputStream> client = connection.openClient("streamtest", "*", InputStream.class, InputStream.class, InvocationTestBase.class.getClassLoader(), OptionMap.EMPTY).get();
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ Streams.copyStream(client.invoke(new ByteArrayInputStream("This is a test!!!".getBytes())), baos);
+ assertEquals(new String(baos.toByteArray()), "This is a test!!!");
+ } finally {
+ IoUtils.safeClose(client);
+ client.awaitClosedUninterruptibly();
+ }
+ } finally {
+ IoUtils.safeClose(connection);
+ connection.awaitClosedUninterruptibly();
+ }
+ } finally {
+ IoUtils.safeClose(registration);
+ registration.awaitClosedUninterruptibly();
+ }
+ } finally {
+ exit();
+ }
+ }
+
+ public void testOutputStream() throws Throwable {
+ enter();
+ try {
+ final ByteArrayOutputStream os = new ByteArrayOutputStream();
+ final Registration registration = endpoint.serviceBuilder(OutputStream.class, OutputStream.class).setServiceType("streamtest").setClientListener(new ClientListener<OutputStream, OutputStream>() {
+ public RequestListener<OutputStream, OutputStream> handleClientOpen(final ClientContext clientContext, final OptionMap optionMap) {
+ return new RequestListener<OutputStream, OutputStream>() {
+ public void handleRequest(final RequestContext<OutputStream> context, final OutputStream request) throws RemoteExecutionException {
+ try {
+ Streams.copyStream(new ByteArrayInputStream("This is a test...".getBytes()), request);
+ } catch (IOException e) {
+ try {
+ context.sendFailure("I/O error", e);
+ } catch (IOException e1) {
+ // blah
+ }
+ }
+ try {
+ context.sendReply(os);
+ } catch (IOException e) {
+ // blah
+ }
+ }
+ };
+ }
+ }).register();
+ try {
+ final Connection connection = getConnection();
+ try {
+ final Client<OutputStream, OutputStream> client = connection.openClient("streamtest", "*", OutputStream.class, OutputStream.class, InvocationTestBase.class.getClassLoader(), OptionMap.EMPTY).get();
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final OutputStream result = client.invoke(baos);
+ assertEquals(new String(baos.toByteArray()), "This is a test...");
+ Streams.copyStream(new ByteArrayInputStream("This is a test #2...".getBytes()), result);
+ // this test can't finish in time
+// assertEquals(new String(os.toByteArray()), "This is a test #2...");
+ } finally {
+ IoUtils.safeClose(client);
+ client.awaitClosedUninterruptibly();
+ }
+ } finally {
+ IoUtils.safeClose(connection);
+ connection.awaitClosedUninterruptibly();
+ }
+ } finally {
+ IoUtils.safeClose(registration);
+ registration.awaitClosedUninterruptibly();
+ }
+ } finally {
+ exit();
+ }
+ }
+
@AfterTest
public void tearDown() throws IOException {
enter();
Modified: remoting3/trunk/jboss-remoting/src/test/resources/logging.properties
===================================================================
--- remoting3/trunk/jboss-remoting/src/test/resources/logging.properties 2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/test/resources/logging.properties 2010-03-10 04:36:18 UTC (rev 5816)
@@ -24,7 +24,7 @@
loggers=javax.security.sasl,org.jboss.xnio.ssl
# Root logger configuration
-logger.level=${test.leve:INFO}
+logger.level=${test.level:INFO}
logger.handlers=CONSOLE
# Configure javax.security.sasl to be less verbose by default
14 years, 8 months
JBoss Remoting SVN: r5815 - remoting3/trunk/jboss-remoting.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2010-03-09 15:15:58 -0500 (Tue, 09 Mar 2010)
New Revision: 5815
Modified:
remoting3/trunk/jboss-remoting/pom.xml
Log:
License info
Modified: remoting3/trunk/jboss-remoting/pom.xml
===================================================================
--- remoting3/trunk/jboss-remoting/pom.xml 2010-03-09 19:55:38 UTC (rev 5814)
+++ remoting3/trunk/jboss-remoting/pom.xml 2010-03-09 20:15:58 UTC (rev 5815)
@@ -39,6 +39,18 @@
<artifactId>jboss-remoting</artifactId>
<packaging>jar</packaging>
<version>3.1.0.Beta2-SNAPSHOT</version>
+
+ <licenses>
+ <license>
+ <name>LGPL 2.1</name>
+ <url>http://www.gnu.org/licenses/lgpl-2.1.html</url>
+ <distribution>repo</distribution>
+ </license>
+ </licenses>
+ <organization>
+ <name>JBoss, a division of Red Hat, Inc.</name>
+ <url>http://www.jboss.org/</url>
+ </organization>
<dependencies>
<dependency>
<groupId>org.jboss.xnio</groupId>
@@ -153,6 +165,12 @@
<manifest>
<mainClass>org.jboss.remoting3.Version</mainClass>
</manifest>
+ <addDefaultImplementationEntries/>
+ <manifestEntries>
+ <Specification-Title>${pom.name}</Specification-Title>
+ <Specification-Version>3.1</Specification-Version>
+ <Specification-Vendor>${pom.organization.name}</Specification-Vendor>
+ </manifestEntries>
</archive>
</configuration>
</plugin>
14 years, 8 months
JBoss Remoting SVN: r5814 - in remoting3/trunk/jboss-remoting/src: test/java/org/jboss/remoting3/test and 1 other directory.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2010-03-09 14:55:38 -0500 (Tue, 09 Mar 2010)
New Revision: 5814
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java
Log:
JBREM-1208 - make sure client open options are sent across
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java 2010-03-09 14:53:39 UTC (rev 5813)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java 2010-03-09 19:55:38 UTC (rev 5814)
@@ -25,7 +25,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
+import org.jboss.marshalling.ByteOutput;
+import org.jboss.marshalling.Marshaller;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
@@ -67,8 +68,6 @@
private final IntKeyMap<OutboundStream> outboundStreams = new IntKeyMap<OutboundStream>();
private final IntKeyMap<InboundStream> inboundStreams = new IntKeyMap<InboundStream>();
- private final AtomicBoolean closed = new AtomicBoolean();
-
RemoteConnectionHandler(final ConnectionHandlerContext connectionContext, final RemoteConnection remoteConnection, final MarshallerFactory marshallerFactory) {
super(connectionContext.getConnectionProviderContext().getExecutor());
this.connectionContext = connectionContext;
@@ -101,8 +100,17 @@
buffer.put((byte) 0);
Buffers.putModifiedUtf8(buffer, groupName);
buffer.put((byte) 0);
- buffer.flip();
- remoteConnection.sendBlocking(buffer, true);
+ final ByteOutput output = Marshalling.createByteOutput(buffer);
+ final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
+ try {
+ marshaller.start(output);
+ marshaller.writeObject(optionMap);
+ marshaller.finish();
+ buffer.flip();
+ remoteConnection.sendBlocking(buffer, true);
+ } finally {
+ IoUtils.safeClose(marshaller);
+ }
} catch (IOException e) {
result.setException(e);
} catch (Throwable e) {
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java 2010-03-09 14:53:39 UTC (rev 5813)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java 2010-03-09 19:55:38 UTC (rev 5814)
@@ -25,10 +25,14 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
+import org.jboss.marshalling.ByteInput;
+import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.NioByteInput;
+import org.jboss.marshalling.Unmarshaller;
import org.jboss.marshalling.util.IntKeyMap;
import org.jboss.remoting3.ReplyException;
import org.jboss.remoting3.ServiceNotFoundException;
+import org.jboss.remoting3.ServiceOpenException;
import org.jboss.remoting3.ServiceURI;
import org.jboss.remoting3.spi.LocalReplyHandler;
import org.jboss.remoting3.spi.LocalRequestHandler;
@@ -60,11 +64,36 @@
final int id = buffer.getInt();
final String serviceType = Buffers.getModifiedUtf8Z(buffer);
final String groupName = Buffers.getModifiedUtf8Z(buffer);
- final LocalRequestHandler handler;
- handler = connectionHandler.getConnectionContext().openService(serviceType, groupName, OptionMap.EMPTY);
final Pool<ByteBuffer> bufferPool = connectionHandler.getBufferPool();
+ final ByteInput input = Marshalling.createByteInput(buffer);
+ final OptionMap optionMap;
final ByteBuffer outBuf = bufferPool.allocate();
try {
+ try {
+ final Unmarshaller unmarshaller = remoteConnectionHandler.getMarshallerFactory().createUnmarshaller(remoteConnectionHandler.getMarshallingConfiguration());
+ try {
+ unmarshaller.start(input);
+ optionMap = (OptionMap) unmarshaller.readObject();
+ unmarshaller.finish();
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (Exception e) {
+ log.error("Failed to unmarshall service request option map: %s", e);
+ outBuf.putInt(RemoteConnectionHandler.LENGTH_PLACEHOLDER);
+ outBuf.put(RemoteProtocol.SERVICE_ERROR);
+ outBuf.putInt(id);
+ outBuf.flip();
+ try {
+ connection.sendBlocking(outBuf, true);
+ } catch (IOException e1) {
+ // the channel has suddenly failed
+ log.trace("Send failed: %s", e);
+ }
+ return;
+ }
+ final LocalRequestHandler handler;
+ handler = connectionHandler.getConnectionContext().openService(serviceType, groupName, optionMap);
outBuf.putInt(RemoteConnectionHandler.LENGTH_PLACEHOLDER);
if (handler == null) {
// no matching service found
@@ -110,6 +139,24 @@
}
return;
}
+ case RemoteProtocol.SERVICE_ERROR: {
+ final int id = buffer.getInt();
+ final OutboundClient client;
+ final IntKeyMap<OutboundClient> outboundClients = connectionHandler.getOutboundClients();
+ synchronized (outboundClients) {
+ client = outboundClients.remove(id);
+ }
+ if (client == null) {
+ log.trace("Received service-error for unknown client %d", Integer.valueOf(id));
+ return;
+ }
+ synchronized (client) {
+ // todo assert client state == waiting
+ client.getResult().setException(new ServiceOpenException("Remote side failed to open service"));
+ client.setState(OutboundClient.State.CLOSED);
+ }
+ return;
+ }
case RemoteProtocol.SERVICE_CLIENT_OPENED: {
final int id = buffer.getInt();
final OutboundClient client;
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java 2010-03-09 14:53:39 UTC (rev 5813)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java 2010-03-09 19:55:38 UTC (rev 5814)
@@ -57,8 +57,9 @@
static final byte SERVICE_REQUEST = 16;
static final byte SERVICE_NOT_FOUND = 17;
static final byte SERVICE_CLIENT_OPENED = 18;
- static final byte CLIENT_CLOSE = 19;
- static final byte CLIENT_ASYNC_CLOSE = 20; // close from the server side
+ static final byte SERVICE_ERROR = 19;
+ static final byte CLIENT_CLOSE = 20;
+ static final byte CLIENT_ASYNC_CLOSE = 21; // close from the server side
static final byte STREAM_DATA = 32; // from source -> sink side
static final byte STREAM_EXCEPTION = 33; // from source -> sink side
Modified: remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java 2010-03-09 14:53:39 UTC (rev 5813)
+++ remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java 2010-03-09 19:55:38 UTC (rev 5814)
@@ -23,6 +23,7 @@
package org.jboss.remoting3.test;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
import org.jboss.remoting3.Client;
import org.jboss.remoting3.ClientConnector;
import org.jboss.remoting3.ClientContext;
@@ -38,6 +39,7 @@
import org.jboss.remoting3.ServiceNotFoundException;
import org.jboss.xnio.IoUtils;
import org.jboss.xnio.OptionMap;
+import org.jboss.xnio.Options;
import org.jboss.xnio.Xnio;
import org.jboss.xnio.log.Logger;
import org.testng.SkipException;
@@ -45,8 +47,7 @@
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
+import static org.testng.Assert.*;
@Test
public abstract class InvocationTestBase {
@@ -265,6 +266,55 @@
}
}
+ public void testOptions() throws Throwable {
+ enter();
+ try {
+ final OptionMap optionMap = OptionMap.builder().set(Options.BROADCAST, true).getMap();
+ final AtomicReference<OptionMap> receivedOptions = new AtomicReference<OptionMap>();
+ final Registration registration = endpoint.serviceBuilder().setGroupName("foo").setServiceType("test1").setRequestType(InvocationTestObject.class).
+ setReplyType(InvocationTestObject.class).setClientListener(new ClientListener<InvocationTestObject, InvocationTestObject>() {
+ public RequestListener<InvocationTestObject, InvocationTestObject> handleClientOpen(final ClientContext clientContext, final OptionMap optionMap) {
+ receivedOptions.set(optionMap);
+ clientContext.addCloseHandler(new CloseHandler<ClientContext>() {
+ public void handleClose(final ClientContext closed) {
+ log.info("Client closed");
+ }
+ });
+ return new RequestListener<InvocationTestObject, InvocationTestObject>() {
+ public void handleRequest(final RequestContext<InvocationTestObject> objectRequestContext, final InvocationTestObject request) throws RemoteExecutionException {
+ // not invoked
+ }
+ };
+ }
+ }).register();
+ try {
+ final Connection connection = getConnection();
+ try {
+ final Client<InvocationTestObject, InvocationTestObject> client = connection.openClient("test1", "*", InvocationTestObject.class, InvocationTestObject.class, getClass().getClassLoader(), optionMap).get();
+ try {
+ assertTrue(optionMap.contains(Options.BROADCAST), "Option disappeared from original map");
+ assertTrue(optionMap.get(Options.BROADCAST).booleanValue(), "Option changed value from original map");
+ final OptionMap map2 = receivedOptions.get();
+ assertNotNull(map2, "Option map was not received");
+ assertTrue(map2.contains(Options.BROADCAST), "Option does not appear in destination map");
+ assertTrue(map2.get(Options.BROADCAST).booleanValue(), "Option changed value in destination map");
+ } finally {
+ IoUtils.safeClose(client);
+ client.awaitClosedUninterruptibly();
+ }
+ } finally {
+ IoUtils.safeClose(connection);
+ connection.awaitClosedUninterruptibly();
+ }
+ } finally {
+ IoUtils.safeClose(registration);
+ registration.awaitClosedUninterruptibly();
+ }
+ } finally {
+ exit();
+ }
+ }
+
@AfterTest
public void tearDown() throws IOException {
enter();
14 years, 8 months
JBoss Remoting SVN: r5813 - in remoting3/trunk/jboss-remoting/src: main/java/org/jboss/remoting3/remote and 2 other directories.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2010-03-09 09:53:39 -0500 (Tue, 09 Mar 2010)
New Revision: 5813
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/RemotingOptions.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionProvider.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/ServerOpenListener.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/NetworkServerProvider.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ProtocolServiceType.java
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/AbstractRemoteTestCase.java
Log:
Move authentication to the network server provider interface
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java 2010-03-09 14:53:10 UTC (rev 5812)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java 2010-03-09 14:53:39 UTC (rev 5813)
@@ -663,9 +663,6 @@
if (sm != null) {
sm.checkPermission(ADD_PROTOCOL_SERVICE_PERM);
}
- if ("default".equals(name)) {
- throw new IllegalArgumentException("'default' is not an allowed name");
- }
if (map.putIfAbsent(name, provider) != null) {
throw new DuplicateRegistrationException(type.getDescription() + " '" + name + "' is already registered");
}
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/RemotingOptions.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/RemotingOptions.java 2010-03-09 14:53:10 UTC (rev 5812)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/RemotingOptions.java 2010-03-09 14:53:39 UTC (rev 5813)
@@ -140,11 +140,6 @@
public static final Option<Boolean> CALL_BY_VALUE = Option.simple(RemotingOptions.class, "CALL_BY_VALUE", Boolean.class);
/**
- * Specify the name of a preregistered server authentication provider to use.
- */
- public static final Option<String> AUTHENTICATION_PROVIDER = Option.simple(RemotingOptions.class, "AUTHENTICATION_PROVIDER", String.class);
-
- /**
* Specify the number of times a client is allowed to retry authentication before closing the connection.
*/
public static final Option<Integer> AUTHENTICATION_RETRIES = Option.simple(RemotingOptions.class, "AUTHENTICATION_RETRIES", Integer.class);
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionProvider.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionProvider.java 2010-03-09 14:53:10 UTC (rev 5812)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionProvider.java 2010-03-09 14:53:39 UTC (rev 5813)
@@ -95,13 +95,8 @@
}
private class ProviderInterface implements NetworkServerProvider {
- public ChannelListener<ConnectedStreamChannel<InetSocketAddress>> getServerListener(final OptionMap optionMap) {
- final String providerName = optionMap.get(RemotingOptions.AUTHENTICATION_PROVIDER);
- final ServerAuthenticationProvider authenticationProvider = connectionProviderContext.getProtocolServiceProvider(ProtocolServiceType.SERVER_AUTHENTICATION_PROVIDER, providerName == null ? "default" : providerName);
- if (authenticationProvider == null) {
- throw new IllegalArgumentException("Missing authentication provider: " + (providerName == null ? "default" : providerName));
- }
- return new ServerOpenListener(optionMap, connectionProviderContext, providerDescriptor);
+ public ChannelListener<ConnectedStreamChannel<InetSocketAddress>> getServerListener(final OptionMap optionMap, final ServerAuthenticationProvider authenticationProvider) {
+ return new ServerOpenListener(optionMap, connectionProviderContext, providerDescriptor, authenticationProvider);
}
}
}
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/ServerOpenListener.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/ServerOpenListener.java 2010-03-09 14:53:10 UTC (rev 5812)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/ServerOpenListener.java 2010-03-09 14:53:39 UTC (rev 5813)
@@ -32,10 +32,8 @@
import java.util.Map;
import java.util.Set;
import org.jboss.marshalling.ProviderDescriptor;
-import org.jboss.remoting3.RemotingOptions;
import org.jboss.remoting3.security.ServerAuthenticationProvider;
import org.jboss.remoting3.spi.ConnectionProviderContext;
-import org.jboss.remoting3.spi.ProtocolServiceType;
import org.jboss.xnio.ChannelListener;
import org.jboss.xnio.IoUtils;
import org.jboss.xnio.OptionMap;
@@ -55,12 +53,14 @@
private final OptionMap optionMap;
private final ConnectionProviderContext connectionProviderContext;
private final ProviderDescriptor providerDescriptor;
+ private final ServerAuthenticationProvider authenticationProvider;
private static final Logger log = Loggers.serverSasl;
- ServerOpenListener(final OptionMap optionMap, final ConnectionProviderContext connectionProviderContext, final ProviderDescriptor providerDescriptor) {
+ ServerOpenListener(final OptionMap optionMap, final ConnectionProviderContext connectionProviderContext, final ProviderDescriptor providerDescriptor, final ServerAuthenticationProvider authenticationProvider) {
this.optionMap = optionMap;
this.connectionProviderContext = connectionProviderContext;
this.providerDescriptor = providerDescriptor;
+ this.authenticationProvider = authenticationProvider;
}
public void handleEvent(final ConnectedStreamChannel<InetSocketAddress> channel) {
@@ -71,20 +71,6 @@
}
final RemoteConnection connection = new RemoteConnection(connectionProviderContext.getExecutor(), channel, optionMap, providerDescriptor);
- // Get the server authentication provider
- final String authProvider = optionMap.get(RemotingOptions.AUTHENTICATION_PROVIDER);
- if (authProvider == null) {
- log.warn("No authentication provider available");
- IoUtils.safeClose(connection);
- return;
- }
- final ServerAuthenticationProvider provider = connectionProviderContext.getProtocolServiceProvider(ProtocolServiceType.SERVER_AUTHENTICATION_PROVIDER, authProvider);
- if (provider == null) {
- log.warn("No authentication provider available");
- IoUtils.safeClose(connection);
- return;
- }
-
// Calculate available server mechanisms
final Sequence<String> mechs = optionMap.get(Options.SASL_MECHANISMS);
final Set<String> includes = mechs != null ? new HashSet<String>(mechs) : null;
@@ -176,7 +162,7 @@
}
}
});
- connection.setMessageHandler(new ServerGreetingHandler(connection, connectionProviderContext, saslServerFactories, provider, propertyMap));
+ connection.setMessageHandler(new ServerGreetingHandler(connection, connectionProviderContext, saslServerFactories, authenticationProvider, propertyMap));
// and send the greeting
channel.resumeWrites();
}
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/NetworkServerProvider.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/NetworkServerProvider.java 2010-03-09 14:53:10 UTC (rev 5812)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/NetworkServerProvider.java 2010-03-09 14:53:39 UTC (rev 5813)
@@ -23,6 +23,7 @@
package org.jboss.remoting3.spi;
import java.net.InetSocketAddress;
+import org.jboss.remoting3.security.ServerAuthenticationProvider;
import org.jboss.xnio.ChannelListener;
import org.jboss.xnio.OptionMap;
import org.jboss.xnio.channels.ConnectedStreamChannel;
@@ -36,7 +37,8 @@
* Get the channel open listener for servers of this connection provider type.
*
* @param optionMap options which may be used to configure the returned server
+ * @param authenticationProvider the server authentication provider
* @return the channel listener
*/
- ChannelListener<ConnectedStreamChannel<InetSocketAddress>> getServerListener(OptionMap optionMap);
+ ChannelListener<ConnectedStreamChannel<InetSocketAddress>> getServerListener(OptionMap optionMap, ServerAuthenticationProvider authenticationProvider);
}
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ProtocolServiceType.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ProtocolServiceType.java 2010-03-09 14:53:10 UTC (rev 5812)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ProtocolServiceType.java 2010-03-09 14:53:39 UTC (rev 5813)
@@ -78,8 +78,6 @@
public static final ProtocolServiceType<ClassExternalizerFactory> CLASS_EXTERNALIZER_FACTORY;
- public static final ProtocolServiceType<ServerAuthenticationProvider> SERVER_AUTHENTICATION_PROVIDER;
-
private static final ProtocolServiceType<?>[] SERVICE_TYPES;
public static ProtocolServiceType<?>[] getServiceTypes() {
@@ -99,7 +97,6 @@
CLASS_RESOLVER = new ProtocolServiceType<ClassResolver>(ClassResolver.class, "CLASS_RESOLVER", "Class resolver", index++),
OBJECT_RESOLVER = new ProtocolServiceType<ObjectResolver>(ObjectResolver.class, "OBJECT_RESOLVER", "Object resolver", index++),
CLASS_EXTERNALIZER_FACTORY = new ProtocolServiceType<ClassExternalizerFactory>(ClassExternalizerFactory.class, "CLASS_EXTERNALIZER_FACTORY", "Class externalizer factory", index++),
- SERVER_AUTHENTICATION_PROVIDER = new ProtocolServiceType<ServerAuthenticationProvider>(ServerAuthenticationProvider.class, "SERVER_AUTHENTICATION_PROVIDER", "Server authentication provider", index++)
};
}
Modified: remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/AbstractRemoteTestCase.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/AbstractRemoteTestCase.java 2010-03-09 14:53:10 UTC (rev 5812)
+++ remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/AbstractRemoteTestCase.java 2010-03-09 14:53:39 UTC (rev 5813)
@@ -22,17 +22,14 @@
package org.jboss.remoting3.test;
-import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;
import org.jboss.remoting3.Connection;
-import org.jboss.remoting3.RemotingOptions;
import org.jboss.remoting3.security.SimpleServerAuthenticationProvider;
import org.jboss.remoting3.spi.NetworkServerProvider;
-import org.jboss.remoting3.spi.ProtocolServiceType;
import org.jboss.remoting3.spi.SpiUtils;
import org.jboss.xnio.AcceptingServer;
import org.jboss.xnio.ChannelListener;
@@ -42,7 +39,6 @@
import org.jboss.xnio.Xnio;
import org.jboss.xnio.channels.BoundChannel;
import org.jboss.xnio.channels.ConnectedStreamChannel;
-import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import static org.testng.Assert.assertNotNull;
@@ -50,27 +46,19 @@
@Test
public abstract class AbstractRemoteTestCase extends InvocationTestBase {
- @BeforeTest
- public void setUp() throws IOException {
- super.setUp();
- enter();
- try {
- final SimpleServerAuthenticationProvider authenticationProvider = new SimpleServerAuthenticationProvider();
- authenticationProvider.addUser("user", "endpoint", "password".toCharArray());
- endpoint.addProtocolService(ProtocolServiceType.SERVER_AUTHENTICATION_PROVIDER, "test", authenticationProvider);
- } finally {
- exit();
- }
+ final SimpleServerAuthenticationProvider authenticationProvider = new SimpleServerAuthenticationProvider();
+
+ protected AbstractRemoteTestCase() {
+ authenticationProvider.addUser("user", "endpoint", "password".toCharArray());
}
protected Connection getConnection() throws Exception {
final NetworkServerProvider provider = endpoint.getConnectionProviderInterface(getScheme(), NetworkServerProvider.class);
assertNotNull(provider, "No remote provider interface");
final OptionMap serverOptions = OptionMap.builder()
- .set(RemotingOptions.AUTHENTICATION_PROVIDER, "test")
.setSequence(Options.SASL_MECHANISMS, "EXTERNAL", "DIGEST-MD5")
.getMap();
- final ChannelListener<ConnectedStreamChannel<InetSocketAddress>> listener = provider.getServerListener(serverOptions);
+ final ChannelListener<ConnectedStreamChannel<InetSocketAddress>> listener = provider.getServerListener(serverOptions, authenticationProvider);
final Xnio xnio = Xnio.getInstance();
final AcceptingServer<InetSocketAddress, ?, ?> server = getServer(listener, xnio);
final IoFuture<? extends BoundChannel<InetSocketAddress>> future = server.bind(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0));
14 years, 8 months