Author: david.lloyd(a)jboss.com
Date: 2008-09-09 10:05:02 -0400 (Tue, 09 Sep 2008)
New Revision: 4565
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ReplyException.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ConfigValue.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/RemotingChannelConfiguration.java
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ConcurrentIntegerMap.java
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/EmulatedConcurrentIntegerHashMap.java
Removed:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/
remoting3/trunk/api/src/test/java/org/jboss/cx/remoting/spi/marshal/
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java
remoting3/trunk/build.properties
remoting3/trunk/build.xml
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/CollectionUtil.java
Log:
Convert to new marshalling API
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ReplyException.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ReplyException.java
(rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ReplyException.java 2008-09-09
14:05:02 UTC (rev 4565)
@@ -0,0 +1,46 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting;
+
+/**
+ *
+ */
+public class ReplyException extends RemotingException {
+
+ private static final long serialVersionUID = 5562116026829381932L;
+
+ public ReplyException() {
+ }
+
+ public ReplyException(final String msg) {
+ super(msg);
+ }
+
+ public ReplyException(final Throwable cause) {
+ super(cause);
+ }
+
+ public ReplyException(final String msg, final Throwable cause) {
+ super(msg, cause);
+ }
+}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java 2008-09-09
08:55:28 UTC (rev 4564)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java 2008-09-09
14:05:02 UTC (rev 4565)
@@ -41,6 +41,7 @@
* @param msg the message
* @param cause the cause
*/
+ // TODO - change to accept a RemotingException instead?
void handleException(final String msg, Throwable cause);
/**
Modified: remoting3/trunk/build.properties
===================================================================
--- remoting3/trunk/build.properties 2008-09-09 08:55:28 UTC (rev 4564)
+++ remoting3/trunk/build.properties 2008-09-09 14:05:02 UTC (rev 4565)
@@ -115,6 +115,14 @@
lib.jboss-managed.local=${local.repository}/${lib.jboss-managed.local-path}
lib.jboss-managed.remote=${remote.repository}/${lib.jboss-managed.remote-path}
+lib.jboss-marshalling-api.version=1.0.0.Alpha.20080903-314
+lib.jboss-marshalling-api.name=marshalling-api.jar
+lib.jboss-marshalling-api.license=lgpl
+lib.jboss-marshalling-api.dir=jboss/marshalling/${lib.jboss-marshalling-api.version}/lib
+lib.jboss-marshalling-api.path=${lib.jboss-marshalling-api.dir}/${lib.jboss-marshalling-api.name}
+lib.jboss-marshalling-api.local=${local.repository}/${lib.jboss-marshalling-api.path}
+lib.jboss-marshalling-api.remote=${remote.repository}/${lib.jboss-marshalling-api.path}
+
lib.jboss-serialization.version=1.1.0-snapshot-r358
lib.jboss-serialization.name=jboss-serialization.jar
lib.jboss-serialization.license=lgpl
Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml 2008-09-09 08:55:28 UTC (rev 4564)
+++ remoting3/trunk/build.xml 2008-09-09 14:05:02 UTC (rev 4565)
@@ -127,6 +127,18 @@
<get src="${remote.license.dir}/${lib.jboss-managed.license}.txt"
dest="${lib.jboss-managed.local}.license.txt" usetimestamp="true"
ignoreerrors="false"/>
</target>
+ <!-- External library: JBoss Marshalling -->
+
+ <target name="lib.jboss-marshalling-api-check">
+ <available property="lib.jboss-marshalling-api.exists"
file="${lib.jboss-marshalling-api.local}"/>
+ </target>
+
+ <target name="lib.jboss-marshalling-api"
depends="lib.jboss-marshalling-api-check"
unless="lib.jboss-marshalling-api.exists">
+ <mkdir
dir="${local.repository}/${lib.jboss-marshalling-api.dir}"/>
+ <get src="${lib.jboss-marshalling-api.remote}"
dest="${lib.jboss-marshalling-api.local}" usetimestamp="true"
ignoreerrors="false"/>
+ <get
src="${remote.license.dir}/${lib.jboss-marshalling-api.license}.txt"
dest="${lib.jboss-marshalling-api.local}.license.txt"
usetimestamp="true" ignoreerrors="false"/>
+ </target>
+
<!-- External library: JBoss Serialization -->
<target name="lib.jboss-serialization-check">
@@ -236,6 +248,7 @@
debug="true">
<compilerarg value="-Xlint:unchecked"/>
<classpath>
+ <pathelement
location="${lib.jboss-marshalling-api.local}"/>
<pathelement location="${lib.xnio-api.local}"/>
<path refid="util.classpath"/>
</classpath>
@@ -267,6 +280,7 @@
<path refid="api.classpath"/>
<path refid="testing-support.classpath"/>
<pathelement location="${lib.junit.local}"/>
+ <pathelement
location="${lib.jboss-marshalling-api.local}"/>
<pathelement location="${lib.xnio-api.local}"/>
</classpath>
</javac>
@@ -319,7 +333,7 @@
<delete dir="api/target"/>
</target>
- <target name="api" description="Build the API module"
depends="lib.xnio-api,util,api.compile">
+ <target name="api" description="Build the API module"
depends="lib.jboss-marshalling-api-check,lib.xnio-api,util,api.compile">
<path id="api.classpath">
<pathelement location="api/target/main/classes"/>
</path>
@@ -390,7 +404,7 @@
<path refid="api.classpath"/>
<path refid="util.classpath"/>
<path refid="version.classpath"/>
- <pathelement
location="${lib.jboss-serialization.local}"/>
+ <pathelement
location="${lib.jboss-marshalling-api.local}"/>
<pathelement location="${lib.xnio-api.local}"/>
</classpath>
</javac>
@@ -422,6 +436,7 @@
<path refid="core.classpath"/>
<path refid="util.classpath"/>
<path refid="testing-support.classpath"/>
+ <pathelement
location="${lib.jboss-marshalling-api.local}"/>
<pathelement location="${lib.junit.local}"/>
<pathelement location="${lib.xnio-api.local}"/>
</classpath>
@@ -476,7 +491,7 @@
<delete dir="core/target"/>
</target>
- <target name="core" description="Build the core module"
depends="lib.jboss-serialization,api,util,version,core.compile">
+ <target name="core" description="Build the core module"
depends="api,util,version,core.compile">
<path id="core.classpath">
<pathelement location="core/target/main/classes"/>
</path>
@@ -743,6 +758,7 @@
<!-- TODO: marshallers should be moved to their own module -->
<path refid="core.classpath"/>
<path refid="util.classpath"/>
+ <pathelement
location="${lib.jboss-marshalling-api.local}"/>
<pathelement location="${lib.xnio-api.local}"/>
</classpath>
</javac>
@@ -776,6 +792,7 @@
<path refid="util.classpath"/>
<path refid="testing-support.classpath"/>
<pathelement location="${lib.junit.local}"/>
+ <pathelement
location="${lib.jboss-marshalling-api.local}"/>
<pathelement location="${lib.xnio-standalone.local}"/>
</classpath>
</javac>
@@ -802,6 +819,7 @@
<path refid="util.classpath"/>
<pathelement
location="protocol/basic/target/test/classes"/>
<pathelement location="${lib.junit.local}"/>
+ <pathelement
location="${lib.jboss-marshalling-api.local}"/>
<pathelement location="${lib.xnio-standalone.local}"/>
</classpath>
<batchtest fork="yes"
todir="protocol/basic/target/test-results"
@@ -830,7 +848,7 @@
<delete dir="protocol.basic/target"/>
</target>
- <target name="protocol.basic" description="Build the protocol.basic
module" depends="lib.xnio-api,api,core,util,protocol.basic.compile">
+ <target name="protocol.basic" description="Build the protocol.basic
module"
depends="lib.jboss-marshalling-api,lib.xnio-api,api,core,util,protocol.basic.compile">
<path id="protocol.basic.classpath">
<pathelement location="protocol/basic/target/main/classes"/>
</path>
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java 2008-09-09
08:55:28 UTC (rev 4564)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java 2008-09-09
14:05:02 UTC (rev 4565)
@@ -2,7 +2,6 @@
import java.io.IOException;
import org.jboss.cx.remoting.spi.stream.StreamSerializerFactory;
-import org.jboss.cx.remoting.spi.marshal.MarshallerFactory;
import org.jboss.cx.remoting.stream.ObjectSource;
import org.jboss.xnio.channels.StreamChannel;
import org.jboss.xnio.channels.StreamSourceChannel;
@@ -12,6 +11,7 @@
import org.jboss.xnio.ChannelSource;
import org.jboss.xnio.IoUtils;
import org.jboss.xnio.log.Logger;
+import org.jboss.marshalling.MarshallerFactory;
/**
*
Modified:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-09-09
08:55:28 UTC (rev 4564)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-09-09
14:05:02 UTC (rev 4565)
@@ -27,43 +27,30 @@
import org.jboss.xnio.BufferAllocator;
import org.jboss.xnio.IoUtils;
import org.jboss.xnio.log.Logger;
-import static org.jboss.xnio.Buffers.*;
import org.jboss.cx.remoting.spi.remote.RequestHandler;
import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
import org.jboss.cx.remoting.spi.remote.ReplyHandler;
import org.jboss.cx.remoting.spi.remote.RemoteRequestContext;
import org.jboss.cx.remoting.spi.remote.Handle;
-import org.jboss.cx.remoting.spi.marshal.Unmarshaller;
-import org.jboss.cx.remoting.spi.marshal.Marshaller;
-import org.jboss.cx.remoting.spi.marshal.MarshallerFactory;
-import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
-import org.jboss.cx.remoting.spi.marshal.IdentityResolver;
import org.jboss.cx.remoting.spi.SpiUtils;
import org.jboss.cx.remoting.spi.AbstractAutoCloseable;
-import static org.jboss.cx.remoting.util.CollectionUtil.concurrentMap;
-import static org.jboss.cx.remoting.util.CollectionUtil.arrayList;
+import static org.jboss.cx.remoting.util.CollectionUtil.concurrentIntegerMap;
import org.jboss.cx.remoting.util.CollectionUtil;
-import static org.jboss.cx.remoting.protocol.basic.MessageType.REQUEST_ONEWAY;
-import static org.jboss.cx.remoting.protocol.basic.MessageType.REQUEST;
-import static org.jboss.cx.remoting.protocol.basic.MessageType.REPLY;
-import static org.jboss.cx.remoting.protocol.basic.MessageType.CLIENT_CLOSE;
-import static org.jboss.cx.remoting.protocol.basic.MessageType.CLIENT_OPEN;
-import static org.jboss.cx.remoting.protocol.basic.MessageType.SERVICE_CLOSE;
-import static org.jboss.cx.remoting.protocol.basic.MessageType.REQUEST_FAILED;
-import static org.jboss.cx.remoting.protocol.basic.MessageType.CANCEL_ACK;
-import static org.jboss.cx.remoting.protocol.basic.MessageType.VERSION;
-import static org.jboss.cx.remoting.protocol.basic.MessageType.SERVICE_ADVERTISE;
-import static org.jboss.cx.remoting.protocol.basic.MessageType.SERVICE_UNADVERTISE;
+import org.jboss.cx.remoting.util.ConcurrentIntegerMap;
import org.jboss.cx.remoting.CloseHandler;
import org.jboss.cx.remoting.Endpoint;
-import java.util.concurrent.ConcurrentMap;
+import org.jboss.cx.remoting.SimpleCloseable;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.ByteInput;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.marshalling.ByteOutput;
+import org.jboss.marshalling.Marshaller;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.List;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Collection;
import java.nio.ByteBuffer;
import java.nio.BufferUnderflowException;
import java.io.IOException;
@@ -74,96 +61,67 @@
public final class BasicHandler implements IoHandler<AllocatedMessageChannel> {
private static final Logger log = Logger.getLogger(BasicHandler.class);
- private static final int LOCAL_VERSION = 0x00000100;
+ //--== Connection configuration items ==--
+ private final MarshallerFactory marshallerFactory;
+ private final int linkMetric;
+ private final Executor executor;
+ private final ClassLoader classLoader;
+ // buffer allocator for outbound message assembly
+ private final BufferAllocator<ByteBuffer> allocator;
+
// running on remote node
- private final ConcurrentMap<Integer, ReplyHandler> outstandingRequests =
concurrentMap();
+ private final ConcurrentIntegerMap<ReplyHandler> remoteRequests =
concurrentIntegerMap();
+ // running on local node
+ private final ConcurrentIntegerMap<RemoteRequestContext> localRequests =
concurrentIntegerMap();
+ // sequence for remote requests
+ private final AtomicInteger requestSequence = new AtomicInteger();
// clients whose requests get forwarded to the remote side
- private final ConcurrentMap<Integer, RequestHandler> remoteClients =
concurrentMap();
+ // even #s were opened from services forwarded to us (our sequence)
+ // odd #s were forwarded directly to us (remote sequence)
+ private final ConcurrentIntegerMap<RequestHandler> remoteClients =
concurrentIntegerMap();
// forwarded to remote side (handled on this side)
- private final ConcurrentMap<Integer, Handle<RequestHandler>>
forwardedClients = concurrentMap();
+ private final ConcurrentIntegerMap<Handle<RequestHandler>>
forwardedClients = concurrentIntegerMap();
+ // sequence for forwarded clients
+ private final AtomicInteger forwardedClientSequence = new AtomicInteger();
+ // sequence for clients created from services forwarded to us
+ private final AtomicInteger remoteClientSequence = new AtomicInteger();
// services forwarded to us
- private final ConcurrentMap<Integer, RequestHandlerSource> remoteServices =
concurrentMap();
+ private final ConcurrentIntegerMap<RequestHandlerSource> remoteServices =
concurrentIntegerMap();
// forwarded to remote side (handled on this side)
- private final ConcurrentMap<Integer, Handle<RequestHandlerSource>>
forwardedServices = concurrentMap();
+ private final ConcurrentIntegerMap<Handle<RequestHandlerSource>>
forwardedServices = concurrentIntegerMap();
+ // sequence for forwarded services
+ private final AtomicInteger serviceSequence = new AtomicInteger();
- private final boolean server;
- private final BufferAllocator<ByteBuffer> allocator;
-
- private final AtomicBoolean isnew = new AtomicBoolean(true);
private volatile AllocatedMessageChannel channel;
- private volatile int remoteVersion;
- private final Executor executor;
- private final MarshallerFactory<ByteBuffer> marshallerFactory;
- private final ObjectResolver resolver;
- private final ClassLoader classLoader;
- private List<String> localMarshallerList =
Collections.singletonList("java-serialization");
- private volatile String marshallerType;
- private volatile int metric;
- private final AtomicBoolean initialized = new AtomicBoolean(false);
- public BasicHandler(final boolean server, final BufferAllocator<ByteBuffer>
allocator, final Executor executor, final MarshallerFactory<ByteBuffer>
marshallerFactory) {
- this.server = server;
- this.allocator = allocator;
- this.executor = executor;
- final RequestHandlerImpl endpoint = new RequestHandlerImpl(0, allocator);
- remoteClients.put(Integer.valueOf(0), endpoint);
- this.marshallerFactory = marshallerFactory;
- // todo
- resolver = IdentityResolver.getInstance();
- classLoader = getClass().getClassLoader();
+ public BasicHandler(final RemotingChannelConfiguration configuration) {
+ allocator = configuration.getAllocator();
+ executor = configuration.getExecutor();
+ classLoader = configuration.getClassLoader();
+ marshallerFactory = configuration.getMarshallerFactory();
+ linkMetric = configuration.getLinkMetric();
}
- /**
- * Sequence number of requests originating locally.
- */
- private final AtomicInteger localRequestIdSeq = new AtomicInteger();
- /**
- * Sequence number of local clients forwarded to the remote side.
- */
- private final AtomicInteger localClientIdSeq = new AtomicInteger();
- /**
- * Sequence number of remote clients opened locally from services from the remote
side.
- */
- private final AtomicInteger remoteClientIdSeq = new AtomicInteger();
-
public void handleOpened(final AllocatedMessageChannel channel) {
- if (isnew.getAndSet(false)) {
- this.channel = channel;
- }
- final List<ByteBuffer> bufferList = arrayList();
- ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) VERSION);
- buffer.putInt(LOCAL_VERSION);
- String joinedList = CollectionUtil.join(",", localMarshallerList);
- for (;;) {
- int i = writeUTFZ(buffer, joinedList);
- if (i == -1) {
- break;
- }
- bufferList.add(flip(buffer));
- joinedList = joinedList.substring(i);
- buffer = allocator.allocate();
- }
- bufferList.add(flip(buffer));
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
- } catch (InterruptedException e) {
- log.error("Interrupted while sending intial version message");
- IoUtils.safeClose(channel);
- Thread.currentThread().interrupt();
- return;
- }
channel.resumeReads();
}
public void handleReadable(final AllocatedMessageChannel channel) {
for (;;) try {
- final ByteBuffer buffer = channel.receive();
+ final ByteBuffer buffer;
+ try {
+ buffer = channel.receive();
+ } catch (IOException e) {
+ log.error(e, "I/O error in protocol channel; closing
channel");
+ IoUtils.safeClose(channel);
+ return;
+ }
if (buffer == null) {
// todo release all handles...
+ // todo what if the write queue is not empty?
IoUtils.safeClose(channel);
return;
}
@@ -172,145 +130,188 @@
channel.resumeReads();
return;
}
- int msgType = buffer.get() & 0xff;
- if (initialized.getAndSet(true) != (msgType != 0)) {
- log.error("Expected a version message; closing connection");
- IoUtils.safeClose(channel);
+ final MessageType msgType;
+ try {
+ msgType = MessageType.getMessageType(buffer.get() & 0xff);
+ } catch (IllegalArgumentException ex) {
+ log.trace("Received invalid message type");
return;
}
- log.trace("Received message %s, type %d", buffer,
Integer.valueOf(msgType));
+ log.trace("Received message %s, type %s", buffer, msgType);
switch (msgType) {
- case VERSION: {
- // participants always choose the lowest version number
- // since we only support one version (0), we don't do anything
with the value
- buffer.getInt();
- // Select the client's most preferred marshaling method that the
server supports
- final String marshallerList = readUTFZ(buffer);
- final Iterable<String> remoteMarshallerList =
CollectionUtil.split(",", marshallerList);
- final Iterable<String> clientList = server ?
remoteMarshallerList : localMarshallerList;
- final Iterable<String> serverList = server ?
localMarshallerList : remoteMarshallerList;
- for (final String clientSuggestion : clientList) {
- for (final String serverSuggestion : serverList) {
- if (clientSuggestion.equals(serverSuggestion)) {
- marshallerType = clientSuggestion;
- log.trace("Chose marshaller type '%s'",
marshallerType);
- }
- }
- }
- if (marshallerType == null) {
- log.error("Could not agree on a marshaller type; closing
connection");
- IoUtils.safeClose(channel);
- return;
- }
- break;
- }
case REQUEST_ONEWAY: {
final int clientId = buffer.getInt();
- final Handle<RequestHandler> handle =
getForwardedClient(clientId);
+ final Handle<RequestHandler> handle =
forwardedClients.get(clientId);
if (handle == null) {
log.trace("Request on invalid client ID %d",
Integer.valueOf(clientId));
return;
}
- final Unmarshaller<ByteBuffer> unmarshaller =
marshallerFactory.createUnmarshaller(resolver, classLoader);
- if (! unmarshaller.unmarshal(buffer)) {
- log.trace("Incomplete one-way request for client ID
%d", Integer.valueOf(clientId));
- break;
- }
final Object payload;
try {
- payload = unmarshaller.get();
- } catch (ClassNotFoundException e) {
- log.trace("Class not found in one-way request for client ID
%d", Integer.valueOf(clientId));
+ final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller();
+ try {
+ unmarshaller.start(createByteInput(buffer, true));
+ try {
+ payload = unmarshaller.readObject();
+ } catch (ClassNotFoundException e) {
+ log.trace("Class not found in one-way request for
client ID %d", Integer.valueOf(clientId));
+ break;
+ }
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (IOException ex) {
+ log.error(ex, "Failed to unmarshal a one-way
request");
break;
}
final RequestHandler requestHandler = handle.getResource();
- requestHandler.receiveRequest(payload);
+ try {
+ requestHandler.receiveRequest(payload);
+ } catch (Throwable t) {
+ log.error(t, "One-way request handler unexpectedly threw an
exception");
+ }
break;
}
case REQUEST: {
final int clientId = buffer.getInt();
- final Handle<RequestHandler> handle =
getForwardedClient(clientId);
+ final Handle<RequestHandler> handle =
forwardedClients.get(clientId);
if (handle == null) {
log.trace("Request on invalid client ID %d",
Integer.valueOf(clientId));
break;
}
final int requestId = buffer.getInt();
- final Unmarshaller<ByteBuffer> unmarshaller =
marshallerFactory.createUnmarshaller(resolver, classLoader);
- if (! unmarshaller.unmarshal(buffer)) {
- log.trace("Incomplete request ID %d for client ID %d",
Integer.valueOf(requestId), Integer.valueOf(clientId));
- new ReplyHandlerImpl(channel, requestId,
allocator).handleException("Incomplete request", null);
- break;
- }
final Object payload;
try {
- payload = unmarshaller.get();
- } catch (ClassNotFoundException e) {
- log.trace("Class not found in request ID %d for client ID
%d", Integer.valueOf(requestId), Integer.valueOf(clientId));
+ final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller();
+ try {
+ unmarshaller.start(createByteInput(buffer, true));
+ try {
+ payload = unmarshaller.readObject();
+ } catch (ClassNotFoundException e) {
+ log.trace("Class not found in request ID %d for
client ID %d", Integer.valueOf(requestId), Integer.valueOf(clientId));
+ // todo - send request receive failed message
+ break;
+ }
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (IOException ex) {
+ log.trace("Failed to unmarshal a request (%s), sending
%s", ex, MessageType.REQUEST_RECEIVE_FAILED);
+ // todo send a request failure message
break;
}
final RequestHandler requestHandler = handle.getResource();
- requestHandler.receiveRequest(payload, (ReplyHandler) new
ReplyHandlerImpl(channel, requestId, allocator));
+ requestHandler.receiveRequest(payload, new ReplyHandlerImpl(channel,
requestId, allocator));
break;
}
case REPLY: {
final int requestId = buffer.getInt();
- final ReplyHandler replyHandler = takeOutstandingReqeust(requestId);
+ final ReplyHandler replyHandler = remoteRequests.remove(requestId);
if (replyHandler == null) {
log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
break;
}
- final Unmarshaller<ByteBuffer> unmarshaller =
marshallerFactory.createUnmarshaller(resolver, classLoader);
- if (! unmarshaller.unmarshal(buffer)) {
- replyHandler.handleException("Incomplete reply",
null);
- log.trace("Incomplete reply to request ID %d",
Integer.valueOf(requestId));
- break;
- }
final Object payload;
try {
- payload = unmarshaller.get();
- } catch (ClassNotFoundException e) {
- replyHandler.handleException("Reply unmarshalling
failed", e);
- log.trace("Class not found in reply to request ID %d",
Integer.valueOf(requestId));
+ final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller();
+ try {
+ unmarshaller.start(createByteInput(buffer, true));
+ try {
+ payload = unmarshaller.readObject();
+ } catch (ClassNotFoundException e) {
+ replyHandler.handleException("Reply unmarshalling
failed", e);
+ log.trace("Class not found in reply to request ID
%d", Integer.valueOf(requestId));
+ break;
+ }
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (IOException ex) {
+ log.trace("Failed to unmarshal a reply (%s), sending a
ReplyException");
+ // todo
+ SpiUtils.safeHandleException(replyHandler, null, null);
break;
}
SpiUtils.safeHandleReply(replyHandler, payload);
break;
}
- case REQUEST_FAILED: {
+ case CANCEL_REQUEST: {
final int requestId = buffer.getInt();
- final ReplyHandler replyHandler = takeOutstandingReqeust(requestId);
+ final RemoteRequestContext context = localRequests.get(requestId);
+ if (context != null) {
+ context.cancel();
+ }
+ break;
+ }
+ case CANCEL_ACK: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler replyHandler = remoteRequests.get(requestId);
+ if (replyHandler != null) {
+ replyHandler.handleCancellation();
+ }
+ break;
+ }
+ case REQUEST_RECEIVE_FAILED: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler replyHandler = remoteRequests.remove(requestId);
if (replyHandler == null) {
log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
break;
}
- final Unmarshaller<ByteBuffer> unmarshaller =
marshallerFactory.createUnmarshaller(resolver, classLoader);
- if (! unmarshaller.unmarshal(buffer)) {
- replyHandler.handleException("Incomplete exception
reply", null);
- log.trace("Incomplete exception reply to request ID
%d", Integer.valueOf(requestId));
+ final String reason = readUTFZ(buffer);
+ // todo - throw a new ReplyException
+ break;
+ }
+ case REQUEST_FAILED: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler replyHandler = remoteRequests.remove(requestId);
+ if (replyHandler == null) {
+ log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
break;
}
- final Object message;
+ final Throwable cause;
try {
- message = unmarshaller.get();
- } catch (ClassNotFoundException e) {
- replyHandler.handleException("Exception reply unmarshalling
failed", e);
- log.trace("Class not found in exception reply to request ID
%d", Integer.valueOf(requestId));
+ final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller();
+ try {
+ unmarshaller.start(createByteInput(buffer, true));
+ try {
+ cause = (Throwable) unmarshaller.readObject();
+ } catch (ClassNotFoundException e) {
+ replyHandler.handleException("Exception reply
unmarshalling failed", e);
+ log.trace("Class not found in exception reply to
request ID %d", Integer.valueOf(requestId));
+ break;
+ } catch (ClassCastException e) {
+ // todo - report a generic exception
+ SpiUtils.safeHandleException(replyHandler, null, null);
+ break;
+ }
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (IOException ex) {
+ log.trace("Failed to unmarshal an exception reply (%s),
sending a generic execution exception");
+ // todo
+ SpiUtils.safeHandleException(replyHandler, null, null);
break;
}
- final Object cause;
- try {
- cause = unmarshaller.get();
- } catch (ClassNotFoundException e) {
- replyHandler.handleException("Exception reply unmarshalling
failed", e);
- log.trace("Class not found in exception reply to request ID
%d", Integer.valueOf(requestId));
+ // todo - wrap with REE
+ SpiUtils.safeHandleException(replyHandler, null, cause);
+ break;
+ }
+ case REQUEST_OUTCOME_UNKNOWN: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler replyHandler = remoteRequests.remove(requestId);
+ if (replyHandler == null) {
+ log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
break;
}
- SpiUtils.safeHandleException(replyHandler, message == null ? null :
message.toString(), cause instanceof Throwable ? (Throwable) cause : null);
+ final String reason = readUTFZ(buffer);
+ // todo - throw a new IndetermOutcomeEx
break;
}
case CLIENT_CLOSE: {
final int clientId = buffer.getInt();
- final Handle<RequestHandler> handle =
takeForwardedClient(clientId);
+ final Handle<RequestHandler> handle =
forwardedClients.remove(clientId);
if (handle == null) {
log.warn("Got client close message for unknown client
%d", Integer.valueOf(clientId));
break;
@@ -321,7 +322,7 @@
case CLIENT_OPEN: {
final int serviceId = buffer.getInt();
final int clientId = buffer.getInt();
- final Handle<RequestHandlerSource> handle =
registry.lookup(serviceId);
+ final Handle<RequestHandlerSource> handle =
forwardedServices.get(serviceId);
if (handle == null) {
log.warn("Received client open message for unknown service
%d", Integer.valueOf(serviceId));
break;
@@ -332,14 +333,21 @@
// todo check for duplicate
// todo validate the client ID
log.trace("Opening client %d from service %d",
Integer.valueOf(clientId), Integer.valueOf(serviceId));
- forwardedClients.put(Integer.valueOf(clientId), clientHandle);
+ forwardedClients.put(clientId, clientHandle);
+ } catch (IOException ex) {
+ log.error(ex, "Failed to create a request handler for client
ID %d", Integer.valueOf(clientId));
+ break;
} finally {
IoUtils.safeClose(handle);
}
break;
}
case SERVICE_CLOSE: {
- registry.unbind(buffer.getInt());
+ final Handle<RequestHandlerSource> handle =
forwardedServices.remove(buffer.getInt());
+ if (handle == null) {
+ break;
+ }
+ IoUtils.safeClose(handle);
break;
}
case SERVICE_ADVERTISE: {
@@ -348,37 +356,31 @@
final String groupName = readUTFZ(buffer);
final String endpointName = readUTFZ(buffer);
final int baseMetric = buffer.getInt();
- Endpoint endpoint;
- int id;
+ Endpoint endpoint = null;
+ int id = -1;
final RequestHandlerSource handlerSource = new
RequestHandlerSourceImpl(allocator, id);
- final int calcMetric = baseMetric + metric;
+ final int calcMetric = baseMetric + linkMetric;
if (calcMetric > 0) {
- endpoint.registerRemoteService(serviceType, groupName,
endpointName, handlerSource, calcMetric);
+ try {
+ final SimpleCloseable closeable =
endpoint.registerRemoteService(serviceType, groupName, endpointName, handlerSource,
calcMetric);
+ // todo - something with that closeable
+ } catch (IOException e) {
+ log.error(e, "Unable to register remote service");
+ }
}
break;
}
case SERVICE_UNADVERTISE: {
final int serviceId = buffer.getInt();
- IoUtils.safeClose(remoteServices.get(Integer.valueOf(serviceId)));
+ IoUtils.safeClose(remoteServices.get(serviceId));
break;
}
default: {
- log.trace("Received invalid message type %d",
Integer.valueOf(msgType));
+ log.trace("Received invalid message type %s", msgType);
}
}
- } catch (IOException e) {
- log.error(e, "I/O error in protocol channel");
- IoUtils.safeClose(channel);
- return;
} catch (BufferUnderflowException e) {
log.error(e, "Malformed packet");
-// } catch (InterruptedException e) {
-// log.error(e, "Read thread interrupted, closing channel");
-// IoUtils.safeClose(channel);
-// Thread.currentThread().interrupt();
-// return;
- } catch (Throwable t) {
- log.error(t, "Handler failed");
}
}
@@ -407,10 +409,6 @@
public void handleClosed(final AllocatedMessageChannel channel) {
}
- RequestHandler getRemoteClient(final int i) {
- return remoteClients.get(Integer.valueOf(i));
- }
-
RequestHandlerSource getRemoteService(final int id) {
return new RequestHandlerSourceImpl(allocator, id);
}
@@ -435,56 +433,65 @@
public void handleReply(final Object reply) {
ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) REPLY);
+ buffer.put((byte) MessageType.REPLY.getId());
buffer.putInt(requestId);
try {
- final Marshaller<ByteBuffer> marshaller =
marshallerFactory.createMarshaller(resolver);
- marshaller.start(reply);
- final List<ByteBuffer> bufferList = new
ArrayList<ByteBuffer>();
- while (! marshaller.marshal(buffer)) {
- bufferList.add(flip(buffer));
- buffer = allocator.allocate();
+ final org.jboss.marshalling.Marshaller marshaller =
marshallerFactory.createMarshaller();
+ try {
+ final List<ByteBuffer> bufferList = new
ArrayList<ByteBuffer>();
+ final ByteOutput output = createByteOutput(allocator, bufferList);
+ try {
+ marshaller.start(output);
+ marshaller.writeObject(reply);
+ marshaller.close();
+ output.close();
+ registerWriter(channel, new SimpleWriteHandler(allocator,
bufferList));
+ } finally {
+ IoUtils.safeClose(output);
+ }
+ } finally {
+ IoUtils.safeClose(marshaller);
}
- bufferList.add(flip(buffer));
- registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
} catch (IOException e) {
- // todo log
+ log.error(e, "Failed to send a reply to the remote side");
} catch (InterruptedException e) {
- // todo log
+ log.error(e, "Reply handler thread interrupted before a reply could
be sent");
Thread.currentThread().interrupt();
}
}
public void handleException(final String msg, final Throwable cause) {
ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) REQUEST_FAILED);
+ buffer.put((byte) MessageType.REQUEST_FAILED.getId());
buffer.putInt(requestId);
try {
- final Marshaller<ByteBuffer> marshaller =
marshallerFactory.createMarshaller(resolver);
- final List<ByteBuffer> bufferList = new
ArrayList<ByteBuffer>();
- marshaller.start(msg);
- while (! marshaller.marshal(buffer)) {
- bufferList.add(flip(buffer));
- buffer = allocator.allocate();
+ final org.jboss.marshalling.Marshaller marshaller =
marshallerFactory.createMarshaller();
+ try {
+ final List<ByteBuffer> bufferList = new
ArrayList<ByteBuffer>();
+ final ByteOutput output = createByteOutput(allocator, bufferList);
+ try {
+ marshaller.start(output);
+ marshaller.writeObject(cause);
+ marshaller.close();
+ output.close();
+ registerWriter(channel, new SimpleWriteHandler(allocator,
bufferList));
+ } finally {
+ IoUtils.safeClose(output);
+ }
+ } finally {
+ IoUtils.safeClose(marshaller);
}
- marshaller.start(cause);
- while (! marshaller.marshal(buffer)) {
- bufferList.add(flip(buffer));
- buffer = allocator.allocate();
- }
- bufferList.add(flip(buffer));
- registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
} catch (IOException e) {
- // todo log
+ log.error(e, "Failed to send an exception to the remote
side");
} catch (InterruptedException e) {
- // todo log
+ log.error(e, "Reply handler thread interrupted before an exception
could be sent");
Thread.currentThread().interrupt();
}
}
public void handleCancellation() {
final ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) CANCEL_ACK);
+ buffer.put((byte) MessageType.CANCEL_ACK.getId());
buffer.putInt(requestId);
buffer.flip();
try {
@@ -496,45 +503,6 @@
}
}
- // Session mgmt
-
- public int openRequest(ReplyHandler handler) {
- int id;
- do {
- id = localRequestIdSeq.getAndIncrement();
- } while (outstandingRequests.putIfAbsent(Integer.valueOf(id), handler) != null);
- return id;
- }
-
- public int openClientFromService() {
- int id;
- do {
- id = remoteClientIdSeq.getAndIncrement() << 1 | (server ? 1 : 0);
- } while (remoteClients.putIfAbsent(Integer.valueOf(id), new
RequestHandlerImpl(id, allocator)) != null);
- return id;
- }
-
- public void openClientForForwardedService(int id, RequestHandler clientEndpoint) {
- try {
- forwardedClients.put(Integer.valueOf(id), clientEndpoint.getHandle());
- } catch (IOException e) {
- // TODO fix
- e.printStackTrace();
- }
- }
-
- public Handle<RequestHandler> getForwardedClient(int id) {
- return forwardedClients.get(Integer.valueOf(id));
- }
-
- private Handle<RequestHandler> takeForwardedClient(final int id) {
- return forwardedClients.remove(Integer.valueOf(id));
- }
-
- public ReplyHandler takeOutstandingReqeust(int id) {
- return outstandingRequests.remove(Integer.valueOf(id));
- }
-
// Writer members
private final BlockingQueue<WriteHandler> outputQueue =
CollectionUtil.blockingQueue(64);
@@ -655,8 +623,9 @@
this.allocator = allocator;
addCloseHandler(new CloseHandler<RequestHandler>() {
public void handleClose(final RequestHandler closed) {
+ remoteClients.remove(identifier, this);
ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.CLIENT_CLOSE);
+ buffer.put((byte) MessageType.CLIENT_CLOSE.getId());
buffer.putInt(identifier);
buffer.flip();
try {
@@ -671,17 +640,23 @@
public void receiveRequest(final Object request) {
log.trace("Sending outbound one-way request of type %s", request ==
null ? "null" : request.getClass());
try {
- final Marshaller<ByteBuffer> marshaller =
marshallerFactory.createMarshaller(null);
- final List<ByteBuffer> bufferList = new
ArrayList<ByteBuffer>();
- ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.REQUEST_ONEWAY);
- buffer.putInt(identifier);
- marshaller.start(request);
- while (! marshaller.marshal(buffer)) {
- bufferList.add(flip(buffer));
- buffer = allocator.allocate();
+ final List<ByteBuffer> bufferList;
+ final Marshaller marshaller = marshallerFactory.createMarshaller();
+ try {
+ bufferList = new ArrayList<ByteBuffer>();
+ final ByteOutput output = createByteOutput(allocator, bufferList);
+ try {
+ marshaller.write(MessageType.REQUEST_ONEWAY.getId());
+ marshaller.writeInt(identifier);
+ marshaller.writeObject(request);
+ marshaller.close();
+ output.close();
+ } finally {
+ IoUtils.safeClose(output);
+ }
+ } finally {
+ IoUtils.safeClose(marshaller);
}
- bufferList.add(flip(buffer));
try {
registerWriter(channel, new SimpleWriteHandler(allocator,
bufferList));
} catch (InterruptedException e) {
@@ -699,33 +674,43 @@
public RemoteRequestContext receiveRequest(final Object request, final
ReplyHandler handler) {
log.trace("Sending outbound request of type %s", request == null ?
"null" : request.getClass());
try {
- final Marshaller<ByteBuffer> marshaller =
marshallerFactory.createMarshaller(null);
- final List<ByteBuffer> bufferList = new
ArrayList<ByteBuffer>();
- ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.REQUEST);
- buffer.putInt(identifier);
- final int id = openRequest(handler);
- buffer.putInt(id);
- marshaller.start(request);
- while (! marshaller.marshal(buffer)) {
- bufferList.add(flip(buffer));
- buffer = allocator.allocate();
- }
- bufferList.add(flip(buffer));
+ final List<ByteBuffer> bufferList;
+ final Marshaller marshaller = marshallerFactory.createMarshaller();
try {
- registerWriter(channel, new SimpleWriteHandler(allocator,
bufferList));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- executor.execute(new Runnable() {
- public void run() {
- SpiUtils.safeHandleCancellation(handler);
+ bufferList = new ArrayList<ByteBuffer>();
+ final ByteOutput output = createByteOutput(allocator, bufferList);
+ try {
+ marshaller.write(MessageType.REQUEST.getId());
+ marshaller.writeInt(identifier);
+
+ int id;
+ do {
+ id = requestSequence.getAndIncrement();
+ } while (remoteRequests.putIfAbsent(id, handler) != null);
+ marshaller.writeInt(id);
+ marshaller.writeObject(request);
+ marshaller.close();
+ output.close();
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator,
bufferList));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ executor.execute(new Runnable() {
+ public void run() {
+ SpiUtils.safeHandleCancellation(handler);
+ }
+ });
+ return SpiUtils.getBlankRemoteRequestContext();
}
- });
- return SpiUtils.getBlankRemoteRequestContext();
+ log.trace("Sent request %s", request);
+ return new RemoteRequestContextImpl(id, allocator, channel);
+ } finally {
+ IoUtils.safeClose(output);
+ }
+ } finally {
+ IoUtils.safeClose(marshaller);
}
- log.trace("Sent request %s", request);
- return new RemoteRequestContextImpl(id, allocator, channel);
- } catch (final Throwable t) {
+ } catch (final IOException t) {
log.trace(t, "receiveRequest failed with an exception");
executor.execute(new Runnable() {
public void run() {
@@ -756,7 +741,7 @@
public void cancel() {
try {
final ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.CANCEL_REQUEST);
+ buffer.put((byte) MessageType.CANCEL_REQUEST.getId());
buffer.putInt(id);
buffer.flip();
registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
@@ -781,7 +766,7 @@
addCloseHandler(new CloseHandler<RequestHandlerSource>() {
public void handleClose(final RequestHandlerSource closed) {
ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.SERVICE_CLOSE);
+ buffer.put((byte) MessageType.SERVICE_CLOSE.getId());
buffer.putInt(identifier);
buffer.flip();
try {
@@ -794,9 +779,13 @@
}
public Handle<RequestHandler> createRequestHandler() throws IOException {
- final int clientId = openClientFromService();
+ int id;
+ do {
+ id = remoteClientSequence.getAndIncrement() << 1;
+ } while (remoteClients.putIfAbsent(id, new RequestHandlerImpl(id,
BasicHandler.this.allocator)) != null);
+ final int clientId = id;
final ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.CLIENT_OPEN);
+ buffer.put((byte) MessageType.CLIENT_OPEN.getId());
buffer.putInt(identifier);
buffer.putInt(clientId);
buffer.flip();
@@ -822,4 +811,105 @@
return "forwarding request handler source <" +
Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
}
}
+
+ public static ByteInput createByteInput(final ByteBuffer buffer, final boolean eof)
{
+ return new ByteInput() {
+ public int read() throws IOException {
+ if (buffer.hasRemaining()) {
+ return buffer.get() & 0xff;
+ } else {
+ return eof ? -1 : 0;
+ }
+ }
+
+ public int read(final byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ public int read(final byte[] b, final int off, final int len) throws
IOException {
+ int r = Math.min(buffer.remaining(), len);
+ if (r > 0) {
+ buffer.get(b, off, r);
+ return r;
+ } else {
+ return eof ? -1 : 0;
+ }
+ }
+
+ public int available() throws IOException {
+ return buffer.remaining();
+ }
+
+ public long skip(final long n) throws IOException {
+ final int cnt = n > (long) Integer.MAX_VALUE ? Integer.MAX_VALUE :
(int) n;
+ int r = Math.min(buffer.remaining(), cnt);
+ if (r > 0) {
+ final int oldPos = buffer.position();
+ final int newPos = oldPos + r;
+ if (newPos < 0) {
+ final int lim = buffer.limit();
+ buffer.position(lim);
+ return lim - oldPos;
+ }
+ }
+ return r;
+ }
+
+ public void close() {
+ }
+ };
+ }
+
+ public static ByteOutput createByteOutput(final BufferAllocator<ByteBuffer>
allocator, final Collection<ByteBuffer> target) {
+ return new ByteOutput() {
+ private ByteBuffer current;
+
+ private ByteBuffer getCurrent() {
+ final ByteBuffer buffer = current;
+ return buffer == null ? (current = allocator.allocate()) : buffer;
+ }
+
+ public void write(final int i) throws IOException {
+ final ByteBuffer buffer = getCurrent();
+ buffer.put((byte) i);
+ if (! buffer.hasRemaining()) {
+ buffer.flip();
+ target.add(buffer);
+ current = null;
+ }
+ }
+
+ public void write(final byte[] bytes) throws IOException {
+ write(bytes, 0, bytes.length);
+ }
+
+ public void write(final byte[] bytes, int offs, int len) throws IOException
{
+ while (len > 0) {
+ final ByteBuffer buffer = getCurrent();
+ final int c = Math.min(len, buffer.remaining());
+ buffer.put(bytes, offs, c);
+ offs += c;
+ len -= c;
+ if (! buffer.hasRemaining()) {
+ buffer.flip();
+ target.add(buffer);
+ current = null;
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ flush();
+ }
+
+ public void flush() throws IOException {
+ final ByteBuffer buffer = current;
+ if (buffer != null) {
+ buffer.flip();
+ target.add(buffer);
+ current = null;
+ }
+ }
+ };
+ }
}
Modified:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java 2008-09-09
08:55:28 UTC (rev 4564)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java 2008-09-09
14:05:02 UTC (rev 4565)
@@ -26,7 +26,6 @@
import org.jboss.cx.remoting.SimpleCloseable;
import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
import org.jboss.cx.remoting.spi.remote.Handle;
-import org.jboss.cx.remoting.core.marshal.JavaSerializationMarshallerFactory;
import org.jboss.xnio.IoHandlerFactory;
import org.jboss.xnio.ChannelSource;
import org.jboss.xnio.IoFuture;
@@ -59,7 +58,11 @@
public static IoHandlerFactory<AllocatedMessageChannel> createServer(final
Executor executor, final BufferAllocator<ByteBuffer> allocator) {
return new IoHandlerFactory<AllocatedMessageChannel>() {
public IoHandler<? super AllocatedMessageChannel> createHandler() {
- return new BasicHandler(true, allocator, executor, new
JavaSerializationMarshallerFactory(executor));
+ final RemotingChannelConfiguration configuration = new
RemotingChannelConfiguration();
+ configuration.setAllocator(allocator);
+ configuration.setExecutor(executor);
+ // todo marshaller factory... etc
+ return new BasicHandler(configuration);
}
};
}
@@ -74,7 +77,11 @@
* @throws IOException if an error occurs
*/
public static IoFuture<SimpleCloseable> connect(final Executor executor, final
ChannelSource<AllocatedMessageChannel> channelSource, final
BufferAllocator<ByteBuffer> allocator) throws IOException {
- final BasicHandler basicHandler = new BasicHandler(false, allocator, executor,
new JavaSerializationMarshallerFactory(executor));
+ final RemotingChannelConfiguration configuration = new
RemotingChannelConfiguration();
+ configuration.setAllocator(allocator);
+ configuration.setExecutor(executor);
+ // todo marshaller factory... etc
+ final BasicHandler basicHandler = new BasicHandler(configuration);
final IoFuture<AllocatedMessageChannel> futureChannel =
channelSource.open(basicHandler);
return new AbstractConvertingIoFuture<SimpleCloseable,
AllocatedMessageChannel>(futureChannel) {
protected SimpleCloseable convert(final AllocatedMessageChannel channel)
throws RemotingException {
Added:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ConfigValue.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ConfigValue.java
(rev 0)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ConfigValue.java 2008-09-09
14:05:02 UTC (rev 4565)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+/**
+ *
+ */
+public enum ConfigValue {
+
+ /**
+ * The protocol version to use. Value type is {@code int}.
+ */
+ PROTOCOL_VERSION(0),
+ /**
+ * The name of the marshaller to use. Value type is {@code String}.
+ */
+ MARSHALLER_NAME(1),
+ ;
+ private final int id;
+
+ private ConfigValue(final int id) {
+ this.id = id;
+ }
+
+ /**
+ * Get the integer ID for this config value.
+ *
+ * @return the integer ID
+ */
+ public int getId() {
+ return id;
+ }
+
+ /**
+ * Get the config value for an integer ID.
+ *
+ * @param id the integer ID
+ * @return the config value instance
+ */
+ public static ConfigValue getConfigValue(final int id) {
+ switch (id) {
+ case 0: return PROTOCOL_VERSION;
+ case 1: return MARSHALLER_NAME;
+ default: throw new IllegalArgumentException("Invalid config value
ID");
+ }
+ }
+}
Modified:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java 2008-09-09
08:55:28 UTC (rev 4564)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java 2008-09-09
14:05:02 UTC (rev 4565)
@@ -23,34 +23,69 @@
package org.jboss.cx.remoting.protocol.basic;
/**
- *
+ * The type of a protocol message.
*/
-public final class MessageType {
- // Initial version & marshaller negotiation
- public static final int VERSION = 0;
+public enum MessageType {
+
// One-way request, no return value may be sent
- public static final int REQUEST_ONEWAY = 1;
+ REQUEST_ONEWAY(1),
// Two-way request, return value is expected
- public static final int REQUEST = 2;
+ REQUEST(2),
// Reply
- public static final int REPLY = 3;
+ REPLY(3),
// Attempt to cancel a request
- public static final int CANCEL_REQUEST = 4;
+ CANCEL_REQUEST(4),
// Acknowledge that a request was cancelled
- public static final int CANCEL_ACK = 5;
+ CANCEL_ACK(5),
+ // Request failed due to protocol or unmarshalling problem
+ REQUEST_RECEIVE_FAILED(6),
// Request failed due to exception
- public static final int REQUEST_FAILED = 6;
+ REQUEST_FAILED(7),
+ // Request completed but no reply or exception was sent
+ REQUEST_OUTCOME_UNKNOWN(8),
// Remote side called .close() on a forwarded RequestHandler
- public static final int CLIENT_CLOSE = 7;
+ CLIENT_CLOSE(9),
// Remote side pulled a new RequestHandler off of a forwarded RequestHandlerSource
- public static final int CLIENT_OPEN = 8;
+ CLIENT_OPEN(10),
// Remote side called .close() on a forwarded RequestHandlerSource
- public static final int SERVICE_CLOSE = 9;
+ SERVICE_CLOSE(11),
// Remote side brought a new service online
- public static final int SERVICE_ADVERTISE = 10;
+ SERVICE_ADVERTISE(12),
// Remote side's service is no longer available
- public static final int SERVICE_UNADVERTISE= 11;
+ SERVICE_UNADVERTISE(13),
+ ;
+ private final int id;
- private MessageType() {
+ private MessageType(int id) {
+ this.id = id;
}
+
+ public int getId() {
+ return id;
+ }
+
+ /**
+ * Get the message type for an integer ID.
+ *
+ * @param id the integer ID
+ * @return the message type instance
+ */
+ public static MessageType getMessageType(final int id) {
+ switch (id) {
+ case 1: return REQUEST_ONEWAY;
+ case 2: return REQUEST;
+ case 3: return REPLY;
+ case 4: return CANCEL_REQUEST;
+ case 5: return CANCEL_ACK;
+ case 6: return REQUEST_RECEIVE_FAILED;
+ case 7: return REQUEST_FAILED;
+ case 8: return REQUEST_OUTCOME_UNKNOWN;
+ case 9: return CLIENT_CLOSE;
+ case 10: return CLIENT_OPEN;
+ case 11: return SERVICE_CLOSE;
+ case 12: return SERVICE_ADVERTISE;
+ case 13: return SERVICE_UNADVERTISE;
+ default: throw new IllegalArgumentException("Invalid message type
ID");
+ }
+ }
}
Added:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/RemotingChannelConfiguration.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/RemotingChannelConfiguration.java
(rev 0)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/RemotingChannelConfiguration.java 2008-09-09
14:05:02 UTC (rev 4565)
@@ -0,0 +1,82 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import java.util.concurrent.Executor;
+import java.nio.ByteBuffer;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.marshalling.MarshallerFactory;
+
+/**
+ *
+ */
+public final class RemotingChannelConfiguration {
+ private MarshallerFactory marshallerFactory;
+ private int linkMetric;
+ private Executor executor;
+ private ClassLoader classLoader;
+ private BufferAllocator<ByteBuffer> allocator;
+
+ public RemotingChannelConfiguration() {
+ }
+
+ public MarshallerFactory getMarshallerFactory() {
+ return marshallerFactory;
+ }
+
+ public void setMarshallerFactory(final MarshallerFactory marshallerFactory) {
+ this.marshallerFactory = marshallerFactory;
+ }
+
+ public int getLinkMetric() {
+ return linkMetric;
+ }
+
+ public void setLinkMetric(final int linkMetric) {
+ this.linkMetric = linkMetric;
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+
+ public void setExecutor(final Executor executor) {
+ this.executor = executor;
+ }
+
+ public ClassLoader getClassLoader() {
+ return classLoader;
+ }
+
+ public void setClassLoader(final ClassLoader classLoader) {
+ this.classLoader = classLoader;
+ }
+
+ public BufferAllocator<ByteBuffer> getAllocator() {
+ return allocator;
+ }
+
+ public void setAllocator(final BufferAllocator<ByteBuffer> allocator) {
+ this.allocator = allocator;
+ }
+}
Modified:
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
===================================================================
---
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java 2008-09-09
08:55:28 UTC (rev 4564)
+++
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java 2008-09-09
14:05:02 UTC (rev 4565)
@@ -84,73 +84,6 @@
endpoint.setExecutor(closeableExecutor);
endpoint.start();
try {
- try {
- final Handle<RequestHandlerSource>
requestHandlerSourceHandle = endpoint.createRequestHandlerSource(new
AbstractRequestListener<Object, Object>() {
- public void handleRequest(final RequestContext<Object>
context, final Object request) throws RemoteExecutionException {
- try {
- context.sendReply(REPLY);
- } catch (IOException e) {
- problems.add(e);
- }
- }
- }, null, null);
- try {
-
serviceRegistry.bind(requestHandlerSourceHandle.getResource(), 13);
- final IoHandlerFactory<AllocatedMessageChannel>
handlerFactory = BasicProtocol.createServer(closeableExecutor, allocator);
- final IoHandlerFactory<StreamChannel> newHandlerFactory
= Channels.convertStreamToAllocatedMessage(handlerFactory, 32768, 32768);
- final Closeable tcpServerCloseable =
xnio.createTcpServer(newHandlerFactory, new InetSocketAddress(12345)).create();
- try {
- final CloseableTcpConnector connector =
xnio.createTcpConnector().create();
- try {
- final TcpClient tcpClient =
connector.createChannelSource(new InetSocketAddress("localhost", 12345));
- final ChannelSource<AllocatedMessageChannel>
channelSource = Channels.convertStreamToAllocatedMessage(tcpClient, 32768, 32768);
- final IoFuture<SimpleCloseable> futureCloseable
= BasicProtocol.connect(closeableExecutor, channelSource, allocator);
- final SimpleCloseable connection =
futureCloseable.get();
- try {
- final Handle<RequestHandlerSource>
handleThirteen = connection.getServiceForId(13);
- try {
- final ClientSource<Object,Object>
clientSource = endpoint.createClientSource(handleThirteen.getResource());
- try {
- final Client<Object,Object> client
= clientSource.createClient();
- try {
- final IoFuture<Object> future =
client.send(REQUEST);
- assertEquals(IoFuture.Status.DONE,
future.await(TimeUnit.MILLISECONDS, 500L));
- assertEquals(REPLY, future.get());
- client.close();
- clientSource.close();
- handleThirteen.close();
- connection.close();
- connector.close();
- tcpServerCloseable.close();
- requestHandlerSourceHandle.close();
- serviceRegistry.clear();
- endpoint.stop();
- xnio.close();
- closeableExecutor.close();
- } finally {
- IoUtils.safeClose(client);
- }
- } finally {
- IoUtils.safeClose(clientSource);
- }
- } finally {
- IoUtils.safeClose(handleThirteen);
- }
- } finally {
- IoUtils.safeClose(connection);
- }
- } finally {
- IoUtils.safeClose(connector);
- }
- } finally {
- IoUtils.safeClose(tcpServerCloseable);
- }
- } finally {
- IoUtils.safeClose(requestHandlerSourceHandle);
- }
- } finally {
- serviceRegistry.clear();
- }
} finally {
endpoint.stop();
}
Modified:
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/CollectionUtil.java
===================================================================
---
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/CollectionUtil.java 2008-09-09
08:55:28 UTC (rev 4564)
+++
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/CollectionUtil.java 2008-09-09
14:05:02 UTC (rev 4565)
@@ -63,6 +63,10 @@
return new ConcurrentReferenceHashMap<K, V>(16, STRONG, WEAK);
}
+ public static <V> ConcurrentIntegerMap<V> concurrentIntegerMap() {
+ return new EmulatedConcurrentIntegerHashMap<V>(CollectionUtil.<Integer,
V>concurrentMap());
+ }
+
/**
* Create a synchronized map that obeys the contract for {@code ConcurrentMap}.
*
Added:
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ConcurrentIntegerMap.java
===================================================================
---
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ConcurrentIntegerMap.java
(rev 0)
+++
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ConcurrentIntegerMap.java 2008-09-09
14:05:02 UTC (rev 4565)
@@ -0,0 +1,77 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.util;
+
+import java.util.Set;
+import java.util.Collection;
+
+/**
+ *
+ */
+public interface ConcurrentIntegerMap<V> {
+ boolean containsKey(int key);
+
+ boolean containsValue(Object value);
+
+ V get(int key);
+
+ V put(int key, V value);
+
+ V putIfAbsent(int key, V value);
+
+ void putAll(ConcurrentIntegerMap<? extends V> m);
+
+ V remove(int key);
+
+ boolean remove(int key, Object oldValue);
+
+ V replace(int key, V value);
+
+ boolean replace(int key, V oldValue, V newValue);
+
+ void clear();
+
+ int size();
+
+ boolean isEmpty();
+
+ Set<Entry<V>> entrySet();
+
+ Collection<V> values();
+
+ boolean equals(Object other);
+
+ int hashCode();
+
+ interface Entry<V> {
+ int getKey();
+
+ V getValue();
+
+ V setValue(V value);
+
+ int hashCode();
+
+ boolean equals(Object other);
+ }
+}
Added:
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/EmulatedConcurrentIntegerHashMap.java
===================================================================
---
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/EmulatedConcurrentIntegerHashMap.java
(rev 0)
+++
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/EmulatedConcurrentIntegerHashMap.java 2008-09-09
14:05:02 UTC (rev 4565)
@@ -0,0 +1,197 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.util;
+
+import java.util.Set;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ *
+ */
+public final class EmulatedConcurrentIntegerHashMap<V> implements
ConcurrentIntegerMap<V> {
+
+ private final ConcurrentMap<Integer, V> delegate;
+
+ public EmulatedConcurrentIntegerHashMap(final ConcurrentMap<Integer, V>
delegate) {
+ this.delegate = delegate;
+ }
+
+ public boolean containsKey(final int key) {
+ return delegate.containsKey(Integer.valueOf(key));
+ }
+
+ public boolean containsValue(final Object value) {
+ return delegate.containsValue(value);
+ }
+
+ public V get(final int key) {
+ return delegate.get(Integer.valueOf(key));
+ }
+
+ public V put(final int key, final V value) {
+ return delegate.put(Integer.valueOf(key), value);
+ }
+
+ public V putIfAbsent(final int key, final V value) {
+ return delegate.putIfAbsent(Integer.valueOf(key), value);
+ }
+
+ public void putAll(final ConcurrentIntegerMap<? extends V> m) {
+ throw new UnsupportedOperationException("maybe later");
+ }
+
+ public V remove(final int key) {
+ return delegate.remove(Integer.valueOf(key));
+ }
+
+ public boolean remove(final int key, final Object oldValue) {
+ return delegate.remove(Integer.valueOf(key), oldValue);
+ }
+
+ public V replace(final int key, final V value) {
+ return delegate.replace(Integer.valueOf(key), value);
+ }
+
+ public boolean replace(final int key, final V oldValue, final V newValue) {
+ return delegate.replace(Integer.valueOf(key), oldValue, newValue);
+ }
+
+ public void clear() {
+ delegate.clear();
+ }
+
+ public int size() {
+ return delegate.size();
+ }
+
+ public boolean isEmpty() {
+ return delegate.isEmpty();
+ }
+
+ public Set<Entry<V>> entrySet() {
+ return new EmulatedEntrySet<V>(delegate.entrySet());
+ }
+
+ public Collection<V> values() {
+ return delegate.values();
+ }
+
+ public boolean equals(final Object obj) {
+ return super.equals(obj);
+ }
+
+ public int hashCode() {
+ return super.hashCode();
+ }
+
+ private static class EmulatedEntrySet<V> implements Set<Entry<V>>
{
+
+ private final Set<Map.Entry<Integer, V>> delegate;
+
+ public EmulatedEntrySet(final Set<Map.Entry<Integer,V>> delegate) {
+ this.delegate = delegate;
+ }
+
+ public int size() {
+ return delegate.size();
+ }
+
+ public boolean isEmpty() {
+ return delegate.isEmpty();
+ }
+
+ public boolean contains(final Object o) {
+ // todo
+ return false;
+ }
+
+ public Iterator<Entry<V>> iterator() {
+ final Iterator<Map.Entry<Integer, V>> i = delegate.iterator();
+ return new Iterator<Entry<V>>() {
+ public boolean hasNext() {
+ return i.hasNext();
+ }
+
+ public Entry<V> next() {
+ final Map.Entry<Integer, V> n = i.next();
+ return new Entry<V>() {
+ public int getKey() {
+ return n.getKey().intValue();
+ }
+
+ public V getValue() {
+ return n.getValue();
+ }
+
+ public V setValue(final V value) {
+ return n.setValue(value);
+ }
+ };
+ }
+
+ public void remove() {
+ i.remove();
+ }
+ };
+ }
+
+ public Object[] toArray() {
+ throw new UnsupportedOperationException("maybe later");
+ }
+
+ public <T> T[] toArray(final T[] a) {
+ throw new UnsupportedOperationException("maybe later");
+ }
+
+ public boolean add(final Entry<V> o) {
+ throw new UnsupportedOperationException("add() not supported");
+ }
+
+ public boolean remove(final Object o) {
+ throw new UnsupportedOperationException("maybe later");
+ }
+
+ public boolean containsAll(final Collection<?> c) {
+ throw new UnsupportedOperationException("maybe later");
+ }
+
+ public boolean addAll(final Collection<? extends Entry<V>> c) {
+ throw new UnsupportedOperationException("addAll() not supported");
+ }
+
+ public boolean retainAll(final Collection<?> c) {
+ throw new UnsupportedOperationException("maybe later");
+ }
+
+ public boolean removeAll(final Collection<?> c) {
+ throw new UnsupportedOperationException("maybe later");
+ }
+
+ public void clear() {
+ delegate.clear();
+ }
+ }
+}