Author: david.lloyd(a)jboss.com
Date: 2008-07-21 12:29:04 -0400 (Mon, 21 Jul 2008)
New Revision: 4417
Modified:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/CollectionUtil.java
Log:
Add a VERSION message for protocol forward compatibility
Modified:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-07-21
14:10:27 UTC (rev 4416)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-07-21
16:29:04 UTC (rev 4417)
@@ -50,6 +50,7 @@
import static org.jboss.cx.remoting.protocol.basic.MessageType.SERVICE_CLOSE;
import static org.jboss.cx.remoting.protocol.basic.MessageType.REQUEST_FAILED;
import static org.jboss.cx.remoting.protocol.basic.MessageType.CANCEL_ACK;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.VERSION;
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.CloseHandler;
import java.util.concurrent.ConcurrentMap;
@@ -59,6 +60,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.List;
import java.util.ArrayList;
+import java.util.Collections;
import java.nio.ByteBuffer;
import java.nio.BufferUnderflowException;
import java.io.IOException;
@@ -69,6 +71,7 @@
public final class BasicHandler implements IoHandler<AllocatedMessageChannel> {
private static final Logger log = Logger.getLogger(BasicHandler.class);
+ private static final int LOCAL_VERSION = 1;
// clients whose requests get forwarded to the remote side
private final ConcurrentMap<Integer, RemoteClientEndpoint> remoteClients =
concurrentMap();
@@ -84,10 +87,14 @@
private final AtomicBoolean isnew = new AtomicBoolean(true);
private volatile AllocatedMessageChannel channel;
+ private volatile int remoteVersion;
private final Executor executor;
private final MarshallerFactory<ByteBuffer> marshallerFactory;
private final ObjectResolver resolver;
private final ClassLoader classLoader;
+ private List<String> localMarshallerList =
Collections.singletonList("java-serialization");
+ private volatile String marshallerType;
+ private final AtomicBoolean initialized = new AtomicBoolean(false);
public BasicHandler(final boolean server, final BufferAllocator<ByteBuffer>
allocator, final Executor executor, final MarshallerFactory<ByteBuffer>
marshallerFactory, final ServiceRegistry registry) {
this.server = server;
@@ -119,6 +126,19 @@
if (isnew.getAndSet(false)) {
this.channel = channel;
}
+ final ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) VERSION);
+ buffer.putInt(LOCAL_VERSION);
+ writeUTFZ(buffer, CollectionUtil.join(",", localMarshallerList));
+ buffer.flip();
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+ } catch (InterruptedException e) {
+ log.error("Interrupted while sending intial version message");
+ IoUtils.safeClose(channel);
+ Thread.currentThread().interrupt();
+ return;
+ }
channel.resumeReads();
}
@@ -136,8 +156,37 @@
return;
}
int msgType = buffer.get() & 0xff;
+ if (initialized.getAndSet(true) != (msgType != 0)) {
+ log.error("Expected a version message; closing connection");
+ IoUtils.safeClose(channel);
+ return;
+ }
log.trace("Received message %s, type %d", buffer,
Integer.valueOf(msgType));
switch (msgType) {
+ case VERSION: {
+ // participants always choose the lowest version number
+ // since we only support one version (0), we don't do anything
with the value
+ buffer.getInt();
+ // Select the client's most preferred marshaling method that the
server supports
+ final String marshallerList = readUTFZ(buffer);
+ final Iterable<String> remoteMarshallerList =
CollectionUtil.split(",", marshallerList);
+ final Iterable<String> clientList = server ?
remoteMarshallerList : localMarshallerList;
+ final Iterable<String> serverList = server ?
localMarshallerList : remoteMarshallerList;
+ for (final String clientSuggestion : clientList) {
+ for (final String serverSuggestion : serverList) {
+ if (clientSuggestion.equals(serverSuggestion)) {
+ marshallerType = clientSuggestion;
+ log.trace("Chose marshaller type '%s'",
marshallerType);
+ }
+ }
+ }
+ if (marshallerType == null) {
+ log.error("Could not agree on a marshaller type; closing
connection");
+ IoUtils.safeClose(channel);
+ return;
+ }
+ break;
+ }
case REQUEST_ONEWAY: {
final int clientId = buffer.getInt();
final Handle<RemoteClientEndpoint> handle =
getForwardedClient(clientId);
@@ -286,6 +335,11 @@
return;
} catch (BufferUnderflowException e) {
log.error(e, "Malformed packet");
+// } catch (InterruptedException e) {
+// log.error(e, "Read thread interrupted, closing channel");
+// IoUtils.safeClose(channel);
+// Thread.currentThread().interrupt();
+// return;
} catch (Throwable t) {
log.error(t, "Handler failed");
}
@@ -456,6 +510,98 @@
}
}
+ private int writeUTFZ(ByteBuffer buffer, CharSequence s) {
+ final int len = s.length();
+ for (int i = 0; i < len; i++) {
+ char c = s.charAt(i);
+ if (1 <= c && c < 0x80) {
+ if (buffer.hasRemaining()) {
+ buffer.put((byte) c);
+ } else {
+ return i;
+ }
+ } else if (c < 0x0800) {
+ if (buffer.remaining() >= 2) {
+ buffer.put((byte) (0xc0 | (c >> 6)));
+ buffer.put((byte) (0x80 | (c & 0x3f)));
+ } else {
+ return i;
+ }
+ } else {
+ if (buffer.remaining() >= 3) {
+ buffer.put((byte) (0xe0 | (c >> 12)));
+ buffer.put((byte) (0x80 | ((c >> 6) & 0x3f)));
+ buffer.put((byte) (0x80 | (c & 0x3f)));
+ } else {
+ return i;
+ }
+ }
+ }
+ if (buffer.hasRemaining()) {
+ buffer.put((byte) 0);
+ return -1;
+ } else {
+ return len;
+ }
+ }
+
+ // Reader utils
+
+ private String readUTFZ(ByteBuffer buffer) {
+ StringBuilder builder = new StringBuilder();
+ int state = 0, a = 0;
+ while (buffer.hasRemaining()) {
+ final int v = buffer.get() & 0xff;
+ switch (state) {
+ case 0: {
+ if (v == 0) {
+ return builder.toString();
+ } else if (v < 128) {
+ builder.append((char) v);
+ } else if (192 <= v && v < 224) {
+ a = v << 6;
+ state = 1;
+ } else if (224 <= v && v < 232) {
+ a = v << 12;
+ state = 2;
+ } else {
+ builder.append('?');
+ }
+ break;
+ }
+ case 1: {
+ if (v == 0) {
+ builder.append('?');
+ return builder.toString();
+ } else if (128 <= v && v < 192) {
+ a |= v & 0x3f;
+ builder.append((char) a);
+ } else {
+ builder.append('?');
+ }
+ state = 0;
+ break;
+ }
+ case 2: {
+ if (v == 0) {
+ builder.append('?');
+ return builder.toString();
+ } else if (128 <= v && v < 192) {
+ a |= (v & 0x3f) << 6;
+ state = 1;
+ } else {
+ builder.append('?');
+ state = 0;
+ }
+ break;
+ }
+ default:
+ throw new IllegalStateException("wrong state");
+ }
+ }
+ return builder.toString();
+ }
+
// client endpoint
private final class RemoteClientEndpointImpl extends
AbstractAutoCloseable<RemoteClientEndpoint> implements RemoteClientEndpoint {
Modified:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java 2008-07-21
14:10:27 UTC (rev 4416)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java 2008-07-21
16:29:04 UTC (rev 4417)
@@ -26,18 +26,18 @@
*
*/
public final class MessageType {
- //
- public static final int REQUEST_ONEWAY = 0;
- public static final int REQUEST = 1;
- public static final int REPLY = 2;
- public static final int CANCEL_REQUEST = 3;
- public static final int CANCEL_ACK = 4;
- public static final int REQUEST_FAILED = 5;
+ public static final int VERSION = 0;
+ public static final int REQUEST_ONEWAY = 1;
+ public static final int REQUEST = 2;
+ public static final int REPLY = 3;
+ public static final int CANCEL_REQUEST = 4;
+ public static final int CANCEL_ACK = 5;
+ public static final int REQUEST_FAILED = 6;
// Remote side called .close() on a forwarded RemoteClientEndpoint
- public static final int CLIENT_CLOSE = 6;
+ public static final int CLIENT_CLOSE = 7;
// Remote side called .close() on a forwarded RemoteClientEndpoint
- public static final int CLIENT_OPEN = 7;
- public static final int SERVICE_CLOSE = 8;
+ public static final int CLIENT_OPEN = 8;
+ public static final int SERVICE_CLOSE = 9;
private MessageType() {
}
Modified:
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/CollectionUtil.java
===================================================================
---
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/CollectionUtil.java 2008-07-21
14:10:27 UTC (rev 4416)
+++
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/CollectionUtil.java 2008-07-21
16:29:04 UTC (rev 4417)
@@ -395,6 +395,26 @@
};
}
+ /**
+ * Join a series of character sequences using a delimiter.
+ *
+ * @param delimiter the delimiter
+ * @param strings the strings
+ * @return a joined string
+ */
+ public static String join(final CharSequence delimiter, final Iterable<? extends
CharSequence> strings) {
+ StringBuilder builder = new StringBuilder();
+ Iterator<? extends CharSequence> it = strings.iterator();
+ while (it.hasNext()) {
+ CharSequence s = it.next();
+ builder.append(s);
+ if (it.hasNext()) {
+ builder.append(delimiter);
+ }
+ }
+ return builder.toString();
+ }
+
@SuppressWarnings({ "unchecked" })
private static <T> T[] unroll(final Iterator<? extends T> iterator, final
Class<T> type, final int c) {
if (iterator.hasNext()) {