[jboss-remoting-commits] JBoss Remoting SVN: r4417 - in remoting3/trunk: util/src/main/java/org/jboss/cx/remoting/util and 1 other directory.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Mon Jul 21 12:29:04 EDT 2008


Author: david.lloyd at jboss.com
Date: 2008-07-21 12:29:04 -0400 (Mon, 21 Jul 2008)
New Revision: 4417

Modified:
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java
   remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/CollectionUtil.java
Log:
Add a VERSION message for protocol forward compatibility

Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java	2008-07-21 14:10:27 UTC (rev 4416)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java	2008-07-21 16:29:04 UTC (rev 4417)
@@ -50,6 +50,7 @@
 import static org.jboss.cx.remoting.protocol.basic.MessageType.SERVICE_CLOSE;
 import static org.jboss.cx.remoting.protocol.basic.MessageType.REQUEST_FAILED;
 import static org.jboss.cx.remoting.protocol.basic.MessageType.CANCEL_ACK;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.VERSION;
 import org.jboss.cx.remoting.RemotingException;
 import org.jboss.cx.remoting.CloseHandler;
 import java.util.concurrent.ConcurrentMap;
@@ -59,6 +60,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.nio.ByteBuffer;
 import java.nio.BufferUnderflowException;
 import java.io.IOException;
@@ -69,6 +71,7 @@
 public final class BasicHandler implements IoHandler<AllocatedMessageChannel> {
 
     private static final Logger log = Logger.getLogger(BasicHandler.class);
+    private static final int LOCAL_VERSION = 1;
 
     // clients whose requests get forwarded to the remote side
     private final ConcurrentMap<Integer, RemoteClientEndpoint> remoteClients = concurrentMap();
@@ -84,10 +87,14 @@
 
     private final AtomicBoolean isnew = new AtomicBoolean(true);
     private volatile AllocatedMessageChannel channel;
+    private volatile int remoteVersion;
     private final Executor executor;
     private final MarshallerFactory<ByteBuffer> marshallerFactory;
     private final ObjectResolver resolver;
     private final ClassLoader classLoader;
+    private List<String> localMarshallerList = Collections.singletonList("java-serialization");
+    private volatile String marshallerType;
+    private final AtomicBoolean initialized = new AtomicBoolean(false);
 
     public BasicHandler(final boolean server, final BufferAllocator<ByteBuffer> allocator, final Executor executor, final MarshallerFactory<ByteBuffer> marshallerFactory, final ServiceRegistry registry) {
         this.server = server;
@@ -119,6 +126,19 @@
         if (isnew.getAndSet(false)) {
             this.channel = channel;
         }
+        final ByteBuffer buffer = allocator.allocate();
+        buffer.put((byte) VERSION);
+        buffer.putInt(LOCAL_VERSION);
+        writeUTFZ(buffer, CollectionUtil.join(",", localMarshallerList));
+        buffer.flip();
+        try {
+            registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+        } catch (InterruptedException e) {
+            log.error("Interrupted while sending intial version message");
+            IoUtils.safeClose(channel);
+            Thread.currentThread().interrupt();
+            return;
+        }
         channel.resumeReads();
     }
 
@@ -136,8 +156,37 @@
                 return;
             }
             int msgType = buffer.get() & 0xff;
+            if (initialized.getAndSet(true) != (msgType != 0)) {
+                log.error("Expected a version message; closing connection");
+                IoUtils.safeClose(channel);
+                return;
+            }
             log.trace("Received message %s, type %d", buffer, Integer.valueOf(msgType));
             switch (msgType) {
+                case VERSION: {
+                    // participants always choose the lowest version number
+                    // since we only support one version (0), we don't do anything with the value
+                    buffer.getInt();
+                    // Select the client's most preferred marshaling method that the server supports
+                    final String marshallerList = readUTFZ(buffer);
+                    final Iterable<String> remoteMarshallerList = CollectionUtil.split(",", marshallerList);
+                    final Iterable<String> clientList = server ? remoteMarshallerList : localMarshallerList;
+                    final Iterable<String> serverList = server ? localMarshallerList : remoteMarshallerList;
+                    for (final String clientSuggestion : clientList) {
+                        for (final String serverSuggestion : serverList) {
+                            if (clientSuggestion.equals(serverSuggestion)) {
+                                marshallerType = clientSuggestion;
+                                log.trace("Chose marshaller type '%s'", marshallerType);
+                            }
+                        }
+                    }
+                    if (marshallerType == null) {
+                        log.error("Could not agree on a marshaller type; closing connection");
+                        IoUtils.safeClose(channel);
+                        return;
+                    }
+                    break;
+                }
                 case REQUEST_ONEWAY: {
                     final int clientId = buffer.getInt();
                     final Handle<RemoteClientEndpoint> handle = getForwardedClient(clientId);
@@ -286,6 +335,11 @@
             return;
         } catch (BufferUnderflowException e) {
             log.error(e, "Malformed packet");
+//        } catch (InterruptedException e) {
+//            log.error(e, "Read thread interrupted, closing channel");
+//            IoUtils.safeClose(channel);
+//            Thread.currentThread().interrupt();
+//            return;
         } catch (Throwable t) {
             log.error(t, "Handler failed");
         }
@@ -456,6 +510,98 @@
         }
     }
 
+    private int writeUTFZ(ByteBuffer buffer, CharSequence s) {
+        final int len = s.length();
+        for (int i = 0; i < len; i++) {
+            char c = s.charAt(i);
+            if (1 <= c && c < 0x80) {
+                if (buffer.hasRemaining()) {
+                    buffer.put((byte) c);
+                } else {
+                    return i;
+                }
+            } else if (c < 0x0800) {
+                if (buffer.remaining() >= 2) {
+                    buffer.put((byte) (0xc0 | (c >> 6)));
+                    buffer.put((byte) (0x80 | (c & 0x3f)));
+                } else {
+                    return i;
+                }
+            } else {
+                if (buffer.remaining() >= 3) {
+                    buffer.put((byte) (0xe0 | (c >> 12)));
+                    buffer.put((byte) (0x80 | ((c >> 6) & 0x3f)));
+                    buffer.put((byte) (0x80 | (c & 0x3f)));
+                } else {
+                    return i;
+                }
+            }
+        }
+        if (buffer.hasRemaining()) {
+            buffer.put((byte) 0);
+            return -1;
+        } else {
+            return len;
+        }
+    }
+
+    // Reader utils
+
+    private String readUTFZ(ByteBuffer buffer) {
+        StringBuilder builder = new StringBuilder();
+        int state = 0, a = 0;
+        while (buffer.hasRemaining()) {
+            final int v = buffer.get() & 0xff;
+            switch (state) {
+                case 0: {
+                    if (v == 0) {
+                        return builder.toString();
+                    } else if (v < 128) {
+                        builder.append((char) v);
+                    } else if (192 <= v && v < 224) {
+                        a = v << 6;
+                        state = 1;
+                    } else if (224 <= v && v < 232) {
+                        a = v << 12;
+                        state = 2;
+                    } else {
+                        builder.append('?');
+                    }
+                    break;
+                }
+                case 1: {
+                    if (v == 0) {
+                        builder.append('?');
+                        return builder.toString();
+                    } else if (128 <= v && v < 192) {
+                        a |= v & 0x3f;
+                        builder.append((char) a);
+                    } else {
+                        builder.append('?');
+                    }
+                    state = 0;
+                    break;
+                }
+                case 2: {
+                    if (v == 0) {
+                        builder.append('?');
+                        return builder.toString();
+                    } else if (128 <= v && v < 192) {
+                        a |= (v & 0x3f) << 6;
+                        state = 1;
+                    } else {
+                        builder.append('?');
+                        state = 0;
+                    }
+                    break;
+                }
+                default:
+                    throw new IllegalStateException("wrong state");
+            }
+        }
+        return builder.toString();
+    }
+
     // client endpoint
 
     private final class RemoteClientEndpointImpl extends AbstractAutoCloseable<RemoteClientEndpoint> implements RemoteClientEndpoint {

Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java	2008-07-21 14:10:27 UTC (rev 4416)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java	2008-07-21 16:29:04 UTC (rev 4417)
@@ -26,18 +26,18 @@
  *
  */
 public final class MessageType {
-    //
-    public static final int REQUEST_ONEWAY     = 0;
-    public static final int REQUEST            = 1;
-    public static final int REPLY              = 2;
-    public static final int CANCEL_REQUEST     = 3;
-    public static final int CANCEL_ACK         = 4;
-    public static final int REQUEST_FAILED     = 5;
+    public static final int VERSION            = 0;
+    public static final int REQUEST_ONEWAY     = 1;
+    public static final int REQUEST            = 2;
+    public static final int REPLY              = 3;
+    public static final int CANCEL_REQUEST     = 4;
+    public static final int CANCEL_ACK         = 5;
+    public static final int REQUEST_FAILED     = 6;
     // Remote side called .close() on a forwarded RemoteClientEndpoint
-    public static final int CLIENT_CLOSE       = 6;
+    public static final int CLIENT_CLOSE       = 7;
     // Remote side called .close() on a forwarded RemoteClientEndpoint
-    public static final int CLIENT_OPEN        = 7;
-    public static final int SERVICE_CLOSE      = 8;
+    public static final int CLIENT_OPEN        = 8;
+    public static final int SERVICE_CLOSE      = 9;
 
     private MessageType() {
     }

Modified: remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/CollectionUtil.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/CollectionUtil.java	2008-07-21 14:10:27 UTC (rev 4416)
+++ remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/CollectionUtil.java	2008-07-21 16:29:04 UTC (rev 4417)
@@ -395,6 +395,26 @@
         };
     }
 
+    /**
+     * Join a series of character sequences using a delimiter.
+     *
+     * @param delimiter the delimiter
+     * @param strings the strings
+     * @return a joined string
+     */
+    public static String join(final CharSequence delimiter, final Iterable<? extends CharSequence> strings) {
+        StringBuilder builder = new StringBuilder();
+        Iterator<? extends CharSequence> it = strings.iterator();
+        while (it.hasNext()) {
+            CharSequence s = it.next();
+            builder.append(s);
+            if (it.hasNext()) {
+                builder.append(delimiter);
+            }
+        }
+        return builder.toString();
+    }
+
     @SuppressWarnings({ "unchecked" })
     private static <T> T[] unroll(final Iterator<? extends T> iterator, final Class<T> type, final int c) {
         if (iterator.hasNext()) {




More information about the jboss-remoting-commits mailing list