Author: david.lloyd(a)jboss.com
Date: 2008-11-14 01:41:47 -0500 (Fri, 14 Nov 2008)
New Revision: 4679
Added:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BufferByteOutput.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/FutureRemoteRequestHandlerSource.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConnection.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReadHandler.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReplyHandler.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandler.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandlerSource.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleMultiplexHandler.java
Removed:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractAutoCloseable.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractHandleableCloseable.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/NamedServiceRegistry.java
remoting3/trunk/api/src/test/java/org/jboss/remoting/spi/CloseableTestCase.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientContextImpl.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ServiceContextImpl.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicRequestHandler.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerBiMap.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerResourceBiMap.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerBiMap.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerResourceBiMap.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConfiguration.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/CollectionUtil.java
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/QualifiedName.java
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedCollection.java
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedSet.java
Log:
Complete changes for multiplex transport...
Modified:
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractAutoCloseable.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractAutoCloseable.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractAutoCloseable.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -29,7 +29,6 @@
import java.lang.ref.WeakReference;
import org.jboss.remoting.RemotingException;
import org.jboss.remoting.CloseHandler;
-import org.jboss.remoting.HandleableCloseable;
import org.jboss.xnio.log.Logger;
import org.jboss.xnio.IoUtils;
import org.jboss.xnio.WeakCloseable;
@@ -40,11 +39,11 @@
*/
public abstract class AbstractAutoCloseable<T> extends
AbstractHandleableCloseable<T> implements AutoCloseable<T> {
+ private static final Logger log =
Logger.getLogger("org.jboss.remoting.resource");
+
private final AtomicInteger refcount = new AtomicInteger(0);
private final Executor executor;
- private static final Logger log = Logger.getLogger(AbstractAutoCloseable.class);
-
/**
* Basic constructor.
*
Modified:
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractHandleableCloseable.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractHandleableCloseable.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractHandleableCloseable.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -42,7 +42,7 @@
*/
public abstract class AbstractHandleableCloseable<T> implements
HandleableCloseable<T> {
- private static final Logger log =
Logger.getLogger(AbstractHandleableCloseable.class);
+ private static final Logger log =
Logger.getLogger("org.jboss.remoting.resource");
protected final Executor executor;
private final Object closeLock = new Object();
Modified:
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/NamedServiceRegistry.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/NamedServiceRegistry.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/NamedServiceRegistry.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -23,6 +23,9 @@
package org.jboss.remoting.spi;
import java.util.concurrent.ConcurrentMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Collections;
import java.io.IOException;
import org.jboss.remoting.util.QualifiedName;
import org.jboss.remoting.util.CollectionUtil;
@@ -75,6 +78,10 @@
return map.get(path);
}
+ public Set<Map.Entry<QualifiedName, Handle<RequestHandlerSource>>>
getEntrySet() {
+ return Collections.unmodifiableSet(map.entrySet());
+ }
+
public String toString() {
return "named service registry <" + Integer.toHexString(hashCode())
+ ">";
}
Modified: remoting3/trunk/api/src/test/java/org/jboss/remoting/spi/CloseableTestCase.java
===================================================================
---
remoting3/trunk/api/src/test/java/org/jboss/remoting/spi/CloseableTestCase.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/api/src/test/java/org/jboss/remoting/spi/CloseableTestCase.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -30,6 +30,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
import org.jboss.remoting.CloseHandler;
import org.jboss.remoting.test.support.LoggingHelper;
@@ -41,6 +42,8 @@
LoggingHelper.init();
}
+ public static final Logger log = Logger.getLogger(CloseableTestCase.class);
+
public void testBasic() throws Throwable {
final ExecutorService executorService = Executors.newCachedThreadPool();
try {
Modified:
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientContextImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientContextImpl.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientContextImpl.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -25,12 +25,15 @@
import java.util.concurrent.Executor;
import org.jboss.remoting.ClientContext;
import org.jboss.remoting.ServiceContext;
+import org.jboss.xnio.log.Logger;
/**
*
*/
public final class ClientContextImpl extends AbstractContextImpl<ClientContext>
implements ClientContext {
+ private static final Logger log =
Logger.getLogger("org.jboss.remoting.client-context");
+
private final ServiceContextImpl serviceContext;
ClientContextImpl(final Executor executor) {
Modified:
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -53,7 +53,7 @@
}
private <I, O> ClientImpl<I, O> doCreateExternal(Class<I>
requestClass, Class<O> replyClass, RequestHandler handler) throws IOException {
- return new ClientImpl<I, O>(handler.getHandle(), endpoint.getExecutor(),
requestClass, replyClass);
+ return ClientImpl.create(handler.getHandle(), endpoint.getExecutor(),
requestClass, replyClass);
}
public Object createExternal(final Class<?> aClass, final ObjectInput input,
final Creator creator) throws IOException, ClassNotFoundException {
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java 2008-11-14
04:08:51 UTC (rev 4678)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -27,29 +27,44 @@
import org.jboss.remoting.Client;
import org.jboss.remoting.IndeterminateOutcomeException;
import org.jboss.remoting.RemoteRequestException;
+import org.jboss.remoting.CloseHandler;
import org.jboss.remoting.core.util.QueueExecutor;
import org.jboss.remoting.spi.Handle;
import org.jboss.remoting.spi.RemoteRequestContext;
import org.jboss.remoting.spi.ReplyHandler;
import org.jboss.remoting.spi.RequestHandler;
import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
/**
*
*/
public final class ClientImpl<I, O> extends AbstractContextImpl<Client<I,
O>> implements Client<I, O> {
+ private static final Logger log =
Logger.getLogger("org.jboss.remoting.client");
+
private final Handle<RequestHandler> handle;
private final Class<I> requestClass;
private final Class<O> replyClass;
- ClientImpl(final Handle<RequestHandler> handle, final Executor executor, final
Class<I> requestClass, final Class<O> replyClass) {
+ private ClientImpl(final Handle<RequestHandler> handle, final Executor
executor, final Class<I> requestClass, final Class<O> replyClass) {
super(executor);
this.handle = handle;
this.requestClass = requestClass;
this.replyClass = replyClass;
}
+ static <I, O> ClientImpl<I, O> create(final Handle<RequestHandler>
handle, final Executor executor, final Class<I> requestClass, final Class<O>
replyClass) {
+ final ClientImpl<I, O> ci = new ClientImpl<I, O>(handle, executor,
requestClass, replyClass);
+ handle.addCloseHandler(new CloseHandler<Handle<RequestHandler>>() {
+ public void handleClose(final Handle<RequestHandler> closed) {
+ IoUtils.safeClose(ci);
+ }
+ });
+ return ci;
+ }
+
protected void closeAction() throws IOException {
handle.close();
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -53,7 +53,7 @@
}
private <I, O> ClientSourceImpl<I, O> doCreateExternal(Class<I>
requestClass, Class<O> replyClass, RequestHandlerSource handlerSource) throws
IOException {
- return new ClientSourceImpl<I, O>(handlerSource.getHandle(), endpoint,
requestClass, replyClass);
+ return ClientSourceImpl.create(handlerSource.getHandle(), endpoint, requestClass,
replyClass);
}
public Object createExternal(final Class<?> aClass, final ObjectInput input,
final Creator creator) throws IOException, ClassNotFoundException {
Modified:
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -26,23 +26,27 @@
import org.jboss.remoting.Client;
import org.jboss.remoting.ClientSource;
import org.jboss.remoting.Endpoint;
+import org.jboss.remoting.CloseHandler;
import org.jboss.remoting.spi.AbstractHandleableCloseable;
import org.jboss.remoting.spi.Handle;
import org.jboss.remoting.spi.RequestHandler;
import org.jboss.remoting.spi.RequestHandlerSource;
import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
/**
*
*/
public final class ClientSourceImpl<I, O> extends
AbstractHandleableCloseable<ClientSource<I, O>> implements ClientSource<I,
O> {
+ private static final Logger log =
Logger.getLogger("org.jboss.remoting.client-source");
+
private final Handle<RequestHandlerSource> handle;
private final Endpoint endpoint;
private final Class<I> requestClass;
private final Class<O> replyClass;
- ClientSourceImpl(final Handle<RequestHandlerSource> handle, final EndpointImpl
endpoint, final Class<I> requestClass, final Class<O> replyClass) {
+ private ClientSourceImpl(final Handle<RequestHandlerSource> handle, final
EndpointImpl endpoint, final Class<I> requestClass, final Class<O> replyClass)
{
super(endpoint.getExecutor());
this.handle = handle;
this.endpoint = endpoint;
@@ -50,6 +54,16 @@
this.replyClass = replyClass;
}
+ static <I, O> ClientSourceImpl<I, O> create(final
Handle<RequestHandlerSource> handle, final EndpointImpl endpoint, final
Class<I> requestClass, final Class<O> replyClass) {
+ final ClientSourceImpl<I, O> csi = new ClientSourceImpl<I, O>(handle,
endpoint, requestClass, replyClass);
+ handle.addCloseHandler(new
CloseHandler<Handle<RequestHandlerSource>>() {
+ public void handleClose(final Handle<RequestHandlerSource> closed) {
+ IoUtils.safeClose(csi);
+ }
+ });
+ return csi;
+ }
+
protected void closeAction() throws IOException {
handle.close();
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -46,7 +46,7 @@
Logger.getLogger("org.jboss.remoting").info("JBoss Remoting
version %s", Version.VERSION);
}
- private static final Logger log = Logger.getLogger(Endpoint.class);
+ private static final Logger log =
Logger.getLogger("org.jboss.remoting.endpoint");
private String name;
@@ -234,12 +234,7 @@
boolean ok = false;
final Handle<RequestHandler> handle = requestHandler.getHandle();
try {
- final ClientImpl<I, O> client = new ClientImpl<I, O>(handle,
executor, requestType, replyType);
- client.addCloseHandler(new CloseHandler<Client<I, O>>() {
- public void handleClose(final Client<I, O> closed) {
- IoUtils.safeClose(handle);
- }
- });
+ final ClientImpl<I, O> client = ClientImpl.create(handle, executor,
requestType, replyType);
ok = true;
return client;
} finally {
@@ -257,7 +252,7 @@
boolean ok = false;
final Handle<RequestHandlerSource> handle =
requestHandlerSource.getHandle();
try {
- final ClientSourceImpl<I, O> clientSource = new ClientSourceImpl<I,
O>(handle, this, requestClass, replyClass);
+ final ClientSourceImpl<I, O> clientSource =
ClientSourceImpl.create(handle, this, requestClass, replyClass);
ok = true;
return clientSource;
} finally {
Modified:
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -25,7 +25,6 @@
import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
-import org.jboss.remoting.CloseHandler;
import org.jboss.remoting.RemoteExecutionException;
import org.jboss.remoting.RequestListener;
import org.jboss.remoting.RemoteRequestException;
@@ -46,7 +45,7 @@
private final Class<I> requestClass;
private final Class<O> replyClass;
- private static final Logger log = Logger.getLogger(LocalRequestHandler.class);
+ private static final Logger log =
Logger.getLogger("org.jboss.remoting.listener");
LocalRequestHandler(Config<I, O> config) {
super(config.getExecutor());
@@ -88,18 +87,17 @@
};
}
+ protected void closeAction() throws IOException {
+ try {
+ requestListener.handleClientClose(clientContext);
+ } catch (Throwable t) {
+ log.error(t, "Unexpected exception in request listener client close
handler method");
+ }
+ }
+
void open() throws IOException {
try {
requestListener.handleClientOpen(clientContext);
- addCloseHandler(new CloseHandler<RequestHandler>() {
- public void handleClose(final RequestHandler closed) {
- try {
- requestListener.handleClientClose(clientContext);
- } catch (Throwable t) {
- log.error(t, "Unexpected exception in request listener
client close handler method");
- }
- }
- });
} catch (Throwable t) {
final IOException ioe = new IOException("Failed to open client
context");
ioe.initCause(t);
Modified:
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -43,7 +43,7 @@
private final Class<I> requestClass;
private final Class<O> replyClass;
- private static final Logger log = Logger.getLogger(LocalRequestHandlerSource.class);
+ private static final Logger log =
Logger.getLogger("org.jboss.remoting.listener-source");
LocalRequestHandlerSource(final Config<I, O> config) {
super(config.getExecutor());
Modified:
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ServiceContextImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ServiceContextImpl.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ServiceContextImpl.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -24,11 +24,14 @@
import java.util.concurrent.Executor;
import org.jboss.remoting.ServiceContext;
+import org.jboss.xnio.log.Logger;
/**
*
*/
public final class ServiceContextImpl extends AbstractContextImpl<ServiceContext>
implements ServiceContext {
+ private static final Logger log =
Logger.getLogger("org.jboss.remoting.service-context");
+
protected ServiceContextImpl(final Executor executor) {
super(executor);
}
Modified:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicRequestHandler.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicRequestHandler.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicRequestHandler.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -42,7 +42,7 @@
*/
final class BasicRequestHandler extends AbstractAutoCloseable<RequestHandler>
implements RequestHandler {
- private static final Logger log = Logger.getLogger(BasicRequestHandler.class);
+ private static final Logger log =
Logger.getLogger("org.jboss.remoting.basic");
private final AtomicInteger requestSequence;
private final Lock reqLock;
@@ -77,6 +77,8 @@
} catch (IOException e) {
log.error(e, "Error writing cancel request");
IoUtils.safeClose(BasicRequestHandler.this);
+ } finally {
+ reqLock.unlock();
}
}
};
Added:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BufferByteOutput.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BufferByteOutput.java
(rev 0)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BufferByteOutput.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -0,0 +1,90 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.marshalling.ByteOutput;
+import org.jboss.xnio.BufferAllocator;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+/**
+ *
+ */
+public class BufferByteOutput implements ByteOutput {
+
+ private ByteBuffer current;
+ private final BufferAllocator<ByteBuffer> allocator;
+ private final Collection<ByteBuffer> target;
+
+ public BufferByteOutput(final BufferAllocator<ByteBuffer> allocator, final
Collection<ByteBuffer> target) {
+ this.allocator = allocator;
+ this.target = target;
+ }
+
+ private ByteBuffer getCurrent() {
+ final ByteBuffer buffer = current;
+ return buffer == null ? (current = allocator.allocate()) : buffer;
+ }
+
+ public void write(final int i) {
+ final ByteBuffer buffer = getCurrent();
+ buffer.put((byte) i);
+ if (! buffer.hasRemaining()) {
+ buffer.flip();
+ target.add(buffer);
+ current = null;
+ }
+ }
+
+ public void write(final byte[] bytes) {
+ write(bytes, 0, bytes.length);
+ }
+
+ public void write(final byte[] bytes, int offs, int len) {
+ while (len > 0) {
+ final ByteBuffer buffer = getCurrent();
+ final int c = Math.min(len, buffer.remaining());
+ buffer.put(bytes, offs, c);
+ offs += c;
+ len -= c;
+ if (! buffer.hasRemaining()) {
+ buffer.flip();
+ target.add(buffer);
+ current = null;
+ }
+ }
+ }
+
+ public void close() {
+ flush();
+ }
+
+ public void flush() {
+ final ByteBuffer buffer = current;
+ if (buffer != null) {
+ buffer.flip();
+ target.add(buffer);
+ current = null;
+ }
+ }
+}
Added:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/FutureRemoteRequestHandlerSource.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/FutureRemoteRequestHandlerSource.java
(rev 0)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/FutureRemoteRequestHandlerSource.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -0,0 +1,46 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.xnio.AbstractIoFuture;
+import org.jboss.xnio.IoFuture;
+import org.jboss.remoting.spi.RequestHandlerSource;
+import java.io.IOException;
+
+/**
+ *
+ */
+public final class FutureRemoteRequestHandlerSource extends
AbstractIoFuture<RequestHandlerSource> {
+
+ public IoFuture<RequestHandlerSource> cancel() {
+ return this;
+ }
+
+ protected boolean setException(final IOException exception) {
+ return super.setException(exception);
+ }
+
+ protected boolean setResult(final RequestHandlerSource result) {
+ return super.setResult(result);
+ }
+}
Modified:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerBiMap.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerBiMap.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerBiMap.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -24,6 +24,8 @@
import java.util.HashMap;
import java.util.IdentityHashMap;
+import java.util.Set;
+import java.util.Collections;
/**
*
@@ -59,6 +61,17 @@
leftMap.remove(oldKey1Obj);
}
+ public boolean putIfAbsent(final int key1, final T key2) {
+ final Integer key1Obj = Integer.valueOf(key1);
+ if (leftMap.containsKey(key1Obj)) {
+ return false;
+ }
+ final T oldKey2 = leftMap.put(key1Obj, key2);
+ rightMap.put(key2, key1Obj);
+ rightMap.remove(oldKey2);
+ return true;
+ }
+
public T remove(final int key) {
final T oldRightKey = leftMap.remove(Integer.valueOf(key));
rightMap.remove(oldRightKey);
@@ -69,6 +82,10 @@
leftMap.remove(rightMap.remove(key));
}
+ public Set<T> getKeys() {
+ return Collections.unmodifiableSet(rightMap.keySet());
+ }
+
public static <T> IntegerBiMap<T> create() {
return new IdentityHashIntegerBiMap<T>();
}
Modified:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerResourceBiMap.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerResourceBiMap.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerResourceBiMap.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -26,6 +26,8 @@
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.Collections;
+import java.util.Collection;
import org.jboss.remoting.spi.AutoCloseable;
import org.jboss.remoting.spi.Handle;
@@ -73,6 +75,10 @@
leftMap.remove(rightMap.remove(key));
}
+ public Collection<Handle<T>> getKeys() {
+ return Collections.unmodifiableCollection(leftMap.values());
+ }
+
public static <T extends AutoCloseable<T>> IntegerResourceBiMap<T>
create() {
return new IdentityHashIntegerResourceBiMap<T>();
}
Modified:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerBiMap.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerBiMap.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerBiMap.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -22,6 +22,9 @@
package org.jboss.remoting.protocol.multiplex;
+import org.jboss.remoting.util.SynchronizedSet;
+import java.util.Set;
+
/**
*
*/
@@ -32,10 +35,14 @@
void put(int key1, T key2);
+ boolean putIfAbsent(int key1, T key2);
+
T remove(int key);
void remove(T key);
+ Set<T> getKeys();
+
class Util {
private Util() {
@@ -69,6 +76,12 @@
}
}
+ public boolean putIfAbsent(final int key1, final T key2) {
+ synchronized (lock) {
+ return orig.putIfAbsent(key1, key2);
+ }
+ }
+
public T remove(final int key) {
synchronized (lock) {
return orig.remove(key);
@@ -80,6 +93,10 @@
orig.remove(key);
}
}
+
+ public Set<T> getKeys() {
+ return new SynchronizedSet<T>(orig.getKeys(), lock);
+ }
}
public static <T> IntegerBiMap<T> synchronizing(IntegerBiMap<T>
orig) {
Modified:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerResourceBiMap.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerResourceBiMap.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerResourceBiMap.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -24,7 +24,10 @@
import org.jboss.remoting.spi.AutoCloseable;
import org.jboss.remoting.spi.Handle;
+import org.jboss.remoting.util.SynchronizedSet;
+import org.jboss.remoting.util.SynchronizedCollection;
import java.util.Iterator;
+import java.util.Collection;
/**
*
@@ -40,6 +43,8 @@
void remove(T key);
+ Collection<Handle<T>> getKeys();
+
class Util {
private Util() {
@@ -85,6 +90,10 @@
}
}
+ public Collection<Handle<T>> getKeys() {
+ return new SynchronizedCollection<Handle<T>>(orig.getKeys(),
lock);
+ }
+
public Iterator<Handle<T>> iterator() {
return null;
}
Modified:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -28,40 +28,50 @@
enum MessageType {
/**
+ * Signals that the connection should be closed in an orderly fashion. After this
message is sent, no further
+ * requests or service advertisements may be sent.
+ */
+ CONNECTION_CLOSE(0x00),
+ /**
* The request part of a request-response sequence, sent from the Client to the
RequestListener.
*/
- REQUEST(2),
+ REQUEST(0x10),
/**
* The reply part of a request-response sequence, sent from the RequestListener to
the Client.
*/
- REPLY(3),
+ REPLY(0x11),
/**
* A cancellation request for an outstanding request, sent from the Client to the
RequestListener.
*/
- CANCEL_REQUEST(4),
+ CANCEL_REQUEST(0x12),
/**
* Acknowlegement that a request was cancelled, sent from the RequestListener to the
Client.
*/
- CANCEL_ACK(5),
+ CANCEL_ACK(0x13),
/**
* Message that the request could not be received on the remote end, sent from to the
Client from the
- * protocol handler as a
+ * protocol handler.
*/
- REQUEST_RECEIVE_FAILED(6),
+ REQUEST_RECEIVE_FAILED(0x14),
// Request failed due to exception
- REQUEST_FAILED(7),
- // Request completed but no reply or exception was sent
- REQUEST_OUTCOME_UNKNOWN(8),
+ REQUEST_FAILED(0x15),
// Remote side called .close() on a forwarded RequestHandler
- CLIENT_CLOSE(9),
+ CLIENT_CLOSE(0x20),
// Remote side pulled a new RequestHandler off of a forwarded RequestHandlerSource
- CLIENT_OPEN(10),
- // Remote side called .close() on a forwarded RequestHandlerSource
- SERVICE_CLOSE(11),
- // Remote side brought a new service online
- SERVICE_ADVERTISE(12),
- // Remote side's service is no longer available
- SERVICE_UNADVERTISE(13),
+ CLIENT_OPEN(0x21),
+ // Request to open a service at a path
+ SERVICE_OPEN_REQUEST(0x30),
+ // Reply for a successful service open
+ SERVICE_OPEN_REPLY(0x31),
+ // Reply for a generally failed service open
+ SERVICE_OPEN_FAILED(0x32),
+ SERVICE_OPEN_NOT_FOUND(0x33),
+ SERVICE_OPEN_FORBIDDEN(0x34),
+
+ // Notify the remote side that the service will no longer be used
+ SERVICE_CLOSE_REQUEST(0x3e),
+ // The service channel is closed; no further clients may be opened
+ SERVICE_CLOSE_NOTIFY(0x3f),
;
private final int id;
@@ -81,18 +91,22 @@
*/
public static MessageType getMessageType(final int id) {
switch (id) {
- case 2: return REQUEST;
- case 3: return REPLY;
- case 4: return CANCEL_REQUEST;
- case 5: return CANCEL_ACK;
- case 6: return REQUEST_RECEIVE_FAILED;
- case 7: return REQUEST_FAILED;
- case 8: return REQUEST_OUTCOME_UNKNOWN;
- case 9: return CLIENT_CLOSE;
- case 10: return CLIENT_OPEN;
- case 11: return SERVICE_CLOSE;
- case 12: return SERVICE_ADVERTISE;
- case 13: return SERVICE_UNADVERTISE;
+ case 0x00: return CONNECTION_CLOSE;
+ case 0x10: return REQUEST;
+ case 0x11: return REPLY;
+ case 0x12: return CANCEL_REQUEST;
+ case 0x13: return CANCEL_ACK;
+ case 0x14: return REQUEST_RECEIVE_FAILED;
+ case 0x15: return REQUEST_FAILED;
+ case 0x20: return CLIENT_CLOSE;
+ case 0x21: return CLIENT_OPEN;
+ case 0x30: return SERVICE_OPEN_REQUEST;
+ case 0x31: return SERVICE_OPEN_REPLY;
+ case 0x32: return SERVICE_OPEN_FAILED;
+ case 0x33: return SERVICE_OPEN_NOT_FOUND;
+ case 0x34: return SERVICE_OPEN_FORBIDDEN;
+ case 0x3e: return SERVICE_CLOSE_REQUEST;
+ case 0x3f: return SERVICE_CLOSE_NOTIFY;
default: throw new IllegalArgumentException("Invalid message type
ID");
}
}
Modified:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConfiguration.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConfiguration.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConfiguration.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -27,6 +27,7 @@
import org.jboss.xnio.BufferAllocator;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.MarshallingConfiguration;
+import org.jboss.remoting.spi.NamedServiceRegistry;
/**
* A configuration object for the multiplex protocol.
@@ -37,6 +38,7 @@
private int linkMetric;
private Executor executor;
private BufferAllocator<ByteBuffer> allocator;
+ private NamedServiceRegistry namedServiceRegistry;
/**
* Construct a new instance.
@@ -133,4 +135,22 @@
public void setAllocator(final BufferAllocator<ByteBuffer> allocator) {
this.allocator = allocator;
}
+
+ /**
+ * Get the named service registry for this connection.
+ *
+ * @return the registry
+ */
+ public NamedServiceRegistry getNamedServiceRegistry() {
+ return namedServiceRegistry;
+ }
+
+ /**
+ * Set the named service registry for this connection.
+ *
+ * @param namedServiceRegistry the registry
+ */
+ public void setNamedServiceRegistry(final NamedServiceRegistry namedServiceRegistry)
{
+ this.namedServiceRegistry = namedServiceRegistry;
+ }
}
Added:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConnection.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConnection.java
(rev 0)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConnection.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -0,0 +1,444 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.MarshallingConfiguration;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.Buffers;
+import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.remoting.spi.ReplyHandler;
+import org.jboss.remoting.spi.RemoteRequestContext;
+import org.jboss.remoting.spi.RequestHandler;
+import org.jboss.remoting.spi.RequestHandlerSource;
+import org.jboss.remoting.spi.Handle;
+import org.jboss.remoting.spi.NamedServiceRegistry;
+import org.jboss.remoting.spi.SpiUtils;
+import org.jboss.remoting.spi.AbstractHandleableCloseable;
+import org.jboss.remoting.Endpoint;
+import org.jboss.remoting.IndeterminateOutcomeException;
+import org.jboss.remoting.util.QualifiedName;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.List;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.Closeable;
+
+/**
+ *
+ */
+public final class MultiplexConnection extends
AbstractHandleableCloseable<MultiplexConnection> {
+ private static final Logger log =
Logger.getLogger("org.jboss.remoting.multiplex");
+
+ //--== Connection configuration items ==--
+ private final MarshallerFactory marshallerFactory;
+ private final MarshallingConfiguration marshallingConfiguration;
+ private final int linkMetric;
+ private final Executor executor;
+ // buffer allocator for outbound message assembly
+ private final BufferAllocator<ByteBuffer> allocator;
+
+ // running on remote node
+ private final IntegerBiMap<ReplyHandler> remoteRequests =
IdentityHashIntegerBiMap.createSynchronizing();
+ // running on local node
+ private final IntegerBiMap<RemoteRequestContext> localRequests =
IdentityHashIntegerBiMap.createSynchronizing();
+ // sequence for remote requests
+ private final AtomicInteger requestSequence = new AtomicInteger();
+
+ // clients whose requests get forwarded to the remote side
+ // even #s were opened from services forwarded to us (our sequence)
+ // odd #s were forwarded directly to us (remote sequence)
+ private final IntegerBiMap<RequestHandler> remoteClients =
IdentityHashIntegerBiMap.createSynchronizing();
+ // forwarded to remote side (handled on this side)
+ private final IntegerResourceBiMap<RequestHandler> forwardedClients =
IdentityHashIntegerResourceBiMap.createSynchronizing();
+ // sequence for forwarded clients (shift left one bit, add one, limit is 2^30)
+ private final AtomicInteger forwardedClientSequence = new AtomicInteger();
+ // sequence for clients created from services forwarded to us (shift left one bit,
limit is 2^30)
+ private final AtomicInteger remoteClientSequence = new AtomicInteger();
+
+ // services on the remote side
+ private final IntegerBiMap<FutureRemoteRequestHandlerSource> remoteServices =
IdentityHashIntegerBiMap.createSynchronizing();
+ // forwarded to remote side (handled on this side)
+ private final IntegerResourceBiMap<RequestHandlerSource> forwardedServices =
IdentityHashIntegerResourceBiMap.createSynchronizing();
+ // sequence for remote services
+ private final AtomicInteger remoteServiceSequence = new AtomicInteger();
+
+ // registered services by path
+ private final NamedServiceRegistry namedServiceRegistry;
+
+ private final Endpoint endpoint;
+
+ private final AllocatedMessageChannel channel;
+
+ public MultiplexConnection(final Endpoint endpoint, final AllocatedMessageChannel
channel, final MultiplexConfiguration configuration) {
+ super(configuration.getExecutor());
+ this.endpoint = endpoint;
+ this.channel = channel;
+ marshallerFactory = configuration.getMarshallerFactory();
+ if (marshallerFactory == null) {
+ throw new NullPointerException("marshallerFactory is null");
+ }
+ marshallingConfiguration = configuration.getMarshallingConfiguration();
+ if (marshallingConfiguration == null) {
+ throw new NullPointerException("marshallingConfiguration is
null");
+ }
+ linkMetric = configuration.getLinkMetric();
+ executor = configuration.getExecutor();
+ if (executor == null) {
+ throw new NullPointerException("executor is null");
+ }
+ allocator = configuration.getAllocator();
+ if (allocator == null) {
+ throw new NullPointerException("allocator is null");
+ }
+ namedServiceRegistry = configuration.getNamedServiceRegistry();
+ if (namedServiceRegistry == null) {
+ throw new NullPointerException("namedServiceRegistry is null");
+ }
+ }
+
+ // sequence methods
+
+ int nextRequest() {
+ return requestSequence.getAndIncrement() & 0x7fffffff;
+ }
+
+ int nextForwardedClient() {
+ return (forwardedClientSequence.getAndIncrement() << 1 | 1) &
0x7fffffff;
+ }
+
+ int nextRemoteClient() {
+ return remoteClientSequence.getAndIncrement() << 1 & 0x7fffffff;
+ }
+
+ int nextRemoteService() {
+ return remoteServiceSequence.getAndIncrement() & 0x7fffffff;
+ }
+
+ void doBlockingWrite(ByteBuffer... buffers) throws IOException {
+ if (buffers.length == 1) doBlockingWrite(buffers[0]); else for (;;) {
+ if (channel.send(buffers)) {
+ return;
+ }
+ channel.awaitWritable();
+ }
+ }
+
+ void doBlockingWrite(ByteBuffer buffer) throws IOException {
+ log.trace("Sending message:\n%s", Buffers.createDumper(buffer, 8, 1));
+ for (;;) {
+ if (channel.send(buffer)) {
+ return;
+ }
+ channel.awaitWritable();
+ }
+ }
+
+ void doBlockingWrite(List<ByteBuffer> buffers) throws IOException {
+ doBlockingWrite(buffers.toArray(new ByteBuffer[buffers.size()]));
+ }
+
+ MarshallerFactory getMarshallerFactory() {
+ return marshallerFactory;
+ }
+
+ MarshallingConfiguration getMarshallingConfiguration() {
+ return marshallingConfiguration;
+ }
+
+ int getLinkMetric() {
+ return linkMetric;
+ }
+
+ protected Executor getExecutor() {
+ return executor;
+ }
+
+ BufferAllocator<ByteBuffer> getAllocator() {
+ return allocator;
+ }
+
+ Endpoint getEndpoint() {
+ return endpoint;
+ }
+
+ AllocatedMessageChannel getChannel() {
+ return channel;
+ }
+
+ void removeRemoteClient(final int identifier) {
+ remoteClients.remove(identifier);
+ }
+
+ void addRemoteRequest(final int id, final ReplyHandler handler) {
+ remoteRequests.put(id, handler);
+ }
+
+ void addRemoteClient(final int id, final RequestHandler handler) {
+ remoteClients.put(id, handler);
+ }
+
+ Handle<RequestHandler> getForwardedClient(final int id) {
+ return forwardedClients.get(id);
+ }
+
+ ReplyHandler removeRemoteRequest(final int id) {
+ return remoteRequests.remove(id);
+ }
+
+ RemoteRequestContext getLocalRequest(final int id) {
+ return localRequests.get(id);
+ }
+
+ ReplyHandler getRemoteRequest(final int id) {
+ return remoteRequests.get(id);
+ }
+
+ Handle<RequestHandler> removeForwardedClient(final int id) {
+ return forwardedClients.remove(id);
+ }
+
+ Handle<RequestHandlerSource> getForwardedService(final int id) {
+ return forwardedServices.get(id);
+ }
+
+ void addForwardedClient(final int id, final Handle<RequestHandler> handle) {
+ forwardedClients.put(id, handle);
+ }
+
+ void addForwadedService(final int id, final Handle<RequestHandlerSource>
service) {
+ forwardedServices.put(id, service);
+ }
+
+ Handle<RequestHandlerSource> removeForwardedService(final int id) {
+ return forwardedServices.remove(id);
+ }
+
+ Handle<RequestHandlerSource> getServiceByPath(String path) {
+ return getService(QualifiedName.parse(path));
+ }
+
+ Handle<RequestHandlerSource> getService(final QualifiedName name) {
+ return namedServiceRegistry.lookupService(name);
+ }
+
+ FutureRemoteRequestHandlerSource getFutureRemoteService(final int id) {
+ return remoteServices.get(id);
+ }
+
+ FutureRemoteRequestHandlerSource removeFutureRemoteService(final int id) {
+ return remoteServices.remove(id);
+ }
+
+ public Handle<RequestHandlerSource> openRemoteService(final QualifiedName name)
throws IOException {
+ final FutureRemoteRequestHandlerSource future = new
FutureRemoteRequestHandlerSource();
+ int id;
+ for (;;) {
+ id = nextRemoteService();
+ if (remoteServices.putIfAbsent(id, future)) {
+ break;
+ }
+ }
+ ByteBuffer buffer = ByteBuffer.allocate(5 + getByteLength(name));
+ buffer.put((byte) MessageType.SERVICE_OPEN_REQUEST.getId());
+ buffer.putInt(id);
+ putQualifiedName(buffer, name);
+ buffer.flip();
+ doBlockingWrite(buffer);
+ try {
+ final Handle<RequestHandlerSource> handle =
future.getInterruptibly().getHandle();
+ log.trace("Opened %s to %s", handle, this);
+ return handle;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("Interrupted while waiting for remote
service");
+ }
+ }
+
+ static int getByteLength(QualifiedName name) {
+ int cnt = 2; // short header
+ for (String s : name) {
+ cnt += getByteLength(s);
+ }
+ return cnt;
+ }
+
+ static int getByteLength(String s) {
+ final int len = s.length();
+ int cnt = 0;
+ for (int i = 0; i < len; i++) {
+ char ch = s.charAt(i);
+ if (ch > 0 && ch <= 0x7f) {
+ cnt ++;
+ } else if (ch <= 0x07ff) {
+ cnt += 2;
+ } else {
+ cnt += 3;
+ }
+ }
+ // null terminator...
+ cnt ++;
+ return cnt;
+ }
+
+ static String getString(final ByteBuffer buffer) {
+ StringBuilder builder = new StringBuilder();
+ int state = 0, a = 0;
+ while (buffer.hasRemaining()) {
+ final int v = buffer.get() & 0xff;
+ switch (state) {
+ case 0: {
+ if (v == 0) {
+ return builder.toString();
+ } else if (v < 128) {
+ builder.append((char) v);
+ } else if (192 <= v && v < 224) {
+ a = v << 6;
+ state = 1;
+ } else if (224 <= v && v < 232) {
+ a = v << 12;
+ state = 2;
+ } else {
+ builder.append('?');
+ }
+ break;
+ }
+ case 1: {
+ if (v == 0) {
+ builder.append('?');
+ return builder.toString();
+ } else if (128 <= v && v < 192) {
+ a |= v & 0x3f;
+ builder.append((char) a);
+ } else {
+ builder.append('?');
+ }
+ state = 0;
+ break;
+ }
+ case 2: {
+ if (v == 0) {
+ builder.append('?');
+ return builder.toString();
+ } else if (128 <= v && v < 192) {
+ a |= (v & 0x3f) << 6;
+ state = 1;
+ } else {
+ builder.append('?');
+ state = 0;
+ }
+ break;
+ }
+ default:
+ throw new IllegalStateException("wrong state");
+ }
+ }
+ return builder.toString();
+ }
+
+ static void putString(final ByteBuffer buffer, final String string) {
+ final int len = string.length();
+ for (int i = 0; i < len; i ++) {
+ char ch = string.charAt(i);
+ if (ch > 0 && ch <= 0x7f) {
+ buffer.put((byte) ch);
+ } else if (ch <= 0x07ff) {
+ buffer.put((byte) (0xc0 | 0x1f & ch >> 6));
+ buffer.put((byte) (0x80 | 0x3f & ch));
+ } else {
+ buffer.put((byte) (0xe0 | 0x0f & ch >> 12));
+ buffer.put((byte) (0x80 | 0x3f & ch >> 6));
+ buffer.put((byte) (0x80 | 0x3f & ch));
+ }
+ }
+ buffer.put((byte) 0);
+ }
+
+ static QualifiedName getQualifiedName(final ByteBuffer buffer) {
+ final int len = buffer.getShort() & 0xffff;
+ final String[] segs = new String[len];
+ for (int i = 0; i < len; i++) {
+ segs[i] = getString(buffer);
+ }
+ return new QualifiedName(segs);
+ }
+
+ static void putQualifiedName(final ByteBuffer buffer, final QualifiedName
qualifiedName) {
+ final int len = qualifiedName.length();
+ if (len > 0xffff) {
+ throw new IllegalArgumentException("Qualified name is too long");
+ }
+ buffer.putShort((short) len);
+ for (String seg : qualifiedName) {
+ putString(buffer, seg);
+ }
+ }
+
+ protected void closeAction() {
+ // just to make sure...
+ IoUtils.safeClose(channel);
+ final IndeterminateOutcomeException ioe = new
IndeterminateOutcomeException("The connection was closed");
+ // Things running remotely
+ for (ReplyHandler x : remoteRequests.getKeys()) {
+ SpiUtils.safeHandleException(x, ioe);
+ }
+ for (RequestHandler x : remoteClients.getKeys()) {
+ IoUtils.safeClose(x);
+ }
+ for (FutureRemoteRequestHandlerSource future : remoteServices.getKeys()) {
+
future.addNotifier(MultiplexConnection.<RequestHandlerSource>closingNotifier());
+ }
+ // Things running locally
+ for (RemoteRequestContext localRequest : localRequests.getKeys()) {
+ localRequest.cancel();
+ }
+ for (Handle<RequestHandler> client : forwardedClients.getKeys()) {
+ IoUtils.safeClose(client);
+ }
+ for (Handle<RequestHandlerSource> service : forwardedServices.getKeys()) {
+ IoUtils.safeClose(service);
+ }
+ }
+
+ public String toString() {
+ return "multiplex connection <" + Integer.toHexString(hashCode()) +
"> on " + channel;
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ private static <T extends Closeable> IoFuture.Notifier<T>
closingNotifier() {
+ return (IoFuture.Notifier<T>) CLOSING_NOTIFIER;
+ }
+
+ private static final ClosingNotifier CLOSING_NOTIFIER = new ClosingNotifier();
+
+ private static class ClosingNotifier extends
IoFuture.HandlingNotifier<Closeable> {
+ public void handleDone(final Closeable result) {
+ IoUtils.safeClose(result);
+ }
+ }
+}
Deleted:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -1,879 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.multiplex;
-
-import org.jboss.xnio.channels.AllocatedMessageChannel;
-import org.jboss.xnio.IoHandler;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.IoUtils;
-import org.jboss.xnio.log.Logger;
-import org.jboss.remoting.spi.RequestHandler;
-import org.jboss.remoting.spi.RequestHandlerSource;
-import org.jboss.remoting.spi.ReplyHandler;
-import org.jboss.remoting.spi.RemoteRequestContext;
-import org.jboss.remoting.spi.Handle;
-import org.jboss.remoting.spi.SpiUtils;
-import org.jboss.remoting.spi.AbstractAutoCloseable;
-import org.jboss.remoting.util.CollectionUtil;
-import org.jboss.remoting.CloseHandler;
-import org.jboss.remoting.Endpoint;
-import org.jboss.remoting.SimpleCloseable;
-import org.jboss.remoting.RemoteExecutionException;
-import org.jboss.remoting.IndeterminateOutcomeException;
-import org.jboss.remoting.ReplyException;
-import org.jboss.remoting.RemoteServiceConfiguration;
-import org.jboss.marshalling.MarshallerFactory;
-import org.jboss.marshalling.Unmarshaller;
-import org.jboss.marshalling.ByteOutput;
-import org.jboss.marshalling.Marshaller;
-import org.jboss.marshalling.MarshallingConfiguration;
-import org.jboss.marshalling.ObjectTable;
-import org.jboss.marshalling.Marshalling;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.nio.ByteBuffer;
-import java.nio.BufferUnderflowException;
-import java.io.IOException;
-import java.io.InvalidClassException;
-import java.io.InterruptedIOException;
-import java.io.InvalidObjectException;
-
-/**
- * Protocol handler for the basic message-oriented Remoting protocol.
- */
-final class MultiplexHandler implements IoHandler<AllocatedMessageChannel> {
-
- private static final Logger log = Logger.getLogger(MultiplexHandler.class);
-
- //--== Connection configuration items ==--
- private final MarshallerFactory marshallerFactory;
- private final MarshallingConfiguration marshallingConfiguration;
- private final int linkMetric;
- private final Executor executor;
- // buffer allocator for outbound message assembly
- private final BufferAllocator<ByteBuffer> allocator;
-
- // running on remote node
- private final IntegerBiMap<ReplyHandler> remoteRequests =
IdentityHashIntegerBiMap.createSynchronizing();
- // running on local node
- private final IntegerBiMap<RemoteRequestContext> localRequests =
IdentityHashIntegerBiMap.createSynchronizing();
- // sequence for remote requests
- private final AtomicInteger requestSequence = new AtomicInteger();
-
- // clients whose requests get forwarded to the remote side
- // even #s were opened from services forwarded to us (our sequence)
- // odd #s were forwarded directly to us (remote sequence)
- private final IntegerBiMap<RequestHandler> remoteClients =
IdentityHashIntegerBiMap.createSynchronizing();
- // forwarded to remote side (handled on this side)
- private final IntegerResourceBiMap<RequestHandler> forwardedClients =
IdentityHashIntegerResourceBiMap.createSynchronizing();
- // sequence for forwarded clients (shift left one bit, add one, limit is 2^30)
- private final AtomicInteger forwardedClientSequence = new AtomicInteger();
- // sequence for clients created from services forwarded to us (shift left one bit,
limit is 2^30)
- private final AtomicInteger remoteClientSequence = new AtomicInteger();
-
- // services forwarded to us
- private final IntegerBiMap<RequestHandlerSource> remoteServices =
IdentityHashIntegerBiMap.createSynchronizing();
- // forwarded to remote side (handled on this side)
- private final IntegerResourceBiMap<RequestHandlerSource> forwardedServices =
IdentityHashIntegerResourceBiMap.createSynchronizing();
- // sequence for forwarded services
- private final AtomicInteger forwardedServiceSequence = new AtomicInteger();
-
- private final Endpoint endpoint;
-
- private volatile AllocatedMessageChannel channel;
- private static final StackTraceElement[] emptyStackTraceElements = new
StackTraceElement[0];
-
- public MultiplexHandler(final Endpoint endpoint, final MultiplexConfiguration
configuration) {
- this.endpoint = endpoint;
- allocator = configuration.getAllocator();
- executor = configuration.getExecutor();
- marshallerFactory = configuration.getMarshallerFactory();
- marshallingConfiguration = configuration.getMarshallingConfiguration();
- linkMetric = configuration.getLinkMetric();
- }
-
- // sequence methods
-
- int nextRequest() {
- return requestSequence.getAndIncrement() & 0x7fffffff;
- }
-
- int nextForwardedClient() {
- return (forwardedClientSequence.getAndIncrement() << 1 | 1) &
0x7fffffff;
- }
-
- int nextRemoteClient() {
- return remoteClientSequence.getAndIncrement() << 1 & 0x7fffffff;
- }
-
- int nextForwardedService() {
- return forwardedServiceSequence.getAndIncrement() & 0x7fffffff;
- }
-
- void setChannel(final AllocatedMessageChannel channel) {
- this.channel = channel;
- }
-
- public void handleOpened(final AllocatedMessageChannel channel) {
- channel.resumeReads();
- }
-
- public void handleReadable(final AllocatedMessageChannel channel) {
- for (;;) try {
- final ByteBuffer buffer;
- try {
- buffer = channel.receive();
- } catch (IOException e) {
- log.error(e, "I/O error in protocol channel; closing
channel");
- IoUtils.safeClose(channel);
- return;
- }
- if (buffer == null) {
- // todo release all handles...
- // todo what if the write queue is not empty?
- IoUtils.safeClose(channel);
- return;
- }
- if (! buffer.hasRemaining()) {
- // would block
- channel.resumeReads();
- return;
- }
- final MessageType msgType;
- try {
- msgType = MessageType.getMessageType(buffer.get() & 0xff);
- } catch (IllegalArgumentException ex) {
- log.trace("Received invalid message type");
- return;
- }
- log.trace("Received message %s, type %s", buffer, msgType);
- switch (msgType) {
- case REQUEST: {
- final int clientId = buffer.getInt();
- final Handle<RequestHandler> handle =
forwardedClients.get(clientId);
- if (handle == null) {
- log.trace("Request on invalid client ID %d",
Integer.valueOf(clientId));
- break;
- }
- final int requestId = buffer.getInt();
- final Object payload;
- try {
- final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller(marshallingConfiguration);
- try {
- unmarshaller.start(Marshalling.createByteInput(buffer));
- try {
- payload = unmarshaller.readObject();
- unmarshaller.finish();
- } catch (ClassNotFoundException e) {
- break;
- }
- } finally {
- IoUtils.safeClose(unmarshaller);
- }
- } catch (Exception ex) {
- // IOException | ClassNotFoundException
- log.trace("Failed to unmarshal a request (%s), sending
%s", ex, MessageType.REQUEST_RECEIVE_FAILED);
- try {
- final Marshaller marshaller =
marshallerFactory.createMarshaller(marshallingConfiguration);
- try {
- List<ByteBuffer> buffers = new
ArrayList<ByteBuffer>();
- marshaller.start(createByteOutput(allocator, buffers));
-
marshaller.write(MessageType.REQUEST_RECEIVE_FAILED.getId());
- final IOException ioe = new IOException("Request
receive failed");
- ex.setStackTrace(emptyStackTraceElements);
- ioe.initCause(ex);
- ioe.setStackTrace(emptyStackTraceElements);
- marshaller.writeObject(ioe);
- marshaller.finish();
- registerWriter(channel, new SimpleWriteHandler(allocator,
buffers));
- } catch (InterruptedException e1) {
- Thread.currentThread().interrupt();
- log.debug("Remoting channel handler thread
interrupted; closing channel");
- IoUtils.safeClose(channel);
- } finally {
- IoUtils.safeClose(marshaller);
- }
- } catch (IOException ioe) {
- log.warn("Failed to send notification of failure to
unmarshal a request: %s", ioe);
- }
- break;
- }
- // request received OK
- final RequestHandler requestHandler = handle.getResource();
- requestHandler.receiveRequest(payload, new ReplyHandlerImpl(channel,
requestId, allocator));
- break;
- }
- case REPLY: {
- final int requestId = buffer.getInt();
- final ReplyHandler replyHandler = remoteRequests.remove(requestId);
- if (replyHandler == null) {
- log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
- break;
- }
- final Object payload;
- try {
- final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller(marshallingConfiguration);
- try {
- unmarshaller.start(Marshalling.createByteInput(buffer));
- try {
- payload = unmarshaller.readObject();
- unmarshaller.finish();
- } catch (ClassNotFoundException e) {
- replyHandler.handleException(new
InvalidClassException("Class not found: " + e.toString()));
- log.trace("Class not found in reply to request ID
%d", Integer.valueOf(requestId));
- break;
- }
- } finally {
- IoUtils.safeClose(unmarshaller);
- }
- } catch (IOException ex) {
- log.trace("Failed to unmarshal a reply (%s), sending a
ReplyException", ex);
- // todo
- SpiUtils.safeHandleException(replyHandler, new
ReplyException("Unmarshal failed", ex));
- break;
- }
- SpiUtils.safeHandleReply(replyHandler, payload);
- break;
- }
- case CANCEL_REQUEST: {
- final int requestId = buffer.getInt();
- final RemoteRequestContext context = localRequests.get(requestId);
- if (context != null) {
- context.cancel();
- }
- break;
- }
- case CANCEL_ACK: {
- final int requestId = buffer.getInt();
- final ReplyHandler replyHandler = remoteRequests.get(requestId);
- if (replyHandler != null) try {
- replyHandler.handleCancellation();
- } catch (IOException e) {
- log.trace("Failed to forward a cancellation acknowledgement
(%s)", e);
- }
- break;
- }
- case REQUEST_RECEIVE_FAILED: {
- final int requestId = buffer.getInt();
- final ReplyHandler replyHandler = remoteRequests.remove(requestId);
- if (replyHandler == null) {
- log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
- break;
- }
- final IOException cause;
- try {
- final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller(marshallingConfiguration);
- try {
- unmarshaller.start(Marshalling.createByteInput(buffer));
- cause = (IOException) unmarshaller.readObject();
- unmarshaller.finish();
- } finally {
- IoUtils.safeClose(unmarshaller);
- }
- } catch (IOException e) {
- SpiUtils.safeHandleException(replyHandler, new
RemoteExecutionException("Remote operation failed; the remote exception could not be
read", e));
- break;
- } catch (ClassNotFoundException e) {
- SpiUtils.safeHandleException(replyHandler, new
RemoteExecutionException("Remote operation failed; the remote exception could not be
read", e));
- break;
- }
- SpiUtils.safeHandleException(replyHandler, cause);
- break;
- }
- case REQUEST_FAILED: {
- final int requestId = buffer.getInt();
- final ReplyHandler replyHandler = remoteRequests.remove(requestId);
- if (replyHandler == null) {
- log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
- break;
- }
- final IOException cause;
- try {
- final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller(marshallingConfiguration);
- try {
- unmarshaller.start(Marshalling.createByteInput(buffer));
- try {
- cause = (IOException) unmarshaller.readObject();
- } catch (ClassNotFoundException e) {
- replyHandler.handleException(new
InvalidClassException("Class not found: " + e.toString()));
- log.trace("Class not found in exception reply to
request ID %d", Integer.valueOf(requestId));
- break;
- } catch (ClassCastException e) {
- SpiUtils.safeHandleException(replyHandler, new
RemoteExecutionException("Remote request failed (and an unexpected I/O error occurred
when attempting to unmarshal the cause)"));
- break;
- }
- } finally {
- IoUtils.safeClose(unmarshaller);
- }
- } catch (IOException ex) {
- log.trace("Failed to unmarshal an exception reply (%s),
sending a generic execution exception");
- SpiUtils.safeHandleException(replyHandler, new
RemoteExecutionException("Remote request failed (and an unexpected I/O error occurred
when attempting to read the cause)"));
- break;
- }
- SpiUtils.safeHandleException(replyHandler, new
RemoteExecutionException("Remote execution failed", cause));
- break;
- }
- case REQUEST_OUTCOME_UNKNOWN: {
- final int requestId = buffer.getInt();
- final ReplyHandler replyHandler = remoteRequests.remove(requestId);
- if (replyHandler == null) {
- log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
- break;
- }
- final String reason = readUTFZ(buffer);
- SpiUtils.safeHandleException(replyHandler, new
IndeterminateOutcomeException(reason));
- break;
- }
- case CLIENT_CLOSE: {
- final int clientId = buffer.getInt();
- final Handle<RequestHandler> handle =
forwardedClients.remove(clientId);
- if (handle == null) {
- log.warn("Got client close message for unknown client
%d", Integer.valueOf(clientId));
- break;
- }
- IoUtils.safeClose(handle);
- break;
- }
- case CLIENT_OPEN: {
- final int serviceId = buffer.getInt();
- final int clientId = buffer.getInt();
- final Handle<RequestHandlerSource> handle =
forwardedServices.get(serviceId);
- if (handle == null) {
- log.warn("Received client open message for unknown service
%d", Integer.valueOf(serviceId));
- break;
- }
- try {
- final RequestHandlerSource requestHandlerSource =
handle.getResource();
- final Handle<RequestHandler> clientHandle =
requestHandlerSource.createRequestHandler();
- // todo check for duplicate
- // todo validate the client ID
- log.trace("Opening client %d from service %d",
Integer.valueOf(clientId), Integer.valueOf(serviceId));
- forwardedClients.put(clientId, clientHandle);
- } catch (IOException ex) {
- log.error(ex, "Failed to create a request handler for client
ID %d", Integer.valueOf(clientId));
- break;
- } finally {
- IoUtils.safeClose(handle);
- }
- break;
- }
- case SERVICE_CLOSE: {
- final Handle<RequestHandlerSource> handle =
forwardedServices.remove(buffer.getInt());
- if (handle == null) {
- break;
- }
- IoUtils.safeClose(handle);
- break;
- }
- case SERVICE_ADVERTISE: {
- final int serviceId = buffer.getInt();
- final String serviceType = readUTFZ(buffer);
- final String groupName = readUTFZ(buffer);
- final String endpointName = readUTFZ(buffer);
- final int baseMetric = buffer.getInt();
- int id = -1;
- final RequestHandlerSource handlerSource = new
RequestHandlerSourceImpl(allocator, id);
- final int calcMetric = baseMetric + linkMetric;
- if (calcMetric > 0) {
- try {
- final RemoteServiceConfiguration config = new
RemoteServiceConfiguration();
- config.setServiceType(serviceType);
- config.setGroupName(groupName);
- config.setEndpointName(endpointName);
- final SimpleCloseable closeable =
endpoint.registerRemoteService(config);
- // todo - something with that closeable
- } catch (IOException e) {
- log.error(e, "Unable to register remote service");
- }
- }
- break;
- }
- case SERVICE_UNADVERTISE: {
- final int serviceId = buffer.getInt();
- IoUtils.safeClose(remoteServices.get(serviceId));
- break;
- }
- default: {
- log.error("Malformed packet received (invalid message type
%s)", msgType);
- }
- }
- } catch (BufferUnderflowException e) {
- log.error("Malformed packet received (buffer underflow)");
- }
- }
-
- public void handleWritable(final AllocatedMessageChannel channel) {
- for (;;) {
- final WriteHandler handler = outputQueue.peek();
- if (handler == null) {
- return;
- }
- try {
- if (handler.handleWrite(channel)) {
- log.trace("Handled write with handler %s", handler);
- pending.decrementAndGet();
- outputQueue.remove();
- } else {
- channel.resumeWrites();
- return;
- }
- } catch (Throwable t) {
- pending.decrementAndGet();
- outputQueue.remove();
- }
- }
- }
-
- public void handleClosed(final AllocatedMessageChannel channel) {
- }
-
- RequestHandlerSource getRemoteService(final int id) {
- return new RequestHandlerSourceImpl(allocator, id);
- }
-
- private final class ReplyHandlerImpl implements ReplyHandler {
-
- private final AllocatedMessageChannel channel;
- private final int requestId;
- private final BufferAllocator<ByteBuffer> allocator;
-
- private ReplyHandlerImpl(final AllocatedMessageChannel channel, final int
requestId, final BufferAllocator<ByteBuffer> allocator) {
- if (channel == null) {
- throw new NullPointerException("channel is null");
- }
- if (allocator == null) {
- throw new NullPointerException("allocator is null");
- }
- this.channel = channel;
- this.requestId = requestId;
- this.allocator = allocator;
- }
-
- public void handleReply(final Object reply) throws IOException {
- ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.REPLY.getId());
- buffer.putInt(requestId);
- try {
- final Marshaller marshaller =
marshallerFactory.createMarshaller(marshallingConfiguration);
- try {
- final List<ByteBuffer> bufferList = new
ArrayList<ByteBuffer>();
- final ByteOutput output = createByteOutput(allocator, bufferList);
- try {
- marshaller.start(output);
- marshaller.writeObject(reply);
- marshaller.close();
- output.close();
- registerWriter(channel, new SimpleWriteHandler(allocator,
bufferList));
- } finally {
- IoUtils.safeClose(output);
- }
- } finally {
- IoUtils.safeClose(marshaller);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException("Reply handler thread interrupted
before a reply could be sent");
- }
- }
-
- public void handleException(final IOException exception) throws IOException {
- try {
- final Marshaller marshaller =
marshallerFactory.createMarshaller(marshallingConfiguration);
- try {
- final List<ByteBuffer> bufferList = new
ArrayList<ByteBuffer>();
- final ByteOutput output = createByteOutput(allocator, bufferList);
- try {
- marshaller.write(MessageType.REQUEST_FAILED.getId());
- marshaller.writeInt(requestId);
- marshaller.start(output);
- marshaller.writeObject(exception);
- marshaller.close();
- output.close();
- registerWriter(channel, new SimpleWriteHandler(allocator,
bufferList));
- } finally {
- IoUtils.safeClose(output);
- }
- } finally {
- IoUtils.safeClose(marshaller);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException("Reply handler thread interrupted
before an exception could be sent");
- }
- }
-
- public void handleCancellation() throws InterruptedIOException {
- final ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.CANCEL_ACK.getId());
- buffer.putInt(requestId);
- buffer.flip();
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException("Reply handler thread interrupted
before cancellation could be sent");
- }
- }
- }
-
- // Writer members
-
- private final BlockingQueue<WriteHandler> outputQueue =
CollectionUtil.blockingQueue(64);
- private final AtomicInteger pending = new AtomicInteger();
-
- private void registerWriter(final AllocatedMessageChannel channel, final WriteHandler
writeHandler) throws InterruptedException {
- outputQueue.put(writeHandler);
- if (pending.getAndIncrement() == 0) {
- channel.resumeWrites();
- }
- }
-
- // Reader utils
-
- private String readUTFZ(ByteBuffer buffer) {
- StringBuilder builder = new StringBuilder();
- int state = 0, a = 0;
- while (buffer.hasRemaining()) {
- final int v = buffer.get() & 0xff;
- switch (state) {
- case 0: {
- if (v == 0) {
- return builder.toString();
- } else if (v < 128) {
- builder.append((char) v);
- } else if (192 <= v && v < 224) {
- a = v << 6;
- state = 1;
- } else if (224 <= v && v < 232) {
- a = v << 12;
- state = 2;
- } else {
- builder.append('?');
- }
- break;
- }
- case 1: {
- if (v == 0) {
- builder.append('?');
- return builder.toString();
- } else if (128 <= v && v < 192) {
- a |= v & 0x3f;
- builder.append((char) a);
- } else {
- builder.append('?');
- }
- state = 0;
- break;
- }
- case 2: {
- if (v == 0) {
- builder.append('?');
- return builder.toString();
- } else if (128 <= v && v < 192) {
- a |= (v & 0x3f) << 6;
- state = 1;
- } else {
- builder.append('?');
- state = 0;
- }
- break;
- }
- default:
- throw new IllegalStateException("wrong state");
- }
- }
- return builder.toString();
- }
-
- // client endpoint
-
- private final class RequestHandlerImpl extends
AbstractAutoCloseable<RequestHandler> implements RequestHandler {
-
- private final int identifier;
- private final BufferAllocator<ByteBuffer> allocator;
-
- public RequestHandlerImpl(final int identifier, final
BufferAllocator<ByteBuffer> allocator) {
- super(executor);
- if (allocator == null) {
- throw new NullPointerException("allocator is null");
- }
- this.identifier = identifier;
- this.allocator = allocator;
- addCloseHandler(new CloseHandler<RequestHandler>() {
- public void handleClose(final RequestHandler closed) {
- remoteClients.remove(identifier);
- ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.CLIENT_CLOSE.getId());
- buffer.putInt(identifier);
- buffer.flip();
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator,
buffer));
- } catch (InterruptedException e) {
- log.warn("Client close notification was interrupted before
it could be sent");
- }
- }
- });
- }
-
- public RemoteRequestContext receiveRequest(final Object request, final
ReplyHandler handler) {
- log.trace("Sending outbound request of type %s", request == null ?
"null" : request.getClass());
- try {
- final List<ByteBuffer> bufferList;
- final Marshaller marshaller =
marshallerFactory.createMarshaller(marshallingConfiguration);
- try {
- bufferList = new ArrayList<ByteBuffer>();
- final ByteOutput output = createByteOutput(allocator, bufferList);
- try {
- marshaller.write(MessageType.REQUEST.getId());
- marshaller.writeInt(identifier);
-
- final int id = nextRequest();
- remoteRequests.put(id, handler);
- marshaller.writeInt(id);
- marshaller.writeObject(request);
- marshaller.close();
- output.close();
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator,
bufferList));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- executor.execute(new Runnable() {
- public void run() {
- SpiUtils.safeHandleCancellation(handler);
- }
- });
- return SpiUtils.getBlankRemoteRequestContext();
- }
- log.trace("Sent request %s", request);
- return new RemoteRequestContextImpl(id, allocator, channel);
- } finally {
- IoUtils.safeClose(output);
- }
- } finally {
- IoUtils.safeClose(marshaller);
- }
- } catch (final IOException t) {
- log.trace(t, "receiveRequest failed with an exception");
- executor.execute(new Runnable() {
- public void run() {
- SpiUtils.safeHandleException(handler, t);
- }
- });
- return SpiUtils.getBlankRemoteRequestContext();
- }
- }
-
- public String toString() {
- return "forwarding request handler <" +
Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
- }
- }
-
- public final class RemoteRequestContextImpl implements RemoteRequestContext {
-
- private final BufferAllocator<ByteBuffer> allocator;
- private final int id;
- private final AllocatedMessageChannel channel;
-
- public RemoteRequestContextImpl(final int id, final
BufferAllocator<ByteBuffer> allocator, final AllocatedMessageChannel channel) {
- this.id = id;
- this.allocator = allocator;
- this.channel = channel;
- }
-
- public void cancel() {
- try {
- final ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.CANCEL_REQUEST.getId());
- buffer.putInt(id);
- buffer.flip();
- registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
- } catch (InterruptedException e) {
- // todo log that cancel attempt failed
- Thread.currentThread().interrupt();
- } catch (Throwable t) {
- // todo log that cancel attempt failed
- }
- }
- }
-
- public final class RequestHandlerSourceImpl extends
AbstractAutoCloseable<RequestHandlerSource> implements RequestHandlerSource {
-
- private final BufferAllocator<ByteBuffer> allocator;
- private final int identifier;
-
- protected RequestHandlerSourceImpl(final BufferAllocator<ByteBuffer>
allocator, final int identifier) {
- super(executor);
- this.allocator = allocator;
- this.identifier = identifier;
- addCloseHandler(new CloseHandler<RequestHandlerSource>() {
- public void handleClose(final RequestHandlerSource closed) {
- ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.SERVICE_CLOSE.getId());
- buffer.putInt(identifier);
- buffer.flip();
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator,
buffer));
- } catch (InterruptedException e) {
- log.warn("Service close notification was interrupted before
it could be sent");
- }
- }
- });
- }
-
- public Handle<RequestHandler> createRequestHandler() throws IOException {
- final int id = nextRemoteClient();
- final RequestHandler requestHandler = new RequestHandlerImpl(id,
MultiplexHandler.this.allocator);
- remoteClients.put(id, requestHandler);
- final ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.CLIENT_OPEN.getId());
- buffer.putInt(identifier);
- buffer.putInt(id);
- buffer.flip();
- // todo - probably should bail out if we're interrupted?
- boolean intr = false;
- for (;;) {
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
- try {
- return new RequestHandlerImpl(id, allocator).getHandle();
- } finally {
- if (intr) {
- Thread.currentThread().interrupt();
- }
- }
- } catch (InterruptedException e) {
- intr = true;
- }
- }
- }
-
- public String toString() {
- return "forwarding request handler source <" +
Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
- }
- }
-
- public static ByteOutput createByteOutput(final BufferAllocator<ByteBuffer>
allocator, final Collection<ByteBuffer> target) {
- return new ByteOutput() {
- private ByteBuffer current;
-
- private ByteBuffer getCurrent() {
- final ByteBuffer buffer = current;
- return buffer == null ? (current = allocator.allocate()) : buffer;
- }
-
- public void write(final int i) {
- final ByteBuffer buffer = getCurrent();
- buffer.put((byte) i);
- if (! buffer.hasRemaining()) {
- buffer.flip();
- target.add(buffer);
- current = null;
- }
- }
-
- public void write(final byte[] bytes) {
- write(bytes, 0, bytes.length);
- }
-
- public void write(final byte[] bytes, int offs, int len) {
- while (len > 0) {
- final ByteBuffer buffer = getCurrent();
- final int c = Math.min(len, buffer.remaining());
- buffer.put(bytes, offs, c);
- offs += c;
- len -= c;
- if (! buffer.hasRemaining()) {
- buffer.flip();
- target.add(buffer);
- current = null;
- }
- }
- }
-
- public void close() {
- flush();
- }
-
- public void flush() {
- final ByteBuffer buffer = current;
- if (buffer != null) {
- buffer.flip();
- target.add(buffer);
- current = null;
- }
- }
- };
- }
-
- private final ProtocolObjectTableWriter protocolObjectTableWriter = new
ProtocolObjectTableWriter();
-
- public class ProtocolObjectTableWriter implements ObjectTable.Writer {
-
- public void writeObject(final Marshaller marshaller, final Object o) throws
IOException {
- final RequestHandler requestHandler = (RequestHandler) o;
- final int existingId = forwardedClients.get(requestHandler, -1);
- marshaller.write(1);
- if (existingId == -1) {
- final int newId = nextForwardedClient();
- forwardedClients.put(newId, requestHandler.getHandle());
- marshaller.writeInt(newId);
- } else {
- marshaller.writeInt(existingId);
- }
- }
- }
-
- public class ProtocolObjectTable implements ObjectTable {
-
- public Writer getObjectWriter(final Object o) throws IOException {
- if (o instanceof RequestHandler) {
- return protocolObjectTableWriter;
- } else {
- return null;
- }
- }
-
- public Object readObject(final Unmarshaller unmarshaller) throws IOException,
ClassNotFoundException {
- switch (unmarshaller.readByte()) {
- case 1: {
- // remote client
- final int id = unmarshaller.readInt();
- return remoteClients.get(id);
- }
- case 2: {
- // remote client source
- final int id = unmarshaller.readInt();
- return remoteServices.get(id);
- }
- default: {
- // invalid
- throw new InvalidObjectException("Invalid ID sent for protocol
object table");
- }
- }
- }
- }
-}
Modified:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -22,12 +22,7 @@
package org.jboss.remoting.protocol.multiplex;
-import org.jboss.remoting.RemotingException;
-import org.jboss.remoting.SimpleCloseable;
import org.jboss.remoting.Endpoint;
-import org.jboss.remoting.spi.RequestHandlerSource;
-import org.jboss.remoting.spi.Handle;
-import org.jboss.remoting.spi.AbstractSimpleCloseable;
import org.jboss.xnio.IoHandlerFactory;
import org.jboss.xnio.ChannelSource;
import org.jboss.xnio.IoFuture;
@@ -35,7 +30,6 @@
import org.jboss.xnio.IoHandler;
import org.jboss.xnio.channels.AllocatedMessageChannel;
import java.io.IOException;
-import java.util.concurrent.Executor;
/**
*
@@ -55,7 +49,7 @@
public static IoHandlerFactory<AllocatedMessageChannel> createServer(final
Endpoint endpoint, final MultiplexConfiguration configuration) {
return new IoHandlerFactory<AllocatedMessageChannel>() {
public IoHandler<? super AllocatedMessageChannel> createHandler() {
- return new MultiplexHandler(endpoint, configuration);
+ return new SimpleMultiplexHandler(endpoint, configuration);
}
};
}
@@ -69,29 +63,13 @@
* @return a handle which may be used to close the connection
* @throws IOException if an error occurs
*/
- public static IoFuture<SimpleCloseable> connect(final Endpoint endpoint, final
MultiplexConfiguration configuration, final ChannelSource<AllocatedMessageChannel>
channelSource) throws IOException {
- final MultiplexHandler multiplexHandler = new MultiplexHandler(endpoint,
configuration);
- final IoFuture<AllocatedMessageChannel> futureChannel =
channelSource.open(multiplexHandler);
- return new AbstractConvertingIoFuture<SimpleCloseable,
AllocatedMessageChannel>(futureChannel) {
- protected SimpleCloseable convert(final AllocatedMessageChannel channel)
throws RemotingException {
- return new AbstractConnection(configuration.getExecutor()) {
- // todo - this method is not called by anyone?
- public Handle<RequestHandlerSource> getServiceForId(final int
id) throws IOException {
- return multiplexHandler.getRemoteService(id).getHandle();
- }
- };
+ public static IoFuture<MultiplexConnection> connect(final Endpoint endpoint,
final MultiplexConfiguration configuration, final
ChannelSource<AllocatedMessageChannel> channelSource) throws IOException {
+ final SimpleMultiplexHandler handler = new SimpleMultiplexHandler(endpoint,
configuration);
+ final IoFuture<AllocatedMessageChannel> futureChannel =
channelSource.open(handler);
+ return new AbstractConvertingIoFuture<MultiplexConnection,
AllocatedMessageChannel>(futureChannel) {
+ protected MultiplexConnection convert(final AllocatedMessageChannel channel)
throws IOException {
+ return handler.getConnection().get();
}
};
}
-
- private abstract static class AbstractConnection extends AbstractSimpleCloseable {
-
- protected AbstractConnection(final Executor executor) {
- super(executor);
- }
-
- public String toString() {
- return "Remoting multiplex connection <" +
Integer.toString(hashCode()) + ">";
- }
- }
}
Added:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReadHandler.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReadHandler.java
(rev 0)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReadHandler.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -0,0 +1,378 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.xnio.IoReadHandler;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.Buffers;
+import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.remoting.spi.RequestHandler;
+import org.jboss.remoting.spi.Handle;
+import org.jboss.remoting.spi.ReplyHandler;
+import org.jboss.remoting.spi.SpiUtils;
+import org.jboss.remoting.spi.RemoteRequestContext;
+import org.jboss.remoting.spi.RequestHandlerSource;
+import org.jboss.remoting.ReplyException;
+import org.jboss.remoting.RemoteExecutionException;
+import org.jboss.remoting.ServiceRegistrationException;
+import org.jboss.remoting.util.QualifiedName;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.marshalling.Marshalling;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.MarshallingConfiguration;
+import java.nio.ByteBuffer;
+import java.nio.BufferUnderflowException;
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ *
+ */
+public final class MultiplexReadHandler implements
IoReadHandler<AllocatedMessageChannel> {
+
+ private static final Logger log =
Logger.getLogger("org.jboss.remoting.multiplex");
+ private static final StackTraceElement[] emptyStackTraceElements = new
StackTraceElement[0];
+ private final MultiplexConnection connection;
+
+ public MultiplexReadHandler(final MultiplexConnection connection) {
+ this.connection = connection;
+ }
+
+ public void handleReadable(final AllocatedMessageChannel channel) {
+ final MultiplexConnection connection = this.connection;
+ final MarshallerFactory marshallerFactory = connection.getMarshallerFactory();
+ final MarshallingConfiguration marshallingConfiguration =
connection.getMarshallingConfiguration();
+ for (;;) try {
+ final ByteBuffer buffer;
+ try {
+ buffer = channel.receive();
+ } catch (IOException e) {
+ log.error(e, "I/O error in protocol channel; closing
channel");
+ IoUtils.safeClose(channel);
+ return;
+ }
+ if (buffer == null) {
+ IoUtils.safeClose(channel);
+ return;
+ }
+ if (! buffer.hasRemaining()) {
+ // would block
+ channel.resumeReads();
+ return;
+ }
+ final MessageType msgType;
+ try {
+ msgType = MessageType.getMessageType(buffer.get() & 0xff);
+ } catch (IllegalArgumentException ex) {
+ log.trace("Received invalid message type");
+ return;
+ }
+ log.trace("Received message type %s; dump:\n%s", msgType,
Buffers.createDumper(buffer, 8, 1));
+ switch (msgType) {
+ case REQUEST: {
+ final int clientId = buffer.getInt();
+ final Handle<RequestHandler> handle =
connection.getForwardedClient(clientId);
+ if (handle == null) {
+ log.trace("Request on invalid client ID %d",
Integer.valueOf(clientId));
+ break;
+ }
+ final int requestId = buffer.getInt();
+ final Object payload;
+ try {
+ final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller(marshallingConfiguration);
+ try {
+ unmarshaller.start(Marshalling.createByteInput(buffer));
+ payload = unmarshaller.readObject();
+ unmarshaller.finish();
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (Exception ex) {
+ // IOException | ClassNotFoundException
+ log.trace("Failed to unmarshal a request (%s), sending
%s", ex, MessageType.REQUEST_RECEIVE_FAILED);
+ try {
+ final Marshaller marshaller =
marshallerFactory.createMarshaller(marshallingConfiguration);
+ try {
+ List<ByteBuffer> buffers = new
ArrayList<ByteBuffer>();
+ marshaller.start(new
BufferByteOutput(connection.getAllocator(), buffers));
+
marshaller.write(MessageType.REQUEST_RECEIVE_FAILED.getId());
+ ex.setStackTrace(emptyStackTraceElements);
+ final IOException ioe = new IOException("Request
receive failed");
+ ioe.initCause(ex);
+ ioe.setStackTrace(emptyStackTraceElements);
+ marshaller.writeObject(ioe);
+ marshaller.finish();
+ connection.doBlockingWrite(buffers);
+ } finally {
+ IoUtils.safeClose(marshaller);
+ }
+ } catch (IOException ioe) {
+ log.warn("Failed to send notification of failure to
unmarshal a request: %s", ioe);
+ }
+ break;
+ }
+ // request received OK
+ final RequestHandler requestHandler = handle.getResource();
+ requestHandler.receiveRequest(payload, new
MultiplexReplyHandler(requestId, connection));
+ break;
+ }
+ case REPLY: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler replyHandler =
connection.removeRemoteRequest(requestId);
+ if (replyHandler == null) {
+ log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
+ break;
+ }
+ final Object payload;
+ try {
+ final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller(marshallingConfiguration);
+ try {
+ unmarshaller.start(Marshalling.createByteInput(buffer));
+ payload = unmarshaller.readObject();
+ unmarshaller.finish();
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (Exception ex) {
+ // IOException | ClassNotFoundException
+ log.trace("Failed to unmarshal a reply (%s), sending a
ReplyException", ex);
+ SpiUtils.safeHandleException(replyHandler, new
ReplyException("Unmarshal failed", ex));
+ break;
+ }
+ SpiUtils.safeHandleReply(replyHandler, payload);
+ break;
+ }
+ case CANCEL_REQUEST: {
+ final int requestId = buffer.getInt();
+ final RemoteRequestContext context =
connection.getLocalRequest(requestId);
+ if (context != null) {
+ context.cancel();
+ }
+ break;
+ }
+ case CANCEL_ACK: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler replyHandler =
connection.getRemoteRequest(requestId);
+ if (replyHandler != null) {
+ SpiUtils.safeHandleCancellation(replyHandler);
+ }
+ break;
+ }
+ case REQUEST_RECEIVE_FAILED: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler replyHandler =
connection.removeRemoteRequest(requestId);
+ if (replyHandler == null) {
+ log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
+ break;
+ }
+ final IOException cause;
+ try {
+ final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller(marshallingConfiguration);
+ try {
+ unmarshaller.start(Marshalling.createByteInput(buffer));
+ cause = (IOException) unmarshaller.readObject();
+ unmarshaller.finish();
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (IOException e) {
+ SpiUtils.safeHandleException(replyHandler, new
RemoteExecutionException("Remote operation failed; the remote exception could not be
read", e));
+ break;
+ } catch (ClassNotFoundException e) {
+ SpiUtils.safeHandleException(replyHandler, new
RemoteExecutionException("Remote operation failed; the remote exception could not be
read", e));
+ break;
+ }
+ SpiUtils.safeHandleException(replyHandler, cause);
+ break;
+ }
+ case REQUEST_FAILED: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler replyHandler =
connection.removeRemoteRequest(requestId);
+ if (replyHandler == null) {
+ log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
+ break;
+ }
+ final IOException cause;
+ try {
+ final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller(marshallingConfiguration);
+ try {
+ unmarshaller.start(Marshalling.createByteInput(buffer));
+ try {
+ cause = (IOException) unmarshaller.readObject();
+ } catch (ClassNotFoundException e) {
+ SpiUtils.safeHandleException(replyHandler, new
RemoteExecutionException("Remote request failed (and an ClassNotFoundException
occurred when attempting to unmarshal the cause)"));
+ log.trace(e, "Class not found in exception reply to
request ID %d", Integer.valueOf(requestId));
+ break;
+ } catch (ClassCastException e) {
+ SpiUtils.safeHandleException(replyHandler, new
RemoteExecutionException("Remote request failed (and an ClassCastException occurred
when attempting to unmarshal the cause)"));
+ log.trace(e, "Class cast exception in exception
reply to request ID %d", Integer.valueOf(requestId));
+ break;
+ }
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (IOException ex) {
+ log.trace("Failed to unmarshal an exception reply (%s),
sending a generic execution exception");
+ SpiUtils.safeHandleException(replyHandler, new
RemoteExecutionException("Remote request failed (and an unexpected I/O error occurred
when attempting to read the cause)"));
+ break;
+ }
+ SpiUtils.safeHandleException(replyHandler, new
RemoteExecutionException("Remote execution failed", cause));
+ break;
+ }
+ case CLIENT_CLOSE: {
+ final int clientId = buffer.getInt();
+ final Handle<RequestHandler> handle =
connection.removeForwardedClient(clientId);
+ if (handle == null) {
+ log.warn("Got client close message for unknown client
%d", Integer.valueOf(clientId));
+ break;
+ }
+ IoUtils.safeClose(handle);
+ break;
+ }
+ case CLIENT_OPEN: {
+ final int serviceId = buffer.getInt();
+ final int clientId = buffer.getInt();
+ final Handle<RequestHandlerSource> handle =
connection.getForwardedService(serviceId);
+ if (handle == null) {
+ log.warn("Received client open message for unknown service
%d", Integer.valueOf(serviceId));
+ break;
+ }
+ try {
+ final RequestHandlerSource requestHandlerSource =
handle.getResource();
+ final Handle<RequestHandler> clientHandle =
requestHandlerSource.createRequestHandler();
+ log.trace("Opening client %d from service %d",
Integer.valueOf(clientId), Integer.valueOf(serviceId));
+ connection.addForwardedClient(clientId, clientHandle);
+ } catch (IOException ex) {
+ log.error(ex, "Failed to create a request handler for client
ID %d", Integer.valueOf(clientId));
+ break;
+ } finally {
+ IoUtils.safeClose(handle);
+ }
+ break;
+ }
+ case SERVICE_OPEN_REQUEST: {
+ final int serviceId = buffer.getInt();
+ final QualifiedName qualifiedName =
MultiplexConnection.getQualifiedName(buffer);
+ final Handle<RequestHandlerSource> service =
connection.getService(qualifiedName);
+ if (service == null) {
+ ByteBuffer replyBuffer = ByteBuffer.allocate(5);
+ replyBuffer.put((byte)
MessageType.SERVICE_OPEN_NOT_FOUND.getId());
+ replyBuffer.putInt(serviceId);
+ replyBuffer.flip();
+ try {
+ connection.doBlockingWrite(replyBuffer);
+ } catch (IOException e) {
+ log.error(e, "Failed to send an error reply to an
invalid service open request");
+ }
+ break;
+ }
+ final Handle<RequestHandlerSource> ourHandle;
+ try {
+ ourHandle = service.getResource().getHandle();
+ } catch (IOException e) {
+ log.error("Failed to acquire a handle to registered service:
%s", e);
+ ByteBuffer replyBuffer = ByteBuffer.allocate(5);
+ replyBuffer.put((byte) MessageType.SERVICE_OPEN_FAILED.getId());
+ replyBuffer.putInt(serviceId);
+ replyBuffer.flip();
+ try {
+ connection.doBlockingWrite(replyBuffer);
+ } catch (IOException e2) {
+ log.trace(e, "Failed to send an exception reply to a
service open request");
+ }
+ break;
+ }
+ connection.addForwadedService(serviceId, ourHandle);
+ ByteBuffer replyBuffer = ByteBuffer.allocate(5);
+ replyBuffer.put((byte) MessageType.SERVICE_OPEN_REPLY.getId());
+ replyBuffer.putInt(serviceId);
+ replyBuffer.flip();
+ try {
+ connection.doBlockingWrite(replyBuffer);
+ } catch (IOException e) {
+ log.trace(e, "Failed to send a reply to a service open
request");
+ }
+ break;
+ }
+ case SERVICE_OPEN_FAILED:
+ case SERVICE_OPEN_NOT_FOUND:
+ case SERVICE_OPEN_FORBIDDEN: {
+ final int serviceId = buffer.getInt();
+ final FutureRemoteRequestHandlerSource future =
connection.removeFutureRemoteService(serviceId);
+ if (future == null) {
+ log.trace("Service open failure reply received for unknown
service ID %d", Integer.valueOf(serviceId));
+ break;
+ }
+ future.setException(
+ msgType == MessageType.SERVICE_OPEN_NOT_FOUND ? new
ServiceRegistrationException("Service not found") :
+ msgType == MessageType.SERVICE_OPEN_FORBIDDEN ? new
ServiceRegistrationException("Service open forbidden") :
+ new ServiceRegistrationException("Service open
failed")
+ );
+ break;
+ }
+ case SERVICE_OPEN_REPLY: {
+ final int serviceId = buffer.getInt();
+ final FutureRemoteRequestHandlerSource future =
connection.getFutureRemoteService(serviceId);
+ if (future == null) {
+ log.trace("Service open reply received for unknown service
ID %d", Integer.valueOf(serviceId));
+ break;
+ }
+ final MultiplexRequestHandlerSource requestHandlerSource = new
MultiplexRequestHandlerSource(serviceId, connection);
+ future.setResult(requestHandlerSource);
+ break;
+ }
+ case SERVICE_CLOSE_NOTIFY: {
+ final int serviceId = buffer.getInt();
+ final FutureRemoteRequestHandlerSource future =
connection.removeFutureRemoteService(serviceId);
+ future.addNotifier(new
IoFuture.HandlingNotifier<RequestHandlerSource>() {
+ public void handleDone(final RequestHandlerSource result) {
+ IoUtils.safeClose(result);
+ }
+ });
+ break;
+ }
+ case SERVICE_CLOSE_REQUEST: {
+ final int serviceId = buffer.getInt();
+ final Handle<RequestHandlerSource> handle =
connection.removeForwardedService(serviceId);
+ if (handle == null) {
+ log.trace("Received service close request on unknown ID
%d", Integer.valueOf(serviceId));
+ break;
+ }
+ IoUtils.safeClose(handle);
+ break;
+ }
+ default: {
+ log.error("Malformed packet received (invalid message type
%s)", msgType);
+ }
+ case CONNECTION_CLOSE:
+ break;
+ }
+ } catch (BufferUnderflowException e) {
+ log.error(e, "Malformed packet received (buffer underflow)");
+ }
+ }
+}
Added:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReplyHandler.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReplyHandler.java
(rev 0)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReplyHandler.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -0,0 +1,98 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.remoting.spi.ReplyHandler;
+import org.jboss.xnio.IoUtils;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.ByteOutput;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ *
+ */
+final class MultiplexReplyHandler implements ReplyHandler {
+
+ private final int requestId;
+ private final MultiplexConnection connection;
+
+ MultiplexReplyHandler(final int requestId, final MultiplexConnection connection) {
+ this.requestId = requestId;
+ this.connection = connection;
+ }
+
+ public void handleReply(final Object reply) throws IOException {
+ final MultiplexConnection connection = this.connection;
+ final Marshaller marshaller =
connection.getMarshallerFactory().createMarshaller(connection.getMarshallingConfiguration());
+ try {
+ final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
+ final ByteOutput output = new BufferByteOutput(connection.getAllocator(),
bufferList);
+ try {
+ marshaller.start(output);
+ marshaller.write(MessageType.REPLY.getId());
+ marshaller.writeInt(requestId);
+ marshaller.writeObject(reply);
+ marshaller.close();
+ output.close();
+ connection.doBlockingWrite(bufferList);
+ } finally {
+ IoUtils.safeClose(output);
+ }
+ } finally {
+ IoUtils.safeClose(marshaller);
+ }
+ }
+
+ public void handleException(final IOException exception) throws IOException {
+ final MultiplexConnection connection = this.connection;
+ final Marshaller marshaller =
connection.getMarshallerFactory().createMarshaller(connection.getMarshallingConfiguration());
+ try {
+ final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
+ final ByteOutput output = new BufferByteOutput(connection.getAllocator(),
bufferList);
+ try {
+ marshaller.start(output);
+ marshaller.write(MessageType.REQUEST_FAILED.getId());
+ marshaller.writeInt(requestId);
+ marshaller.writeObject(exception);
+ marshaller.close();
+ output.close();
+ connection.doBlockingWrite(bufferList);
+ } finally {
+ IoUtils.safeClose(output);
+ }
+ } finally {
+ IoUtils.safeClose(marshaller);
+ }
+ }
+
+ public void handleCancellation() throws IOException {
+ final ByteBuffer buffer = ByteBuffer.allocate(5);
+ buffer.put((byte) MessageType.CANCEL_ACK.getId());
+ buffer.putInt(requestId);
+ buffer.flip();
+ connection.doBlockingWrite(buffer);
+ }
+}
Added:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandler.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandler.java
(rev 0)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandler.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -0,0 +1,136 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.remoting.spi.RequestHandler;
+import org.jboss.remoting.spi.AbstractAutoCloseable;
+import org.jboss.remoting.spi.RemoteRequestContext;
+import org.jboss.remoting.spi.ReplyHandler;
+import org.jboss.remoting.spi.SpiUtils;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.ByteOutput;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.io.IOException;
+
+/**
+ *
+ */
+final class MultiplexRequestHandler extends AbstractAutoCloseable<RequestHandler>
implements RequestHandler {
+ private static final Logger log =
Logger.getLogger("org.jboss.remoting.multiplex.request-handler");
+
+ private final int identifier;
+ private final BufferAllocator<ByteBuffer> allocator;
+ private final MultiplexConnection connection;
+
+ public MultiplexRequestHandler(final int identifier, final MultiplexConnection
connection) {
+ super(connection.getExecutor());
+ this.connection = connection;
+ this.identifier = identifier;
+ allocator = connection.getAllocator();
+ }
+
+ @Override
+ protected void closeAction() throws IOException {
+ connection.removeRemoteClient(identifier);
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.CLIENT_CLOSE.getId());
+ buffer.putInt(identifier);
+ buffer.flip();
+ connection.doBlockingWrite(buffer);
+ }
+
+ public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler
handler) {
+ log.trace("Sending outbound request of type %s", request == null ?
"null" : request.getClass());
+ final List<ByteBuffer> bufferList;
+ final MultiplexConnection connection = this.connection;
+ try {
+ final Marshaller marshaller =
connection.getMarshallerFactory().createMarshaller(connection.getMarshallingConfiguration());
+ try {
+ bufferList = new ArrayList<ByteBuffer>();
+ final ByteOutput output = new BufferByteOutput(allocator, bufferList);
+ try {
+ marshaller.start(output);
+ marshaller.write(MessageType.REQUEST.getId());
+ marshaller.writeInt(identifier);
+ final int id = connection.nextRequest();
+ connection.addRemoteRequest(id, handler);
+ marshaller.writeInt(id);
+ marshaller.writeObject(request);
+ marshaller.close();
+ output.close();
+ connection.doBlockingWrite(bufferList);
+ log.trace("Sent request %s", request);
+ return new RemoteRequestContextImpl(id, connection);
+ } finally {
+ IoUtils.safeClose(output);
+ }
+ } finally {
+ IoUtils.safeClose(marshaller);
+ }
+ } catch (final IOException t) {
+ log.trace(t, "receiveRequest failed with an exception");
+ SpiUtils.safeHandleException(handler, t);
+ return SpiUtils.getBlankRemoteRequestContext();
+ }
+ }
+
+ public String toString() {
+ return "forwarding request handler <" + Integer.toString(hashCode(),
16) + "> (id = " + identifier + ")";
+ }
+}
+
+final class RemoteRequestContextImpl implements RemoteRequestContext {
+
+ private static final Logger log =
Logger.getLogger("org.jboss.remoting.multiplex.requesthandler.context");
+
+ private final int id;
+ private final MultiplexConnection connection;
+ private final AtomicBoolean cancelSent = new AtomicBoolean();
+
+ public RemoteRequestContextImpl(final int id, final MultiplexConnection connection)
{
+ this.id = id;
+ this.connection = connection;
+ }
+
+ public void cancel() {
+ if (! cancelSent.getAndSet(true)) try {
+ final ByteBuffer buffer = ByteBuffer.allocate(5);
+ buffer.put((byte) MessageType.CANCEL_REQUEST.getId());
+ buffer.putInt(id);
+ buffer.flip();
+ connection.doBlockingWrite(buffer);
+ } catch (Throwable t) {
+ log.warn("Sending cancel request failed: %s", t);
+ }
+ }
+
+ public String toString() {
+ return "remote request context (multiplex) <" +
Integer.toString(hashCode(), 16) + "> (id = " + id + ")";
+ }
+}
Added:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandlerSource.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandlerSource.java
(rev 0)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandlerSource.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -0,0 +1,90 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.remoting.spi.RequestHandlerSource;
+import org.jboss.remoting.spi.AbstractAutoCloseable;
+import org.jboss.remoting.spi.RequestHandler;
+import org.jboss.remoting.spi.Handle;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+
+/**
+ *
+ */
+final class MultiplexRequestHandlerSource extends
AbstractAutoCloseable<RequestHandlerSource> implements RequestHandlerSource {
+
+ private static final Logger log =
Logger.getLogger("org.jboss.remoting.multiplex.request-handler-source");
+
+ private final int identifier;
+ private final MultiplexConnection connection;
+
+ MultiplexRequestHandlerSource(final int identifier, final MultiplexConnection
connection) {
+ super(connection.getExecutor());
+ this.connection = connection;
+ this.identifier = identifier;
+ }
+
+ @Override
+ protected void closeAction() throws IOException {
+ ByteBuffer buffer = ByteBuffer.allocate(5);
+ buffer.put((byte) MessageType.SERVICE_CLOSE_REQUEST.getId());
+ buffer.putInt(identifier);
+ buffer.flip();
+ connection.doBlockingWrite(buffer);
+ }
+
+ public Handle<RequestHandler> createRequestHandler() throws IOException {
+ final int id = connection.nextRemoteClient();
+ final RequestHandler requestHandler = new MultiplexRequestHandler(id,
connection);
+ boolean ok = false;
+ try {
+ connection.addRemoteClient(id, requestHandler);
+ try {
+ final ByteBuffer buffer = ByteBuffer.allocate(9);
+ buffer.put((byte) MessageType.CLIENT_OPEN.getId());
+ buffer.putInt(identifier);
+ buffer.putInt(id);
+ buffer.flip();
+ connection.doBlockingWrite(buffer);
+ final Handle<RequestHandler> handlerHandle = new
MultiplexRequestHandler(id, connection).getHandle();
+ ok = true;
+ return handlerHandle;
+ } finally {
+ if (! ok) {
+ connection.removeRemoteClient(id);
+ }
+ }
+ } finally {
+ if (! ok) {
+ IoUtils.safeClose(requestHandler);
+ }
+ }
+ }
+
+ public String toString() {
+ return "forwarding request handler source <" +
Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
+ }
+}
Added:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleMultiplexHandler.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleMultiplexHandler.java
(rev 0)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleMultiplexHandler.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.xnio.DelegatingIoHandler;
+import org.jboss.xnio.AbstractIoFuture;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.remoting.Endpoint;
+
+/**
+ *
+ */
+public final class SimpleMultiplexHandler extends
DelegatingIoHandler<AllocatedMessageChannel> {
+
+ private volatile MultiplexConnection connection;
+ private final Endpoint endpoint;
+ private final MultiplexConfiguration configuration;
+ private final FutureConnection futureConnection = new FutureConnection();
+
+ public SimpleMultiplexHandler(final Endpoint endpoint, final MultiplexConfiguration
configuration) {
+ this.endpoint = endpoint;
+ this.configuration = configuration;
+ }
+
+ public void handleOpened(final AllocatedMessageChannel channel) {
+ connection = new MultiplexConnection(endpoint, channel, configuration);
+ futureConnection.setResult(connection);
+ setReadHandler(new MultiplexReadHandler(connection));
+ channel.resumeReads();
+ }
+
+ public void handleClosed(final AllocatedMessageChannel channel) {
+ IoUtils.safeClose(connection);
+ }
+
+ public IoFuture<MultiplexConnection> getConnection() {
+ return futureConnection;
+ }
+
+ public static final class FutureConnection extends
AbstractIoFuture<MultiplexConnection> {
+ public IoFuture<MultiplexConnection> cancel() {
+ return this;
+ }
+
+ protected boolean setResult(final MultiplexConnection result) {
+ return super.setResult(result);
+ }
+ }
+}
Deleted:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -1,84 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.multiplex;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.log.Logger;
-import org.jboss.xnio.channels.WritableMessageChannel;
-
-/**
- *
- */
-final class SimpleWriteHandler implements WriteHandler {
- private static final Logger log = Logger.getLogger(SimpleWriteHandler.class);
-
- private final BufferAllocator<ByteBuffer> allocator;
- private final ByteBuffer[] buffers;
-
- public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final
List<ByteBuffer> buffers) {
- this.allocator = allocator;
- this.buffers = buffers.toArray(new ByteBuffer[buffers.size()]);
- logBufferSize();
- }
-
- public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final
ByteBuffer[] buffers) {
- this.allocator = allocator;
- this.buffers = buffers;
- logBufferSize();
- }
-
- public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final
ByteBuffer buffer) {
- this.allocator = allocator;
- buffers = new ByteBuffer[] { buffer };
- logBufferSize();
- }
-
- private void logBufferSize() {
- if (log.isTrace()) {
- long t = 0L;
- for (ByteBuffer buf : buffers) {
- t += (long)buf.remaining();
- }
- log.trace("Writing a message of size %d", Long.valueOf(t));
- }
- }
-
- public boolean handleWrite(final WritableMessageChannel channel) {
- boolean done = true;
- try {
- return (done = channel.send(buffers));
- } catch (IOException e) {
- log.trace(e, "Write failed");
- return true;
- } finally {
- if (done) {
- for (ByteBuffer buffer : buffers) {
- allocator.free(buffer);
- }
- }
- }
- }
-}
Deleted:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -1,32 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.multiplex;
-
-import org.jboss.xnio.channels.WritableMessageChannel;
-
-/**
- *
- */
-interface WriteHandler {
- boolean handleWrite(WritableMessageChannel channel);
-}
Modified:
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -29,12 +29,10 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CountDownLatch;
-import java.net.URI;
import java.io.IOException;
import junit.framework.TestCase;
import org.jboss.remoting.core.EndpointImpl;
import org.jboss.remoting.test.support.LoggingHelper;
-import org.jboss.remoting.SimpleCloseable;
import org.jboss.remoting.LocalServiceConfiguration;
import org.jboss.remoting.RequestListener;
import org.jboss.remoting.ClientContext;
@@ -43,6 +41,10 @@
import org.jboss.remoting.RemoteExecutionException;
import org.jboss.remoting.ClientSource;
import org.jboss.remoting.Client;
+import org.jboss.remoting.util.QualifiedName;
+import org.jboss.remoting.spi.NamedServiceRegistry;
+import org.jboss.remoting.spi.RequestHandlerSource;
+import org.jboss.remoting.spi.Handle;
import org.jboss.xnio.BufferAllocator;
import org.jboss.xnio.IoUtils;
import org.jboss.xnio.Xnio;
@@ -91,12 +93,10 @@
configuration.setExecutor(closeableExecutor);
configuration.setLinkMetric(10);
configuration.setMarshallerFactory(new
RiverMarshallerFactory());
+ final NamedServiceRegistry registry = new
NamedServiceRegistry();
+ configuration.setNamedServiceRegistry(registry);
final MarshallingConfiguration marshallingConfiguration = new
MarshallingConfiguration();
configuration.setMarshallingConfiguration(marshallingConfiguration);
- final IoHandlerFactory<AllocatedMessageChannel>
handlerFactory = MultiplexProtocol.createServer(remoteEndpoint, configuration);
- final ChannelSource<AllocatedMessageChannel> channelSource
=
Channels.convertStreamToAllocatedMessage(xnio.createPipeServer(Channels.convertStreamToAllocatedMessage(handlerFactory,
16384, 16384)), 16384, 16384);
- final IoFuture<SimpleCloseable> future =
MultiplexProtocol.connect(endpoint, configuration, channelSource);
- future.get();
final LocalServiceConfiguration<Object, Object>
localServiceConfiguration = new LocalServiceConfiguration<Object, Object>(new
RequestListener<Object, Object>() {
public void handleClientOpen(final ClientContext context) {
log.debug("Client open");
@@ -125,15 +125,43 @@
localServiceConfiguration.setServiceType("connection.test");
localServiceConfiguration.setGroupName("testgroup");
localServiceConfiguration.setMetric(10);
- remoteEndpoint.registerService(localServiceConfiguration);
- final IoFuture<ClientSource<Object,Object>>
futureClientSource = endpoint.locateService(new URI("jrs:connection.test::"),
Object.class, Object.class);
- assertEquals(IoFuture.Status.DONE, futureClientSource.await(1L,
TimeUnit.SECONDS));
- final ClientSource<Object, Object> clientSource =
futureClientSource.get();
- final Client<Object,Object> client =
clientSource.createClient();
- final IoFuture<Object> futureReply = client.send(REQUEST);
- assertEquals(IoFuture.Status.DONE, futureReply.await(1L,
TimeUnit.SECONDS));
- assertEquals(REPLY, futureReply.get());
- assertTrue(latch.await(1L, TimeUnit.SECONDS));
+ final Handle<RequestHandlerSource>
requestHandlerSourceHandle = remoteEndpoint.registerService(localServiceConfiguration);
+ try {
+
registry.registerService(QualifiedName.parse("/test/connectiontest"),
requestHandlerSourceHandle.getResource());
+ final IoHandlerFactory<AllocatedMessageChannel>
handlerFactory = MultiplexProtocol.createServer(remoteEndpoint, configuration);
+ final ChannelSource<AllocatedMessageChannel>
channelSource =
Channels.convertStreamToAllocatedMessage(xnio.createPipeServer(Channels.convertStreamToAllocatedMessage(handlerFactory,
16384, 16384)), 16384, 16384);
+ final IoFuture<MultiplexConnection> future =
MultiplexProtocol.connect(endpoint, configuration, channelSource);
+ final MultiplexConnection connection = future.get();
+ try {
+ final Handle<RequestHandlerSource>
remoteHandlerSource =
connection.openRemoteService(QualifiedName.parse("/test/connectiontest"));
+ try {
+ final ClientSource<Object, Object> clientSource
= endpoint.createClientSource(remoteHandlerSource.getResource(), Object.class,
Object.class);
+ try {
+ final Client<Object,Object> client =
clientSource.createClient();
+ try {
+ final IoFuture<Object> futureReply =
client.send(REQUEST);
+ assertEquals(IoFuture.Status.DONE,
futureReply.await(1L, TimeUnit.SECONDS));
+ assertEquals(REPLY, futureReply.get());
+ client.close();
+ clientSource.close();
+ remoteHandlerSource.close();
+ connection.close();
+ assertTrue(latch.await(1L,
TimeUnit.SECONDS));
+ } finally {
+ IoUtils.safeClose(client);
+ }
+ } finally {
+ IoUtils.safeClose(clientSource);
+ }
+ } finally {
+ IoUtils.safeClose(remoteHandlerSource);
+ }
+ } finally {
+ IoUtils.safeClose(connection);
+ }
+ } finally {
+ IoUtils.safeClose(requestHandlerSourceHandle);
+ }
} finally {
endpoint.stop();
}
Modified: remoting3/trunk/util/src/main/java/org/jboss/remoting/util/CollectionUtil.java
===================================================================
---
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/CollectionUtil.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/CollectionUtil.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -363,7 +363,7 @@
return subject.substring(position, nextDelim);
}
} finally {
- position = nextDelim;
+ position = nextDelim == -1 ? -1 : nextDelim + 1;
}
}
Modified: remoting3/trunk/util/src/main/java/org/jboss/remoting/util/QualifiedName.java
===================================================================
---
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/QualifiedName.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/QualifiedName.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -100,9 +100,18 @@
public static QualifiedName parse(String path) {
List<String> decoded = new ArrayList<String>();
+ boolean first = true;
for (String segment : CollectionUtil.split("/", path)) {
- if (segment.length() == 0) {
- throw new IllegalArgumentException("Empty segment in path");
+ if (first) {
+ if (segment.length() > 0) {
+ throw new IllegalArgumentException("Relative paths are not
allowed");
+ }
+ first = false;
+ continue;
+ } else {
+ if (segment.length() == 0) {
+ throw new IllegalArgumentException("Empty segment in
path");
+ }
}
try {
decoded.add(URLDecoder.decode(segment, "utf-8"));
Modified:
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedCollection.java
===================================================================
---
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedCollection.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedCollection.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -15,7 +15,7 @@
monitor = this;
}
- protected SynchronizedCollection(final Collection<V> delegate, final Object
monitor) {
+ public SynchronizedCollection(final Collection<V> delegate, final Object
monitor) {
this.delegate = delegate;
this.monitor = monitor;
}
Modified: remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedSet.java
===================================================================
---
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedSet.java 2008-11-14
04:08:51 UTC (rev 4678)
+++
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedSet.java 2008-11-14
06:41:47 UTC (rev 4679)
@@ -11,7 +11,7 @@
super(delegate);
}
- protected SynchronizedSet(final Set<K> delegate, final Object monitor) {
+ public SynchronizedSet(final Set<K> delegate, final Object monitor) {
super(delegate, monitor);
}
}