JBoss Remoting SVN: r4684 - remoting3/trunk/version/src/main/java/org/jboss/remoting/version.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-11-14 15:08:16 -0500 (Fri, 14 Nov 2008)
New Revision: 4684
Modified:
remoting3/trunk/version/src/main/java/org/jboss/remoting/version/Version.java
Log:
Prep beta1!
Modified: remoting3/trunk/version/src/main/java/org/jboss/remoting/version/Version.java
===================================================================
--- remoting3/trunk/version/src/main/java/org/jboss/remoting/version/Version.java 2008-11-14 20:07:55 UTC (rev 4683)
+++ remoting3/trunk/version/src/main/java/org/jboss/remoting/version/Version.java 2008-11-14 20:08:16 UTC (rev 4684)
@@ -11,7 +11,7 @@
/**
* The version.
*/
- public static final String VERSION = "3.0.0-M3";
+ public static final String VERSION = "3.0.0.Beta1";
/**
* Print the version to {@code System.out}.
16 years, 1 month
JBoss Remoting SVN: r4683 - remoting3/trunk.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-11-14 15:07:55 -0500 (Fri, 14 Nov 2008)
New Revision: 4683
Modified:
remoting3/trunk/build.properties
remoting3/trunk/build.xml
Log:
Fix two minor build issues
Modified: remoting3/trunk/build.properties
===================================================================
--- remoting3/trunk/build.properties 2008-11-14 18:14:57 UTC (rev 4682)
+++ remoting3/trunk/build.properties 2008-11-14 20:07:55 UTC (rev 4683)
@@ -180,7 +180,7 @@
lib.trove.remote=${remote.repository}/${lib.trove.path}
lib.xnio.version=1.2.0.Alpha2008111101
-lib.xnio.version-javadoc=1.2
+lib.xnio.version-javadoc=1.2.0.Alpha
lib.xnio-api.name=xnio-api-${lib.xnio.version}.jar
lib.xnio-api.license=lgpl
Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml 2008-11-14 18:14:57 UTC (rev 4682)
+++ remoting3/trunk/build.xml 2008-11-14 20:07:55 UTC (rev 4683)
@@ -1144,7 +1144,7 @@
<!-- JAVADOCS -->
<!-- ============================================== -->
- <target name="api-javadoc" depends="api,core,standalone,util,lib.apiviz,lib.marshalling-api,lib.xnio-api">
+ <target name="api-javadoc" depends="api,core,standalone,transporter,util,lib.apiviz,lib.marshalling-api,lib.xnio-api">
<delete dir="api/target/main/docs"/>
<mkdir dir="api/target/main/docs"/>
<javadoc destdir="api/target/main/docs" author="false" version="false" use="false" windowtitle="JBoss Remoting API">
16 years, 1 month
JBoss Remoting SVN: r4682 - remoting3/trunk.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-11-14 13:14:57 -0500 (Fri, 14 Nov 2008)
New Revision: 4682
Modified:
remoting3/trunk/build.properties
remoting3/trunk/build.xml
Log:
Specify separate javadoc version for XNIO; Marshalling uses 1.0.0.GA
Modified: remoting3/trunk/build.properties
===================================================================
--- remoting3/trunk/build.properties 2008-11-14 17:42:35 UTC (rev 4681)
+++ remoting3/trunk/build.properties 2008-11-14 18:14:57 UTC (rev 4682)
@@ -115,7 +115,7 @@
lib.jboss-managed.local=${local.repository}/${lib.jboss-managed.local-path}
lib.jboss-managed.remote=${remote.repository}/${lib.jboss-managed.remote-path}
-lib.marshalling-api.version=1.0.0.CR1
+lib.marshalling-api.version=1.0.0.GA
lib.marshalling-api.name=marshalling-api.jar
lib.marshalling-api.license=lgpl
lib.marshalling-api.dir=jboss/marshalling/${lib.marshalling-api.version}/lib
@@ -180,6 +180,7 @@
lib.trove.remote=${remote.repository}/${lib.trove.path}
lib.xnio.version=1.2.0.Alpha2008111101
+lib.xnio.version-javadoc=1.2
lib.xnio-api.name=xnio-api-${lib.xnio.version}.jar
lib.xnio-api.license=lgpl
Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml 2008-11-14 17:42:35 UTC (rev 4681)
+++ remoting3/trunk/build.xml 2008-11-14 18:14:57 UTC (rev 4682)
@@ -1156,7 +1156,7 @@
<doctitle><![CDATA[<h1>JBoss Remoting 3</h1>]]></doctitle>
<bottom><![CDATA[<i>Copyright © 2008 JBoss, a division of Red Hat, Inc.</i>]]></bottom>
<link href="http://java.sun.com/j2se/1.5.0/docs/api/"/>
- <link href="http://docs.jboss.org/xnio/${lib.xnio.version}/api/"/>
+ <link href="http://docs.jboss.org/xnio/${lib.xnio.version-javadoc}/api/"/>
<link href="http://docs.jboss.org/river/${lib.marshalling-api.version}/api/"/>
<classpath>
<path refid="core.classpath"/>
16 years, 1 month
JBoss Remoting SVN: r4681 - in remoting3/trunk: testing-support/src/main/resources and 1 other directory.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-11-14 12:42:35 -0500 (Fri, 14 Nov 2008)
New Revision: 4681
Modified:
remoting3/trunk/build.xml
remoting3/trunk/testing-support/src/main/resources/testing.policy
Log:
Permissions fix for secmgr tests
Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml 2008-11-14 16:52:00 UTC (rev 4680)
+++ remoting3/trunk/build.xml 2008-11-14 17:42:35 UTC (rev 4681)
@@ -633,6 +633,7 @@
<sysproperty key="ant.library.dir" value="${ant.home}/lib"/>
<sysproperty key="lib.junit.local" value="${lib.junit.local}"/>
<sysproperty key="lib.marshalling-api.local" value="${lib.marshalling-api.local}"/>
+ <sysproperty key="lib.river.local" value="${lib.river.local}"/>
<sysproperty key="lib.xnio-api.local" value="${lib.xnio-api.local}"/>
<sysproperty key="lib.xnio-nio.local" value="${lib.xnio-nio.local}"/>
<jvmarg line="${test.jvmargs}"/>
Modified: remoting3/trunk/testing-support/src/main/resources/testing.policy
===================================================================
--- remoting3/trunk/testing-support/src/main/resources/testing.policy 2008-11-14 16:52:00 UTC (rev 4680)
+++ remoting3/trunk/testing-support/src/main/resources/testing.policy 2008-11-14 17:42:35 UTC (rev 4681)
@@ -25,6 +25,8 @@
{
permission java.lang.RuntimePermission "modifyThread"; // for executor control
permission java.net.SocketPermission "*:*", "accept, connect, resolve";
+ permission org.jboss.remoting.EndpointPermission "createRequestHandler";
+ permission org.jboss.remoting.EndpointPermission "createClient";
};
// Permissions for Remoting itself
16 years, 1 month
JBoss Remoting SVN: r4680 - remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-11-14 11:52:00 -0500 (Fri, 14 Nov 2008)
New Revision: 4680
Modified:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReadHandler.java
Log:
Logging tweak
Modified: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReadHandler.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReadHandler.java 2008-11-14 06:41:47 UTC (rev 4679)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReadHandler.java 2008-11-14 16:52:00 UTC (rev 4680)
@@ -84,6 +84,7 @@
channel.resumeReads();
return;
}
+ log.trace("Received message:\n%s", Buffers.createDumper(buffer, 8, 1));
final MessageType msgType;
try {
msgType = MessageType.getMessageType(buffer.get() & 0xff);
@@ -91,7 +92,7 @@
log.trace("Received invalid message type");
return;
}
- log.trace("Received message type %s; dump:\n%s", msgType, Buffers.createDumper(buffer, 8, 1));
+ log.trace("Decoded message type %s", msgType);
switch (msgType) {
case REQUEST: {
final int clientId = buffer.getInt();
16 years, 1 month
JBoss Remoting SVN: r4679 - in remoting3/trunk: api/src/test/java/org/jboss/remoting/spi and 5 other directories.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-11-14 01:41:47 -0500 (Fri, 14 Nov 2008)
New Revision: 4679
Added:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BufferByteOutput.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/FutureRemoteRequestHandlerSource.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConnection.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReadHandler.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReplyHandler.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandler.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandlerSource.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleMultiplexHandler.java
Removed:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractAutoCloseable.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractHandleableCloseable.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/NamedServiceRegistry.java
remoting3/trunk/api/src/test/java/org/jboss/remoting/spi/CloseableTestCase.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientContextImpl.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ServiceContextImpl.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicRequestHandler.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerBiMap.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerResourceBiMap.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerBiMap.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerResourceBiMap.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConfiguration.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/CollectionUtil.java
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/QualifiedName.java
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedCollection.java
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedSet.java
Log:
Complete changes for multiplex transport...
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractAutoCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractAutoCloseable.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractAutoCloseable.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -29,7 +29,6 @@
import java.lang.ref.WeakReference;
import org.jboss.remoting.RemotingException;
import org.jboss.remoting.CloseHandler;
-import org.jboss.remoting.HandleableCloseable;
import org.jboss.xnio.log.Logger;
import org.jboss.xnio.IoUtils;
import org.jboss.xnio.WeakCloseable;
@@ -40,11 +39,11 @@
*/
public abstract class AbstractAutoCloseable<T> extends AbstractHandleableCloseable<T> implements AutoCloseable<T> {
+ private static final Logger log = Logger.getLogger("org.jboss.remoting.resource");
+
private final AtomicInteger refcount = new AtomicInteger(0);
private final Executor executor;
- private static final Logger log = Logger.getLogger(AbstractAutoCloseable.class);
-
/**
* Basic constructor.
*
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractHandleableCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractHandleableCloseable.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractHandleableCloseable.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -42,7 +42,7 @@
*/
public abstract class AbstractHandleableCloseable<T> implements HandleableCloseable<T> {
- private static final Logger log = Logger.getLogger(AbstractHandleableCloseable.class);
+ private static final Logger log = Logger.getLogger("org.jboss.remoting.resource");
protected final Executor executor;
private final Object closeLock = new Object();
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/NamedServiceRegistry.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/NamedServiceRegistry.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/NamedServiceRegistry.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -23,6 +23,9 @@
package org.jboss.remoting.spi;
import java.util.concurrent.ConcurrentMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Collections;
import java.io.IOException;
import org.jboss.remoting.util.QualifiedName;
import org.jboss.remoting.util.CollectionUtil;
@@ -75,6 +78,10 @@
return map.get(path);
}
+ public Set<Map.Entry<QualifiedName, Handle<RequestHandlerSource>>> getEntrySet() {
+ return Collections.unmodifiableSet(map.entrySet());
+ }
+
public String toString() {
return "named service registry <" + Integer.toHexString(hashCode()) + ">";
}
Modified: remoting3/trunk/api/src/test/java/org/jboss/remoting/spi/CloseableTestCase.java
===================================================================
--- remoting3/trunk/api/src/test/java/org/jboss/remoting/spi/CloseableTestCase.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/api/src/test/java/org/jboss/remoting/spi/CloseableTestCase.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -30,6 +30,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
import org.jboss.remoting.CloseHandler;
import org.jboss.remoting.test.support.LoggingHelper;
@@ -41,6 +42,8 @@
LoggingHelper.init();
}
+ public static final Logger log = Logger.getLogger(CloseableTestCase.class);
+
public void testBasic() throws Throwable {
final ExecutorService executorService = Executors.newCachedThreadPool();
try {
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientContextImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientContextImpl.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientContextImpl.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -25,12 +25,15 @@
import java.util.concurrent.Executor;
import org.jboss.remoting.ClientContext;
import org.jboss.remoting.ServiceContext;
+import org.jboss.xnio.log.Logger;
/**
*
*/
public final class ClientContextImpl extends AbstractContextImpl<ClientContext> implements ClientContext {
+ private static final Logger log = Logger.getLogger("org.jboss.remoting.client-context");
+
private final ServiceContextImpl serviceContext;
ClientContextImpl(final Executor executor) {
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -53,7 +53,7 @@
}
private <I, O> ClientImpl<I, O> doCreateExternal(Class<I> requestClass, Class<O> replyClass, RequestHandler handler) throws IOException {
- return new ClientImpl<I, O>(handler.getHandle(), endpoint.getExecutor(), requestClass, replyClass);
+ return ClientImpl.create(handler.getHandle(), endpoint.getExecutor(), requestClass, replyClass);
}
public Object createExternal(final Class<?> aClass, final ObjectInput input, final Creator creator) throws IOException, ClassNotFoundException {
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -27,29 +27,44 @@
import org.jboss.remoting.Client;
import org.jboss.remoting.IndeterminateOutcomeException;
import org.jboss.remoting.RemoteRequestException;
+import org.jboss.remoting.CloseHandler;
import org.jboss.remoting.core.util.QueueExecutor;
import org.jboss.remoting.spi.Handle;
import org.jboss.remoting.spi.RemoteRequestContext;
import org.jboss.remoting.spi.ReplyHandler;
import org.jboss.remoting.spi.RequestHandler;
import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
/**
*
*/
public final class ClientImpl<I, O> extends AbstractContextImpl<Client<I, O>> implements Client<I, O> {
+ private static final Logger log = Logger.getLogger("org.jboss.remoting.client");
+
private final Handle<RequestHandler> handle;
private final Class<I> requestClass;
private final Class<O> replyClass;
- ClientImpl(final Handle<RequestHandler> handle, final Executor executor, final Class<I> requestClass, final Class<O> replyClass) {
+ private ClientImpl(final Handle<RequestHandler> handle, final Executor executor, final Class<I> requestClass, final Class<O> replyClass) {
super(executor);
this.handle = handle;
this.requestClass = requestClass;
this.replyClass = replyClass;
}
+ static <I, O> ClientImpl<I, O> create(final Handle<RequestHandler> handle, final Executor executor, final Class<I> requestClass, final Class<O> replyClass) {
+ final ClientImpl<I, O> ci = new ClientImpl<I, O>(handle, executor, requestClass, replyClass);
+ handle.addCloseHandler(new CloseHandler<Handle<RequestHandler>>() {
+ public void handleClose(final Handle<RequestHandler> closed) {
+ IoUtils.safeClose(ci);
+ }
+ });
+ return ci;
+ }
+
protected void closeAction() throws IOException {
handle.close();
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -53,7 +53,7 @@
}
private <I, O> ClientSourceImpl<I, O> doCreateExternal(Class<I> requestClass, Class<O> replyClass, RequestHandlerSource handlerSource) throws IOException {
- return new ClientSourceImpl<I, O>(handlerSource.getHandle(), endpoint, requestClass, replyClass);
+ return ClientSourceImpl.create(handlerSource.getHandle(), endpoint, requestClass, replyClass);
}
public Object createExternal(final Class<?> aClass, final ObjectInput input, final Creator creator) throws IOException, ClassNotFoundException {
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -26,23 +26,27 @@
import org.jboss.remoting.Client;
import org.jboss.remoting.ClientSource;
import org.jboss.remoting.Endpoint;
+import org.jboss.remoting.CloseHandler;
import org.jboss.remoting.spi.AbstractHandleableCloseable;
import org.jboss.remoting.spi.Handle;
import org.jboss.remoting.spi.RequestHandler;
import org.jboss.remoting.spi.RequestHandlerSource;
import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
/**
*
*/
public final class ClientSourceImpl<I, O> extends AbstractHandleableCloseable<ClientSource<I, O>> implements ClientSource<I, O> {
+ private static final Logger log = Logger.getLogger("org.jboss.remoting.client-source");
+
private final Handle<RequestHandlerSource> handle;
private final Endpoint endpoint;
private final Class<I> requestClass;
private final Class<O> replyClass;
- ClientSourceImpl(final Handle<RequestHandlerSource> handle, final EndpointImpl endpoint, final Class<I> requestClass, final Class<O> replyClass) {
+ private ClientSourceImpl(final Handle<RequestHandlerSource> handle, final EndpointImpl endpoint, final Class<I> requestClass, final Class<O> replyClass) {
super(endpoint.getExecutor());
this.handle = handle;
this.endpoint = endpoint;
@@ -50,6 +54,16 @@
this.replyClass = replyClass;
}
+ static <I, O> ClientSourceImpl<I, O> create(final Handle<RequestHandlerSource> handle, final EndpointImpl endpoint, final Class<I> requestClass, final Class<O> replyClass) {
+ final ClientSourceImpl<I, O> csi = new ClientSourceImpl<I, O>(handle, endpoint, requestClass, replyClass);
+ handle.addCloseHandler(new CloseHandler<Handle<RequestHandlerSource>>() {
+ public void handleClose(final Handle<RequestHandlerSource> closed) {
+ IoUtils.safeClose(csi);
+ }
+ });
+ return csi;
+ }
+
protected void closeAction() throws IOException {
handle.close();
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -46,7 +46,7 @@
Logger.getLogger("org.jboss.remoting").info("JBoss Remoting version %s", Version.VERSION);
}
- private static final Logger log = Logger.getLogger(Endpoint.class);
+ private static final Logger log = Logger.getLogger("org.jboss.remoting.endpoint");
private String name;
@@ -234,12 +234,7 @@
boolean ok = false;
final Handle<RequestHandler> handle = requestHandler.getHandle();
try {
- final ClientImpl<I, O> client = new ClientImpl<I, O>(handle, executor, requestType, replyType);
- client.addCloseHandler(new CloseHandler<Client<I, O>>() {
- public void handleClose(final Client<I, O> closed) {
- IoUtils.safeClose(handle);
- }
- });
+ final ClientImpl<I, O> client = ClientImpl.create(handle, executor, requestType, replyType);
ok = true;
return client;
} finally {
@@ -257,7 +252,7 @@
boolean ok = false;
final Handle<RequestHandlerSource> handle = requestHandlerSource.getHandle();
try {
- final ClientSourceImpl<I, O> clientSource = new ClientSourceImpl<I, O>(handle, this, requestClass, replyClass);
+ final ClientSourceImpl<I, O> clientSource = ClientSourceImpl.create(handle, this, requestClass, replyClass);
ok = true;
return clientSource;
} finally {
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -25,7 +25,6 @@
import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
-import org.jboss.remoting.CloseHandler;
import org.jboss.remoting.RemoteExecutionException;
import org.jboss.remoting.RequestListener;
import org.jboss.remoting.RemoteRequestException;
@@ -46,7 +45,7 @@
private final Class<I> requestClass;
private final Class<O> replyClass;
- private static final Logger log = Logger.getLogger(LocalRequestHandler.class);
+ private static final Logger log = Logger.getLogger("org.jboss.remoting.listener");
LocalRequestHandler(Config<I, O> config) {
super(config.getExecutor());
@@ -88,18 +87,17 @@
};
}
+ protected void closeAction() throws IOException {
+ try {
+ requestListener.handleClientClose(clientContext);
+ } catch (Throwable t) {
+ log.error(t, "Unexpected exception in request listener client close handler method");
+ }
+ }
+
void open() throws IOException {
try {
requestListener.handleClientOpen(clientContext);
- addCloseHandler(new CloseHandler<RequestHandler>() {
- public void handleClose(final RequestHandler closed) {
- try {
- requestListener.handleClientClose(clientContext);
- } catch (Throwable t) {
- log.error(t, "Unexpected exception in request listener client close handler method");
- }
- }
- });
} catch (Throwable t) {
final IOException ioe = new IOException("Failed to open client context");
ioe.initCause(t);
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -43,7 +43,7 @@
private final Class<I> requestClass;
private final Class<O> replyClass;
- private static final Logger log = Logger.getLogger(LocalRequestHandlerSource.class);
+ private static final Logger log = Logger.getLogger("org.jboss.remoting.listener-source");
LocalRequestHandlerSource(final Config<I, O> config) {
super(config.getExecutor());
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ServiceContextImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ServiceContextImpl.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ServiceContextImpl.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -24,11 +24,14 @@
import java.util.concurrent.Executor;
import org.jboss.remoting.ServiceContext;
+import org.jboss.xnio.log.Logger;
/**
*
*/
public final class ServiceContextImpl extends AbstractContextImpl<ServiceContext> implements ServiceContext {
+ private static final Logger log = Logger.getLogger("org.jboss.remoting.service-context");
+
protected ServiceContextImpl(final Executor executor) {
super(executor);
}
Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicRequestHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicRequestHandler.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicRequestHandler.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -42,7 +42,7 @@
*/
final class BasicRequestHandler extends AbstractAutoCloseable<RequestHandler> implements RequestHandler {
- private static final Logger log = Logger.getLogger(BasicRequestHandler.class);
+ private static final Logger log = Logger.getLogger("org.jboss.remoting.basic");
private final AtomicInteger requestSequence;
private final Lock reqLock;
@@ -77,6 +77,8 @@
} catch (IOException e) {
log.error(e, "Error writing cancel request");
IoUtils.safeClose(BasicRequestHandler.this);
+ } finally {
+ reqLock.unlock();
}
}
};
Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BufferByteOutput.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BufferByteOutput.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BufferByteOutput.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -0,0 +1,90 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.marshalling.ByteOutput;
+import org.jboss.xnio.BufferAllocator;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+/**
+ *
+ */
+public class BufferByteOutput implements ByteOutput {
+
+ private ByteBuffer current;
+ private final BufferAllocator<ByteBuffer> allocator;
+ private final Collection<ByteBuffer> target;
+
+ public BufferByteOutput(final BufferAllocator<ByteBuffer> allocator, final Collection<ByteBuffer> target) {
+ this.allocator = allocator;
+ this.target = target;
+ }
+
+ private ByteBuffer getCurrent() {
+ final ByteBuffer buffer = current;
+ return buffer == null ? (current = allocator.allocate()) : buffer;
+ }
+
+ public void write(final int i) {
+ final ByteBuffer buffer = getCurrent();
+ buffer.put((byte) i);
+ if (! buffer.hasRemaining()) {
+ buffer.flip();
+ target.add(buffer);
+ current = null;
+ }
+ }
+
+ public void write(final byte[] bytes) {
+ write(bytes, 0, bytes.length);
+ }
+
+ public void write(final byte[] bytes, int offs, int len) {
+ while (len > 0) {
+ final ByteBuffer buffer = getCurrent();
+ final int c = Math.min(len, buffer.remaining());
+ buffer.put(bytes, offs, c);
+ offs += c;
+ len -= c;
+ if (! buffer.hasRemaining()) {
+ buffer.flip();
+ target.add(buffer);
+ current = null;
+ }
+ }
+ }
+
+ public void close() {
+ flush();
+ }
+
+ public void flush() {
+ final ByteBuffer buffer = current;
+ if (buffer != null) {
+ buffer.flip();
+ target.add(buffer);
+ current = null;
+ }
+ }
+}
Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/FutureRemoteRequestHandlerSource.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/FutureRemoteRequestHandlerSource.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/FutureRemoteRequestHandlerSource.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -0,0 +1,46 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.xnio.AbstractIoFuture;
+import org.jboss.xnio.IoFuture;
+import org.jboss.remoting.spi.RequestHandlerSource;
+import java.io.IOException;
+
+/**
+ *
+ */
+public final class FutureRemoteRequestHandlerSource extends AbstractIoFuture<RequestHandlerSource> {
+
+ public IoFuture<RequestHandlerSource> cancel() {
+ return this;
+ }
+
+ protected boolean setException(final IOException exception) {
+ return super.setException(exception);
+ }
+
+ protected boolean setResult(final RequestHandlerSource result) {
+ return super.setResult(result);
+ }
+}
Modified: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerBiMap.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerBiMap.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerBiMap.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -24,6 +24,8 @@
import java.util.HashMap;
import java.util.IdentityHashMap;
+import java.util.Set;
+import java.util.Collections;
/**
*
@@ -59,6 +61,17 @@
leftMap.remove(oldKey1Obj);
}
+ public boolean putIfAbsent(final int key1, final T key2) {
+ final Integer key1Obj = Integer.valueOf(key1);
+ if (leftMap.containsKey(key1Obj)) {
+ return false;
+ }
+ final T oldKey2 = leftMap.put(key1Obj, key2);
+ rightMap.put(key2, key1Obj);
+ rightMap.remove(oldKey2);
+ return true;
+ }
+
public T remove(final int key) {
final T oldRightKey = leftMap.remove(Integer.valueOf(key));
rightMap.remove(oldRightKey);
@@ -69,6 +82,10 @@
leftMap.remove(rightMap.remove(key));
}
+ public Set<T> getKeys() {
+ return Collections.unmodifiableSet(rightMap.keySet());
+ }
+
public static <T> IntegerBiMap<T> create() {
return new IdentityHashIntegerBiMap<T>();
}
Modified: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerResourceBiMap.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerResourceBiMap.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerResourceBiMap.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -26,6 +26,8 @@
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.Collections;
+import java.util.Collection;
import org.jboss.remoting.spi.AutoCloseable;
import org.jboss.remoting.spi.Handle;
@@ -73,6 +75,10 @@
leftMap.remove(rightMap.remove(key));
}
+ public Collection<Handle<T>> getKeys() {
+ return Collections.unmodifiableCollection(leftMap.values());
+ }
+
public static <T extends AutoCloseable<T>> IntegerResourceBiMap<T> create() {
return new IdentityHashIntegerResourceBiMap<T>();
}
Modified: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerBiMap.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerBiMap.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerBiMap.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -22,6 +22,9 @@
package org.jboss.remoting.protocol.multiplex;
+import org.jboss.remoting.util.SynchronizedSet;
+import java.util.Set;
+
/**
*
*/
@@ -32,10 +35,14 @@
void put(int key1, T key2);
+ boolean putIfAbsent(int key1, T key2);
+
T remove(int key);
void remove(T key);
+ Set<T> getKeys();
+
class Util {
private Util() {
@@ -69,6 +76,12 @@
}
}
+ public boolean putIfAbsent(final int key1, final T key2) {
+ synchronized (lock) {
+ return orig.putIfAbsent(key1, key2);
+ }
+ }
+
public T remove(final int key) {
synchronized (lock) {
return orig.remove(key);
@@ -80,6 +93,10 @@
orig.remove(key);
}
}
+
+ public Set<T> getKeys() {
+ return new SynchronizedSet<T>(orig.getKeys(), lock);
+ }
}
public static <T> IntegerBiMap<T> synchronizing(IntegerBiMap<T> orig) {
Modified: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerResourceBiMap.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerResourceBiMap.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerResourceBiMap.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -24,7 +24,10 @@
import org.jboss.remoting.spi.AutoCloseable;
import org.jboss.remoting.spi.Handle;
+import org.jboss.remoting.util.SynchronizedSet;
+import org.jboss.remoting.util.SynchronizedCollection;
import java.util.Iterator;
+import java.util.Collection;
/**
*
@@ -40,6 +43,8 @@
void remove(T key);
+ Collection<Handle<T>> getKeys();
+
class Util {
private Util() {
@@ -85,6 +90,10 @@
}
}
+ public Collection<Handle<T>> getKeys() {
+ return new SynchronizedCollection<Handle<T>>(orig.getKeys(), lock);
+ }
+
public Iterator<Handle<T>> iterator() {
return null;
}
Modified: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -28,40 +28,50 @@
enum MessageType {
/**
+ * Signals that the connection should be closed in an orderly fashion. After this message is sent, no further
+ * requests or service advertisements may be sent.
+ */
+ CONNECTION_CLOSE(0x00),
+ /**
* The request part of a request-response sequence, sent from the Client to the RequestListener.
*/
- REQUEST(2),
+ REQUEST(0x10),
/**
* The reply part of a request-response sequence, sent from the RequestListener to the Client.
*/
- REPLY(3),
+ REPLY(0x11),
/**
* A cancellation request for an outstanding request, sent from the Client to the RequestListener.
*/
- CANCEL_REQUEST(4),
+ CANCEL_REQUEST(0x12),
/**
* Acknowlegement that a request was cancelled, sent from the RequestListener to the Client.
*/
- CANCEL_ACK(5),
+ CANCEL_ACK(0x13),
/**
* Message that the request could not be received on the remote end, sent from to the Client from the
- * protocol handler as a
+ * protocol handler.
*/
- REQUEST_RECEIVE_FAILED(6),
+ REQUEST_RECEIVE_FAILED(0x14),
// Request failed due to exception
- REQUEST_FAILED(7),
- // Request completed but no reply or exception was sent
- REQUEST_OUTCOME_UNKNOWN(8),
+ REQUEST_FAILED(0x15),
// Remote side called .close() on a forwarded RequestHandler
- CLIENT_CLOSE(9),
+ CLIENT_CLOSE(0x20),
// Remote side pulled a new RequestHandler off of a forwarded RequestHandlerSource
- CLIENT_OPEN(10),
- // Remote side called .close() on a forwarded RequestHandlerSource
- SERVICE_CLOSE(11),
- // Remote side brought a new service online
- SERVICE_ADVERTISE(12),
- // Remote side's service is no longer available
- SERVICE_UNADVERTISE(13),
+ CLIENT_OPEN(0x21),
+ // Request to open a service at a path
+ SERVICE_OPEN_REQUEST(0x30),
+ // Reply for a successful service open
+ SERVICE_OPEN_REPLY(0x31),
+ // Reply for a generally failed service open
+ SERVICE_OPEN_FAILED(0x32),
+ SERVICE_OPEN_NOT_FOUND(0x33),
+ SERVICE_OPEN_FORBIDDEN(0x34),
+
+ // Notify the remote side that the service will no longer be used
+ SERVICE_CLOSE_REQUEST(0x3e),
+ // The service channel is closed; no further clients may be opened
+ SERVICE_CLOSE_NOTIFY(0x3f),
;
private final int id;
@@ -81,18 +91,22 @@
*/
public static MessageType getMessageType(final int id) {
switch (id) {
- case 2: return REQUEST;
- case 3: return REPLY;
- case 4: return CANCEL_REQUEST;
- case 5: return CANCEL_ACK;
- case 6: return REQUEST_RECEIVE_FAILED;
- case 7: return REQUEST_FAILED;
- case 8: return REQUEST_OUTCOME_UNKNOWN;
- case 9: return CLIENT_CLOSE;
- case 10: return CLIENT_OPEN;
- case 11: return SERVICE_CLOSE;
- case 12: return SERVICE_ADVERTISE;
- case 13: return SERVICE_UNADVERTISE;
+ case 0x00: return CONNECTION_CLOSE;
+ case 0x10: return REQUEST;
+ case 0x11: return REPLY;
+ case 0x12: return CANCEL_REQUEST;
+ case 0x13: return CANCEL_ACK;
+ case 0x14: return REQUEST_RECEIVE_FAILED;
+ case 0x15: return REQUEST_FAILED;
+ case 0x20: return CLIENT_CLOSE;
+ case 0x21: return CLIENT_OPEN;
+ case 0x30: return SERVICE_OPEN_REQUEST;
+ case 0x31: return SERVICE_OPEN_REPLY;
+ case 0x32: return SERVICE_OPEN_FAILED;
+ case 0x33: return SERVICE_OPEN_NOT_FOUND;
+ case 0x34: return SERVICE_OPEN_FORBIDDEN;
+ case 0x3e: return SERVICE_CLOSE_REQUEST;
+ case 0x3f: return SERVICE_CLOSE_NOTIFY;
default: throw new IllegalArgumentException("Invalid message type ID");
}
}
Modified: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConfiguration.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConfiguration.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConfiguration.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -27,6 +27,7 @@
import org.jboss.xnio.BufferAllocator;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.MarshallingConfiguration;
+import org.jboss.remoting.spi.NamedServiceRegistry;
/**
* A configuration object for the multiplex protocol.
@@ -37,6 +38,7 @@
private int linkMetric;
private Executor executor;
private BufferAllocator<ByteBuffer> allocator;
+ private NamedServiceRegistry namedServiceRegistry;
/**
* Construct a new instance.
@@ -133,4 +135,22 @@
public void setAllocator(final BufferAllocator<ByteBuffer> allocator) {
this.allocator = allocator;
}
+
+ /**
+ * Get the named service registry for this connection.
+ *
+ * @return the registry
+ */
+ public NamedServiceRegistry getNamedServiceRegistry() {
+ return namedServiceRegistry;
+ }
+
+ /**
+ * Set the named service registry for this connection.
+ *
+ * @param namedServiceRegistry the registry
+ */
+ public void setNamedServiceRegistry(final NamedServiceRegistry namedServiceRegistry) {
+ this.namedServiceRegistry = namedServiceRegistry;
+ }
}
Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConnection.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConnection.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConnection.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -0,0 +1,444 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.MarshallingConfiguration;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.Buffers;
+import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.remoting.spi.ReplyHandler;
+import org.jboss.remoting.spi.RemoteRequestContext;
+import org.jboss.remoting.spi.RequestHandler;
+import org.jboss.remoting.spi.RequestHandlerSource;
+import org.jboss.remoting.spi.Handle;
+import org.jboss.remoting.spi.NamedServiceRegistry;
+import org.jboss.remoting.spi.SpiUtils;
+import org.jboss.remoting.spi.AbstractHandleableCloseable;
+import org.jboss.remoting.Endpoint;
+import org.jboss.remoting.IndeterminateOutcomeException;
+import org.jboss.remoting.util.QualifiedName;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.List;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.Closeable;
+
+/**
+ *
+ */
+public final class MultiplexConnection extends AbstractHandleableCloseable<MultiplexConnection> {
+ private static final Logger log = Logger.getLogger("org.jboss.remoting.multiplex");
+
+ //--== Connection configuration items ==--
+ private final MarshallerFactory marshallerFactory;
+ private final MarshallingConfiguration marshallingConfiguration;
+ private final int linkMetric;
+ private final Executor executor;
+ // buffer allocator for outbound message assembly
+ private final BufferAllocator<ByteBuffer> allocator;
+
+ // running on remote node
+ private final IntegerBiMap<ReplyHandler> remoteRequests = IdentityHashIntegerBiMap.createSynchronizing();
+ // running on local node
+ private final IntegerBiMap<RemoteRequestContext> localRequests = IdentityHashIntegerBiMap.createSynchronizing();
+ // sequence for remote requests
+ private final AtomicInteger requestSequence = new AtomicInteger();
+
+ // clients whose requests get forwarded to the remote side
+ // even #s were opened from services forwarded to us (our sequence)
+ // odd #s were forwarded directly to us (remote sequence)
+ private final IntegerBiMap<RequestHandler> remoteClients = IdentityHashIntegerBiMap.createSynchronizing();
+ // forwarded to remote side (handled on this side)
+ private final IntegerResourceBiMap<RequestHandler> forwardedClients = IdentityHashIntegerResourceBiMap.createSynchronizing();
+ // sequence for forwarded clients (shift left one bit, add one, limit is 2^30)
+ private final AtomicInteger forwardedClientSequence = new AtomicInteger();
+ // sequence for clients created from services forwarded to us (shift left one bit, limit is 2^30)
+ private final AtomicInteger remoteClientSequence = new AtomicInteger();
+
+ // services on the remote side
+ private final IntegerBiMap<FutureRemoteRequestHandlerSource> remoteServices = IdentityHashIntegerBiMap.createSynchronizing();
+ // forwarded to remote side (handled on this side)
+ private final IntegerResourceBiMap<RequestHandlerSource> forwardedServices = IdentityHashIntegerResourceBiMap.createSynchronizing();
+ // sequence for remote services
+ private final AtomicInteger remoteServiceSequence = new AtomicInteger();
+
+ // registered services by path
+ private final NamedServiceRegistry namedServiceRegistry;
+
+ private final Endpoint endpoint;
+
+ private final AllocatedMessageChannel channel;
+
+ public MultiplexConnection(final Endpoint endpoint, final AllocatedMessageChannel channel, final MultiplexConfiguration configuration) {
+ super(configuration.getExecutor());
+ this.endpoint = endpoint;
+ this.channel = channel;
+ marshallerFactory = configuration.getMarshallerFactory();
+ if (marshallerFactory == null) {
+ throw new NullPointerException("marshallerFactory is null");
+ }
+ marshallingConfiguration = configuration.getMarshallingConfiguration();
+ if (marshallingConfiguration == null) {
+ throw new NullPointerException("marshallingConfiguration is null");
+ }
+ linkMetric = configuration.getLinkMetric();
+ executor = configuration.getExecutor();
+ if (executor == null) {
+ throw new NullPointerException("executor is null");
+ }
+ allocator = configuration.getAllocator();
+ if (allocator == null) {
+ throw new NullPointerException("allocator is null");
+ }
+ namedServiceRegistry = configuration.getNamedServiceRegistry();
+ if (namedServiceRegistry == null) {
+ throw new NullPointerException("namedServiceRegistry is null");
+ }
+ }
+
+ // sequence methods
+
+ int nextRequest() {
+ return requestSequence.getAndIncrement() & 0x7fffffff;
+ }
+
+ int nextForwardedClient() {
+ return (forwardedClientSequence.getAndIncrement() << 1 | 1) & 0x7fffffff;
+ }
+
+ int nextRemoteClient() {
+ return remoteClientSequence.getAndIncrement() << 1 & 0x7fffffff;
+ }
+
+ int nextRemoteService() {
+ return remoteServiceSequence.getAndIncrement() & 0x7fffffff;
+ }
+
+ void doBlockingWrite(ByteBuffer... buffers) throws IOException {
+ if (buffers.length == 1) doBlockingWrite(buffers[0]); else for (;;) {
+ if (channel.send(buffers)) {
+ return;
+ }
+ channel.awaitWritable();
+ }
+ }
+
+ void doBlockingWrite(ByteBuffer buffer) throws IOException {
+ log.trace("Sending message:\n%s", Buffers.createDumper(buffer, 8, 1));
+ for (;;) {
+ if (channel.send(buffer)) {
+ return;
+ }
+ channel.awaitWritable();
+ }
+ }
+
+ void doBlockingWrite(List<ByteBuffer> buffers) throws IOException {
+ doBlockingWrite(buffers.toArray(new ByteBuffer[buffers.size()]));
+ }
+
+ MarshallerFactory getMarshallerFactory() {
+ return marshallerFactory;
+ }
+
+ MarshallingConfiguration getMarshallingConfiguration() {
+ return marshallingConfiguration;
+ }
+
+ int getLinkMetric() {
+ return linkMetric;
+ }
+
+ protected Executor getExecutor() {
+ return executor;
+ }
+
+ BufferAllocator<ByteBuffer> getAllocator() {
+ return allocator;
+ }
+
+ Endpoint getEndpoint() {
+ return endpoint;
+ }
+
+ AllocatedMessageChannel getChannel() {
+ return channel;
+ }
+
+ void removeRemoteClient(final int identifier) {
+ remoteClients.remove(identifier);
+ }
+
+ void addRemoteRequest(final int id, final ReplyHandler handler) {
+ remoteRequests.put(id, handler);
+ }
+
+ void addRemoteClient(final int id, final RequestHandler handler) {
+ remoteClients.put(id, handler);
+ }
+
+ Handle<RequestHandler> getForwardedClient(final int id) {
+ return forwardedClients.get(id);
+ }
+
+ ReplyHandler removeRemoteRequest(final int id) {
+ return remoteRequests.remove(id);
+ }
+
+ RemoteRequestContext getLocalRequest(final int id) {
+ return localRequests.get(id);
+ }
+
+ ReplyHandler getRemoteRequest(final int id) {
+ return remoteRequests.get(id);
+ }
+
+ Handle<RequestHandler> removeForwardedClient(final int id) {
+ return forwardedClients.remove(id);
+ }
+
+ Handle<RequestHandlerSource> getForwardedService(final int id) {
+ return forwardedServices.get(id);
+ }
+
+ void addForwardedClient(final int id, final Handle<RequestHandler> handle) {
+ forwardedClients.put(id, handle);
+ }
+
+ void addForwadedService(final int id, final Handle<RequestHandlerSource> service) {
+ forwardedServices.put(id, service);
+ }
+
+ Handle<RequestHandlerSource> removeForwardedService(final int id) {
+ return forwardedServices.remove(id);
+ }
+
+ Handle<RequestHandlerSource> getServiceByPath(String path) {
+ return getService(QualifiedName.parse(path));
+ }
+
+ Handle<RequestHandlerSource> getService(final QualifiedName name) {
+ return namedServiceRegistry.lookupService(name);
+ }
+
+ FutureRemoteRequestHandlerSource getFutureRemoteService(final int id) {
+ return remoteServices.get(id);
+ }
+
+ FutureRemoteRequestHandlerSource removeFutureRemoteService(final int id) {
+ return remoteServices.remove(id);
+ }
+
+ public Handle<RequestHandlerSource> openRemoteService(final QualifiedName name) throws IOException {
+ final FutureRemoteRequestHandlerSource future = new FutureRemoteRequestHandlerSource();
+ int id;
+ for (;;) {
+ id = nextRemoteService();
+ if (remoteServices.putIfAbsent(id, future)) {
+ break;
+ }
+ }
+ ByteBuffer buffer = ByteBuffer.allocate(5 + getByteLength(name));
+ buffer.put((byte) MessageType.SERVICE_OPEN_REQUEST.getId());
+ buffer.putInt(id);
+ putQualifiedName(buffer, name);
+ buffer.flip();
+ doBlockingWrite(buffer);
+ try {
+ final Handle<RequestHandlerSource> handle = future.getInterruptibly().getHandle();
+ log.trace("Opened %s to %s", handle, this);
+ return handle;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("Interrupted while waiting for remote service");
+ }
+ }
+
+ static int getByteLength(QualifiedName name) {
+ int cnt = 2; // short header
+ for (String s : name) {
+ cnt += getByteLength(s);
+ }
+ return cnt;
+ }
+
+ static int getByteLength(String s) {
+ final int len = s.length();
+ int cnt = 0;
+ for (int i = 0; i < len; i++) {
+ char ch = s.charAt(i);
+ if (ch > 0 && ch <= 0x7f) {
+ cnt ++;
+ } else if (ch <= 0x07ff) {
+ cnt += 2;
+ } else {
+ cnt += 3;
+ }
+ }
+ // null terminator...
+ cnt ++;
+ return cnt;
+ }
+
+ static String getString(final ByteBuffer buffer) {
+ StringBuilder builder = new StringBuilder();
+ int state = 0, a = 0;
+ while (buffer.hasRemaining()) {
+ final int v = buffer.get() & 0xff;
+ switch (state) {
+ case 0: {
+ if (v == 0) {
+ return builder.toString();
+ } else if (v < 128) {
+ builder.append((char) v);
+ } else if (192 <= v && v < 224) {
+ a = v << 6;
+ state = 1;
+ } else if (224 <= v && v < 232) {
+ a = v << 12;
+ state = 2;
+ } else {
+ builder.append('?');
+ }
+ break;
+ }
+ case 1: {
+ if (v == 0) {
+ builder.append('?');
+ return builder.toString();
+ } else if (128 <= v && v < 192) {
+ a |= v & 0x3f;
+ builder.append((char) a);
+ } else {
+ builder.append('?');
+ }
+ state = 0;
+ break;
+ }
+ case 2: {
+ if (v == 0) {
+ builder.append('?');
+ return builder.toString();
+ } else if (128 <= v && v < 192) {
+ a |= (v & 0x3f) << 6;
+ state = 1;
+ } else {
+ builder.append('?');
+ state = 0;
+ }
+ break;
+ }
+ default:
+ throw new IllegalStateException("wrong state");
+ }
+ }
+ return builder.toString();
+ }
+
+ static void putString(final ByteBuffer buffer, final String string) {
+ final int len = string.length();
+ for (int i = 0; i < len; i ++) {
+ char ch = string.charAt(i);
+ if (ch > 0 && ch <= 0x7f) {
+ buffer.put((byte) ch);
+ } else if (ch <= 0x07ff) {
+ buffer.put((byte) (0xc0 | 0x1f & ch >> 6));
+ buffer.put((byte) (0x80 | 0x3f & ch));
+ } else {
+ buffer.put((byte) (0xe0 | 0x0f & ch >> 12));
+ buffer.put((byte) (0x80 | 0x3f & ch >> 6));
+ buffer.put((byte) (0x80 | 0x3f & ch));
+ }
+ }
+ buffer.put((byte) 0);
+ }
+
+ static QualifiedName getQualifiedName(final ByteBuffer buffer) {
+ final int len = buffer.getShort() & 0xffff;
+ final String[] segs = new String[len];
+ for (int i = 0; i < len; i++) {
+ segs[i] = getString(buffer);
+ }
+ return new QualifiedName(segs);
+ }
+
+ static void putQualifiedName(final ByteBuffer buffer, final QualifiedName qualifiedName) {
+ final int len = qualifiedName.length();
+ if (len > 0xffff) {
+ throw new IllegalArgumentException("Qualified name is too long");
+ }
+ buffer.putShort((short) len);
+ for (String seg : qualifiedName) {
+ putString(buffer, seg);
+ }
+ }
+
+ protected void closeAction() {
+ // just to make sure...
+ IoUtils.safeClose(channel);
+ final IndeterminateOutcomeException ioe = new IndeterminateOutcomeException("The connection was closed");
+ // Things running remotely
+ for (ReplyHandler x : remoteRequests.getKeys()) {
+ SpiUtils.safeHandleException(x, ioe);
+ }
+ for (RequestHandler x : remoteClients.getKeys()) {
+ IoUtils.safeClose(x);
+ }
+ for (FutureRemoteRequestHandlerSource future : remoteServices.getKeys()) {
+ future.addNotifier(MultiplexConnection.<RequestHandlerSource>closingNotifier());
+ }
+ // Things running locally
+ for (RemoteRequestContext localRequest : localRequests.getKeys()) {
+ localRequest.cancel();
+ }
+ for (Handle<RequestHandler> client : forwardedClients.getKeys()) {
+ IoUtils.safeClose(client);
+ }
+ for (Handle<RequestHandlerSource> service : forwardedServices.getKeys()) {
+ IoUtils.safeClose(service);
+ }
+ }
+
+ public String toString() {
+ return "multiplex connection <" + Integer.toHexString(hashCode()) + "> on " + channel;
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ private static <T extends Closeable> IoFuture.Notifier<T> closingNotifier() {
+ return (IoFuture.Notifier<T>) CLOSING_NOTIFIER;
+ }
+
+ private static final ClosingNotifier CLOSING_NOTIFIER = new ClosingNotifier();
+
+ private static class ClosingNotifier extends IoFuture.HandlingNotifier<Closeable> {
+ public void handleDone(final Closeable result) {
+ IoUtils.safeClose(result);
+ }
+ }
+}
Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -1,879 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.multiplex;
-
-import org.jboss.xnio.channels.AllocatedMessageChannel;
-import org.jboss.xnio.IoHandler;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.IoUtils;
-import org.jboss.xnio.log.Logger;
-import org.jboss.remoting.spi.RequestHandler;
-import org.jboss.remoting.spi.RequestHandlerSource;
-import org.jboss.remoting.spi.ReplyHandler;
-import org.jboss.remoting.spi.RemoteRequestContext;
-import org.jboss.remoting.spi.Handle;
-import org.jboss.remoting.spi.SpiUtils;
-import org.jboss.remoting.spi.AbstractAutoCloseable;
-import org.jboss.remoting.util.CollectionUtil;
-import org.jboss.remoting.CloseHandler;
-import org.jboss.remoting.Endpoint;
-import org.jboss.remoting.SimpleCloseable;
-import org.jboss.remoting.RemoteExecutionException;
-import org.jboss.remoting.IndeterminateOutcomeException;
-import org.jboss.remoting.ReplyException;
-import org.jboss.remoting.RemoteServiceConfiguration;
-import org.jboss.marshalling.MarshallerFactory;
-import org.jboss.marshalling.Unmarshaller;
-import org.jboss.marshalling.ByteOutput;
-import org.jboss.marshalling.Marshaller;
-import org.jboss.marshalling.MarshallingConfiguration;
-import org.jboss.marshalling.ObjectTable;
-import org.jboss.marshalling.Marshalling;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.nio.ByteBuffer;
-import java.nio.BufferUnderflowException;
-import java.io.IOException;
-import java.io.InvalidClassException;
-import java.io.InterruptedIOException;
-import java.io.InvalidObjectException;
-
-/**
- * Protocol handler for the basic message-oriented Remoting protocol.
- */
-final class MultiplexHandler implements IoHandler<AllocatedMessageChannel> {
-
- private static final Logger log = Logger.getLogger(MultiplexHandler.class);
-
- //--== Connection configuration items ==--
- private final MarshallerFactory marshallerFactory;
- private final MarshallingConfiguration marshallingConfiguration;
- private final int linkMetric;
- private final Executor executor;
- // buffer allocator for outbound message assembly
- private final BufferAllocator<ByteBuffer> allocator;
-
- // running on remote node
- private final IntegerBiMap<ReplyHandler> remoteRequests = IdentityHashIntegerBiMap.createSynchronizing();
- // running on local node
- private final IntegerBiMap<RemoteRequestContext> localRequests = IdentityHashIntegerBiMap.createSynchronizing();
- // sequence for remote requests
- private final AtomicInteger requestSequence = new AtomicInteger();
-
- // clients whose requests get forwarded to the remote side
- // even #s were opened from services forwarded to us (our sequence)
- // odd #s were forwarded directly to us (remote sequence)
- private final IntegerBiMap<RequestHandler> remoteClients = IdentityHashIntegerBiMap.createSynchronizing();
- // forwarded to remote side (handled on this side)
- private final IntegerResourceBiMap<RequestHandler> forwardedClients = IdentityHashIntegerResourceBiMap.createSynchronizing();
- // sequence for forwarded clients (shift left one bit, add one, limit is 2^30)
- private final AtomicInteger forwardedClientSequence = new AtomicInteger();
- // sequence for clients created from services forwarded to us (shift left one bit, limit is 2^30)
- private final AtomicInteger remoteClientSequence = new AtomicInteger();
-
- // services forwarded to us
- private final IntegerBiMap<RequestHandlerSource> remoteServices = IdentityHashIntegerBiMap.createSynchronizing();
- // forwarded to remote side (handled on this side)
- private final IntegerResourceBiMap<RequestHandlerSource> forwardedServices = IdentityHashIntegerResourceBiMap.createSynchronizing();
- // sequence for forwarded services
- private final AtomicInteger forwardedServiceSequence = new AtomicInteger();
-
- private final Endpoint endpoint;
-
- private volatile AllocatedMessageChannel channel;
- private static final StackTraceElement[] emptyStackTraceElements = new StackTraceElement[0];
-
- public MultiplexHandler(final Endpoint endpoint, final MultiplexConfiguration configuration) {
- this.endpoint = endpoint;
- allocator = configuration.getAllocator();
- executor = configuration.getExecutor();
- marshallerFactory = configuration.getMarshallerFactory();
- marshallingConfiguration = configuration.getMarshallingConfiguration();
- linkMetric = configuration.getLinkMetric();
- }
-
- // sequence methods
-
- int nextRequest() {
- return requestSequence.getAndIncrement() & 0x7fffffff;
- }
-
- int nextForwardedClient() {
- return (forwardedClientSequence.getAndIncrement() << 1 | 1) & 0x7fffffff;
- }
-
- int nextRemoteClient() {
- return remoteClientSequence.getAndIncrement() << 1 & 0x7fffffff;
- }
-
- int nextForwardedService() {
- return forwardedServiceSequence.getAndIncrement() & 0x7fffffff;
- }
-
- void setChannel(final AllocatedMessageChannel channel) {
- this.channel = channel;
- }
-
- public void handleOpened(final AllocatedMessageChannel channel) {
- channel.resumeReads();
- }
-
- public void handleReadable(final AllocatedMessageChannel channel) {
- for (;;) try {
- final ByteBuffer buffer;
- try {
- buffer = channel.receive();
- } catch (IOException e) {
- log.error(e, "I/O error in protocol channel; closing channel");
- IoUtils.safeClose(channel);
- return;
- }
- if (buffer == null) {
- // todo release all handles...
- // todo what if the write queue is not empty?
- IoUtils.safeClose(channel);
- return;
- }
- if (! buffer.hasRemaining()) {
- // would block
- channel.resumeReads();
- return;
- }
- final MessageType msgType;
- try {
- msgType = MessageType.getMessageType(buffer.get() & 0xff);
- } catch (IllegalArgumentException ex) {
- log.trace("Received invalid message type");
- return;
- }
- log.trace("Received message %s, type %s", buffer, msgType);
- switch (msgType) {
- case REQUEST: {
- final int clientId = buffer.getInt();
- final Handle<RequestHandler> handle = forwardedClients.get(clientId);
- if (handle == null) {
- log.trace("Request on invalid client ID %d", Integer.valueOf(clientId));
- break;
- }
- final int requestId = buffer.getInt();
- final Object payload;
- try {
- final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
- try {
- unmarshaller.start(Marshalling.createByteInput(buffer));
- try {
- payload = unmarshaller.readObject();
- unmarshaller.finish();
- } catch (ClassNotFoundException e) {
- break;
- }
- } finally {
- IoUtils.safeClose(unmarshaller);
- }
- } catch (Exception ex) {
- // IOException | ClassNotFoundException
- log.trace("Failed to unmarshal a request (%s), sending %s", ex, MessageType.REQUEST_RECEIVE_FAILED);
- try {
- final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
- try {
- List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
- marshaller.start(createByteOutput(allocator, buffers));
- marshaller.write(MessageType.REQUEST_RECEIVE_FAILED.getId());
- final IOException ioe = new IOException("Request receive failed");
- ex.setStackTrace(emptyStackTraceElements);
- ioe.initCause(ex);
- ioe.setStackTrace(emptyStackTraceElements);
- marshaller.writeObject(ioe);
- marshaller.finish();
- registerWriter(channel, new SimpleWriteHandler(allocator, buffers));
- } catch (InterruptedException e1) {
- Thread.currentThread().interrupt();
- log.debug("Remoting channel handler thread interrupted; closing channel");
- IoUtils.safeClose(channel);
- } finally {
- IoUtils.safeClose(marshaller);
- }
- } catch (IOException ioe) {
- log.warn("Failed to send notification of failure to unmarshal a request: %s", ioe);
- }
- break;
- }
- // request received OK
- final RequestHandler requestHandler = handle.getResource();
- requestHandler.receiveRequest(payload, new ReplyHandlerImpl(channel, requestId, allocator));
- break;
- }
- case REPLY: {
- final int requestId = buffer.getInt();
- final ReplyHandler replyHandler = remoteRequests.remove(requestId);
- if (replyHandler == null) {
- log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
- break;
- }
- final Object payload;
- try {
- final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
- try {
- unmarshaller.start(Marshalling.createByteInput(buffer));
- try {
- payload = unmarshaller.readObject();
- unmarshaller.finish();
- } catch (ClassNotFoundException e) {
- replyHandler.handleException(new InvalidClassException("Class not found: " + e.toString()));
- log.trace("Class not found in reply to request ID %d", Integer.valueOf(requestId));
- break;
- }
- } finally {
- IoUtils.safeClose(unmarshaller);
- }
- } catch (IOException ex) {
- log.trace("Failed to unmarshal a reply (%s), sending a ReplyException", ex);
- // todo
- SpiUtils.safeHandleException(replyHandler, new ReplyException("Unmarshal failed", ex));
- break;
- }
- SpiUtils.safeHandleReply(replyHandler, payload);
- break;
- }
- case CANCEL_REQUEST: {
- final int requestId = buffer.getInt();
- final RemoteRequestContext context = localRequests.get(requestId);
- if (context != null) {
- context.cancel();
- }
- break;
- }
- case CANCEL_ACK: {
- final int requestId = buffer.getInt();
- final ReplyHandler replyHandler = remoteRequests.get(requestId);
- if (replyHandler != null) try {
- replyHandler.handleCancellation();
- } catch (IOException e) {
- log.trace("Failed to forward a cancellation acknowledgement (%s)", e);
- }
- break;
- }
- case REQUEST_RECEIVE_FAILED: {
- final int requestId = buffer.getInt();
- final ReplyHandler replyHandler = remoteRequests.remove(requestId);
- if (replyHandler == null) {
- log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
- break;
- }
- final IOException cause;
- try {
- final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
- try {
- unmarshaller.start(Marshalling.createByteInput(buffer));
- cause = (IOException) unmarshaller.readObject();
- unmarshaller.finish();
- } finally {
- IoUtils.safeClose(unmarshaller);
- }
- } catch (IOException e) {
- SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote operation failed; the remote exception could not be read", e));
- break;
- } catch (ClassNotFoundException e) {
- SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote operation failed; the remote exception could not be read", e));
- break;
- }
- SpiUtils.safeHandleException(replyHandler, cause);
- break;
- }
- case REQUEST_FAILED: {
- final int requestId = buffer.getInt();
- final ReplyHandler replyHandler = remoteRequests.remove(requestId);
- if (replyHandler == null) {
- log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
- break;
- }
- final IOException cause;
- try {
- final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
- try {
- unmarshaller.start(Marshalling.createByteInput(buffer));
- try {
- cause = (IOException) unmarshaller.readObject();
- } catch (ClassNotFoundException e) {
- replyHandler.handleException(new InvalidClassException("Class not found: " + e.toString()));
- log.trace("Class not found in exception reply to request ID %d", Integer.valueOf(requestId));
- break;
- } catch (ClassCastException e) {
- SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote request failed (and an unexpected I/O error occurred when attempting to unmarshal the cause)"));
- break;
- }
- } finally {
- IoUtils.safeClose(unmarshaller);
- }
- } catch (IOException ex) {
- log.trace("Failed to unmarshal an exception reply (%s), sending a generic execution exception");
- SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote request failed (and an unexpected I/O error occurred when attempting to read the cause)"));
- break;
- }
- SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote execution failed", cause));
- break;
- }
- case REQUEST_OUTCOME_UNKNOWN: {
- final int requestId = buffer.getInt();
- final ReplyHandler replyHandler = remoteRequests.remove(requestId);
- if (replyHandler == null) {
- log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
- break;
- }
- final String reason = readUTFZ(buffer);
- SpiUtils.safeHandleException(replyHandler, new IndeterminateOutcomeException(reason));
- break;
- }
- case CLIENT_CLOSE: {
- final int clientId = buffer.getInt();
- final Handle<RequestHandler> handle = forwardedClients.remove(clientId);
- if (handle == null) {
- log.warn("Got client close message for unknown client %d", Integer.valueOf(clientId));
- break;
- }
- IoUtils.safeClose(handle);
- break;
- }
- case CLIENT_OPEN: {
- final int serviceId = buffer.getInt();
- final int clientId = buffer.getInt();
- final Handle<RequestHandlerSource> handle = forwardedServices.get(serviceId);
- if (handle == null) {
- log.warn("Received client open message for unknown service %d", Integer.valueOf(serviceId));
- break;
- }
- try {
- final RequestHandlerSource requestHandlerSource = handle.getResource();
- final Handle<RequestHandler> clientHandle = requestHandlerSource.createRequestHandler();
- // todo check for duplicate
- // todo validate the client ID
- log.trace("Opening client %d from service %d", Integer.valueOf(clientId), Integer.valueOf(serviceId));
- forwardedClients.put(clientId, clientHandle);
- } catch (IOException ex) {
- log.error(ex, "Failed to create a request handler for client ID %d", Integer.valueOf(clientId));
- break;
- } finally {
- IoUtils.safeClose(handle);
- }
- break;
- }
- case SERVICE_CLOSE: {
- final Handle<RequestHandlerSource> handle = forwardedServices.remove(buffer.getInt());
- if (handle == null) {
- break;
- }
- IoUtils.safeClose(handle);
- break;
- }
- case SERVICE_ADVERTISE: {
- final int serviceId = buffer.getInt();
- final String serviceType = readUTFZ(buffer);
- final String groupName = readUTFZ(buffer);
- final String endpointName = readUTFZ(buffer);
- final int baseMetric = buffer.getInt();
- int id = -1;
- final RequestHandlerSource handlerSource = new RequestHandlerSourceImpl(allocator, id);
- final int calcMetric = baseMetric + linkMetric;
- if (calcMetric > 0) {
- try {
- final RemoteServiceConfiguration config = new RemoteServiceConfiguration();
- config.setServiceType(serviceType);
- config.setGroupName(groupName);
- config.setEndpointName(endpointName);
- final SimpleCloseable closeable = endpoint.registerRemoteService(config);
- // todo - something with that closeable
- } catch (IOException e) {
- log.error(e, "Unable to register remote service");
- }
- }
- break;
- }
- case SERVICE_UNADVERTISE: {
- final int serviceId = buffer.getInt();
- IoUtils.safeClose(remoteServices.get(serviceId));
- break;
- }
- default: {
- log.error("Malformed packet received (invalid message type %s)", msgType);
- }
- }
- } catch (BufferUnderflowException e) {
- log.error("Malformed packet received (buffer underflow)");
- }
- }
-
- public void handleWritable(final AllocatedMessageChannel channel) {
- for (;;) {
- final WriteHandler handler = outputQueue.peek();
- if (handler == null) {
- return;
- }
- try {
- if (handler.handleWrite(channel)) {
- log.trace("Handled write with handler %s", handler);
- pending.decrementAndGet();
- outputQueue.remove();
- } else {
- channel.resumeWrites();
- return;
- }
- } catch (Throwable t) {
- pending.decrementAndGet();
- outputQueue.remove();
- }
- }
- }
-
- public void handleClosed(final AllocatedMessageChannel channel) {
- }
-
- RequestHandlerSource getRemoteService(final int id) {
- return new RequestHandlerSourceImpl(allocator, id);
- }
-
- private final class ReplyHandlerImpl implements ReplyHandler {
-
- private final AllocatedMessageChannel channel;
- private final int requestId;
- private final BufferAllocator<ByteBuffer> allocator;
-
- private ReplyHandlerImpl(final AllocatedMessageChannel channel, final int requestId, final BufferAllocator<ByteBuffer> allocator) {
- if (channel == null) {
- throw new NullPointerException("channel is null");
- }
- if (allocator == null) {
- throw new NullPointerException("allocator is null");
- }
- this.channel = channel;
- this.requestId = requestId;
- this.allocator = allocator;
- }
-
- public void handleReply(final Object reply) throws IOException {
- ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.REPLY.getId());
- buffer.putInt(requestId);
- try {
- final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
- try {
- final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
- final ByteOutput output = createByteOutput(allocator, bufferList);
- try {
- marshaller.start(output);
- marshaller.writeObject(reply);
- marshaller.close();
- output.close();
- registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
- } finally {
- IoUtils.safeClose(output);
- }
- } finally {
- IoUtils.safeClose(marshaller);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException("Reply handler thread interrupted before a reply could be sent");
- }
- }
-
- public void handleException(final IOException exception) throws IOException {
- try {
- final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
- try {
- final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
- final ByteOutput output = createByteOutput(allocator, bufferList);
- try {
- marshaller.write(MessageType.REQUEST_FAILED.getId());
- marshaller.writeInt(requestId);
- marshaller.start(output);
- marshaller.writeObject(exception);
- marshaller.close();
- output.close();
- registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
- } finally {
- IoUtils.safeClose(output);
- }
- } finally {
- IoUtils.safeClose(marshaller);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException("Reply handler thread interrupted before an exception could be sent");
- }
- }
-
- public void handleCancellation() throws InterruptedIOException {
- final ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.CANCEL_ACK.getId());
- buffer.putInt(requestId);
- buffer.flip();
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException("Reply handler thread interrupted before cancellation could be sent");
- }
- }
- }
-
- // Writer members
-
- private final BlockingQueue<WriteHandler> outputQueue = CollectionUtil.blockingQueue(64);
- private final AtomicInteger pending = new AtomicInteger();
-
- private void registerWriter(final AllocatedMessageChannel channel, final WriteHandler writeHandler) throws InterruptedException {
- outputQueue.put(writeHandler);
- if (pending.getAndIncrement() == 0) {
- channel.resumeWrites();
- }
- }
-
- // Reader utils
-
- private String readUTFZ(ByteBuffer buffer) {
- StringBuilder builder = new StringBuilder();
- int state = 0, a = 0;
- while (buffer.hasRemaining()) {
- final int v = buffer.get() & 0xff;
- switch (state) {
- case 0: {
- if (v == 0) {
- return builder.toString();
- } else if (v < 128) {
- builder.append((char) v);
- } else if (192 <= v && v < 224) {
- a = v << 6;
- state = 1;
- } else if (224 <= v && v < 232) {
- a = v << 12;
- state = 2;
- } else {
- builder.append('?');
- }
- break;
- }
- case 1: {
- if (v == 0) {
- builder.append('?');
- return builder.toString();
- } else if (128 <= v && v < 192) {
- a |= v & 0x3f;
- builder.append((char) a);
- } else {
- builder.append('?');
- }
- state = 0;
- break;
- }
- case 2: {
- if (v == 0) {
- builder.append('?');
- return builder.toString();
- } else if (128 <= v && v < 192) {
- a |= (v & 0x3f) << 6;
- state = 1;
- } else {
- builder.append('?');
- state = 0;
- }
- break;
- }
- default:
- throw new IllegalStateException("wrong state");
- }
- }
- return builder.toString();
- }
-
- // client endpoint
-
- private final class RequestHandlerImpl extends AbstractAutoCloseable<RequestHandler> implements RequestHandler {
-
- private final int identifier;
- private final BufferAllocator<ByteBuffer> allocator;
-
- public RequestHandlerImpl(final int identifier, final BufferAllocator<ByteBuffer> allocator) {
- super(executor);
- if (allocator == null) {
- throw new NullPointerException("allocator is null");
- }
- this.identifier = identifier;
- this.allocator = allocator;
- addCloseHandler(new CloseHandler<RequestHandler>() {
- public void handleClose(final RequestHandler closed) {
- remoteClients.remove(identifier);
- ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.CLIENT_CLOSE.getId());
- buffer.putInt(identifier);
- buffer.flip();
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
- } catch (InterruptedException e) {
- log.warn("Client close notification was interrupted before it could be sent");
- }
- }
- });
- }
-
- public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler handler) {
- log.trace("Sending outbound request of type %s", request == null ? "null" : request.getClass());
- try {
- final List<ByteBuffer> bufferList;
- final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
- try {
- bufferList = new ArrayList<ByteBuffer>();
- final ByteOutput output = createByteOutput(allocator, bufferList);
- try {
- marshaller.write(MessageType.REQUEST.getId());
- marshaller.writeInt(identifier);
-
- final int id = nextRequest();
- remoteRequests.put(id, handler);
- marshaller.writeInt(id);
- marshaller.writeObject(request);
- marshaller.close();
- output.close();
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- executor.execute(new Runnable() {
- public void run() {
- SpiUtils.safeHandleCancellation(handler);
- }
- });
- return SpiUtils.getBlankRemoteRequestContext();
- }
- log.trace("Sent request %s", request);
- return new RemoteRequestContextImpl(id, allocator, channel);
- } finally {
- IoUtils.safeClose(output);
- }
- } finally {
- IoUtils.safeClose(marshaller);
- }
- } catch (final IOException t) {
- log.trace(t, "receiveRequest failed with an exception");
- executor.execute(new Runnable() {
- public void run() {
- SpiUtils.safeHandleException(handler, t);
- }
- });
- return SpiUtils.getBlankRemoteRequestContext();
- }
- }
-
- public String toString() {
- return "forwarding request handler <" + Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
- }
- }
-
- public final class RemoteRequestContextImpl implements RemoteRequestContext {
-
- private final BufferAllocator<ByteBuffer> allocator;
- private final int id;
- private final AllocatedMessageChannel channel;
-
- public RemoteRequestContextImpl(final int id, final BufferAllocator<ByteBuffer> allocator, final AllocatedMessageChannel channel) {
- this.id = id;
- this.allocator = allocator;
- this.channel = channel;
- }
-
- public void cancel() {
- try {
- final ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.CANCEL_REQUEST.getId());
- buffer.putInt(id);
- buffer.flip();
- registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
- } catch (InterruptedException e) {
- // todo log that cancel attempt failed
- Thread.currentThread().interrupt();
- } catch (Throwable t) {
- // todo log that cancel attempt failed
- }
- }
- }
-
- public final class RequestHandlerSourceImpl extends AbstractAutoCloseable<RequestHandlerSource> implements RequestHandlerSource {
-
- private final BufferAllocator<ByteBuffer> allocator;
- private final int identifier;
-
- protected RequestHandlerSourceImpl(final BufferAllocator<ByteBuffer> allocator, final int identifier) {
- super(executor);
- this.allocator = allocator;
- this.identifier = identifier;
- addCloseHandler(new CloseHandler<RequestHandlerSource>() {
- public void handleClose(final RequestHandlerSource closed) {
- ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.SERVICE_CLOSE.getId());
- buffer.putInt(identifier);
- buffer.flip();
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
- } catch (InterruptedException e) {
- log.warn("Service close notification was interrupted before it could be sent");
- }
- }
- });
- }
-
- public Handle<RequestHandler> createRequestHandler() throws IOException {
- final int id = nextRemoteClient();
- final RequestHandler requestHandler = new RequestHandlerImpl(id, MultiplexHandler.this.allocator);
- remoteClients.put(id, requestHandler);
- final ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.CLIENT_OPEN.getId());
- buffer.putInt(identifier);
- buffer.putInt(id);
- buffer.flip();
- // todo - probably should bail out if we're interrupted?
- boolean intr = false;
- for (;;) {
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
- try {
- return new RequestHandlerImpl(id, allocator).getHandle();
- } finally {
- if (intr) {
- Thread.currentThread().interrupt();
- }
- }
- } catch (InterruptedException e) {
- intr = true;
- }
- }
- }
-
- public String toString() {
- return "forwarding request handler source <" + Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
- }
- }
-
- public static ByteOutput createByteOutput(final BufferAllocator<ByteBuffer> allocator, final Collection<ByteBuffer> target) {
- return new ByteOutput() {
- private ByteBuffer current;
-
- private ByteBuffer getCurrent() {
- final ByteBuffer buffer = current;
- return buffer == null ? (current = allocator.allocate()) : buffer;
- }
-
- public void write(final int i) {
- final ByteBuffer buffer = getCurrent();
- buffer.put((byte) i);
- if (! buffer.hasRemaining()) {
- buffer.flip();
- target.add(buffer);
- current = null;
- }
- }
-
- public void write(final byte[] bytes) {
- write(bytes, 0, bytes.length);
- }
-
- public void write(final byte[] bytes, int offs, int len) {
- while (len > 0) {
- final ByteBuffer buffer = getCurrent();
- final int c = Math.min(len, buffer.remaining());
- buffer.put(bytes, offs, c);
- offs += c;
- len -= c;
- if (! buffer.hasRemaining()) {
- buffer.flip();
- target.add(buffer);
- current = null;
- }
- }
- }
-
- public void close() {
- flush();
- }
-
- public void flush() {
- final ByteBuffer buffer = current;
- if (buffer != null) {
- buffer.flip();
- target.add(buffer);
- current = null;
- }
- }
- };
- }
-
- private final ProtocolObjectTableWriter protocolObjectTableWriter = new ProtocolObjectTableWriter();
-
- public class ProtocolObjectTableWriter implements ObjectTable.Writer {
-
- public void writeObject(final Marshaller marshaller, final Object o) throws IOException {
- final RequestHandler requestHandler = (RequestHandler) o;
- final int existingId = forwardedClients.get(requestHandler, -1);
- marshaller.write(1);
- if (existingId == -1) {
- final int newId = nextForwardedClient();
- forwardedClients.put(newId, requestHandler.getHandle());
- marshaller.writeInt(newId);
- } else {
- marshaller.writeInt(existingId);
- }
- }
- }
-
- public class ProtocolObjectTable implements ObjectTable {
-
- public Writer getObjectWriter(final Object o) throws IOException {
- if (o instanceof RequestHandler) {
- return protocolObjectTableWriter;
- } else {
- return null;
- }
- }
-
- public Object readObject(final Unmarshaller unmarshaller) throws IOException, ClassNotFoundException {
- switch (unmarshaller.readByte()) {
- case 1: {
- // remote client
- final int id = unmarshaller.readInt();
- return remoteClients.get(id);
- }
- case 2: {
- // remote client source
- final int id = unmarshaller.readInt();
- return remoteServices.get(id);
- }
- default: {
- // invalid
- throw new InvalidObjectException("Invalid ID sent for protocol object table");
- }
- }
- }
- }
-}
Modified: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -22,12 +22,7 @@
package org.jboss.remoting.protocol.multiplex;
-import org.jboss.remoting.RemotingException;
-import org.jboss.remoting.SimpleCloseable;
import org.jboss.remoting.Endpoint;
-import org.jboss.remoting.spi.RequestHandlerSource;
-import org.jboss.remoting.spi.Handle;
-import org.jboss.remoting.spi.AbstractSimpleCloseable;
import org.jboss.xnio.IoHandlerFactory;
import org.jboss.xnio.ChannelSource;
import org.jboss.xnio.IoFuture;
@@ -35,7 +30,6 @@
import org.jboss.xnio.IoHandler;
import org.jboss.xnio.channels.AllocatedMessageChannel;
import java.io.IOException;
-import java.util.concurrent.Executor;
/**
*
@@ -55,7 +49,7 @@
public static IoHandlerFactory<AllocatedMessageChannel> createServer(final Endpoint endpoint, final MultiplexConfiguration configuration) {
return new IoHandlerFactory<AllocatedMessageChannel>() {
public IoHandler<? super AllocatedMessageChannel> createHandler() {
- return new MultiplexHandler(endpoint, configuration);
+ return new SimpleMultiplexHandler(endpoint, configuration);
}
};
}
@@ -69,29 +63,13 @@
* @return a handle which may be used to close the connection
* @throws IOException if an error occurs
*/
- public static IoFuture<SimpleCloseable> connect(final Endpoint endpoint, final MultiplexConfiguration configuration, final ChannelSource<AllocatedMessageChannel> channelSource) throws IOException {
- final MultiplexHandler multiplexHandler = new MultiplexHandler(endpoint, configuration);
- final IoFuture<AllocatedMessageChannel> futureChannel = channelSource.open(multiplexHandler);
- return new AbstractConvertingIoFuture<SimpleCloseable, AllocatedMessageChannel>(futureChannel) {
- protected SimpleCloseable convert(final AllocatedMessageChannel channel) throws RemotingException {
- return new AbstractConnection(configuration.getExecutor()) {
- // todo - this method is not called by anyone?
- public Handle<RequestHandlerSource> getServiceForId(final int id) throws IOException {
- return multiplexHandler.getRemoteService(id).getHandle();
- }
- };
+ public static IoFuture<MultiplexConnection> connect(final Endpoint endpoint, final MultiplexConfiguration configuration, final ChannelSource<AllocatedMessageChannel> channelSource) throws IOException {
+ final SimpleMultiplexHandler handler = new SimpleMultiplexHandler(endpoint, configuration);
+ final IoFuture<AllocatedMessageChannel> futureChannel = channelSource.open(handler);
+ return new AbstractConvertingIoFuture<MultiplexConnection, AllocatedMessageChannel>(futureChannel) {
+ protected MultiplexConnection convert(final AllocatedMessageChannel channel) throws IOException {
+ return handler.getConnection().get();
}
};
}
-
- private abstract static class AbstractConnection extends AbstractSimpleCloseable {
-
- protected AbstractConnection(final Executor executor) {
- super(executor);
- }
-
- public String toString() {
- return "Remoting multiplex connection <" + Integer.toString(hashCode()) + ">";
- }
- }
}
Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReadHandler.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReadHandler.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReadHandler.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -0,0 +1,378 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.xnio.IoReadHandler;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.Buffers;
+import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.remoting.spi.RequestHandler;
+import org.jboss.remoting.spi.Handle;
+import org.jboss.remoting.spi.ReplyHandler;
+import org.jboss.remoting.spi.SpiUtils;
+import org.jboss.remoting.spi.RemoteRequestContext;
+import org.jboss.remoting.spi.RequestHandlerSource;
+import org.jboss.remoting.ReplyException;
+import org.jboss.remoting.RemoteExecutionException;
+import org.jboss.remoting.ServiceRegistrationException;
+import org.jboss.remoting.util.QualifiedName;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.marshalling.Marshalling;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.MarshallingConfiguration;
+import java.nio.ByteBuffer;
+import java.nio.BufferUnderflowException;
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ *
+ */
+public final class MultiplexReadHandler implements IoReadHandler<AllocatedMessageChannel> {
+
+ private static final Logger log = Logger.getLogger("org.jboss.remoting.multiplex");
+ private static final StackTraceElement[] emptyStackTraceElements = new StackTraceElement[0];
+ private final MultiplexConnection connection;
+
+ public MultiplexReadHandler(final MultiplexConnection connection) {
+ this.connection = connection;
+ }
+
+ public void handleReadable(final AllocatedMessageChannel channel) {
+ final MultiplexConnection connection = this.connection;
+ final MarshallerFactory marshallerFactory = connection.getMarshallerFactory();
+ final MarshallingConfiguration marshallingConfiguration = connection.getMarshallingConfiguration();
+ for (;;) try {
+ final ByteBuffer buffer;
+ try {
+ buffer = channel.receive();
+ } catch (IOException e) {
+ log.error(e, "I/O error in protocol channel; closing channel");
+ IoUtils.safeClose(channel);
+ return;
+ }
+ if (buffer == null) {
+ IoUtils.safeClose(channel);
+ return;
+ }
+ if (! buffer.hasRemaining()) {
+ // would block
+ channel.resumeReads();
+ return;
+ }
+ final MessageType msgType;
+ try {
+ msgType = MessageType.getMessageType(buffer.get() & 0xff);
+ } catch (IllegalArgumentException ex) {
+ log.trace("Received invalid message type");
+ return;
+ }
+ log.trace("Received message type %s; dump:\n%s", msgType, Buffers.createDumper(buffer, 8, 1));
+ switch (msgType) {
+ case REQUEST: {
+ final int clientId = buffer.getInt();
+ final Handle<RequestHandler> handle = connection.getForwardedClient(clientId);
+ if (handle == null) {
+ log.trace("Request on invalid client ID %d", Integer.valueOf(clientId));
+ break;
+ }
+ final int requestId = buffer.getInt();
+ final Object payload;
+ try {
+ final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
+ try {
+ unmarshaller.start(Marshalling.createByteInput(buffer));
+ payload = unmarshaller.readObject();
+ unmarshaller.finish();
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (Exception ex) {
+ // IOException | ClassNotFoundException
+ log.trace("Failed to unmarshal a request (%s), sending %s", ex, MessageType.REQUEST_RECEIVE_FAILED);
+ try {
+ final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
+ try {
+ List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+ marshaller.start(new BufferByteOutput(connection.getAllocator(), buffers));
+ marshaller.write(MessageType.REQUEST_RECEIVE_FAILED.getId());
+ ex.setStackTrace(emptyStackTraceElements);
+ final IOException ioe = new IOException("Request receive failed");
+ ioe.initCause(ex);
+ ioe.setStackTrace(emptyStackTraceElements);
+ marshaller.writeObject(ioe);
+ marshaller.finish();
+ connection.doBlockingWrite(buffers);
+ } finally {
+ IoUtils.safeClose(marshaller);
+ }
+ } catch (IOException ioe) {
+ log.warn("Failed to send notification of failure to unmarshal a request: %s", ioe);
+ }
+ break;
+ }
+ // request received OK
+ final RequestHandler requestHandler = handle.getResource();
+ requestHandler.receiveRequest(payload, new MultiplexReplyHandler(requestId, connection));
+ break;
+ }
+ case REPLY: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler replyHandler = connection.removeRemoteRequest(requestId);
+ if (replyHandler == null) {
+ log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
+ break;
+ }
+ final Object payload;
+ try {
+ final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
+ try {
+ unmarshaller.start(Marshalling.createByteInput(buffer));
+ payload = unmarshaller.readObject();
+ unmarshaller.finish();
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (Exception ex) {
+ // IOException | ClassNotFoundException
+ log.trace("Failed to unmarshal a reply (%s), sending a ReplyException", ex);
+ SpiUtils.safeHandleException(replyHandler, new ReplyException("Unmarshal failed", ex));
+ break;
+ }
+ SpiUtils.safeHandleReply(replyHandler, payload);
+ break;
+ }
+ case CANCEL_REQUEST: {
+ final int requestId = buffer.getInt();
+ final RemoteRequestContext context = connection.getLocalRequest(requestId);
+ if (context != null) {
+ context.cancel();
+ }
+ break;
+ }
+ case CANCEL_ACK: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler replyHandler = connection.getRemoteRequest(requestId);
+ if (replyHandler != null) {
+ SpiUtils.safeHandleCancellation(replyHandler);
+ }
+ break;
+ }
+ case REQUEST_RECEIVE_FAILED: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler replyHandler = connection.removeRemoteRequest(requestId);
+ if (replyHandler == null) {
+ log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
+ break;
+ }
+ final IOException cause;
+ try {
+ final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
+ try {
+ unmarshaller.start(Marshalling.createByteInput(buffer));
+ cause = (IOException) unmarshaller.readObject();
+ unmarshaller.finish();
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (IOException e) {
+ SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote operation failed; the remote exception could not be read", e));
+ break;
+ } catch (ClassNotFoundException e) {
+ SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote operation failed; the remote exception could not be read", e));
+ break;
+ }
+ SpiUtils.safeHandleException(replyHandler, cause);
+ break;
+ }
+ case REQUEST_FAILED: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler replyHandler = connection.removeRemoteRequest(requestId);
+ if (replyHandler == null) {
+ log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
+ break;
+ }
+ final IOException cause;
+ try {
+ final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
+ try {
+ unmarshaller.start(Marshalling.createByteInput(buffer));
+ try {
+ cause = (IOException) unmarshaller.readObject();
+ } catch (ClassNotFoundException e) {
+ SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote request failed (and an ClassNotFoundException occurred when attempting to unmarshal the cause)"));
+ log.trace(e, "Class not found in exception reply to request ID %d", Integer.valueOf(requestId));
+ break;
+ } catch (ClassCastException e) {
+ SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote request failed (and an ClassCastException occurred when attempting to unmarshal the cause)"));
+ log.trace(e, "Class cast exception in exception reply to request ID %d", Integer.valueOf(requestId));
+ break;
+ }
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (IOException ex) {
+ log.trace("Failed to unmarshal an exception reply (%s), sending a generic execution exception");
+ SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote request failed (and an unexpected I/O error occurred when attempting to read the cause)"));
+ break;
+ }
+ SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote execution failed", cause));
+ break;
+ }
+ case CLIENT_CLOSE: {
+ final int clientId = buffer.getInt();
+ final Handle<RequestHandler> handle = connection.removeForwardedClient(clientId);
+ if (handle == null) {
+ log.warn("Got client close message for unknown client %d", Integer.valueOf(clientId));
+ break;
+ }
+ IoUtils.safeClose(handle);
+ break;
+ }
+ case CLIENT_OPEN: {
+ final int serviceId = buffer.getInt();
+ final int clientId = buffer.getInt();
+ final Handle<RequestHandlerSource> handle = connection.getForwardedService(serviceId);
+ if (handle == null) {
+ log.warn("Received client open message for unknown service %d", Integer.valueOf(serviceId));
+ break;
+ }
+ try {
+ final RequestHandlerSource requestHandlerSource = handle.getResource();
+ final Handle<RequestHandler> clientHandle = requestHandlerSource.createRequestHandler();
+ log.trace("Opening client %d from service %d", Integer.valueOf(clientId), Integer.valueOf(serviceId));
+ connection.addForwardedClient(clientId, clientHandle);
+ } catch (IOException ex) {
+ log.error(ex, "Failed to create a request handler for client ID %d", Integer.valueOf(clientId));
+ break;
+ } finally {
+ IoUtils.safeClose(handle);
+ }
+ break;
+ }
+ case SERVICE_OPEN_REQUEST: {
+ final int serviceId = buffer.getInt();
+ final QualifiedName qualifiedName = MultiplexConnection.getQualifiedName(buffer);
+ final Handle<RequestHandlerSource> service = connection.getService(qualifiedName);
+ if (service == null) {
+ ByteBuffer replyBuffer = ByteBuffer.allocate(5);
+ replyBuffer.put((byte) MessageType.SERVICE_OPEN_NOT_FOUND.getId());
+ replyBuffer.putInt(serviceId);
+ replyBuffer.flip();
+ try {
+ connection.doBlockingWrite(replyBuffer);
+ } catch (IOException e) {
+ log.error(e, "Failed to send an error reply to an invalid service open request");
+ }
+ break;
+ }
+ final Handle<RequestHandlerSource> ourHandle;
+ try {
+ ourHandle = service.getResource().getHandle();
+ } catch (IOException e) {
+ log.error("Failed to acquire a handle to registered service: %s", e);
+ ByteBuffer replyBuffer = ByteBuffer.allocate(5);
+ replyBuffer.put((byte) MessageType.SERVICE_OPEN_FAILED.getId());
+ replyBuffer.putInt(serviceId);
+ replyBuffer.flip();
+ try {
+ connection.doBlockingWrite(replyBuffer);
+ } catch (IOException e2) {
+ log.trace(e, "Failed to send an exception reply to a service open request");
+ }
+ break;
+ }
+ connection.addForwadedService(serviceId, ourHandle);
+ ByteBuffer replyBuffer = ByteBuffer.allocate(5);
+ replyBuffer.put((byte) MessageType.SERVICE_OPEN_REPLY.getId());
+ replyBuffer.putInt(serviceId);
+ replyBuffer.flip();
+ try {
+ connection.doBlockingWrite(replyBuffer);
+ } catch (IOException e) {
+ log.trace(e, "Failed to send a reply to a service open request");
+ }
+ break;
+ }
+ case SERVICE_OPEN_FAILED:
+ case SERVICE_OPEN_NOT_FOUND:
+ case SERVICE_OPEN_FORBIDDEN: {
+ final int serviceId = buffer.getInt();
+ final FutureRemoteRequestHandlerSource future = connection.removeFutureRemoteService(serviceId);
+ if (future == null) {
+ log.trace("Service open failure reply received for unknown service ID %d", Integer.valueOf(serviceId));
+ break;
+ }
+ future.setException(
+ msgType == MessageType.SERVICE_OPEN_NOT_FOUND ? new ServiceRegistrationException("Service not found") :
+ msgType == MessageType.SERVICE_OPEN_FORBIDDEN ? new ServiceRegistrationException("Service open forbidden") :
+ new ServiceRegistrationException("Service open failed")
+ );
+ break;
+ }
+ case SERVICE_OPEN_REPLY: {
+ final int serviceId = buffer.getInt();
+ final FutureRemoteRequestHandlerSource future = connection.getFutureRemoteService(serviceId);
+ if (future == null) {
+ log.trace("Service open reply received for unknown service ID %d", Integer.valueOf(serviceId));
+ break;
+ }
+ final MultiplexRequestHandlerSource requestHandlerSource = new MultiplexRequestHandlerSource(serviceId, connection);
+ future.setResult(requestHandlerSource);
+ break;
+ }
+ case SERVICE_CLOSE_NOTIFY: {
+ final int serviceId = buffer.getInt();
+ final FutureRemoteRequestHandlerSource future = connection.removeFutureRemoteService(serviceId);
+ future.addNotifier(new IoFuture.HandlingNotifier<RequestHandlerSource>() {
+ public void handleDone(final RequestHandlerSource result) {
+ IoUtils.safeClose(result);
+ }
+ });
+ break;
+ }
+ case SERVICE_CLOSE_REQUEST: {
+ final int serviceId = buffer.getInt();
+ final Handle<RequestHandlerSource> handle = connection.removeForwardedService(serviceId);
+ if (handle == null) {
+ log.trace("Received service close request on unknown ID %d", Integer.valueOf(serviceId));
+ break;
+ }
+ IoUtils.safeClose(handle);
+ break;
+ }
+ default: {
+ log.error("Malformed packet received (invalid message type %s)", msgType);
+ }
+ case CONNECTION_CLOSE:
+ break;
+ }
+ } catch (BufferUnderflowException e) {
+ log.error(e, "Malformed packet received (buffer underflow)");
+ }
+ }
+}
Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReplyHandler.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReplyHandler.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReplyHandler.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -0,0 +1,98 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.remoting.spi.ReplyHandler;
+import org.jboss.xnio.IoUtils;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.ByteOutput;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ *
+ */
+final class MultiplexReplyHandler implements ReplyHandler {
+
+ private final int requestId;
+ private final MultiplexConnection connection;
+
+ MultiplexReplyHandler(final int requestId, final MultiplexConnection connection) {
+ this.requestId = requestId;
+ this.connection = connection;
+ }
+
+ public void handleReply(final Object reply) throws IOException {
+ final MultiplexConnection connection = this.connection;
+ final Marshaller marshaller = connection.getMarshallerFactory().createMarshaller(connection.getMarshallingConfiguration());
+ try {
+ final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
+ final ByteOutput output = new BufferByteOutput(connection.getAllocator(), bufferList);
+ try {
+ marshaller.start(output);
+ marshaller.write(MessageType.REPLY.getId());
+ marshaller.writeInt(requestId);
+ marshaller.writeObject(reply);
+ marshaller.close();
+ output.close();
+ connection.doBlockingWrite(bufferList);
+ } finally {
+ IoUtils.safeClose(output);
+ }
+ } finally {
+ IoUtils.safeClose(marshaller);
+ }
+ }
+
+ public void handleException(final IOException exception) throws IOException {
+ final MultiplexConnection connection = this.connection;
+ final Marshaller marshaller = connection.getMarshallerFactory().createMarshaller(connection.getMarshallingConfiguration());
+ try {
+ final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
+ final ByteOutput output = new BufferByteOutput(connection.getAllocator(), bufferList);
+ try {
+ marshaller.start(output);
+ marshaller.write(MessageType.REQUEST_FAILED.getId());
+ marshaller.writeInt(requestId);
+ marshaller.writeObject(exception);
+ marshaller.close();
+ output.close();
+ connection.doBlockingWrite(bufferList);
+ } finally {
+ IoUtils.safeClose(output);
+ }
+ } finally {
+ IoUtils.safeClose(marshaller);
+ }
+ }
+
+ public void handleCancellation() throws IOException {
+ final ByteBuffer buffer = ByteBuffer.allocate(5);
+ buffer.put((byte) MessageType.CANCEL_ACK.getId());
+ buffer.putInt(requestId);
+ buffer.flip();
+ connection.doBlockingWrite(buffer);
+ }
+}
Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandler.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandler.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandler.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -0,0 +1,136 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.remoting.spi.RequestHandler;
+import org.jboss.remoting.spi.AbstractAutoCloseable;
+import org.jboss.remoting.spi.RemoteRequestContext;
+import org.jboss.remoting.spi.ReplyHandler;
+import org.jboss.remoting.spi.SpiUtils;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.ByteOutput;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.io.IOException;
+
+/**
+ *
+ */
+final class MultiplexRequestHandler extends AbstractAutoCloseable<RequestHandler> implements RequestHandler {
+ private static final Logger log = Logger.getLogger("org.jboss.remoting.multiplex.request-handler");
+
+ private final int identifier;
+ private final BufferAllocator<ByteBuffer> allocator;
+ private final MultiplexConnection connection;
+
+ public MultiplexRequestHandler(final int identifier, final MultiplexConnection connection) {
+ super(connection.getExecutor());
+ this.connection = connection;
+ this.identifier = identifier;
+ allocator = connection.getAllocator();
+ }
+
+ @Override
+ protected void closeAction() throws IOException {
+ connection.removeRemoteClient(identifier);
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.CLIENT_CLOSE.getId());
+ buffer.putInt(identifier);
+ buffer.flip();
+ connection.doBlockingWrite(buffer);
+ }
+
+ public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler handler) {
+ log.trace("Sending outbound request of type %s", request == null ? "null" : request.getClass());
+ final List<ByteBuffer> bufferList;
+ final MultiplexConnection connection = this.connection;
+ try {
+ final Marshaller marshaller = connection.getMarshallerFactory().createMarshaller(connection.getMarshallingConfiguration());
+ try {
+ bufferList = new ArrayList<ByteBuffer>();
+ final ByteOutput output = new BufferByteOutput(allocator, bufferList);
+ try {
+ marshaller.start(output);
+ marshaller.write(MessageType.REQUEST.getId());
+ marshaller.writeInt(identifier);
+ final int id = connection.nextRequest();
+ connection.addRemoteRequest(id, handler);
+ marshaller.writeInt(id);
+ marshaller.writeObject(request);
+ marshaller.close();
+ output.close();
+ connection.doBlockingWrite(bufferList);
+ log.trace("Sent request %s", request);
+ return new RemoteRequestContextImpl(id, connection);
+ } finally {
+ IoUtils.safeClose(output);
+ }
+ } finally {
+ IoUtils.safeClose(marshaller);
+ }
+ } catch (final IOException t) {
+ log.trace(t, "receiveRequest failed with an exception");
+ SpiUtils.safeHandleException(handler, t);
+ return SpiUtils.getBlankRemoteRequestContext();
+ }
+ }
+
+ public String toString() {
+ return "forwarding request handler <" + Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
+ }
+}
+
+final class RemoteRequestContextImpl implements RemoteRequestContext {
+
+ private static final Logger log = Logger.getLogger("org.jboss.remoting.multiplex.requesthandler.context");
+
+ private final int id;
+ private final MultiplexConnection connection;
+ private final AtomicBoolean cancelSent = new AtomicBoolean();
+
+ public RemoteRequestContextImpl(final int id, final MultiplexConnection connection) {
+ this.id = id;
+ this.connection = connection;
+ }
+
+ public void cancel() {
+ if (! cancelSent.getAndSet(true)) try {
+ final ByteBuffer buffer = ByteBuffer.allocate(5);
+ buffer.put((byte) MessageType.CANCEL_REQUEST.getId());
+ buffer.putInt(id);
+ buffer.flip();
+ connection.doBlockingWrite(buffer);
+ } catch (Throwable t) {
+ log.warn("Sending cancel request failed: %s", t);
+ }
+ }
+
+ public String toString() {
+ return "remote request context (multiplex) <" + Integer.toString(hashCode(), 16) + "> (id = " + id + ")";
+ }
+}
Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandlerSource.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandlerSource.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandlerSource.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -0,0 +1,90 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.remoting.spi.RequestHandlerSource;
+import org.jboss.remoting.spi.AbstractAutoCloseable;
+import org.jboss.remoting.spi.RequestHandler;
+import org.jboss.remoting.spi.Handle;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+
+/**
+ *
+ */
+final class MultiplexRequestHandlerSource extends AbstractAutoCloseable<RequestHandlerSource> implements RequestHandlerSource {
+
+ private static final Logger log = Logger.getLogger("org.jboss.remoting.multiplex.request-handler-source");
+
+ private final int identifier;
+ private final MultiplexConnection connection;
+
+ MultiplexRequestHandlerSource(final int identifier, final MultiplexConnection connection) {
+ super(connection.getExecutor());
+ this.connection = connection;
+ this.identifier = identifier;
+ }
+
+ @Override
+ protected void closeAction() throws IOException {
+ ByteBuffer buffer = ByteBuffer.allocate(5);
+ buffer.put((byte) MessageType.SERVICE_CLOSE_REQUEST.getId());
+ buffer.putInt(identifier);
+ buffer.flip();
+ connection.doBlockingWrite(buffer);
+ }
+
+ public Handle<RequestHandler> createRequestHandler() throws IOException {
+ final int id = connection.nextRemoteClient();
+ final RequestHandler requestHandler = new MultiplexRequestHandler(id, connection);
+ boolean ok = false;
+ try {
+ connection.addRemoteClient(id, requestHandler);
+ try {
+ final ByteBuffer buffer = ByteBuffer.allocate(9);
+ buffer.put((byte) MessageType.CLIENT_OPEN.getId());
+ buffer.putInt(identifier);
+ buffer.putInt(id);
+ buffer.flip();
+ connection.doBlockingWrite(buffer);
+ final Handle<RequestHandler> handlerHandle = new MultiplexRequestHandler(id, connection).getHandle();
+ ok = true;
+ return handlerHandle;
+ } finally {
+ if (! ok) {
+ connection.removeRemoteClient(id);
+ }
+ }
+ } finally {
+ if (! ok) {
+ IoUtils.safeClose(requestHandler);
+ }
+ }
+ }
+
+ public String toString() {
+ return "forwarding request handler source <" + Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
+ }
+}
Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleMultiplexHandler.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleMultiplexHandler.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleMultiplexHandler.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.xnio.DelegatingIoHandler;
+import org.jboss.xnio.AbstractIoFuture;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.remoting.Endpoint;
+
+/**
+ *
+ */
+public final class SimpleMultiplexHandler extends DelegatingIoHandler<AllocatedMessageChannel> {
+
+ private volatile MultiplexConnection connection;
+ private final Endpoint endpoint;
+ private final MultiplexConfiguration configuration;
+ private final FutureConnection futureConnection = new FutureConnection();
+
+ public SimpleMultiplexHandler(final Endpoint endpoint, final MultiplexConfiguration configuration) {
+ this.endpoint = endpoint;
+ this.configuration = configuration;
+ }
+
+ public void handleOpened(final AllocatedMessageChannel channel) {
+ connection = new MultiplexConnection(endpoint, channel, configuration);
+ futureConnection.setResult(connection);
+ setReadHandler(new MultiplexReadHandler(connection));
+ channel.resumeReads();
+ }
+
+ public void handleClosed(final AllocatedMessageChannel channel) {
+ IoUtils.safeClose(connection);
+ }
+
+ public IoFuture<MultiplexConnection> getConnection() {
+ return futureConnection;
+ }
+
+ public static final class FutureConnection extends AbstractIoFuture<MultiplexConnection> {
+ public IoFuture<MultiplexConnection> cancel() {
+ return this;
+ }
+
+ protected boolean setResult(final MultiplexConnection result) {
+ return super.setResult(result);
+ }
+ }
+}
Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -1,84 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.multiplex;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.log.Logger;
-import org.jboss.xnio.channels.WritableMessageChannel;
-
-/**
- *
- */
-final class SimpleWriteHandler implements WriteHandler {
- private static final Logger log = Logger.getLogger(SimpleWriteHandler.class);
-
- private final BufferAllocator<ByteBuffer> allocator;
- private final ByteBuffer[] buffers;
-
- public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final List<ByteBuffer> buffers) {
- this.allocator = allocator;
- this.buffers = buffers.toArray(new ByteBuffer[buffers.size()]);
- logBufferSize();
- }
-
- public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final ByteBuffer[] buffers) {
- this.allocator = allocator;
- this.buffers = buffers;
- logBufferSize();
- }
-
- public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final ByteBuffer buffer) {
- this.allocator = allocator;
- buffers = new ByteBuffer[] { buffer };
- logBufferSize();
- }
-
- private void logBufferSize() {
- if (log.isTrace()) {
- long t = 0L;
- for (ByteBuffer buf : buffers) {
- t += (long)buf.remaining();
- }
- log.trace("Writing a message of size %d", Long.valueOf(t));
- }
- }
-
- public boolean handleWrite(final WritableMessageChannel channel) {
- boolean done = true;
- try {
- return (done = channel.send(buffers));
- } catch (IOException e) {
- log.trace(e, "Write failed");
- return true;
- } finally {
- if (done) {
- for (ByteBuffer buffer : buffers) {
- allocator.free(buffer);
- }
- }
- }
- }
-}
Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -1,32 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.multiplex;
-
-import org.jboss.xnio.channels.WritableMessageChannel;
-
-/**
- *
- */
-interface WriteHandler {
- boolean handleWrite(WritableMessageChannel channel);
-}
Modified: remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -29,12 +29,10 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CountDownLatch;
-import java.net.URI;
import java.io.IOException;
import junit.framework.TestCase;
import org.jboss.remoting.core.EndpointImpl;
import org.jboss.remoting.test.support.LoggingHelper;
-import org.jboss.remoting.SimpleCloseable;
import org.jboss.remoting.LocalServiceConfiguration;
import org.jboss.remoting.RequestListener;
import org.jboss.remoting.ClientContext;
@@ -43,6 +41,10 @@
import org.jboss.remoting.RemoteExecutionException;
import org.jboss.remoting.ClientSource;
import org.jboss.remoting.Client;
+import org.jboss.remoting.util.QualifiedName;
+import org.jboss.remoting.spi.NamedServiceRegistry;
+import org.jboss.remoting.spi.RequestHandlerSource;
+import org.jboss.remoting.spi.Handle;
import org.jboss.xnio.BufferAllocator;
import org.jboss.xnio.IoUtils;
import org.jboss.xnio.Xnio;
@@ -91,12 +93,10 @@
configuration.setExecutor(closeableExecutor);
configuration.setLinkMetric(10);
configuration.setMarshallerFactory(new RiverMarshallerFactory());
+ final NamedServiceRegistry registry = new NamedServiceRegistry();
+ configuration.setNamedServiceRegistry(registry);
final MarshallingConfiguration marshallingConfiguration = new MarshallingConfiguration();
configuration.setMarshallingConfiguration(marshallingConfiguration);
- final IoHandlerFactory<AllocatedMessageChannel> handlerFactory = MultiplexProtocol.createServer(remoteEndpoint, configuration);
- final ChannelSource<AllocatedMessageChannel> channelSource = Channels.convertStreamToAllocatedMessage(xnio.createPipeServer(Channels.convertStreamToAllocatedMessage(handlerFactory, 16384, 16384)), 16384, 16384);
- final IoFuture<SimpleCloseable> future = MultiplexProtocol.connect(endpoint, configuration, channelSource);
- future.get();
final LocalServiceConfiguration<Object, Object> localServiceConfiguration = new LocalServiceConfiguration<Object, Object>(new RequestListener<Object, Object>() {
public void handleClientOpen(final ClientContext context) {
log.debug("Client open");
@@ -125,15 +125,43 @@
localServiceConfiguration.setServiceType("connection.test");
localServiceConfiguration.setGroupName("testgroup");
localServiceConfiguration.setMetric(10);
- remoteEndpoint.registerService(localServiceConfiguration);
- final IoFuture<ClientSource<Object,Object>> futureClientSource = endpoint.locateService(new URI("jrs:connection.test::"), Object.class, Object.class);
- assertEquals(IoFuture.Status.DONE, futureClientSource.await(1L, TimeUnit.SECONDS));
- final ClientSource<Object, Object> clientSource = futureClientSource.get();
- final Client<Object,Object> client = clientSource.createClient();
- final IoFuture<Object> futureReply = client.send(REQUEST);
- assertEquals(IoFuture.Status.DONE, futureReply.await(1L, TimeUnit.SECONDS));
- assertEquals(REPLY, futureReply.get());
- assertTrue(latch.await(1L, TimeUnit.SECONDS));
+ final Handle<RequestHandlerSource> requestHandlerSourceHandle = remoteEndpoint.registerService(localServiceConfiguration);
+ try {
+ registry.registerService(QualifiedName.parse("/test/connectiontest"), requestHandlerSourceHandle.getResource());
+ final IoHandlerFactory<AllocatedMessageChannel> handlerFactory = MultiplexProtocol.createServer(remoteEndpoint, configuration);
+ final ChannelSource<AllocatedMessageChannel> channelSource = Channels.convertStreamToAllocatedMessage(xnio.createPipeServer(Channels.convertStreamToAllocatedMessage(handlerFactory, 16384, 16384)), 16384, 16384);
+ final IoFuture<MultiplexConnection> future = MultiplexProtocol.connect(endpoint, configuration, channelSource);
+ final MultiplexConnection connection = future.get();
+ try {
+ final Handle<RequestHandlerSource> remoteHandlerSource = connection.openRemoteService(QualifiedName.parse("/test/connectiontest"));
+ try {
+ final ClientSource<Object, Object> clientSource = endpoint.createClientSource(remoteHandlerSource.getResource(), Object.class, Object.class);
+ try {
+ final Client<Object,Object> client = clientSource.createClient();
+ try {
+ final IoFuture<Object> futureReply = client.send(REQUEST);
+ assertEquals(IoFuture.Status.DONE, futureReply.await(1L, TimeUnit.SECONDS));
+ assertEquals(REPLY, futureReply.get());
+ client.close();
+ clientSource.close();
+ remoteHandlerSource.close();
+ connection.close();
+ assertTrue(latch.await(1L, TimeUnit.SECONDS));
+ } finally {
+ IoUtils.safeClose(client);
+ }
+ } finally {
+ IoUtils.safeClose(clientSource);
+ }
+ } finally {
+ IoUtils.safeClose(remoteHandlerSource);
+ }
+ } finally {
+ IoUtils.safeClose(connection);
+ }
+ } finally {
+ IoUtils.safeClose(requestHandlerSourceHandle);
+ }
} finally {
endpoint.stop();
}
Modified: remoting3/trunk/util/src/main/java/org/jboss/remoting/util/CollectionUtil.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/remoting/util/CollectionUtil.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/util/src/main/java/org/jboss/remoting/util/CollectionUtil.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -363,7 +363,7 @@
return subject.substring(position, nextDelim);
}
} finally {
- position = nextDelim;
+ position = nextDelim == -1 ? -1 : nextDelim + 1;
}
}
Modified: remoting3/trunk/util/src/main/java/org/jboss/remoting/util/QualifiedName.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/remoting/util/QualifiedName.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/util/src/main/java/org/jboss/remoting/util/QualifiedName.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -100,9 +100,18 @@
public static QualifiedName parse(String path) {
List<String> decoded = new ArrayList<String>();
+ boolean first = true;
for (String segment : CollectionUtil.split("/", path)) {
- if (segment.length() == 0) {
- throw new IllegalArgumentException("Empty segment in path");
+ if (first) {
+ if (segment.length() > 0) {
+ throw new IllegalArgumentException("Relative paths are not allowed");
+ }
+ first = false;
+ continue;
+ } else {
+ if (segment.length() == 0) {
+ throw new IllegalArgumentException("Empty segment in path");
+ }
}
try {
decoded.add(URLDecoder.decode(segment, "utf-8"));
Modified: remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedCollection.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedCollection.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedCollection.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -15,7 +15,7 @@
monitor = this;
}
- protected SynchronizedCollection(final Collection<V> delegate, final Object monitor) {
+ public SynchronizedCollection(final Collection<V> delegate, final Object monitor) {
this.delegate = delegate;
this.monitor = monitor;
}
Modified: remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedSet.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedSet.java 2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedSet.java 2008-11-14 06:41:47 UTC (rev 4679)
@@ -11,7 +11,7 @@
super(delegate);
}
- protected SynchronizedSet(final Set<K> delegate, final Object monitor) {
+ public SynchronizedSet(final Set<K> delegate, final Object monitor) {
super(delegate, monitor);
}
}
16 years, 1 month
JBoss Remoting SVN: r4678 - in remoting3/trunk: api/src/main/java/org/jboss/remoting/spi and 1 other directories.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-11-13 23:08:51 -0500 (Thu, 13 Nov 2008)
New Revision: 4678
Added:
remoting3/trunk/api/src/main/java/org/jboss/remoting/ServiceRegistrationException.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/NamedServiceRegistry.java
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/QualifiedName.java
Log:
Named service registry
Added: remoting3/trunk/api/src/main/java/org/jboss/remoting/ServiceRegistrationException.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/ServiceRegistrationException.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/ServiceRegistrationException.java 2008-11-14 04:08:51 UTC (rev 4678)
@@ -0,0 +1,72 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting;
+
+import org.jboss.remoting.RemotingException;
+
+/**
+ *
+ */
+public class ServiceRegistrationException extends RemotingException {
+
+ private static final long serialVersionUID = 6416792968397444648L;
+
+ /**
+ * Constructs a <tt>ServiceRegistrationException</tt> with no detail message. The cause is not initialized, and may
+ * subsequently be initialized by a call to {@link #initCause(Throwable) initCause}.
+ */
+ public ServiceRegistrationException() {
+ }
+
+ /**
+ * Constructs a <tt>ServiceRegistrationException</tt> with the specified detail message. The cause is not initialized,
+ * and may subsequently be initialized by a call to {@link #initCause(Throwable) initCause}.
+ *
+ * @param msg the detail message
+ */
+ public ServiceRegistrationException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * Constructs a <tt>ServiceRegistrationException</tt> with the specified cause. The detail message is set to:
+ * <pre>
+ * (cause == null ? null : cause.toString())</pre>
+ * (which typically contains the class and detail message of <tt>cause</tt>).
+ *
+ * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method)
+ */
+ public ServiceRegistrationException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Constructs a <tt>ServiceRegistrationException</tt> with the specified detail message and cause.
+ *
+ * @param msg the detail message
+ * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method)
+ */
+ public ServiceRegistrationException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
Added: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/NamedServiceRegistry.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/NamedServiceRegistry.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/NamedServiceRegistry.java 2008-11-14 04:08:51 UTC (rev 4678)
@@ -0,0 +1,81 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.spi;
+
+import java.util.concurrent.ConcurrentMap;
+import java.io.IOException;
+import org.jboss.remoting.util.QualifiedName;
+import org.jboss.remoting.util.CollectionUtil;
+import org.jboss.remoting.ServiceRegistrationException;
+import org.jboss.remoting.CloseHandler;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
+
+/**
+ *
+ */
+public final class NamedServiceRegistry {
+ public static final Logger log = Logger.getLogger("org.jboss.remoting.named-registry");
+
+ private final ConcurrentMap<QualifiedName, Handle<RequestHandlerSource>> map = CollectionUtil.concurrentMap();
+
+ public NamedServiceRegistry() {
+ }
+
+ public Handle<RequestHandlerSource> registerService(final QualifiedName path, final RequestHandlerSource service) throws IOException {
+ if (path == null) {
+ throw new NullPointerException("path is null");
+ }
+ if (service == null) {
+ throw new NullPointerException("service is null");
+ }
+ final Handle<RequestHandlerSource> handle = service.getHandle();
+ boolean ok = false;
+ try {
+ final Handle<RequestHandlerSource> oldHandle = map.putIfAbsent(path, handle);
+ if (oldHandle != null) {
+ throw new ServiceRegistrationException(String.format("Failed to register a service at path \"%s\" on %s (a service is already registered at that location)", path, this));
+ }
+ handle.addCloseHandler(new CloseHandler<Handle<RequestHandlerSource>>() {
+ public void handleClose(final Handle<RequestHandlerSource> closed) {
+ if (map.remove(path, service)) {
+ log.trace("Removed service %s at path \"%s\" on %s (service handle was closed)", service, path, this);
+ }
+ }
+ });
+ log.trace("Registered %s at path \"%s\" on %s", service, path, this);
+ ok = true;
+ return handle;
+ } finally {
+ if (! ok) IoUtils.safeClose(handle);
+ }
+ }
+
+ public Handle<RequestHandlerSource> lookupService(QualifiedName path) {
+ return map.get(path);
+ }
+
+ public String toString() {
+ return "named service registry <" + Integer.toHexString(hashCode()) + ">";
+ }
+}
Added: remoting3/trunk/util/src/main/java/org/jboss/remoting/util/QualifiedName.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/remoting/util/QualifiedName.java (rev 0)
+++ remoting3/trunk/util/src/main/java/org/jboss/remoting/util/QualifiedName.java 2008-11-14 04:08:51 UTC (rev 4678)
@@ -0,0 +1,142 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.util;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.io.UnsupportedEncodingException;
+
+/**
+ *
+ */
+public final class QualifiedName implements Comparable<QualifiedName>, Iterable<String> {
+ private final String[] segments;
+
+ public QualifiedName(final String[] segments) {
+ if (segments == null) {
+ throw new NullPointerException("segments is null");
+ }
+ for (String s : segments) {
+ if (s == null) {
+ throw new NullPointerException("a segment is null");
+ }
+ }
+ this.segments = segments;
+ }
+
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (! (o instanceof QualifiedName)) return false;
+ final QualifiedName name = (QualifiedName) o;
+ if (!Arrays.equals(segments, name.segments)) return false;
+ return true;
+ }
+
+ public int hashCode() {
+ return Arrays.hashCode(segments);
+ }
+
+ public int compareTo(final QualifiedName o) {
+ if (this == o) return 0;
+ String[] a = segments;
+ String[] b = o.segments;
+ final int alen = a.length;
+ final int blen = b.length;
+ for (int i = 0; i < alen && i < blen; i ++) {
+ final int cmp = a[i].compareTo(b[i]);
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+ if (alen < blen) {
+ return -1;
+ } else if (alen > blen) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ if (segments.length == 0) {
+ return "/";
+ } else for (String segment : segments) {
+ try {
+ builder.append('/');
+ builder.append(URLEncoder.encode(segment, "utf-8"));
+ } catch (UnsupportedEncodingException e) {
+ // cannot happen
+ throw new IllegalStateException(e);
+ }
+ }
+ return builder.toString();
+ }
+
+ public static QualifiedName parse(String path) {
+ List<String> decoded = new ArrayList<String>();
+ for (String segment : CollectionUtil.split("/", path)) {
+ if (segment.length() == 0) {
+ throw new IllegalArgumentException("Empty segment in path");
+ }
+ try {
+ decoded.add(URLDecoder.decode(segment, "utf-8"));
+ } catch (UnsupportedEncodingException e) {
+ // cannot happen
+ throw new IllegalStateException(e);
+ }
+ }
+ return new QualifiedName(decoded.toArray(new String[decoded.size()]));
+ }
+
+ public Iterator<String> iterator() {
+ return new Iterator<String>() {
+ int i;
+
+ public boolean hasNext() {
+ return i < segments.length;
+ }
+
+ public String next() {
+ try {
+ return segments[i++];
+ } catch (ArrayIndexOutOfBoundsException e) {
+ throw new NoSuchElementException("next() past end");
+ }
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException("remove()");
+ }
+ };
+ }
+
+ public int length() {
+ return segments.length;
+ }
+}
16 years, 1 month
JBoss Remoting SVN: r4677 - remoting3/trunk/api/src/main/java/org/jboss/remoting/spi.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-11-13 23:07:41 -0500 (Thu, 13 Nov 2008)
New Revision: 4677
Modified:
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractAutoCloseable.java
Log:
Fix a memory leak issue; fix a race condition
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractAutoCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractAutoCloseable.java 2008-11-13 22:36:44 UTC (rev 4676)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractAutoCloseable.java 2008-11-14 04:07:41 UTC (rev 4677)
@@ -23,12 +23,16 @@
package org.jboss.remoting.spi;
import java.io.IOException;
+import java.io.Closeable;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
+import java.lang.ref.WeakReference;
import org.jboss.remoting.RemotingException;
import org.jboss.remoting.CloseHandler;
+import org.jboss.remoting.HandleableCloseable;
import org.jboss.xnio.log.Logger;
import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.WeakCloseable;
/**
* A closeable implementation that supports reference counting. Since the initial reference count is zero, implementors
@@ -98,24 +102,20 @@
* @throws RemotingException if the resource is closed
*/
public Handle<T> getHandle() throws IOException {
- return new HandleImpl();
+ final HandleImpl handle = new HandleImpl();
+ final Key key = addCloseHandler(new HandleCloseHandler<T>(handle));
+ handle.addCloseHandler(new KeyCloseHandler<Handle<T>>(key));
+ return handle;
}
private final class HandleImpl extends AbstractHandleableCloseable<Handle<T>> implements Handle<T> {
- private final Key key;
private HandleImpl() throws IOException {
super(AbstractAutoCloseable.this.executor);
- key = AbstractAutoCloseable.this.addCloseHandler(new CloseHandler<T>() {
- public void handleClose(final T closed) {
- IoUtils.safeClose(HandleImpl.this);
- }
- });
inc();
}
protected void closeAction() throws IOException {
- key.remove();
dec();
}
@@ -125,11 +125,36 @@
}
public String toString() {
- return "handle <" + Integer.toString(hashCode(), 16) + "> to " + String.valueOf(AbstractAutoCloseable.this);
+ return "handle <" + Integer.toHexString(hashCode()) + "> to " + String.valueOf(AbstractAutoCloseable.this);
}
}
+ private static class HandleCloseHandler<T> implements CloseHandler<T> {
+
+ private final Closeable handle;
+
+ public HandleCloseHandler(final Handle<T> handle) {
+ this.handle = new WeakCloseable(new WeakReference<Closeable>(handle));
+ }
+
+ public void handleClose(final T closed) {
+ IoUtils.safeClose(handle);
+ }
+ }
+
+ private static class KeyCloseHandler<T> implements CloseHandler<T> {
+ private final Key key;
+
+ public KeyCloseHandler(final Key key) {
+ this.key = key;
+ }
+
+ public void handleClose(final T closed) {
+ key.remove();
+ }
+ }
+
public String toString() {
- return "generic resource <" + Integer.toString(hashCode(), 16) + ">";
+ return "generic resource <" + Integer.toHexString(hashCode()) + ">";
}
}
16 years, 1 month
JBoss Remoting SVN: r4676 - remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/http.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-11-13 17:36:44 -0500 (Thu, 13 Nov 2008)
New Revision: 4676
Added:
remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/http/NullInputStreamTestCase.java
Log:
JBREM-1052: New unit test.
Added: remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/http/NullInputStreamTestCase.java
===================================================================
--- remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/http/NullInputStreamTestCase.java (rev 0)
+++ remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/http/NullInputStreamTestCase.java 2008-11-13 22:36:44 UTC (rev 4676)
@@ -0,0 +1,507 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.test.remoting.transport.http;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.management.MBeanServer;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.jboss.logging.XLevel;
+import org.jboss.remoting.CannotConnectException;
+import org.jboss.remoting.Client;
+import org.jboss.remoting.InvocationRequest;
+import org.jboss.remoting.InvokerLocator;
+import org.jboss.remoting.ServerInvocationHandler;
+import org.jboss.remoting.ServerInvoker;
+import org.jboss.remoting.callback.InvokerCallbackHandler;
+import org.jboss.remoting.marshal.UnMarshaller;
+import org.jboss.remoting.marshal.http.HTTPUnMarshaller;
+import org.jboss.remoting.transport.Connector;
+import org.jboss.remoting.transport.http.HTTPClientInvoker;
+import org.jboss.remoting.transport.http.WebServerError;
+
+
+/**
+ * Unit test for JBREM-1052.
+ *
+ * @author <a href="ron.sigal(a)jboss.com">Ron Sigal</a>
+ * @version $Revision: 1.1 $
+ * <p>
+ * Copyright Oct 27, 2008
+ * </p>
+ */
+public class NullInputStreamTestCase extends TestCase
+{
+ private static Logger log = Logger.getLogger(NullInputStreamTestCase.class);
+
+ private static boolean firstTime = true;
+
+ protected String host;
+ protected int port;
+ protected String locatorURI;
+ protected InvokerLocator serverLocator;
+ protected Connector connector;
+ protected Client client;
+ protected TestInvocationHandler invocationHandler;
+
+
+ public void setUp() throws Exception
+ {
+ if (firstTime)
+ {
+ firstTime = false;
+ Logger.getLogger("org.jboss.remoting").setLevel(XLevel.INFO);
+ Logger.getLogger("org.jboss.test.remoting").setLevel(Level.INFO);
+ String pattern = "[%d{ABSOLUTE}] [%t] %5p (%F:%L) - %m%n";
+ PatternLayout layout = new PatternLayout(pattern);
+ ConsoleAppender consoleAppender = new ConsoleAppender(layout);
+ Logger.getRootLogger().addAppender(consoleAppender);
+ }
+
+ TestUnMarshaller.clear();
+ }
+
+
+ public void tearDown()
+ {
+ if (client != null)
+ {
+ client.disconnect();
+ }
+ if (connector != null)
+ {
+ connector.destroy();
+ }
+ }
+
+
+ /**
+ * Tests default behavior with POST method.
+ */
+ public void testDefaultBehaviorPost() throws Throwable
+ {
+ log.info("entering " + getName());
+
+ // Start server.
+ setupServer();
+ boolean ok = false;
+
+
+ HashMap config = new HashMap();
+ config.put(InvokerLocator.FORCE_REMOTE, "true");
+ HashMap metadata = new HashMap();
+ metadata.put(Client.RAW, "true");
+ metadata.put("TYPE", "POST");
+
+ try
+ {
+ log.info("response: " + makeInvocation(config, metadata));
+ fail("expected WebServerError");
+ }
+ catch (WebServerError e)
+ {
+ log.info("received expected WebServerError");
+ ok = true;
+ }
+
+ assertTrue(ok);
+ assertTrue(TestUnMarshaller.enteredRead);
+ assertTrue(TestUnMarshaller.streamIsNull);
+
+ log.info(getName() + " PASSES");
+ }
+
+
+ /**
+ * Tests default behavior with HEAD method.
+ */
+ public void testDefaultBehaviorHead() throws Throwable
+ {
+ log.info("entering " + getName());
+
+ // Start server.
+ setupServer();
+ boolean ok = false;
+
+
+ HashMap config = new HashMap();
+ config.put(InvokerLocator.FORCE_REMOTE, "true");
+ HashMap metadata = new HashMap();
+ metadata.put(Client.RAW, "true");
+ metadata.put("TYPE", "HEAD");
+
+ try
+ {
+ log.info("response: " + makeInvocation(config, metadata));
+ fail("expected WebServerError");
+ }
+ catch (WebServerError e)
+ {
+ log.info("received expected WebServerError");
+ ok = true;
+ }
+
+ assertTrue(ok);
+ assertTrue(TestUnMarshaller.enteredRead);
+ assertTrue(TestUnMarshaller.streamIsNull);
+
+ log.info(getName() + " PASSES");
+ }
+
+
+ /**
+ * Tests behavior with unmarshallNullStream == true and with POST method.
+ */
+ public void testUnmarshalNullStreamTruePost() throws Throwable
+ {
+ log.info("entering " + getName());
+
+ // Start server.
+ setupServer();
+ boolean ok = false;
+
+
+ HashMap config = new HashMap();
+ config.put(InvokerLocator.FORCE_REMOTE, "true");
+ config.put(HTTPClientInvoker.UNMARSHAL_NULL_STREAM, "true");
+ HashMap metadata = new HashMap();
+ metadata.put(Client.RAW, "true");
+ metadata.put("TYPE", "POST");
+
+ try
+ {
+ log.info("response: " + makeInvocation(config, metadata));
+ fail("expected WebServerError");
+ }
+ catch (WebServerError e)
+ {
+ log.info("received expected WebServerError");
+ ok = true;
+ }
+
+ assertTrue(ok);
+ assertTrue(TestUnMarshaller.enteredRead);
+ assertTrue(TestUnMarshaller.streamIsNull);
+
+ log.info(getName() + " PASSES");
+ }
+
+
+ /**
+ * Tests behavior with unmarshallNullStream == true and with HEAD method.
+ */
+ public void testUnmarshalNullStreamTrueHead() throws Throwable
+ {
+ log.info("entering " + getName());
+
+ // Start server.
+ setupServer();
+ boolean ok = false;
+
+
+ HashMap config = new HashMap();
+ config.put(InvokerLocator.FORCE_REMOTE, "true");
+ config.put(HTTPClientInvoker.UNMARSHAL_NULL_STREAM, "true");
+ HashMap metadata = new HashMap();
+ metadata.put(Client.RAW, "true");
+ metadata.put("TYPE", "HEAD");
+
+ try
+ {
+ log.info("response: " + makeInvocation(config, metadata));
+ fail("expected WebServerError");
+ }
+ catch (WebServerError e)
+ {
+ log.info("received expected WebServerError");
+ ok = true;
+ }
+
+ assertTrue(ok);
+ assertTrue(TestUnMarshaller.enteredRead);
+ assertTrue(TestUnMarshaller.streamIsNull);
+
+ log.info(getName() + " PASSES");
+ }
+
+
+ /**
+ * Tests behavior with unmarshallNullStream == false and with POST method.
+ */
+ public void testUnmarshalNullStreamFalsePost() throws Throwable
+ {
+ log.info("entering " + getName());
+
+ // Start server.
+ setupServer();
+ boolean ok = false;
+
+
+ HashMap config = new HashMap();
+ config.put(InvokerLocator.FORCE_REMOTE, "true");
+ config.put(HTTPClientInvoker.UNMARSHAL_NULL_STREAM, "false");
+ HashMap metadata = new HashMap();
+ metadata.put(Client.RAW, "true");
+ metadata.put("TYPE", "POST");
+
+ try
+ {
+ log.info("response: " + makeInvocation(config, metadata));
+ fail("expected WebServerError");
+ }
+ catch (WebServerError e)
+ {
+ log.info("received expected WebServerError");
+ ok = true;
+ }
+
+ assertTrue(ok);
+ assertFalse(TestUnMarshaller.enteredRead);
+
+ log.info(getName() + " PASSES");
+ }
+
+
+
+ /**
+ * Tests behavior with unmarshallNullStream == false and with HEAD method.
+ */
+ public void testUnmarshalNullStreamFalseHead() throws Throwable
+ {
+ log.info("entering " + getName());
+
+ // Start server.
+ setupServer();
+ boolean ok = false;
+
+
+ HashMap config = new HashMap();
+ config.put(InvokerLocator.FORCE_REMOTE, "true");
+ config.put(HTTPClientInvoker.UNMARSHAL_NULL_STREAM, "false");
+ HashMap metadata = new HashMap();
+ metadata.put(Client.RAW, "true");
+ metadata.put("TYPE", "HEAD");
+
+ try
+ {
+ log.info("response: " + makeInvocation(config, metadata));
+ fail("expected WebServerError");
+ }
+ catch (WebServerError e)
+ {
+ log.info("received expected WebServerError");
+ ok = true;
+ }
+
+ assertTrue(ok);
+ assertFalse(TestUnMarshaller.enteredRead);
+
+ log.info(getName() + " PASSES");
+ }
+
+
+ protected Object makeInvocation(HashMap config, HashMap metadata) throws Throwable
+ {
+ // Create client.
+ locatorURI = "http://" + host + ":" + port;
+ locatorURI += "/?unmarshaller=" + TestUnMarshaller.class.getName();
+ log.info("connecting to " + locatorURI);
+ InvokerLocator clientLocator = new InvokerLocator(locatorURI);
+ Client client = new Client(clientLocator, config);
+ client.connect();
+ log.info("client is connected");
+
+ Object response = null;
+ for (int i = 0; i < 3; i++)
+ {
+ try
+ {
+ response = client.invoke("abc", metadata);
+ break;
+ }
+ catch (CannotConnectException e)
+ {
+ log.info("cannot connect", e);
+
+ }
+ }
+
+ return response;
+ }
+
+
+ protected void setupServer() throws Exception
+ {
+ log.info("setupServer()");
+ InetAddress localHost = InetAddress.getLocalHost();
+ final ServerSocket ss = new ServerSocket(0, 100, localHost);
+ ss.setSoTimeout(5000);
+ host = localHost.getHostAddress();
+ port = ss.getLocalPort();
+ new AcceptThread(ss).start();
+ log.info("started server");
+ }
+
+
+ protected void shutdownServer() throws Exception
+ {
+ if (connector != null)
+ connector.stop();
+ }
+
+
+ static class TestInvocationHandler implements ServerInvocationHandler
+ {
+ public void addListener(InvokerCallbackHandler callbackHandler) {}
+ public Object invoke(final InvocationRequest invocation) throws Throwable
+ {
+ return invocation.getParameter();
+ }
+ public void removeListener(InvokerCallbackHandler callbackHandler) {}
+ public void setMBeanServer(MBeanServer server) {}
+ public void setInvoker(ServerInvoker invoker) {}
+ }
+
+ public static class TestUnMarshaller extends HTTPUnMarshaller
+ {
+ /** The serialVersionUID */
+ private static final long serialVersionUID = 1L;
+
+ public static boolean enteredRead;
+ public static boolean streamIsNull;
+
+ public Object read(InputStream inputStream, Map metadata, int version) throws IOException, ClassNotFoundException
+ {
+ enteredRead = true;
+ streamIsNull = (inputStream == null);
+ log.info("entered TestUnMarshaller.read()");
+ if (inputStream != null)
+ {
+ return super.read(inputStream, metadata, version);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public UnMarshaller cloneUnMarshaller() throws CloneNotSupportedException
+ {
+ HTTPUnMarshaller unmarshaller = new TestUnMarshaller();
+ unmarshaller.setClassLoader(customClassLoader);
+ return unmarshaller;
+ }
+
+ public static void clear()
+ {
+ enteredRead = false;
+ streamIsNull = false;
+ }
+ }
+
+
+ public static class AcceptThread extends Thread
+ {
+ ServerSocket ss;
+
+ public AcceptThread(ServerSocket ss)
+ {
+ this.ss = ss;
+ setDaemon(true);
+ }
+
+ public void run()
+ {
+ log.info("starting AcceptThread");
+ while (true)
+ {
+ try
+ {
+ new WorkerThread(ss.accept()).start();
+ }
+ catch (Exception e)
+ {
+ log.error("AcceptThread erroe", e);
+ }
+ }
+ }
+ }
+
+
+ public static class WorkerThread extends Thread
+ {
+ Socket s;
+
+ public WorkerThread(Socket s)
+ {
+ this.s = s;
+ setDaemon(true);
+ }
+
+ public void run()
+ {
+ try
+ {
+ log.info("starting WorkerThread");
+ InputStreamReader ir = new InputStreamReader(s.getInputStream());
+ char[] cbuf = new char[1024];
+ int len = ir.read(cbuf);
+ log.info("available: " + s.getInputStream().available());
+ log.info("len: " + len);
+ log.info("Request:");
+ System.out.println();
+ System.out.println(String.copyValueOf(cbuf, 0, len));
+ System.out.println();
+
+ DataOutputStream dos = new DataOutputStream(s.getOutputStream());
+ dos.writeBytes("HTTP/1.1 500 error" + "\r\n");
+ dos.writeBytes("Server: testServer");
+ dos.writeBytes("Content-Type: text/html" + "\r\n");
+ dos.writeBytes("Content-Length: 0\r\n");
+ dos.writeBytes("Connection: close\r\n");
+ dos.writeBytes("\r\n");
+
+ ir.close();
+ dos.close();
+ s.close();
+ }
+ catch (Exception e)
+ {
+ log.error("WorkerThread error", e);
+ }
+ }
+ }
+}
\ No newline at end of file
16 years, 1 month
JBoss Remoting SVN: r4675 - remoting2/branches/2.2/src/main/org/jboss/remoting/transport/http.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-11-13 17:35:36 -0500 (Thu, 13 Nov 2008)
New Revision: 4675
Modified:
remoting2/branches/2.2/src/main/org/jboss/remoting/transport/http/HTTPClientInvoker.java
Log:
JBREM-1052: Added unmarshalNullStream variable and related test.
Modified: remoting2/branches/2.2/src/main/org/jboss/remoting/transport/http/HTTPClientInvoker.java
===================================================================
--- remoting2/branches/2.2/src/main/org/jboss/remoting/transport/http/HTTPClientInvoker.java 2008-11-13 06:58:23 UTC (rev 4674)
+++ remoting2/branches/2.2/src/main/org/jboss/remoting/transport/http/HTTPClientInvoker.java 2008-11-13 22:35:36 UTC (rev 4675)
@@ -92,19 +92,29 @@
*/
public static final int MAX_NUM_TIMEOUT_THREADS_DEFAULT = 10;
+ /*
+ * Specifies whether useHttpURLConnection(), upon receiving a null InputStream or ErrorStream,
+ * should call the UnMarshaller.
+ */
+ public static final String UNMARSHAL_NULL_STREAM = "unmarshalNullStream";
+
protected static final Logger log = Logger.getLogger(HTTPClientInvoker.class);
+ protected boolean unmarshalNullStream = true;
+
private Object timeoutThreadPoolLock = new Object();
private ThreadPool timeoutThreadPool;
public HTTPClientInvoker(InvokerLocator locator)
{
super(locator);
+ configureParameters();
}
public HTTPClientInvoker(InvokerLocator locator, Map configuration)
{
super(locator, configuration);
+ configureParameters();
}
/**
@@ -280,7 +290,7 @@
else
marshaller.write(invocation, stream);
responseCode = conn.getResponseCode();
- InputStream is = (responseCode < 400) ? conn.getInputStream() : conn.getErrorStream();
+
Map headers = conn.getHeaderFields();
if (metadata == null)
{
@@ -304,7 +314,11 @@
metadata.put(HTTPMetadataConstants.RESPONSE_CODE_MESSAGE, conn.getResponseMessage());
metadata.put(HTTPMetadataConstants.RESPONSE_CODE, new Integer(responseCode));
- result = readResponse(metadata, headers, unmarshaller, is);
+ InputStream is = (responseCode < 400) ? conn.getInputStream() : conn.getErrorStream();
+ if (is != null || unmarshalNullStream)
+ {
+ result = readResponse(metadata, headers, unmarshaller, is);
+ }
}
else
{
@@ -317,8 +331,11 @@
InputStream is = (conn.getResponseCode() < 400) ? conn.getInputStream() : conn.getErrorStream();
Map headers = conn.getHeaderFields();
- result = readResponse(null, headers, unmarshaller, is);
-
+ if (is != null || unmarshalNullStream)
+ {
+ result = readResponse(null, headers, unmarshaller, is);
+ }
+
if (metadata == null)
{
metadata = new HashMap();
@@ -880,6 +897,25 @@
this.timeoutThreadPool = pool;
}
+ protected void configureParameters()
+ {
+ Object val = configuration.get(UNMARSHAL_NULL_STREAM);
+ if (val != null)
+ {
+ try
+ {
+ unmarshalNullStream = Boolean.valueOf((String)val).booleanValue();
+ log.debug(this + " setting unmarshalNullStream to " + unmarshalNullStream);
+ }
+ catch (Exception e)
+ {
+ log.warn(this + " could not convert " +
+ UNMARSHAL_NULL_STREAM + " value of " +
+ val + " to a boolean value.");
+ }
+ }
+ }
+
/**
* Gets the thread pool being used for simulating timeouts with jdk 1.4. If one has
* not be specifically set via configuration or call to set it, will always return
16 years, 1 month