[jboss-remoting-commits] JBoss Remoting SVN: r5501 - in remoting3-multiplex/trunk: src and 6 other directories.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Wed Sep 16 13:44:20 EDT 2009


Author: david.lloyd at jboss.com
Date: 2009-09-16 13:44:19 -0400 (Wed, 16 Sep 2009)
New Revision: 5501

Added:
   remoting3-multiplex/trunk/src/
   remoting3-multiplex/trunk/src/main/
   remoting3-multiplex/trunk/src/main/java/
   remoting3-multiplex/trunk/src/main/java/org/
   remoting3-multiplex/trunk/src/main/java/org/jboss/
   remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/
   remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/
   remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/AtomicArrayReferenceArray.java
   remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConfigParam.java
   remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConnectionConfiguration.java
   remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/EstablishedConnection.java
   remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/InitialReadHandlerImpl.java
   remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/Loggers.java
   remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MessageType.java
   remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionHandler.java
   remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionProvider.java
   remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexOptions.java
   remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRemoteRequestContext.java
   remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandler.java
   remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandlerConnector.java
   remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/PermitManager.java
   remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReadHandlerImpl.java
   remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReceivingByteInput.java
   remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteClient.java
   remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/SendingByteOutput.java
   remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/package-info.java
Log:
Initial import of new impl

Added: remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/AtomicArrayReferenceArray.java
===================================================================
--- remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/AtomicArrayReferenceArray.java	                        (rev 0)
+++ remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/AtomicArrayReferenceArray.java	2009-09-16 17:44:19 UTC (rev 5501)
@@ -0,0 +1,170 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import java.lang.reflect.Array;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+final class AtomicArrayReferenceArray<V> extends AtomicReferenceArray<V[]> {
+
+    private final V[] emptyArray;
+    private final Class<V> componentType;
+
+    private static final long serialVersionUID = -4748109403436000080L;
+
+    private AtomicArrayReferenceArray(Class<V> componentType, int length) {
+        super(length);
+        this.componentType = componentType;
+        emptyArray = newInstance(componentType, 0);
+    }
+
+    public static <V> AtomicArrayReferenceArray<V> create(AtomicReferenceArray<V[]> array, Class<V> componentType) {
+        return new AtomicArrayReferenceArray<V>(array, componentType);
+    }
+
+    @SuppressWarnings({ "unchecked" })
+    private static <V> V[] copyOf(final Class<V> componentType, V[] old, int newLen) {
+        final V[] target = newInstance(componentType, newLen);
+        System.arraycopy(old, 0, target, 0, Math.min(old.length, newLen));
+        return target;
+    }
+
+    public void add(int idx, V value) {
+        for (;;) {
+            final V[] oldVal = get(idx);
+            final int oldLen = oldVal.length;
+            final V[] newVal = copyOf(componentType, oldVal, oldLen + 1);
+            newVal[oldLen] = value;
+            if (compareAndSet(idx, oldVal, newVal)) {
+                return;
+            }
+        }
+    }
+
+    public boolean addIfAbsent(int idx, V value, boolean identity) {
+        for (;;) {
+            final V[] oldVal = get(idx);
+            final int oldLen = oldVal.length;
+            if (identity || value == null) {
+                for (int i = 0; i < oldLen; i++) {
+                    if (oldVal[i] == value) {
+                        return false;
+                    }
+                }
+            } else {
+                for (int i = 0; i < oldLen; i++) {
+                    if (value.equals(oldVal[i])) {
+                        return false;
+                    }
+                }
+            }
+            final V[] newVal = copyOf(componentType, oldVal, oldLen + 1);
+            newVal[oldLen] = value;
+            if (compareAndSet(idx, oldVal, newVal)) {
+                return true;
+            }
+        }
+    }
+
+    public boolean remove(int idx, V value, boolean identity) {
+        for (;;) {
+            final V[] oldVal = get(idx);
+            final int oldLen = oldVal.length;
+            if (oldLen == 0) {
+                return false;
+            } else {
+                int index = -1;
+                if (identity || value == null) {
+                    for (int i = 0; i < oldLen; i++) {
+                        if (oldVal[i] == value) {
+                            index = i;
+                            break;
+                        }
+                    }
+                } else {
+                    for (int i = 0; i < oldLen; i++) {
+                        if (value.equals(oldVal[i])) {
+                            index = i;
+                            break;
+                        }
+                    }
+                }
+                if (index == -1) {
+                    return false;
+                }
+                final V[] newVal = newInstance(componentType, oldLen - 1);
+                System.arraycopy(oldVal, 0, newVal, 0, index);
+                System.arraycopy(oldVal, index + 1, newVal, index, oldLen - index - 1);
+                if (compareAndSet(idx, oldVal, newVal)) {
+                    return true;
+                }
+            }
+        }
+    }
+
+    public int removeAll(int idx, V value, boolean identity) {
+        for (;;) {
+            final V[] oldVal = get(idx);
+            final int oldLen = oldVal.length;
+            if (oldLen == 0) {
+                return 0;
+            } else {
+                final boolean[] removeSlots = new boolean[oldLen];
+                int removeCount = 0;
+                if (identity || value == null) {
+                    for (int i = 0; i < oldLen; i++) {
+                        if (oldVal[i] == value) {
+                            removeSlots[i] = true;
+                            removeCount++;
+                        }
+                    }
+                } else {
+                    for (int i = 0; i < oldLen; i++) {
+                        if (value.equals(oldVal[i])) {
+                            removeSlots[i] = true;
+                            removeCount++;
+                        }
+                    }
+                }
+                if (removeCount == 0) {
+                    return 0;
+                }
+                final int newLen = oldLen - removeCount;
+                final V[] newVal;
+                if (newLen == 0) {
+                    newVal = emptyArray;
+                } else {
+                    newVal = newInstance(componentType, newLen);
+                    for (int i = 0, j = 0; i < oldLen; i ++) {
+                        if (! removeSlots[i]) {
+                            newVal[j++] = oldVal[i];
+                        }
+                    }
+                }
+                if (compareAndSet(idx, oldVal, newVal)) {
+                    return removeCount;
+                }
+            }
+        }
+    }
+}

Added: remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConfigParam.java
===================================================================
--- remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConfigParam.java	                        (rev 0)
+++ remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConfigParam.java	2009-09-16 17:44:19 UTC (rev 5501)
@@ -0,0 +1,17 @@
+package org.jboss.remoting3.multiplex;
+
+/**
+ * Protocol message configuration parameters used in protocol negotiation phase.  Do not delete, insert, or change the order of these
+ * enum constants.  New constants must be added to the end.
+ */
+enum ConfigParam {
+    MAX_SUPPORTED_VERSION,
+    MAX_RECEIVE_SIZE,
+    MAX_TRANSMIT_SIZE,
+    MAX_RECEIVE_WINDOW_SIZE,
+    MAX_TRANSMIT_WINDOW_SIZE,
+    MAX_OUTBOUND_REQUESTS,
+    MAX_INBOUND_REQUESTS,
+    MAX_OUTBOUND_CLIENTS,
+    MAX_INBOUND_CLIENTS,
+}

Added: remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConnectionConfiguration.java
===================================================================
--- remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConnectionConfiguration.java	                        (rev 0)
+++ remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConnectionConfiguration.java	2009-09-16 17:44:19 UTC (rev 5501)
@@ -0,0 +1,160 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+/**
+ * A configuration object for the multiplex protocol.
+ */
+public final class ConnectionConfiguration implements Cloneable {
+
+    private int linkMetric = 100;
+    private int maximumReceiveSize = 0x200;
+    private int maximumTransmitSize = 0x200;
+    private int receiveWindowSize = 5;
+    private int transmitWindowSize = 5;
+    private int maximumOutboundClients = 0x10;
+    private int maximumInboundClients = 0x10;
+    private int maximumOutboundRequests = 0x10;
+    private int maximumInboundRequests = 0x10;
+    private int maximumOutboundStreams = 0x20;
+    private int maximumInboundStreams = 0x20;
+
+    /**
+     * Construct a new instance.
+     */
+    public ConnectionConfiguration() {
+    }
+
+    /**
+     * Get the link metric to assign to this multiplex connection.
+     *
+     * @return the link metric
+     */
+    public int getLinkMetric() {
+        return linkMetric;
+    }
+
+    /**
+     * Set the link metric to assign to this multiplex connection.
+     *
+     * @param linkMetric the link metric
+     */
+    public void setLinkMetric(final int linkMetric) {
+        this.linkMetric = linkMetric;
+    }
+
+
+    public int getMaximumReceiveSize() {
+        return maximumReceiveSize;
+    }
+
+    public void setMaximumReceiveSize(final int maximumReceiveSize) {
+        this.maximumReceiveSize = maximumReceiveSize;
+    }
+
+    public int getMaximumTransmitSize() {
+        return maximumTransmitSize;
+    }
+
+    public void setMaximumTransmitSize(final int maximumTransmitSize) {
+        this.maximumTransmitSize = maximumTransmitSize;
+    }
+
+    public int getReceiveWindowSize() {
+        return receiveWindowSize;
+    }
+
+    public void setReceiveWindowSize(final int receiveWindowSize) {
+        this.receiveWindowSize = receiveWindowSize;
+    }
+
+    public int getTransmitWindowSize() {
+        return transmitWindowSize;
+    }
+
+    public void setTransmitWindowSize(final int transmitWindowSize) {
+        this.transmitWindowSize = transmitWindowSize;
+    }
+
+    public int getMaximumOutboundClients() {
+        return maximumOutboundClients;
+    }
+
+    public void setMaximumOutboundClients(final int maximumOutboundClients) {
+        this.maximumOutboundClients = maximumOutboundClients;
+    }
+
+    public int getMaximumInboundClients() {
+        return maximumInboundClients;
+    }
+
+    public void setMaximumInboundClients(final int maximumInboundClients) {
+        this.maximumInboundClients = maximumInboundClients;
+    }
+
+    public int getMaximumOutboundRequests() {
+        return maximumOutboundRequests;
+    }
+
+    public void setMaximumOutboundRequests(final int maximumOutboundRequests) {
+        this.maximumOutboundRequests = maximumOutboundRequests;
+    }
+
+    public int getMaximumInboundRequests() {
+        return maximumInboundRequests;
+    }
+
+    public void setMaximumInboundRequests(final int maximumInboundRequests) {
+        this.maximumInboundRequests = maximumInboundRequests;
+    }
+
+    public int getMaximumOutboundStreams() {
+        return maximumOutboundStreams;
+    }
+
+    public void setMaximumOutboundStreams(final int maximumOutboundStreams) {
+        this.maximumOutboundStreams = maximumOutboundStreams;
+    }
+
+    public int getMaximumInboundStreams() {
+        return maximumInboundStreams;
+    }
+
+    public void setMaximumInboundStreams(final int maximumInboundStreams) {
+        this.maximumInboundStreams = maximumInboundStreams;
+    }
+
+    /**
+     * Clone this configuration.
+     *
+     * @return the cloned configuration
+     */
+    public ConnectionConfiguration clone() {
+        try {
+            final ConnectionConfiguration clone = (ConnectionConfiguration) super.clone();
+            return clone;
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file

Added: remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/EstablishedConnection.java
===================================================================
--- remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/EstablishedConnection.java	                        (rev 0)
+++ remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/EstablishedConnection.java	2009-09-16 17:44:19 UTC (rev 5501)
@@ -0,0 +1,237 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.Iterator;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.remoting3.IndeterminateOutcomeException;
+import org.jboss.remoting3.spi.AbstractHandleableCloseable;
+import org.jboss.remoting3.spi.Cancellable;
+import org.jboss.remoting3.spi.ConnectionHandler;
+import org.jboss.remoting3.spi.RemoteRequestContext;
+import org.jboss.remoting3.spi.ReplyHandler;
+import org.jboss.remoting3.spi.RequestHandler;
+import org.jboss.remoting3.spi.RequestHandlerConnector;
+import org.jboss.remoting3.spi.Result;
+import org.jboss.remoting3.spi.SpiUtils;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.Buffers;
+import org.jboss.xnio.Pool;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.xnio.log.Logger;
+
+/**
+ *
+ */
+final class EstablishedConnection extends AbstractHandleableCloseable implements ConnectionHandler {
+    private static final Logger log = Loggers.MAIN_LOGGER;
+
+    //--== Connection configuration items ==--
+    private final Pool<Marshaller> marshallerPool;
+    private final Pool<Unmarshaller> unmarshallerPool;
+    private final int linkMetric;
+    private final Executor executor;
+    // buffer allocator for outbound message assembly
+    private final BufferAllocator<ByteBuffer> allocator;
+
+    // running on remote node
+    private final AtomicReferenceArray<ReplyHandler> remoteRequests;
+    // sequence for outbound remote requests
+    private final PermitManager requestPermits;
+
+    // running on local node (key comes from remote side)
+    private final AtomicReferenceArray<RemoteRequestContext> localRequests;
+
+    // clients whose requests get forwarded to the remote side
+    private final AtomicReferenceArray<RemoteClient> remoteClients;
+    // sequence for clients whose requests get forwarded to the remote side
+    // LOW BIT == 0
+    private final PermitManager remoteClientPermits;
+
+    // clients whose requests are handled on this side (key comes from remote side)
+    private final AtomicReferenceArray<RequestHandler> requestedClients;
+    // LOW BIT == 1;
+    private final PermitManager forwardedClientPermits;
+
+    // the data channel
+    private final AllocatedMessageChannel channel;
+    // the local connection handler
+    private final ConnectionHandler localConnectionHandler;
+
+    private final int sendWindowSize;
+
+    private static final ThreadLocal<EstablishedConnection> currentConnection = new ThreadLocal<EstablishedConnection>();
+
+    public EstablishedConnection(final AllocatedMessageChannel channel, final ConnectionConfiguration configuration, final ConnectionHandler localConnectionHandler) {
+        super(configuration.getExecutor());
+        this.channel = channel;
+        linkMetric = configuration.getLinkMetric();
+        executor = configuration.getExecutor();
+        if (executor == null) {
+            throw new NullPointerException("executor is null");
+        }
+        allocator = Buffers.createHeapByteBufferAllocator(configuration.getMaximumTransmitSize());
+        marshallerPool = configuration.getMarshallerPool();
+        unmarshallerPool = configuration.getUnmarshallerPool();
+        final int maximumInboundRequests = configuration.getMaximumInboundRequests();
+        remoteRequests = new AtomicReferenceArray<ReplyHandler>(maximumInboundRequests);
+        requestPermits = new PermitManager(maximumInboundRequests);
+        final int maximumOutboundRequests = configuration.getMaximumOutboundRequests();
+        localRequests = new AtomicReferenceArray<RemoteRequestContext>(maximumOutboundRequests);
+        final int maximumOutboundClients = configuration.getMaximumOutboundClients();
+        remoteClients = new AtomicReferenceArray<RemoteClient>(maximumOutboundClients);
+        remoteClientPermits = new PermitManager(maximumOutboundClients);
+        final int maximumInboundClients = configuration.getMaximumInboundClients();
+        requestedClients = new AtomicReferenceArray<RequestHandler>(maximumInboundClients);
+        forwardedClientPermits = new PermitManager(maximumInboundClients);
+        sendWindowSize = configuration.getTransmitWindowSize();
+        this.localConnectionHandler = localConnectionHandler;
+    }
+
+    static EstablishedConnection getCurrent() {
+        return currentConnection.get();
+    }
+
+    void setCurrent() {
+        if (currentConnection.get() != null) {
+            throw new IllegalStateException("Reentrant setCurrent()");
+        }
+        currentConnection.set(this);
+    }
+
+    void clearCurrent() {
+        if (currentConnection.get() != this) {
+            throw new IllegalStateException("clearCurrent() from wrong context");
+        }
+        currentConnection.set(null);
+    }
+
+    // sequence methods
+
+    protected Executor getExecutor() {
+        return executor;
+    }
+
+    int getLinkMetric() {
+        return linkMetric;
+    }
+
+    BufferAllocator<ByteBuffer> getAllocator() {
+        return allocator;
+    }
+
+    AllocatedMessageChannel getChannel() {
+        return channel;
+    }
+
+    Pool<Marshaller> getMarshallerPool() {
+        return marshallerPool;
+    }
+
+    Pool<Unmarshaller> getUnmarshallerPool() {
+        return unmarshallerPool;
+    }
+
+    ConnectionHandler getLocalConnectionHandler() {
+        return localConnectionHandler;
+    }
+
+    private static final IoFuture.Notifier<RequestHandler, Result<RequestHandler>> RHS_RESULT_NOTIFIER =
+        new IoFuture.HandlingNotifier<RequestHandler, Result<RequestHandler>>() {
+            public void handleCancelled(final Result<RequestHandler> attachment) {
+                log.trace("Failed to open remote service (cancelled)");
+                attachment.setCancelled();
+            }
+
+            public void handleFailed(final IOException exception, final Result<RequestHandler> attachment) {
+                log.trace("Failed to open remote service: %s", exception);
+                attachment.setException(exception);
+            }
+
+            public void handleDone(final RequestHandler result, final Result<RequestHandler> attachment) {
+                log.trace("Opened %s", result);
+                attachment.setResult(result);
+            }
+        };
+
+    protected void closeAction() {
+        // just to make sure...
+        IoUtils.safeClose(channel);
+        final IndeterminateOutcomeException ioe = new IndeterminateOutcomeException("The connection was closed");
+        // Things running remotely
+        for (int i = 0; i < remoteRequests.length(); i ++) {
+            SpiUtils.safeHandleException(remoteRequests.getAndSet(i, null), ioe);
+        }
+        for (RemoteClient x : clearing(remoteClients)) {
+            IoUtils.safeClose(x.getRequestHandler());
+        }
+        // Things running locally
+        for (RemoteRequestContext localRequest : clearing(localRequests)) {
+            localRequest.cancel();
+        }
+    }
+
+    public String toString() {
+        return "multiplex connection <" + Integer.toHexString(hashCode()) + "> via " + channel;
+    }
+
+    public Cancellable open(final String serviceName, final String groupName, final Result<RequestHandler> result) {
+        return null;
+    }
+
+    public RequestHandlerConnector createConnector(final RequestHandler localHandler) {
+        return null;
+    }
+
+    int getSendWindowSize() {
+        return sendWindowSize;
+    }
+
+    private static <T> Iterable<T> clearing(final AtomicReferenceArray<T> array) {
+        return new Iterable<T>() {
+
+            public Iterator<T> iterator() {
+                return new Iterator<T>() {
+                    private int idx;
+                    public boolean hasNext() {
+                        return idx < array.length();
+                    }
+
+                    public T next() {
+                        return array.getAndSet(idx++, null);
+                    }
+
+                    public void remove() {
+                    }
+                };
+            }
+        };
+    }
+}
\ No newline at end of file

Added: remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/InitialReadHandlerImpl.java
===================================================================
--- remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/InitialReadHandlerImpl.java	                        (rev 0)
+++ remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/InitialReadHandlerImpl.java	2009-09-16 17:44:19 UTC (rev 5501)
@@ -0,0 +1,125 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import org.jboss.xnio.IoReadHandler;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.remoting3.spi.Result;
+import org.jboss.remoting3.spi.ConnectionHandlerFactory;
+import org.jboss.remoting3.ProtocolException;
+import java.nio.ByteBuffer;
+import java.nio.BufferUnderflowException;
+import java.io.IOException;
+
+/**
+ * Read handler for initial (negotiation) phase of connection.
+ */
+final class InitialReadHandlerImpl implements IoReadHandler<AllocatedMessageChannel> {
+
+    private final Result<ConnectionHandlerFactory> result;
+
+    private static final int GREETING = 0x484a524d;
+    private static final int MAX_VERSION = 0;
+
+    private final ConnectionConfiguration connectionConfiguration;
+
+    InitialReadHandlerImpl(final Result<ConnectionHandlerFactory> result, final ConnectionConfiguration connectionConfiguration) {
+        this.result = result;
+        this.connectionConfiguration = connectionConfiguration;
+    }
+
+    public void handleReadable(final AllocatedMessageChannel channel) {
+        try {
+            final ByteBuffer buffer = channel.receive();
+            if (buffer.getInt() != GREETING) {
+                throw new IOException("Incorrect greeting");
+            }
+            final ConfigParam[] paramIds = ConfigParam.values();
+            final ConnectionConfiguration conf = connectionConfiguration;
+            final int count = buffer.getShort() & 0xffff;
+            for (int i = 0; i < count; i ++) {
+                final int id = buffer.get() & 0xff;
+                if (id > paramIds.length) {
+                    continue;
+                }
+                final ConfigParam configParam = paramIds[id];
+                switch (configParam) {
+                    case MAX_SUPPORTED_VERSION: {
+                        // since we're version 0, the remote side must support us, and we support nothing else, so...
+                        break;
+                    }
+                    case MAX_RECEIVE_SIZE: {
+                        conf.setMaximumTransmitSize(Math.min(validate(ConfigParam.MAX_RECEIVE_SIZE, buffer.getInt(), 0x100, 0x100000), conf.getMaximumTransmitSize()));
+                        break;
+                    }
+                    case MAX_TRANSMIT_SIZE: {
+                        conf.setMaximumReceiveSize(Math.min(validate(ConfigParam.MAX_TRANSMIT_SIZE, buffer.getInt(), 0x100, 0x100000), conf.getMaximumReceiveSize()));
+                        break;
+                    }
+                    case MAX_RECEIVE_WINDOW_SIZE: {
+                        conf.setTransmitWindowSize(Math.min(validate(ConfigParam.MAX_RECEIVE_WINDOW_SIZE, buffer.getInt(), 0x2, 0x400), conf.getTransmitWindowSize()));
+                        break;
+                    }
+                    case MAX_TRANSMIT_WINDOW_SIZE: {
+                        conf.setReceiveWindowSize(Math.min(validate(ConfigParam.MAX_TRANSMIT_WINDOW_SIZE, buffer.getInt(), 0x2, 0x400), conf.getReceiveWindowSize()));
+                        break;
+                    }
+                    case MAX_OUTBOUND_REQUESTS: {
+                        conf.setMaximumInboundRequests(Math.min(validate(ConfigParam.MAX_OUTBOUND_REQUESTS, buffer.getInt(), 0, 0x100000), conf.getMaximumInboundRequests()));
+                        break;
+                    }
+                    case MAX_INBOUND_REQUESTS: {
+                        conf.setMaximumOutboundRequests(Math.min(validate(ConfigParam.MAX_INBOUND_REQUESTS, buffer.getInt(), 0, 0x100000), conf.getMaximumOutboundRequests()));
+                        break;
+                    }
+                    case MAX_OUTBOUND_CLIENTS: {
+                        conf.setMaximumInboundClients(Math.min(validate(ConfigParam.MAX_OUTBOUND_CLIENTS, buffer.getInt(), 0, 0x100000), conf.getMaximumInboundClients()));
+                        break;
+                    }
+                    case MAX_INBOUND_CLIENTS: {
+                        conf.setMaximumOutboundClients(Math.min(validate(ConfigParam.MAX_INBOUND_CLIENTS, buffer.getInt(), 0, 0x100000), conf.getMaximumOutboundClients()));
+                        break;
+                    }
+                    default: {
+                        // should not be possible
+                        throw new IllegalStateException();
+                    }
+                }
+            }
+        } catch (IOException e) {
+            IoUtils.safeClose(channel);
+            result.setException(e);
+        } catch (BufferUnderflowException e) {
+            IoUtils.safeClose(channel);
+            result.setException(new ProtocolException("Truncated greeting packet"));
+        }
+    }
+
+    private static int validate(Object field, int val, int min, int max) throws ProtocolException {
+        if (val >= min && val <= max) {
+            return val;
+        }
+        throw new ProtocolException("Value of " + field + " is out of range (expected " + min + " <= n <= " + max + ", got " + val + ")");
+    }
+}

Added: remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/Loggers.java
===================================================================
--- remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/Loggers.java	                        (rev 0)
+++ remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/Loggers.java	2009-09-16 17:44:19 UTC (rev 5501)
@@ -0,0 +1,37 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import org.jboss.xnio.log.Logger;
+
+final class Loggers {
+
+    private Loggers() {
+    }
+
+    private static final String MAIN_LOGGER_NAME = "org.jboss.remoting.multiplex";
+    private static final String REQUEST_HANDLER_LOGGER_NAME = MAIN_LOGGER_NAME + ".request-handler";
+
+    static final Logger REQUEST_HANDLER_LOGGER = Logger.getLogger(REQUEST_HANDLER_LOGGER_NAME);
+    static final Logger MAIN_LOGGER = Logger.getLogger(MAIN_LOGGER_NAME);
+}

Added: remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MessageType.java
===================================================================
--- remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MessageType.java	                        (rev 0)
+++ remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MessageType.java	2009-09-16 17:44:19 UTC (rev 5501)
@@ -0,0 +1,73 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+final class MessageType {
+    private MessageType() {}
+
+    public static final int FIRST       = 0x80;
+    public static final int LAST        = 0x40;
+
+    // Stream message data types
+
+    public static final int DATA        = 0x00;
+    public static final int CANCEL      = 0x01;
+    public static final int FINAL       = 0x02;
+
+    /**
+     * Request - multi-packet
+     */
+    public static final int REQUEST                     = 0x00;
+    public static final int REPLY                       = 0x01;
+    public static final int REQUEST_RECEIVE_FAILED      = 0x02;
+    public static final int REQUEST_FAILED              = 0x03;
+
+    /**
+     * Request receiver aborts transmission.
+     */
+    public static final int REMOTE_REQUEST_ABORT               = 0x10;
+    public static final int REMOTE_REPLY_ABORT                 = 0x11;
+    public static final int REMOTE_REQUEST_RECEIVE_FAILED_ABORT = 0x12;
+    public static final int REMOTE_REQUEST_FAILED_ABORT        = 0x13;
+
+    /**
+     * Request transmitter aborts transmission.
+     */
+    public static final int REQUEST_ABORT               = 0x20;
+    public static final int REPLY_ABORT                 = 0x21;
+    public static final int REQUEST_RECEIVE_FAILED_ABORT = 0x22;
+    public static final int REQUEST_FAILED_ABORT        = 0x23;
+
+    public static final int REQUEST_CANCEL              = 0x30;
+    public static final int REQUEST_CANCEL_ACK          = 0x31;
+    public static final int CLIENT_OPEN                 = 0x32;
+    public static final int CLIENT_OPEN_CANCEL          = 0x33;
+    public static final int CLIENT_OPEN_ACK             = 0x34;
+    public static final int CLIENT_OPEN_CANCEL_ACK      = 0x35;
+    public static final int CLIENT_NOT_FOUND            = 0x36;
+    public static final int CLIENT_REMOTE_CLOSE         = 0x37;
+    public static final int CLIENT_CLOSE                = 0x38;
+
+
+    
+}

Added: remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionHandler.java
===================================================================
--- remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionHandler.java	                        (rev 0)
+++ remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionHandler.java	2009-09-16 17:44:19 UTC (rev 5501)
@@ -0,0 +1,105 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.jboss.remoting3.spi.Cancellable;
+import org.jboss.remoting3.spi.ConnectionHandler;
+import org.jboss.remoting3.spi.RequestHandler;
+import org.jboss.remoting3.spi.RequestHandlerConnector;
+import org.jboss.remoting3.spi.Result;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.xnio.channels.Channels;
+import org.jboss.xnio.log.Logger;
+
+final class MultiplexConnectionHandler implements ConnectionHandler {
+    private static final Logger log = Logger.getLogger("org.jboss.remoting.multiplex");
+
+    private final EstablishedConnection establishedConnection;
+    private final Charset charset = Charset.forName("utf-8");
+
+    MultiplexConnectionHandler(final EstablishedConnection establishedConnection) {
+        this.establishedConnection = establishedConnection;
+    }
+
+    public Cancellable open(final String serviceName, final String groupName, final Result<RequestHandler> result) {
+        final EstablishedConnection establishedConnection = this.establishedConnection;
+        final int id = establishedConnection.nextRemoteClient();
+        final BufferAllocator<ByteBuffer> allocator = establishedConnection.getAllocator();
+        final ByteBuffer buffer = allocator.allocate();
+        final AllocatedMessageChannel channel = establishedConnection.getChannel();
+        final RemoteClient remoteClient = new RemoteClient(result, new MultiplexRequestHandler(id, establishedConnection));
+        establishedConnection.addOutstandingClient(id, remoteClient);
+        buffer.put((byte) MessageType.CLIENT_OPEN);
+        buffer.putInt(id);
+        final byte[] serviceNameBytes = serviceName.getBytes(charset);
+        buffer.putShort((short) serviceNameBytes.length);
+        buffer.put(serviceNameBytes);
+        final byte[] groupNameBytes = groupName.getBytes(charset);
+        buffer.putShort((short) groupNameBytes.length);
+        buffer.put(groupNameBytes);
+        buffer.flip();
+        try {
+            Channels.sendBlocking(channel, buffer);
+        } catch (IOException e) {
+            result.setException(e);
+            establishedConnection.removeRemoteClient(id);
+        }
+        return new Cancellable() {
+            private final AtomicBoolean cancelled = new AtomicBoolean();
+
+            public void cancel() {
+                if (cancelled.getAndSet(true)) {
+                    // cancel already sent
+                    return;
+                }
+                if (remoteClient.getRequestHandler() != null) {
+                    // already done; don't waste the bandwidth
+                    return;
+                }
+                final ByteBuffer buffer = allocator.allocate();
+                buffer.put((byte) MessageType.CLIENT_OPEN_CANCEL);
+                buffer.putInt(id);
+                buffer.flip();
+                try {
+                    Channels.sendBlocking(channel, buffer);
+                } catch (IOException e) {
+                    log.trace("Sending a request to cancel a client open failed");
+                }
+            }
+        };
+    }
+
+    public RequestHandlerConnector createConnector(final RequestHandler localHandler) {
+        final int id = establishedConnection.addLocalClient(localHandler);
+        return null;
+    }
+
+    public void close() throws IOException {
+        establishedConnection.close();
+    }
+}

Added: remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionProvider.java
===================================================================
--- remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionProvider.java	                        (rev 0)
+++ remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionProvider.java	2009-09-16 17:44:19 UTC (rev 5501)
@@ -0,0 +1,70 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import org.jboss.remoting3.spi.ConnectionProvider;
+import org.jboss.remoting3.spi.Cancellable;
+import org.jboss.remoting3.spi.ConnectionHandlerFactory;
+import org.jboss.remoting3.spi.Result;
+import org.jboss.remoting3.spi.SpiUtils;
+import org.jboss.remoting3.OptionMap;
+import org.jboss.xnio.TcpConnector;
+import org.jboss.xnio.FutureConnection;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.channels.TcpChannel;
+import java.net.URI;
+import java.net.InetSocketAddress;
+import java.io.IOException;
+
+public final class MultiplexConnectionProvider implements ConnectionProvider {
+
+    private final TcpConnector tcpConnector;
+
+    public MultiplexConnectionProvider(final TcpConnector tcpConnector) {
+        this.tcpConnector = tcpConnector;
+    }
+
+    public Cancellable connect(final URI uri, final OptionMap connectOptions, final Result<ConnectionHandlerFactory> result) throws IllegalArgumentException {
+        final String host = uri.getHost();
+        final int port = uri.getPort();
+        if (port == -1) {
+            throw new IllegalArgumentException("A valid port number must be explicitly specified");
+        }
+        // TODO - change this to use async DNS via XNIO DNS
+        final FutureConnection<InetSocketAddress,TcpChannel> connection = tcpConnector.connectTo(new InetSocketAddress(host, port), null);
+        connection.addNotifier(new IoFuture.HandlingNotifier<TcpChannel, Result<ConnectionHandlerFactory>>() {
+            public void handleFailed(final IOException exception, final Result<ConnectionHandlerFactory> attachment) {
+                attachment.setException(exception);
+            }
+
+            public void handleDone(final TcpChannel result, final Result<ConnectionHandlerFactory> attachment) {
+                attachment.setResult(null);
+            }
+
+            public void handleCancelled(final Result<ConnectionHandlerFactory> attachment) {
+                attachment.setCancelled();
+            }
+        }, result);
+        return SpiUtils.cancellable(connection);
+    }
+}

Added: remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexOptions.java
===================================================================
--- remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexOptions.java	                        (rev 0)
+++ remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexOptions.java	2009-09-16 17:44:19 UTC (rev 5501)
@@ -0,0 +1,108 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import org.jboss.remoting3.Option;
+
+/**
+ * Options which may be used to configure a multiplex connection.
+ */
+public final class MultiplexOptions {
+
+    private MultiplexOptions() {
+    }
+
+    /**
+     * The maximum message receive size to specify to the remote side.  This is the size of the largest protocol message
+     * that can be sent from the remote side to the local side.  The remote side may elect to send messages of a smaller
+     * size, but it may not send larger messages (such messages will cause the connection to close with a protocol error).
+     * This setting does <b>not</b> affect the maximum request or reply size.  This value should be set to at least {@code 256}
+     * if specified.
+     */
+    public static final Option<Integer> MAX_RECEIVE_SIZE = Option.simple(MultiplexOptions.class, "MAX_RECEIVE_SIZE", Integer.class);
+
+    /**
+     * The maximum message size that this side will send to a peer.  If the peer specifies a smaller maximum receive size,
+     * that size will be used; otherwise this value will be.  This value should be set to at least {@code 256} if specified.
+     *
+     * @see #MAX_RECEIVE_SIZE
+     */
+    public static final Option<Integer> MAX_TRANSMIT_SIZE = Option.simple(MultiplexOptions.class, "MAX_TRANSMIT_SIZE", Integer.class);
+
+    /**
+     * The maximum send window size that will be used by this end of the connection.  The peer may specify a smaller size,
+     * in which case that value will be used.  Increasing this size may decrease latency at the cost of increased memory demands.
+     * The minimum allowed value for this option is {@code 2}.
+     */
+    public static final Option<Integer> MAX_SEND_WINDOW_SIZE = Option.simple(MultiplexOptions.class, "MAX_SEND_WINDOW_SIZE", Integer.class);
+
+    /**
+     * The maximum receive window size that this end of the connection can support.  The peer will not send unacknowledged messages
+     * beyond this window size.  The minimum allowed value for this option is {@code 2}.
+     */
+    public static final Option<Integer> MAX_RECEIVE_WINDOW_SIZE = Option.simple(MultiplexOptions.class, "MAX_RECEIVE_WINDOW_SIZE", Integer.class);
+
+    /**
+     * The maximum number of concurrent inbound requests that this end of the connection will be configured to support.  Using a larger number
+     * increases concurrency at the cost of additional memory demands.
+     */
+    public static final Option<Integer> MAX_INBOUND_REQUESTS = Option.simple(MultiplexOptions.class, "MAX_INBOUND_REQUESTS", Integer.class);
+
+    /**
+     * The maximum number of concurrent outbound requests that this end of the connection will be configured to support.  Using a larger number
+     * increases concurrency at the cost of additional memory demands.
+     */
+    public static final Option<Integer> MAX_OUTBOUND_REQUESTS = Option.simple(MultiplexOptions.class, "MAX_OUTBOUND_REQUESTS", Integer.class);
+
+    /**
+     * The maximum number of concurrently active inbound clients that this end of the connection will be configured to accept.  Using a larger number
+     * increases concurrency at the cost of additional memory demands.
+     */
+    public static final Option<Integer> MAX_INBOUND_CLIENTS = Option.simple(MultiplexOptions.class, "MAX_INBOUND_CLIENTS", Integer.class);
+
+    /**
+     * The maximum number of concurrently active outbound clients that this end of the connection will be configured to support.  Using a larger number
+     * increases concurrency at the cost of additional memory demands.
+     */
+    public static final Option<Integer> MAX_OUTBOUND_CLIENTS = Option.simple(MultiplexOptions.class, "MAX_OUTBOUND_CLIENTS", Integer.class);
+
+    /**
+     * The maximum number of inbound streams.  This is the number of streams that the remote side is allowed to initiate, regardless of
+     * the actual direction of flow of the stream itself.
+     */
+    public static final Option<Integer> MAX_INBOUND_STREAMS = Option.simple(MultiplexOptions.class, "MAX_INBOUND_STREAMS", Integer.class);
+
+    /**
+     * The maximum number of outbound streams.  This is the number of streams that the local side is allowed to initiate, regardless of
+     * the actual direction of flow of the stream itself.
+     */
+    public static final Option<Integer> MAX_OUTBOUND_STREAMS = Option.simple(MultiplexOptions.class, "MAX_OUTBOUND_STREAMS", Integer.class);
+
+    /**
+     * The number of milliseconds to wait before sending accumulated ACK messages.  Higher values increase bandwidth at the cost of latency.
+     * Setting this value to 0 will immediately send ACK messages.
+     */
+    public static final Option<Integer> ACK_CORK_MILLISECONDS = Option.simple(MultiplexOptions.class, "ACK_CORK_MILLISECONDS", Integer.class);
+
+    // todo - authentication/security mechanisms
+}

Added: remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRemoteRequestContext.java
===================================================================
--- remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRemoteRequestContext.java	                        (rev 0)
+++ remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRemoteRequestContext.java	2009-09-16 17:44:19 UTC (rev 5501)
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.jboss.remoting3.spi.RemoteRequestContext;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.xnio.channels.Channels;
+import org.jboss.xnio.log.Logger;
+
+final class MultiplexRemoteRequestContext implements RemoteRequestContext {
+    private static final Logger log = Loggers.MAIN_LOGGER;
+
+    private final int id;
+    private final EstablishedConnection connection;
+    private final AtomicBoolean cancelled = new AtomicBoolean();
+
+    public MultiplexRemoteRequestContext(final int id, final EstablishedConnection connection) {
+        this.id = id;
+        this.connection = connection;
+    }
+
+    public void cancel() {
+        if (cancelled.getAndSet(true)) {
+            return;
+        }
+        final AllocatedMessageChannel messageChannel = connection.getChannel();
+        ByteBuffer buf = ByteBuffer.allocate(5);
+        buf.put((byte) MessageType.REQUEST_CANCEL);
+        buf.putInt(id);
+        buf.flip();
+        try {
+            Channels.sendBlocking(messageChannel, buf);
+            log.trace("Sent cancel request for ID %d", Integer.valueOf(id));
+        } catch (IOException e) {
+            log.trace("Failed to send a request cancel message: %s", e);
+        }
+    }
+}

Added: remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandler.java
===================================================================
--- remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandler.java	                        (rev 0)
+++ remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandler.java	2009-09-16 17:44:19 UTC (rev 5501)
@@ -0,0 +1,110 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.jboss.marshalling.ByteOutput;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.remoting3.spi.AbstractHandleableCloseable;
+import org.jboss.remoting3.spi.RemoteRequestContext;
+import org.jboss.remoting3.spi.ReplyHandler;
+import org.jboss.remoting3.spi.RequestHandler;
+import org.jboss.remoting3.spi.SpiUtils;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.channels.Channels;
+import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.Pool;
+
+/**
+ * Handler for outbound requests.
+ */
+final class MultiplexRequestHandler extends AbstractHandleableCloseable<RequestHandler> implements RequestHandler {
+    private static final Logger log = Loggers.REQUEST_HANDLER_LOGGER;
+
+    private final int identifier;
+    private final BufferAllocator<ByteBuffer> allocator;
+    private final Pool<Marshaller> marshallerPool;
+    private final EstablishedConnection connection;
+
+    MultiplexRequestHandler(final int identifier, final EstablishedConnection connection) {
+        super(connection.getExecutor());
+        this.connection = connection;
+        this.identifier = identifier;
+        allocator = connection.getAllocator();
+    }
+
+    @Override
+    protected void closeAction() throws IOException {
+        connection.removeRemoteClient(identifier);
+        ByteBuffer buffer = allocator.allocate();
+        buffer.put((byte) MessageType.CLIENT_CLOSE);
+        buffer.putInt(identifier);
+        buffer.flip();
+        Channels.sendBlocking(connection.getChannel(), buffer);
+    }
+
+    public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler handler) {
+        log.trace("Sending outbound request (request id = %d) (type is %s)", request == null ? "null" : request.getClass());
+        final EstablishedConnection connection = this.connection;
+        connection.setCurrent();
+        try {
+            final Marshaller marshaller = marshallerPool.allocate();
+            final ByteOutput output = new SendingByteOutput(null, allocator, connection.getSendWindowSize());
+            boolean ok = false;
+            try {
+                marshaller.start(output);
+                marshaller.write(MessageType.REQUEST);
+                marshaller.writeInt(identifier);
+                final int id = connection.nextRequest();
+                connection.addRemoteRequest(id, handler);
+                marshaller.writeInt(id);
+                marshaller.writeObject(request);
+                ok = true;
+                marshaller.finish();
+                marshallerPool.free(marshaller);
+                output.close();
+                return new MultiplexRemoteRequestContext(id, connection);
+            } finally {
+                IoUtils.safeClose(output);
+                if (! ok) try {
+                    marshaller.finish();
+                    marshallerPool.free(marshaller);
+                } catch (final Exception e) {
+                    log.trace("Failed to free a marshaller back into the pool: %s", e);
+                }
+            }
+        } catch (final IOException t) {
+            log.trace(t, "receiveRequest failed with an exception");
+            SpiUtils.safeHandleException(handler, t);
+            return SpiUtils.getBlankRemoteRequestContext();
+        } finally {
+            connection.clearCurrent();
+        }
+    }
+
+    public String toString() {
+        return "forwarding request handler <" + Integer.toHexString(hashCode()) + "> (id = " + identifier + ") for " + connection;
+    }
+}
\ No newline at end of file

Added: remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandlerConnector.java
===================================================================
--- remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandlerConnector.java	                        (rev 0)
+++ remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandlerConnector.java	2009-09-16 17:44:19 UTC (rev 5501)
@@ -0,0 +1,40 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import org.jboss.remoting3.spi.RequestHandlerConnector;
+import org.jboss.remoting3.spi.Cancellable;
+import org.jboss.remoting3.spi.RequestHandler;
+import org.jboss.remoting3.spi.Result;
+
+final class MultiplexRequestHandlerConnector implements RequestHandlerConnector {
+    private final EstablishedConnection connection;
+
+    MultiplexRequestHandlerConnector(final EstablishedConnection connection) {
+        this.connection = connection;
+    }
+
+    public Cancellable createRequestHandler(final Result<RequestHandler> result) throws SecurityException {
+        throw new SecurityException("Not forwarded");
+    }
+}

Added: remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/PermitManager.java
===================================================================
--- remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/PermitManager.java	                        (rev 0)
+++ remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/PermitManager.java	2009-09-16 17:44:19 UTC (rev 5501)
@@ -0,0 +1,123 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+public final class PermitManager {
+
+    private final Object lock = new Object();
+    private final int maxPermits;
+    private final int[] permitIds;
+    private int topOfStack;
+
+    public PermitManager(final int maxPermits) {
+        synchronized (lock) {
+            this.maxPermits = maxPermits;
+            permitIds = new int[maxPermits];
+            for (int i = 0; i < maxPermits; i ++) {
+                permitIds[i] = i;
+            }
+        }
+    }
+
+    public Permit acquireInterruptibly() throws InterruptedException {
+        final int maxPermits = this.maxPermits;
+        synchronized (lock) {
+            int topOfStack;
+            while ((topOfStack = this.topOfStack) == maxPermits) {
+                lock.wait();
+            }
+            final int[] permitIds = this.permitIds;
+            int id = permitIds[topOfStack];
+            this.topOfStack = topOfStack + 1;
+            return new Permit(id);
+        }
+    }
+
+    public Permit acquire() {
+        final int maxPermits = this.maxPermits;
+        synchronized (lock) {
+            boolean intr = Thread.interrupted();
+            try {
+                int topOfStack;
+                while ((topOfStack = this.topOfStack) == maxPermits) try {
+                    lock.wait();
+                } catch (InterruptedException e) {
+                    intr = true;
+                }
+                final int[] permitIds = this.permitIds;
+                int id = permitIds[topOfStack];
+                this.topOfStack = topOfStack + 1;
+                return new Permit(id);
+            } finally {
+                if (intr) Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    private static final AtomicIntegerFieldUpdater<Permit> permitUpdater =
+            AtomicIntegerFieldUpdater.newUpdater(Permit.class, "id");
+
+    public final class Permit {
+        private volatile int id;
+
+        private Permit(final int id) {
+            this.id = id;
+        }
+
+        public int getId() {
+            final int id = this.id;
+            if (id == -1) {
+                throw new IllegalStateException("Permit already released");
+            }
+            return id;
+        }
+
+        public boolean released() {
+            return id == -1;
+        }
+
+        public void release() {
+            doRelease();
+        }
+
+        private boolean doRelease() {
+            final int id;
+            if ((id = permitUpdater.getAndSet(this, -1)) != -1) {
+                synchronized (lock) {
+                    permitIds[--topOfStack] = id;
+                }
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        protected void finalize() {
+            if (doRelease()) {
+                // todo log leak
+            }
+        }
+    }
+}

Added: remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReadHandlerImpl.java
===================================================================
--- remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReadHandlerImpl.java	                        (rev 0)
+++ remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReadHandlerImpl.java	2009-09-16 17:44:19 UTC (rev 5501)
@@ -0,0 +1,69 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.MarshallingConfiguration;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.remoting3.spi.ConnectionHandler;
+import org.jboss.xnio.IoReadHandler;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.Buffers;
+import org.jboss.xnio.Pool;
+import org.jboss.xnio.channels.ReadableAllocatedMessageChannel;
+import org.jboss.xnio.log.Logger;
+
+final class ReadHandlerImpl implements IoReadHandler<ReadableAllocatedMessageChannel> {
+    private static final Logger log = Logger.getLogger("org.jboss.remoting.multiplex");
+
+    private final int maxReceiveSize;
+    private final int maxSendSize;
+    private final int receiveWindowSize;
+    private final int sendWindowSize;
+    private final Pool<Marshaller> marshallerPool;
+    private final Pool<Unmarshaller> unmarshallerPool;
+    private final ConnectionHandler connectionHandler;
+
+    public void handleReadable(final ReadableAllocatedMessageChannel channel) {
+        final ByteBuffer buffer;
+        try {
+            buffer = channel.receive();
+        } catch (IOException e) {
+            log.error("I/O error in protocol channel; closing channel (%s)", e);
+            IoUtils.safeClose(connectionHandler);
+            IoUtils.safeClose(channel);
+            return;
+        }
+        if (log.isTrace()) {
+            log.trace("Received raw message:\n%s", Buffers.createDumper(buffer, 8, 1));
+        }
+        final int idByte = buffer.get() & 0xff;
+        final boolean isMulti = (idByte & 0x80) != 0;
+        final boolean isFirst = isMulti && (idByte & 0x40) != 0;
+        final boolean isLast = isMulti && (idByte & 0x20) != 0;
+        
+    }
+}

Added: remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReceivingByteInput.java
===================================================================
--- remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReceivingByteInput.java	                        (rev 0)
+++ remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReceivingByteInput.java	2009-09-16 17:44:19 UTC (rev 5501)
@@ -0,0 +1,193 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.ArrayDeque;
+import org.jboss.marshalling.ByteInput;
+import org.jboss.xnio.Buffers;
+import org.jboss.remoting3.ProtocolException;
+
+final class ReceivingByteInput extends InputStream implements ByteInput {
+    private final Queue<ByteBuffer> window;
+    private final Runnable ackTask;
+    private final Runnable closeTask;
+
+    private IOException failure;
+    private boolean done;
+
+    ReceivingByteInput(final Runnable ackTask, final int windowSize, final Runnable closeTask) {
+        this.ackTask = ackTask;
+        this.closeTask = closeTask;
+        window = new ArrayDeque<ByteBuffer>(windowSize);
+    }
+
+    public int read() throws IOException {
+        final Queue<ByteBuffer> window = this.window;
+        synchronized (this) {
+            while (window.isEmpty()) {
+                if (done) {
+                    return -1;
+                }
+                checkFailure();
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new InterruptedIOException("Interrupted on read()");
+                }
+            }
+            final ByteBuffer buf = window.peek();
+            try {
+                return buf.get() & 0xff;
+            } finally {
+                if (buf.remaining() == 0) {
+                    window.poll();
+                    ackTask.run();
+                }
+            }
+        }
+    }
+
+    public int read(final byte[] bytes, int offs, int len) throws IOException {
+        if (len == 0) {
+            return 0;
+        }
+        synchronized (this) {
+            while (window.isEmpty()) {
+                if (done) {
+                    return -1;
+                }
+                checkFailure();
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new InterruptedIOException("Interrupted on read()");
+                }
+            }
+            final Queue<ByteBuffer> window = this.window;
+            int total = 0;
+            while (len > 0) {
+                final ByteBuffer buffer = window.peek();
+                if (buffer == null) {
+                    break;
+                }
+                final int bytecnt = Math.min(buffer.remaining(), len);
+                buffer.get(bytes, offs, bytecnt);
+                total += bytecnt;
+                len -= bytecnt;
+                if (buffer.remaining() == 0) {
+                    window.poll();
+                    ackTask.run();
+                }
+            }
+            return total;
+        }
+    }
+
+    public int available() throws IOException {
+        synchronized (this) {
+            int total = 0;
+            for (ByteBuffer buffer : window) {
+                total += buffer.remaining();
+                if (total < 0) {
+                    return Integer.MAX_VALUE;
+                }
+            }
+            return total;
+        }
+    }
+
+    public long skip(long qty) throws IOException {
+        final Queue<ByteBuffer> window = this.window;
+        synchronized (this) {
+            while (window.isEmpty()) {
+                if (done) {
+                    return 0L;
+                }
+                checkFailure();
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new InterruptedIOException("Interrupted on read()");
+                }
+            }
+            long skipped = 0L;
+            while (qty > 0L) {
+                final ByteBuffer buffer = window.peek();
+                if (buffer == null) {
+                    break;
+                }
+                final int bytecnt = Math.min(buffer.remaining(), (int) Math.max((long)Integer.MAX_VALUE, qty));
+                Buffers.skip(buffer, bytecnt);
+                skipped += bytecnt;
+                qty -= bytecnt;
+                if (buffer.remaining() == 0) {
+                    window.poll();
+                    ackTask.run();
+                }
+            }
+            return skipped;
+        }
+    }
+
+    private void checkFailure() throws IOException {
+        final IOException failure = this.failure;
+        if (failure != null) {
+            failure.setStackTrace(new Throwable().getStackTrace());
+            try {
+                throw failure;
+            } finally {
+                done = true;
+                this.failure = null;
+            }
+        }
+    }
+
+    void push(ByteBuffer buffer) throws IOException {
+        synchronized (this) {
+            checkFailure();
+            try {
+                window.add(buffer);
+            } catch (IllegalStateException e) {
+                throw new ProtocolException("Stream window overrun");
+            }
+        }
+    }
+
+    public void close() throws IOException {
+        synchronized (this) {
+            if (! done) {
+                window.clear();
+                done = true;
+                closeTask.run();
+            }
+        }
+    }
+}

Added: remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteClient.java
===================================================================
--- remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteClient.java	                        (rev 0)
+++ remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteClient.java	2009-09-16 17:44:19 UTC (rev 5501)
@@ -0,0 +1,49 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import org.jboss.remoting3.spi.Result;
+import org.jboss.remoting3.spi.RequestHandler;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+final class RemoteClient {
+
+    @SuppressWarnings({ "UnusedDeclaration" })
+    private volatile Result<RequestHandler> result;
+    private final RequestHandler requestHandler;
+    private static final AtomicReferenceFieldUpdater<RemoteClient, Result> updater = AtomicReferenceFieldUpdater.newUpdater(RemoteClient.class, Result.class, "result");
+
+    RemoteClient(final Result<RequestHandler> result, final RequestHandler requestHandler) {
+        this.result = result;
+        this.requestHandler = requestHandler;
+    }
+
+    @SuppressWarnings({ "unchecked" })
+    Result<RequestHandler> takeResult() {
+        return updater.getAndSet(this, null);
+    }
+
+    public RequestHandler getRequestHandler() {
+        return result == null ? requestHandler : null;
+    }
+}

Added: remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/SendingByteOutput.java
===================================================================
--- remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/SendingByteOutput.java	                        (rev 0)
+++ remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/SendingByteOutput.java	2009-09-16 17:44:19 UTC (rev 5501)
@@ -0,0 +1,165 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import org.jboss.marshalling.ByteOutput;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.channels.WritableMessageChannel;
+
+final class SendingByteOutput extends OutputStream implements ByteOutput {
+    private ByteBuffer buffer;
+    private final BufferAllocator<ByteBuffer> allocator;
+    private final WritableMessageChannel channel;
+    private int windowSize;
+    private boolean closed;
+    private IOException remoteException;
+
+    public SendingByteOutput(final WritableMessageChannel channel, final BufferAllocator<ByteBuffer> allocator, final int windowSize) {
+        this.channel = channel;
+        this.allocator = allocator;
+        this.windowSize = windowSize;
+    }
+
+    private static IOException closed() {
+        return new IOException("Stream is closed");
+    }
+
+    private void checkClosed() throws IOException {
+        if (closed) {
+            throw closed();
+        }
+        if (remoteException != null) {
+            closed = true;
+            throw remoteException;
+        }
+    }
+
+    public void write(final int b) throws IOException {
+        synchronized (this) {
+            checkClosed();
+            final ByteBuffer buffer = this.buffer;
+            if (buffer == null) {
+                this.buffer = allocator.allocate();
+            } else if (! buffer.hasRemaining()) {
+                send();
+                this.buffer = allocator.allocate();
+            }
+            buffer.put((byte) b);
+        }
+    }
+
+    public void write(final byte[] b, int off, int len) throws IOException {
+        synchronized (this) {
+            checkClosed();
+            while (len > 0) {
+                ByteBuffer buffer = this.buffer;
+                if (buffer == null) {
+                    buffer = this.buffer = allocator.allocate();
+                } else if (! buffer.hasRemaining()) {
+                    send();
+                    buffer = this.buffer = allocator.allocate();
+                }
+                final int cnt = Math.min(len, buffer.remaining());
+                buffer.put(b, off, cnt);
+                len -= cnt;
+                off += cnt;
+                
+            }
+        }
+    }
+
+    // call with lock held
+    private void send() throws IOException {
+        while (windowSize == 0) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                buffer = null;
+                closed = true;
+                Thread.currentThread().interrupt();
+                throw new ClosedByInterruptException();
+            }
+            checkClosed();
+        }
+        final ByteBuffer buffer = this.buffer;
+        this.buffer = null;
+        buffer.flip();
+        final WritableMessageChannel channel = this.channel;
+        try {
+            while (! (channel.send(buffer))) {
+                channel.awaitWritable();
+                checkClosed();
+                if (Thread.currentThread().isInterrupted()) {
+                    closed = true;
+                    throw new ClosedByInterruptException();
+                }
+            }
+        } finally {
+            allocator.free(buffer);
+        }
+        windowSize--;
+    }
+
+    void acknowledge() {
+        synchronized (this) {
+            windowSize++;
+            notify();
+        }
+    }
+
+    public void flush() throws IOException {
+        synchronized (this) {
+            final ByteBuffer buffer = this.buffer;
+            if (buffer != null && buffer.position() > 0) {
+                send();
+            }
+        }
+    }
+
+    public void close() throws IOException {
+        synchronized (this) {
+            if (closed) {
+                return;
+            }
+            final ByteBuffer buffer = this.buffer;
+            if (buffer != null && buffer.position() > 0) {
+                send();
+            }
+            closed = true;
+        }
+    }
+
+    void abort() {
+        synchronized (this) {
+            if (closed) {
+                return;
+            }
+            closed = true;
+            notifyAll();
+        }
+    }
+}

Added: remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/package-info.java
===================================================================
--- remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/package-info.java	                        (rev 0)
+++ remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/package-info.java	2009-09-16 17:44:19 UTC (rev 5501)
@@ -0,0 +1,26 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+/**
+ * The Remoting 3 multiplex protocol implementation.
+ */
+package org.jboss.remoting3.multiplex;
\ No newline at end of file



More information about the jboss-remoting-commits mailing list