Author: david.lloyd(a)jboss.com
Date: 2009-09-16 13:44:19 -0400 (Wed, 16 Sep 2009)
New Revision: 5501
Added:
remoting3-multiplex/trunk/src/
remoting3-multiplex/trunk/src/main/
remoting3-multiplex/trunk/src/main/java/
remoting3-multiplex/trunk/src/main/java/org/
remoting3-multiplex/trunk/src/main/java/org/jboss/
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/AtomicArrayReferenceArray.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConfigParam.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConnectionConfiguration.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/EstablishedConnection.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/InitialReadHandlerImpl.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/Loggers.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MessageType.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionHandler.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionProvider.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexOptions.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRemoteRequestContext.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandler.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandlerConnector.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/PermitManager.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReadHandlerImpl.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReceivingByteInput.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteClient.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/SendingByteOutput.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/package-info.java
Log:
Initial import of new impl
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/AtomicArrayReferenceArray.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/AtomicArrayReferenceArray.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/AtomicArrayReferenceArray.java 2009-09-16
17:44:19 UTC (rev 5501)
@@ -0,0 +1,170 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import java.lang.reflect.Array;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+final class AtomicArrayReferenceArray<V> extends AtomicReferenceArray<V[]> {
+
+ private final V[] emptyArray;
+ private final Class<V> componentType;
+
+ private static final long serialVersionUID = -4748109403436000080L;
+
+ private AtomicArrayReferenceArray(Class<V> componentType, int length) {
+ super(length);
+ this.componentType = componentType;
+ emptyArray = newInstance(componentType, 0);
+ }
+
+ public static <V> AtomicArrayReferenceArray<V>
create(AtomicReferenceArray<V[]> array, Class<V> componentType) {
+ return new AtomicArrayReferenceArray<V>(array, componentType);
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ private static <V> V[] copyOf(final Class<V> componentType, V[] old, int
newLen) {
+ final V[] target = newInstance(componentType, newLen);
+ System.arraycopy(old, 0, target, 0, Math.min(old.length, newLen));
+ return target;
+ }
+
+ public void add(int idx, V value) {
+ for (;;) {
+ final V[] oldVal = get(idx);
+ final int oldLen = oldVal.length;
+ final V[] newVal = copyOf(componentType, oldVal, oldLen + 1);
+ newVal[oldLen] = value;
+ if (compareAndSet(idx, oldVal, newVal)) {
+ return;
+ }
+ }
+ }
+
+ public boolean addIfAbsent(int idx, V value, boolean identity) {
+ for (;;) {
+ final V[] oldVal = get(idx);
+ final int oldLen = oldVal.length;
+ if (identity || value == null) {
+ for (int i = 0; i < oldLen; i++) {
+ if (oldVal[i] == value) {
+ return false;
+ }
+ }
+ } else {
+ for (int i = 0; i < oldLen; i++) {
+ if (value.equals(oldVal[i])) {
+ return false;
+ }
+ }
+ }
+ final V[] newVal = copyOf(componentType, oldVal, oldLen + 1);
+ newVal[oldLen] = value;
+ if (compareAndSet(idx, oldVal, newVal)) {
+ return true;
+ }
+ }
+ }
+
+ public boolean remove(int idx, V value, boolean identity) {
+ for (;;) {
+ final V[] oldVal = get(idx);
+ final int oldLen = oldVal.length;
+ if (oldLen == 0) {
+ return false;
+ } else {
+ int index = -1;
+ if (identity || value == null) {
+ for (int i = 0; i < oldLen; i++) {
+ if (oldVal[i] == value) {
+ index = i;
+ break;
+ }
+ }
+ } else {
+ for (int i = 0; i < oldLen; i++) {
+ if (value.equals(oldVal[i])) {
+ index = i;
+ break;
+ }
+ }
+ }
+ if (index == -1) {
+ return false;
+ }
+ final V[] newVal = newInstance(componentType, oldLen - 1);
+ System.arraycopy(oldVal, 0, newVal, 0, index);
+ System.arraycopy(oldVal, index + 1, newVal, index, oldLen - index - 1);
+ if (compareAndSet(idx, oldVal, newVal)) {
+ return true;
+ }
+ }
+ }
+ }
+
+ public int removeAll(int idx, V value, boolean identity) {
+ for (;;) {
+ final V[] oldVal = get(idx);
+ final int oldLen = oldVal.length;
+ if (oldLen == 0) {
+ return 0;
+ } else {
+ final boolean[] removeSlots = new boolean[oldLen];
+ int removeCount = 0;
+ if (identity || value == null) {
+ for (int i = 0; i < oldLen; i++) {
+ if (oldVal[i] == value) {
+ removeSlots[i] = true;
+ removeCount++;
+ }
+ }
+ } else {
+ for (int i = 0; i < oldLen; i++) {
+ if (value.equals(oldVal[i])) {
+ removeSlots[i] = true;
+ removeCount++;
+ }
+ }
+ }
+ if (removeCount == 0) {
+ return 0;
+ }
+ final int newLen = oldLen - removeCount;
+ final V[] newVal;
+ if (newLen == 0) {
+ newVal = emptyArray;
+ } else {
+ newVal = newInstance(componentType, newLen);
+ for (int i = 0, j = 0; i < oldLen; i ++) {
+ if (! removeSlots[i]) {
+ newVal[j++] = oldVal[i];
+ }
+ }
+ }
+ if (compareAndSet(idx, oldVal, newVal)) {
+ return removeCount;
+ }
+ }
+ }
+ }
+}
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConfigParam.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConfigParam.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConfigParam.java 2009-09-16
17:44:19 UTC (rev 5501)
@@ -0,0 +1,17 @@
+package org.jboss.remoting3.multiplex;
+
+/**
+ * Protocol message configuration parameters used in protocol negotiation phase. Do not
delete, insert, or change the order of these
+ * enum constants. New constants must be added to the end.
+ */
+enum ConfigParam {
+ MAX_SUPPORTED_VERSION,
+ MAX_RECEIVE_SIZE,
+ MAX_TRANSMIT_SIZE,
+ MAX_RECEIVE_WINDOW_SIZE,
+ MAX_TRANSMIT_WINDOW_SIZE,
+ MAX_OUTBOUND_REQUESTS,
+ MAX_INBOUND_REQUESTS,
+ MAX_OUTBOUND_CLIENTS,
+ MAX_INBOUND_CLIENTS,
+}
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConnectionConfiguration.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConnectionConfiguration.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConnectionConfiguration.java 2009-09-16
17:44:19 UTC (rev 5501)
@@ -0,0 +1,160 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+/**
+ * A configuration object for the multiplex protocol.
+ */
+public final class ConnectionConfiguration implements Cloneable {
+
+ private int linkMetric = 100;
+ private int maximumReceiveSize = 0x200;
+ private int maximumTransmitSize = 0x200;
+ private int receiveWindowSize = 5;
+ private int transmitWindowSize = 5;
+ private int maximumOutboundClients = 0x10;
+ private int maximumInboundClients = 0x10;
+ private int maximumOutboundRequests = 0x10;
+ private int maximumInboundRequests = 0x10;
+ private int maximumOutboundStreams = 0x20;
+ private int maximumInboundStreams = 0x20;
+
+ /**
+ * Construct a new instance.
+ */
+ public ConnectionConfiguration() {
+ }
+
+ /**
+ * Get the link metric to assign to this multiplex connection.
+ *
+ * @return the link metric
+ */
+ public int getLinkMetric() {
+ return linkMetric;
+ }
+
+ /**
+ * Set the link metric to assign to this multiplex connection.
+ *
+ * @param linkMetric the link metric
+ */
+ public void setLinkMetric(final int linkMetric) {
+ this.linkMetric = linkMetric;
+ }
+
+
+ public int getMaximumReceiveSize() {
+ return maximumReceiveSize;
+ }
+
+ public void setMaximumReceiveSize(final int maximumReceiveSize) {
+ this.maximumReceiveSize = maximumReceiveSize;
+ }
+
+ public int getMaximumTransmitSize() {
+ return maximumTransmitSize;
+ }
+
+ public void setMaximumTransmitSize(final int maximumTransmitSize) {
+ this.maximumTransmitSize = maximumTransmitSize;
+ }
+
+ public int getReceiveWindowSize() {
+ return receiveWindowSize;
+ }
+
+ public void setReceiveWindowSize(final int receiveWindowSize) {
+ this.receiveWindowSize = receiveWindowSize;
+ }
+
+ public int getTransmitWindowSize() {
+ return transmitWindowSize;
+ }
+
+ public void setTransmitWindowSize(final int transmitWindowSize) {
+ this.transmitWindowSize = transmitWindowSize;
+ }
+
+ public int getMaximumOutboundClients() {
+ return maximumOutboundClients;
+ }
+
+ public void setMaximumOutboundClients(final int maximumOutboundClients) {
+ this.maximumOutboundClients = maximumOutboundClients;
+ }
+
+ public int getMaximumInboundClients() {
+ return maximumInboundClients;
+ }
+
+ public void setMaximumInboundClients(final int maximumInboundClients) {
+ this.maximumInboundClients = maximumInboundClients;
+ }
+
+ public int getMaximumOutboundRequests() {
+ return maximumOutboundRequests;
+ }
+
+ public void setMaximumOutboundRequests(final int maximumOutboundRequests) {
+ this.maximumOutboundRequests = maximumOutboundRequests;
+ }
+
+ public int getMaximumInboundRequests() {
+ return maximumInboundRequests;
+ }
+
+ public void setMaximumInboundRequests(final int maximumInboundRequests) {
+ this.maximumInboundRequests = maximumInboundRequests;
+ }
+
+ public int getMaximumOutboundStreams() {
+ return maximumOutboundStreams;
+ }
+
+ public void setMaximumOutboundStreams(final int maximumOutboundStreams) {
+ this.maximumOutboundStreams = maximumOutboundStreams;
+ }
+
+ public int getMaximumInboundStreams() {
+ return maximumInboundStreams;
+ }
+
+ public void setMaximumInboundStreams(final int maximumInboundStreams) {
+ this.maximumInboundStreams = maximumInboundStreams;
+ }
+
+ /**
+ * Clone this configuration.
+ *
+ * @return the cloned configuration
+ */
+ public ConnectionConfiguration clone() {
+ try {
+ final ConnectionConfiguration clone = (ConnectionConfiguration)
super.clone();
+ return clone;
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/EstablishedConnection.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/EstablishedConnection.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/EstablishedConnection.java 2009-09-16
17:44:19 UTC (rev 5501)
@@ -0,0 +1,237 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.Iterator;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.remoting3.IndeterminateOutcomeException;
+import org.jboss.remoting3.spi.AbstractHandleableCloseable;
+import org.jboss.remoting3.spi.Cancellable;
+import org.jboss.remoting3.spi.ConnectionHandler;
+import org.jboss.remoting3.spi.RemoteRequestContext;
+import org.jboss.remoting3.spi.ReplyHandler;
+import org.jboss.remoting3.spi.RequestHandler;
+import org.jboss.remoting3.spi.RequestHandlerConnector;
+import org.jboss.remoting3.spi.Result;
+import org.jboss.remoting3.spi.SpiUtils;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.Buffers;
+import org.jboss.xnio.Pool;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.xnio.log.Logger;
+
+/**
+ *
+ */
+final class EstablishedConnection extends AbstractHandleableCloseable implements
ConnectionHandler {
+ private static final Logger log = Loggers.MAIN_LOGGER;
+
+ //--== Connection configuration items ==--
+ private final Pool<Marshaller> marshallerPool;
+ private final Pool<Unmarshaller> unmarshallerPool;
+ private final int linkMetric;
+ private final Executor executor;
+ // buffer allocator for outbound message assembly
+ private final BufferAllocator<ByteBuffer> allocator;
+
+ // running on remote node
+ private final AtomicReferenceArray<ReplyHandler> remoteRequests;
+ // sequence for outbound remote requests
+ private final PermitManager requestPermits;
+
+ // running on local node (key comes from remote side)
+ private final AtomicReferenceArray<RemoteRequestContext> localRequests;
+
+ // clients whose requests get forwarded to the remote side
+ private final AtomicReferenceArray<RemoteClient> remoteClients;
+ // sequence for clients whose requests get forwarded to the remote side
+ // LOW BIT == 0
+ private final PermitManager remoteClientPermits;
+
+ // clients whose requests are handled on this side (key comes from remote side)
+ private final AtomicReferenceArray<RequestHandler> requestedClients;
+ // LOW BIT == 1;
+ private final PermitManager forwardedClientPermits;
+
+ // the data channel
+ private final AllocatedMessageChannel channel;
+ // the local connection handler
+ private final ConnectionHandler localConnectionHandler;
+
+ private final int sendWindowSize;
+
+ private static final ThreadLocal<EstablishedConnection> currentConnection = new
ThreadLocal<EstablishedConnection>();
+
+ public EstablishedConnection(final AllocatedMessageChannel channel, final
ConnectionConfiguration configuration, final ConnectionHandler localConnectionHandler) {
+ super(configuration.getExecutor());
+ this.channel = channel;
+ linkMetric = configuration.getLinkMetric();
+ executor = configuration.getExecutor();
+ if (executor == null) {
+ throw new NullPointerException("executor is null");
+ }
+ allocator =
Buffers.createHeapByteBufferAllocator(configuration.getMaximumTransmitSize());
+ marshallerPool = configuration.getMarshallerPool();
+ unmarshallerPool = configuration.getUnmarshallerPool();
+ final int maximumInboundRequests = configuration.getMaximumInboundRequests();
+ remoteRequests = new
AtomicReferenceArray<ReplyHandler>(maximumInboundRequests);
+ requestPermits = new PermitManager(maximumInboundRequests);
+ final int maximumOutboundRequests = configuration.getMaximumOutboundRequests();
+ localRequests = new
AtomicReferenceArray<RemoteRequestContext>(maximumOutboundRequests);
+ final int maximumOutboundClients = configuration.getMaximumOutboundClients();
+ remoteClients = new
AtomicReferenceArray<RemoteClient>(maximumOutboundClients);
+ remoteClientPermits = new PermitManager(maximumOutboundClients);
+ final int maximumInboundClients = configuration.getMaximumInboundClients();
+ requestedClients = new
AtomicReferenceArray<RequestHandler>(maximumInboundClients);
+ forwardedClientPermits = new PermitManager(maximumInboundClients);
+ sendWindowSize = configuration.getTransmitWindowSize();
+ this.localConnectionHandler = localConnectionHandler;
+ }
+
+ static EstablishedConnection getCurrent() {
+ return currentConnection.get();
+ }
+
+ void setCurrent() {
+ if (currentConnection.get() != null) {
+ throw new IllegalStateException("Reentrant setCurrent()");
+ }
+ currentConnection.set(this);
+ }
+
+ void clearCurrent() {
+ if (currentConnection.get() != this) {
+ throw new IllegalStateException("clearCurrent() from wrong
context");
+ }
+ currentConnection.set(null);
+ }
+
+ // sequence methods
+
+ protected Executor getExecutor() {
+ return executor;
+ }
+
+ int getLinkMetric() {
+ return linkMetric;
+ }
+
+ BufferAllocator<ByteBuffer> getAllocator() {
+ return allocator;
+ }
+
+ AllocatedMessageChannel getChannel() {
+ return channel;
+ }
+
+ Pool<Marshaller> getMarshallerPool() {
+ return marshallerPool;
+ }
+
+ Pool<Unmarshaller> getUnmarshallerPool() {
+ return unmarshallerPool;
+ }
+
+ ConnectionHandler getLocalConnectionHandler() {
+ return localConnectionHandler;
+ }
+
+ private static final IoFuture.Notifier<RequestHandler,
Result<RequestHandler>> RHS_RESULT_NOTIFIER =
+ new IoFuture.HandlingNotifier<RequestHandler,
Result<RequestHandler>>() {
+ public void handleCancelled(final Result<RequestHandler> attachment) {
+ log.trace("Failed to open remote service (cancelled)");
+ attachment.setCancelled();
+ }
+
+ public void handleFailed(final IOException exception, final
Result<RequestHandler> attachment) {
+ log.trace("Failed to open remote service: %s", exception);
+ attachment.setException(exception);
+ }
+
+ public void handleDone(final RequestHandler result, final
Result<RequestHandler> attachment) {
+ log.trace("Opened %s", result);
+ attachment.setResult(result);
+ }
+ };
+
+ protected void closeAction() {
+ // just to make sure...
+ IoUtils.safeClose(channel);
+ final IndeterminateOutcomeException ioe = new
IndeterminateOutcomeException("The connection was closed");
+ // Things running remotely
+ for (int i = 0; i < remoteRequests.length(); i ++) {
+ SpiUtils.safeHandleException(remoteRequests.getAndSet(i, null), ioe);
+ }
+ for (RemoteClient x : clearing(remoteClients)) {
+ IoUtils.safeClose(x.getRequestHandler());
+ }
+ // Things running locally
+ for (RemoteRequestContext localRequest : clearing(localRequests)) {
+ localRequest.cancel();
+ }
+ }
+
+ public String toString() {
+ return "multiplex connection <" + Integer.toHexString(hashCode()) +
"> via " + channel;
+ }
+
+ public Cancellable open(final String serviceName, final String groupName, final
Result<RequestHandler> result) {
+ return null;
+ }
+
+ public RequestHandlerConnector createConnector(final RequestHandler localHandler) {
+ return null;
+ }
+
+ int getSendWindowSize() {
+ return sendWindowSize;
+ }
+
+ private static <T> Iterable<T> clearing(final
AtomicReferenceArray<T> array) {
+ return new Iterable<T>() {
+
+ public Iterator<T> iterator() {
+ return new Iterator<T>() {
+ private int idx;
+ public boolean hasNext() {
+ return idx < array.length();
+ }
+
+ public T next() {
+ return array.getAndSet(idx++, null);
+ }
+
+ public void remove() {
+ }
+ };
+ }
+ };
+ }
+}
\ No newline at end of file
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/InitialReadHandlerImpl.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/InitialReadHandlerImpl.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/InitialReadHandlerImpl.java 2009-09-16
17:44:19 UTC (rev 5501)
@@ -0,0 +1,125 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import org.jboss.xnio.IoReadHandler;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.remoting3.spi.Result;
+import org.jboss.remoting3.spi.ConnectionHandlerFactory;
+import org.jboss.remoting3.ProtocolException;
+import java.nio.ByteBuffer;
+import java.nio.BufferUnderflowException;
+import java.io.IOException;
+
+/**
+ * Read handler for initial (negotiation) phase of connection.
+ */
+final class InitialReadHandlerImpl implements
IoReadHandler<AllocatedMessageChannel> {
+
+ private final Result<ConnectionHandlerFactory> result;
+
+ private static final int GREETING = 0x484a524d;
+ private static final int MAX_VERSION = 0;
+
+ private final ConnectionConfiguration connectionConfiguration;
+
+ InitialReadHandlerImpl(final Result<ConnectionHandlerFactory> result, final
ConnectionConfiguration connectionConfiguration) {
+ this.result = result;
+ this.connectionConfiguration = connectionConfiguration;
+ }
+
+ public void handleReadable(final AllocatedMessageChannel channel) {
+ try {
+ final ByteBuffer buffer = channel.receive();
+ if (buffer.getInt() != GREETING) {
+ throw new IOException("Incorrect greeting");
+ }
+ final ConfigParam[] paramIds = ConfigParam.values();
+ final ConnectionConfiguration conf = connectionConfiguration;
+ final int count = buffer.getShort() & 0xffff;
+ for (int i = 0; i < count; i ++) {
+ final int id = buffer.get() & 0xff;
+ if (id > paramIds.length) {
+ continue;
+ }
+ final ConfigParam configParam = paramIds[id];
+ switch (configParam) {
+ case MAX_SUPPORTED_VERSION: {
+ // since we're version 0, the remote side must support us,
and we support nothing else, so...
+ break;
+ }
+ case MAX_RECEIVE_SIZE: {
+
conf.setMaximumTransmitSize(Math.min(validate(ConfigParam.MAX_RECEIVE_SIZE,
buffer.getInt(), 0x100, 0x100000), conf.getMaximumTransmitSize()));
+ break;
+ }
+ case MAX_TRANSMIT_SIZE: {
+
conf.setMaximumReceiveSize(Math.min(validate(ConfigParam.MAX_TRANSMIT_SIZE,
buffer.getInt(), 0x100, 0x100000), conf.getMaximumReceiveSize()));
+ break;
+ }
+ case MAX_RECEIVE_WINDOW_SIZE: {
+
conf.setTransmitWindowSize(Math.min(validate(ConfigParam.MAX_RECEIVE_WINDOW_SIZE,
buffer.getInt(), 0x2, 0x400), conf.getTransmitWindowSize()));
+ break;
+ }
+ case MAX_TRANSMIT_WINDOW_SIZE: {
+
conf.setReceiveWindowSize(Math.min(validate(ConfigParam.MAX_TRANSMIT_WINDOW_SIZE,
buffer.getInt(), 0x2, 0x400), conf.getReceiveWindowSize()));
+ break;
+ }
+ case MAX_OUTBOUND_REQUESTS: {
+
conf.setMaximumInboundRequests(Math.min(validate(ConfigParam.MAX_OUTBOUND_REQUESTS,
buffer.getInt(), 0, 0x100000), conf.getMaximumInboundRequests()));
+ break;
+ }
+ case MAX_INBOUND_REQUESTS: {
+
conf.setMaximumOutboundRequests(Math.min(validate(ConfigParam.MAX_INBOUND_REQUESTS,
buffer.getInt(), 0, 0x100000), conf.getMaximumOutboundRequests()));
+ break;
+ }
+ case MAX_OUTBOUND_CLIENTS: {
+
conf.setMaximumInboundClients(Math.min(validate(ConfigParam.MAX_OUTBOUND_CLIENTS,
buffer.getInt(), 0, 0x100000), conf.getMaximumInboundClients()));
+ break;
+ }
+ case MAX_INBOUND_CLIENTS: {
+
conf.setMaximumOutboundClients(Math.min(validate(ConfigParam.MAX_INBOUND_CLIENTS,
buffer.getInt(), 0, 0x100000), conf.getMaximumOutboundClients()));
+ break;
+ }
+ default: {
+ // should not be possible
+ throw new IllegalStateException();
+ }
+ }
+ }
+ } catch (IOException e) {
+ IoUtils.safeClose(channel);
+ result.setException(e);
+ } catch (BufferUnderflowException e) {
+ IoUtils.safeClose(channel);
+ result.setException(new ProtocolException("Truncated greeting
packet"));
+ }
+ }
+
+ private static int validate(Object field, int val, int min, int max) throws
ProtocolException {
+ if (val >= min && val <= max) {
+ return val;
+ }
+ throw new ProtocolException("Value of " + field + " is out of
range (expected " + min + " <= n <= " + max + ", got " +
val + ")");
+ }
+}
Added: remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/Loggers.java
===================================================================
--- remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/Loggers.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/Loggers.java 2009-09-16
17:44:19 UTC (rev 5501)
@@ -0,0 +1,37 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import org.jboss.xnio.log.Logger;
+
+final class Loggers {
+
+ private Loggers() {
+ }
+
+ private static final String MAIN_LOGGER_NAME =
"org.jboss.remoting.multiplex";
+ private static final String REQUEST_HANDLER_LOGGER_NAME = MAIN_LOGGER_NAME +
".request-handler";
+
+ static final Logger REQUEST_HANDLER_LOGGER =
Logger.getLogger(REQUEST_HANDLER_LOGGER_NAME);
+ static final Logger MAIN_LOGGER = Logger.getLogger(MAIN_LOGGER_NAME);
+}
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MessageType.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MessageType.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MessageType.java 2009-09-16
17:44:19 UTC (rev 5501)
@@ -0,0 +1,73 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+final class MessageType {
+ private MessageType() {}
+
+ public static final int FIRST = 0x80;
+ public static final int LAST = 0x40;
+
+ // Stream message data types
+
+ public static final int DATA = 0x00;
+ public static final int CANCEL = 0x01;
+ public static final int FINAL = 0x02;
+
+ /**
+ * Request - multi-packet
+ */
+ public static final int REQUEST = 0x00;
+ public static final int REPLY = 0x01;
+ public static final int REQUEST_RECEIVE_FAILED = 0x02;
+ public static final int REQUEST_FAILED = 0x03;
+
+ /**
+ * Request receiver aborts transmission.
+ */
+ public static final int REMOTE_REQUEST_ABORT = 0x10;
+ public static final int REMOTE_REPLY_ABORT = 0x11;
+ public static final int REMOTE_REQUEST_RECEIVE_FAILED_ABORT = 0x12;
+ public static final int REMOTE_REQUEST_FAILED_ABORT = 0x13;
+
+ /**
+ * Request transmitter aborts transmission.
+ */
+ public static final int REQUEST_ABORT = 0x20;
+ public static final int REPLY_ABORT = 0x21;
+ public static final int REQUEST_RECEIVE_FAILED_ABORT = 0x22;
+ public static final int REQUEST_FAILED_ABORT = 0x23;
+
+ public static final int REQUEST_CANCEL = 0x30;
+ public static final int REQUEST_CANCEL_ACK = 0x31;
+ public static final int CLIENT_OPEN = 0x32;
+ public static final int CLIENT_OPEN_CANCEL = 0x33;
+ public static final int CLIENT_OPEN_ACK = 0x34;
+ public static final int CLIENT_OPEN_CANCEL_ACK = 0x35;
+ public static final int CLIENT_NOT_FOUND = 0x36;
+ public static final int CLIENT_REMOTE_CLOSE = 0x37;
+ public static final int CLIENT_CLOSE = 0x38;
+
+
+
+}
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionHandler.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionHandler.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionHandler.java 2009-09-16
17:44:19 UTC (rev 5501)
@@ -0,0 +1,105 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.jboss.remoting3.spi.Cancellable;
+import org.jboss.remoting3.spi.ConnectionHandler;
+import org.jboss.remoting3.spi.RequestHandler;
+import org.jboss.remoting3.spi.RequestHandlerConnector;
+import org.jboss.remoting3.spi.Result;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.xnio.channels.Channels;
+import org.jboss.xnio.log.Logger;
+
+final class MultiplexConnectionHandler implements ConnectionHandler {
+ private static final Logger log =
Logger.getLogger("org.jboss.remoting.multiplex");
+
+ private final EstablishedConnection establishedConnection;
+ private final Charset charset = Charset.forName("utf-8");
+
+ MultiplexConnectionHandler(final EstablishedConnection establishedConnection) {
+ this.establishedConnection = establishedConnection;
+ }
+
+ public Cancellable open(final String serviceName, final String groupName, final
Result<RequestHandler> result) {
+ final EstablishedConnection establishedConnection = this.establishedConnection;
+ final int id = establishedConnection.nextRemoteClient();
+ final BufferAllocator<ByteBuffer> allocator =
establishedConnection.getAllocator();
+ final ByteBuffer buffer = allocator.allocate();
+ final AllocatedMessageChannel channel = establishedConnection.getChannel();
+ final RemoteClient remoteClient = new RemoteClient(result, new
MultiplexRequestHandler(id, establishedConnection));
+ establishedConnection.addOutstandingClient(id, remoteClient);
+ buffer.put((byte) MessageType.CLIENT_OPEN);
+ buffer.putInt(id);
+ final byte[] serviceNameBytes = serviceName.getBytes(charset);
+ buffer.putShort((short) serviceNameBytes.length);
+ buffer.put(serviceNameBytes);
+ final byte[] groupNameBytes = groupName.getBytes(charset);
+ buffer.putShort((short) groupNameBytes.length);
+ buffer.put(groupNameBytes);
+ buffer.flip();
+ try {
+ Channels.sendBlocking(channel, buffer);
+ } catch (IOException e) {
+ result.setException(e);
+ establishedConnection.removeRemoteClient(id);
+ }
+ return new Cancellable() {
+ private final AtomicBoolean cancelled = new AtomicBoolean();
+
+ public void cancel() {
+ if (cancelled.getAndSet(true)) {
+ // cancel already sent
+ return;
+ }
+ if (remoteClient.getRequestHandler() != null) {
+ // already done; don't waste the bandwidth
+ return;
+ }
+ final ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.CLIENT_OPEN_CANCEL);
+ buffer.putInt(id);
+ buffer.flip();
+ try {
+ Channels.sendBlocking(channel, buffer);
+ } catch (IOException e) {
+ log.trace("Sending a request to cancel a client open
failed");
+ }
+ }
+ };
+ }
+
+ public RequestHandlerConnector createConnector(final RequestHandler localHandler) {
+ final int id = establishedConnection.addLocalClient(localHandler);
+ return null;
+ }
+
+ public void close() throws IOException {
+ establishedConnection.close();
+ }
+}
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionProvider.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionProvider.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionProvider.java 2009-09-16
17:44:19 UTC (rev 5501)
@@ -0,0 +1,70 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import org.jboss.remoting3.spi.ConnectionProvider;
+import org.jboss.remoting3.spi.Cancellable;
+import org.jboss.remoting3.spi.ConnectionHandlerFactory;
+import org.jboss.remoting3.spi.Result;
+import org.jboss.remoting3.spi.SpiUtils;
+import org.jboss.remoting3.OptionMap;
+import org.jboss.xnio.TcpConnector;
+import org.jboss.xnio.FutureConnection;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.channels.TcpChannel;
+import java.net.URI;
+import java.net.InetSocketAddress;
+import java.io.IOException;
+
+public final class MultiplexConnectionProvider implements ConnectionProvider {
+
+ private final TcpConnector tcpConnector;
+
+ public MultiplexConnectionProvider(final TcpConnector tcpConnector) {
+ this.tcpConnector = tcpConnector;
+ }
+
+ public Cancellable connect(final URI uri, final OptionMap connectOptions, final
Result<ConnectionHandlerFactory> result) throws IllegalArgumentException {
+ final String host = uri.getHost();
+ final int port = uri.getPort();
+ if (port == -1) {
+ throw new IllegalArgumentException("A valid port number must be
explicitly specified");
+ }
+ // TODO - change this to use async DNS via XNIO DNS
+ final FutureConnection<InetSocketAddress,TcpChannel> connection =
tcpConnector.connectTo(new InetSocketAddress(host, port), null);
+ connection.addNotifier(new IoFuture.HandlingNotifier<TcpChannel,
Result<ConnectionHandlerFactory>>() {
+ public void handleFailed(final IOException exception, final
Result<ConnectionHandlerFactory> attachment) {
+ attachment.setException(exception);
+ }
+
+ public void handleDone(final TcpChannel result, final
Result<ConnectionHandlerFactory> attachment) {
+ attachment.setResult(null);
+ }
+
+ public void handleCancelled(final Result<ConnectionHandlerFactory>
attachment) {
+ attachment.setCancelled();
+ }
+ }, result);
+ return SpiUtils.cancellable(connection);
+ }
+}
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexOptions.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexOptions.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexOptions.java 2009-09-16
17:44:19 UTC (rev 5501)
@@ -0,0 +1,108 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import org.jboss.remoting3.Option;
+
+/**
+ * Options which may be used to configure a multiplex connection.
+ */
+public final class MultiplexOptions {
+
+ private MultiplexOptions() {
+ }
+
+ /**
+ * The maximum message receive size to specify to the remote side. This is the size
of the largest protocol message
+ * that can be sent from the remote side to the local side. The remote side may
elect to send messages of a smaller
+ * size, but it may not send larger messages (such messages will cause the connection
to close with a protocol error).
+ * This setting does <b>not</b> affect the maximum request or reply size.
This value should be set to at least {@code 256}
+ * if specified.
+ */
+ public static final Option<Integer> MAX_RECEIVE_SIZE =
Option.simple(MultiplexOptions.class, "MAX_RECEIVE_SIZE", Integer.class);
+
+ /**
+ * The maximum message size that this side will send to a peer. If the peer
specifies a smaller maximum receive size,
+ * that size will be used; otherwise this value will be. This value should be set to
at least {@code 256} if specified.
+ *
+ * @see #MAX_RECEIVE_SIZE
+ */
+ public static final Option<Integer> MAX_TRANSMIT_SIZE =
Option.simple(MultiplexOptions.class, "MAX_TRANSMIT_SIZE", Integer.class);
+
+ /**
+ * The maximum send window size that will be used by this end of the connection. The
peer may specify a smaller size,
+ * in which case that value will be used. Increasing this size may decrease latency
at the cost of increased memory demands.
+ * The minimum allowed value for this option is {@code 2}.
+ */
+ public static final Option<Integer> MAX_SEND_WINDOW_SIZE =
Option.simple(MultiplexOptions.class, "MAX_SEND_WINDOW_SIZE", Integer.class);
+
+ /**
+ * The maximum receive window size that this end of the connection can support. The
peer will not send unacknowledged messages
+ * beyond this window size. The minimum allowed value for this option is {@code 2}.
+ */
+ public static final Option<Integer> MAX_RECEIVE_WINDOW_SIZE =
Option.simple(MultiplexOptions.class, "MAX_RECEIVE_WINDOW_SIZE",
Integer.class);
+
+ /**
+ * The maximum number of concurrent inbound requests that this end of the connection
will be configured to support. Using a larger number
+ * increases concurrency at the cost of additional memory demands.
+ */
+ public static final Option<Integer> MAX_INBOUND_REQUESTS =
Option.simple(MultiplexOptions.class, "MAX_INBOUND_REQUESTS", Integer.class);
+
+ /**
+ * The maximum number of concurrent outbound requests that this end of the connection
will be configured to support. Using a larger number
+ * increases concurrency at the cost of additional memory demands.
+ */
+ public static final Option<Integer> MAX_OUTBOUND_REQUESTS =
Option.simple(MultiplexOptions.class, "MAX_OUTBOUND_REQUESTS", Integer.class);
+
+ /**
+ * The maximum number of concurrently active inbound clients that this end of the
connection will be configured to accept. Using a larger number
+ * increases concurrency at the cost of additional memory demands.
+ */
+ public static final Option<Integer> MAX_INBOUND_CLIENTS =
Option.simple(MultiplexOptions.class, "MAX_INBOUND_CLIENTS", Integer.class);
+
+ /**
+ * The maximum number of concurrently active outbound clients that this end of the
connection will be configured to support. Using a larger number
+ * increases concurrency at the cost of additional memory demands.
+ */
+ public static final Option<Integer> MAX_OUTBOUND_CLIENTS =
Option.simple(MultiplexOptions.class, "MAX_OUTBOUND_CLIENTS", Integer.class);
+
+ /**
+ * The maximum number of inbound streams. This is the number of streams that the
remote side is allowed to initiate, regardless of
+ * the actual direction of flow of the stream itself.
+ */
+ public static final Option<Integer> MAX_INBOUND_STREAMS =
Option.simple(MultiplexOptions.class, "MAX_INBOUND_STREAMS", Integer.class);
+
+ /**
+ * The maximum number of outbound streams. This is the number of streams that the
local side is allowed to initiate, regardless of
+ * the actual direction of flow of the stream itself.
+ */
+ public static final Option<Integer> MAX_OUTBOUND_STREAMS =
Option.simple(MultiplexOptions.class, "MAX_OUTBOUND_STREAMS", Integer.class);
+
+ /**
+ * The number of milliseconds to wait before sending accumulated ACK messages.
Higher values increase bandwidth at the cost of latency.
+ * Setting this value to 0 will immediately send ACK messages.
+ */
+ public static final Option<Integer> ACK_CORK_MILLISECONDS =
Option.simple(MultiplexOptions.class, "ACK_CORK_MILLISECONDS", Integer.class);
+
+ // todo - authentication/security mechanisms
+}
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRemoteRequestContext.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRemoteRequestContext.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRemoteRequestContext.java 2009-09-16
17:44:19 UTC (rev 5501)
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.jboss.remoting3.spi.RemoteRequestContext;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.xnio.channels.Channels;
+import org.jboss.xnio.log.Logger;
+
+final class MultiplexRemoteRequestContext implements RemoteRequestContext {
+ private static final Logger log = Loggers.MAIN_LOGGER;
+
+ private final int id;
+ private final EstablishedConnection connection;
+ private final AtomicBoolean cancelled = new AtomicBoolean();
+
+ public MultiplexRemoteRequestContext(final int id, final EstablishedConnection
connection) {
+ this.id = id;
+ this.connection = connection;
+ }
+
+ public void cancel() {
+ if (cancelled.getAndSet(true)) {
+ return;
+ }
+ final AllocatedMessageChannel messageChannel = connection.getChannel();
+ ByteBuffer buf = ByteBuffer.allocate(5);
+ buf.put((byte) MessageType.REQUEST_CANCEL);
+ buf.putInt(id);
+ buf.flip();
+ try {
+ Channels.sendBlocking(messageChannel, buf);
+ log.trace("Sent cancel request for ID %d", Integer.valueOf(id));
+ } catch (IOException e) {
+ log.trace("Failed to send a request cancel message: %s", e);
+ }
+ }
+}
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandler.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandler.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandler.java 2009-09-16
17:44:19 UTC (rev 5501)
@@ -0,0 +1,110 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.jboss.marshalling.ByteOutput;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.remoting3.spi.AbstractHandleableCloseable;
+import org.jboss.remoting3.spi.RemoteRequestContext;
+import org.jboss.remoting3.spi.ReplyHandler;
+import org.jboss.remoting3.spi.RequestHandler;
+import org.jboss.remoting3.spi.SpiUtils;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.channels.Channels;
+import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.Pool;
+
+/**
+ * Handler for outbound requests.
+ */
+final class MultiplexRequestHandler extends
AbstractHandleableCloseable<RequestHandler> implements RequestHandler {
+ private static final Logger log = Loggers.REQUEST_HANDLER_LOGGER;
+
+ private final int identifier;
+ private final BufferAllocator<ByteBuffer> allocator;
+ private final Pool<Marshaller> marshallerPool;
+ private final EstablishedConnection connection;
+
+ MultiplexRequestHandler(final int identifier, final EstablishedConnection connection)
{
+ super(connection.getExecutor());
+ this.connection = connection;
+ this.identifier = identifier;
+ allocator = connection.getAllocator();
+ }
+
+ @Override
+ protected void closeAction() throws IOException {
+ connection.removeRemoteClient(identifier);
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.CLIENT_CLOSE);
+ buffer.putInt(identifier);
+ buffer.flip();
+ Channels.sendBlocking(connection.getChannel(), buffer);
+ }
+
+ public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler
handler) {
+ log.trace("Sending outbound request (request id = %d) (type is %s)",
request == null ? "null" : request.getClass());
+ final EstablishedConnection connection = this.connection;
+ connection.setCurrent();
+ try {
+ final Marshaller marshaller = marshallerPool.allocate();
+ final ByteOutput output = new SendingByteOutput(null, allocator,
connection.getSendWindowSize());
+ boolean ok = false;
+ try {
+ marshaller.start(output);
+ marshaller.write(MessageType.REQUEST);
+ marshaller.writeInt(identifier);
+ final int id = connection.nextRequest();
+ connection.addRemoteRequest(id, handler);
+ marshaller.writeInt(id);
+ marshaller.writeObject(request);
+ ok = true;
+ marshaller.finish();
+ marshallerPool.free(marshaller);
+ output.close();
+ return new MultiplexRemoteRequestContext(id, connection);
+ } finally {
+ IoUtils.safeClose(output);
+ if (! ok) try {
+ marshaller.finish();
+ marshallerPool.free(marshaller);
+ } catch (final Exception e) {
+ log.trace("Failed to free a marshaller back into the pool:
%s", e);
+ }
+ }
+ } catch (final IOException t) {
+ log.trace(t, "receiveRequest failed with an exception");
+ SpiUtils.safeHandleException(handler, t);
+ return SpiUtils.getBlankRemoteRequestContext();
+ } finally {
+ connection.clearCurrent();
+ }
+ }
+
+ public String toString() {
+ return "forwarding request handler <" +
Integer.toHexString(hashCode()) + "> (id = " + identifier + ") for
" + connection;
+ }
+}
\ No newline at end of file
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandlerConnector.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandlerConnector.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandlerConnector.java 2009-09-16
17:44:19 UTC (rev 5501)
@@ -0,0 +1,40 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import org.jboss.remoting3.spi.RequestHandlerConnector;
+import org.jboss.remoting3.spi.Cancellable;
+import org.jboss.remoting3.spi.RequestHandler;
+import org.jboss.remoting3.spi.Result;
+
+final class MultiplexRequestHandlerConnector implements RequestHandlerConnector {
+ private final EstablishedConnection connection;
+
+ MultiplexRequestHandlerConnector(final EstablishedConnection connection) {
+ this.connection = connection;
+ }
+
+ public Cancellable createRequestHandler(final Result<RequestHandler> result)
throws SecurityException {
+ throw new SecurityException("Not forwarded");
+ }
+}
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/PermitManager.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/PermitManager.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/PermitManager.java 2009-09-16
17:44:19 UTC (rev 5501)
@@ -0,0 +1,123 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+public final class PermitManager {
+
+ private final Object lock = new Object();
+ private final int maxPermits;
+ private final int[] permitIds;
+ private int topOfStack;
+
+ public PermitManager(final int maxPermits) {
+ synchronized (lock) {
+ this.maxPermits = maxPermits;
+ permitIds = new int[maxPermits];
+ for (int i = 0; i < maxPermits; i ++) {
+ permitIds[i] = i;
+ }
+ }
+ }
+
+ public Permit acquireInterruptibly() throws InterruptedException {
+ final int maxPermits = this.maxPermits;
+ synchronized (lock) {
+ int topOfStack;
+ while ((topOfStack = this.topOfStack) == maxPermits) {
+ lock.wait();
+ }
+ final int[] permitIds = this.permitIds;
+ int id = permitIds[topOfStack];
+ this.topOfStack = topOfStack + 1;
+ return new Permit(id);
+ }
+ }
+
+ public Permit acquire() {
+ final int maxPermits = this.maxPermits;
+ synchronized (lock) {
+ boolean intr = Thread.interrupted();
+ try {
+ int topOfStack;
+ while ((topOfStack = this.topOfStack) == maxPermits) try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ intr = true;
+ }
+ final int[] permitIds = this.permitIds;
+ int id = permitIds[topOfStack];
+ this.topOfStack = topOfStack + 1;
+ return new Permit(id);
+ } finally {
+ if (intr) Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private static final AtomicIntegerFieldUpdater<Permit> permitUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(Permit.class, "id");
+
+ public final class Permit {
+ private volatile int id;
+
+ private Permit(final int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ final int id = this.id;
+ if (id == -1) {
+ throw new IllegalStateException("Permit already released");
+ }
+ return id;
+ }
+
+ public boolean released() {
+ return id == -1;
+ }
+
+ public void release() {
+ doRelease();
+ }
+
+ private boolean doRelease() {
+ final int id;
+ if ((id = permitUpdater.getAndSet(this, -1)) != -1) {
+ synchronized (lock) {
+ permitIds[--topOfStack] = id;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ protected void finalize() {
+ if (doRelease()) {
+ // todo log leak
+ }
+ }
+ }
+}
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReadHandlerImpl.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReadHandlerImpl.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReadHandlerImpl.java 2009-09-16
17:44:19 UTC (rev 5501)
@@ -0,0 +1,69 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.MarshallingConfiguration;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.remoting3.spi.ConnectionHandler;
+import org.jboss.xnio.IoReadHandler;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.Buffers;
+import org.jboss.xnio.Pool;
+import org.jboss.xnio.channels.ReadableAllocatedMessageChannel;
+import org.jboss.xnio.log.Logger;
+
+final class ReadHandlerImpl implements
IoReadHandler<ReadableAllocatedMessageChannel> {
+ private static final Logger log =
Logger.getLogger("org.jboss.remoting.multiplex");
+
+ private final int maxReceiveSize;
+ private final int maxSendSize;
+ private final int receiveWindowSize;
+ private final int sendWindowSize;
+ private final Pool<Marshaller> marshallerPool;
+ private final Pool<Unmarshaller> unmarshallerPool;
+ private final ConnectionHandler connectionHandler;
+
+ public void handleReadable(final ReadableAllocatedMessageChannel channel) {
+ final ByteBuffer buffer;
+ try {
+ buffer = channel.receive();
+ } catch (IOException e) {
+ log.error("I/O error in protocol channel; closing channel (%s)",
e);
+ IoUtils.safeClose(connectionHandler);
+ IoUtils.safeClose(channel);
+ return;
+ }
+ if (log.isTrace()) {
+ log.trace("Received raw message:\n%s", Buffers.createDumper(buffer,
8, 1));
+ }
+ final int idByte = buffer.get() & 0xff;
+ final boolean isMulti = (idByte & 0x80) != 0;
+ final boolean isFirst = isMulti && (idByte & 0x40) != 0;
+ final boolean isLast = isMulti && (idByte & 0x20) != 0;
+
+ }
+}
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReceivingByteInput.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReceivingByteInput.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReceivingByteInput.java 2009-09-16
17:44:19 UTC (rev 5501)
@@ -0,0 +1,193 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.ArrayDeque;
+import org.jboss.marshalling.ByteInput;
+import org.jboss.xnio.Buffers;
+import org.jboss.remoting3.ProtocolException;
+
+final class ReceivingByteInput extends InputStream implements ByteInput {
+ private final Queue<ByteBuffer> window;
+ private final Runnable ackTask;
+ private final Runnable closeTask;
+
+ private IOException failure;
+ private boolean done;
+
+ ReceivingByteInput(final Runnable ackTask, final int windowSize, final Runnable
closeTask) {
+ this.ackTask = ackTask;
+ this.closeTask = closeTask;
+ window = new ArrayDeque<ByteBuffer>(windowSize);
+ }
+
+ public int read() throws IOException {
+ final Queue<ByteBuffer> window = this.window;
+ synchronized (this) {
+ while (window.isEmpty()) {
+ if (done) {
+ return -1;
+ }
+ checkFailure();
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("Interrupted on read()");
+ }
+ }
+ final ByteBuffer buf = window.peek();
+ try {
+ return buf.get() & 0xff;
+ } finally {
+ if (buf.remaining() == 0) {
+ window.poll();
+ ackTask.run();
+ }
+ }
+ }
+ }
+
+ public int read(final byte[] bytes, int offs, int len) throws IOException {
+ if (len == 0) {
+ return 0;
+ }
+ synchronized (this) {
+ while (window.isEmpty()) {
+ if (done) {
+ return -1;
+ }
+ checkFailure();
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("Interrupted on read()");
+ }
+ }
+ final Queue<ByteBuffer> window = this.window;
+ int total = 0;
+ while (len > 0) {
+ final ByteBuffer buffer = window.peek();
+ if (buffer == null) {
+ break;
+ }
+ final int bytecnt = Math.min(buffer.remaining(), len);
+ buffer.get(bytes, offs, bytecnt);
+ total += bytecnt;
+ len -= bytecnt;
+ if (buffer.remaining() == 0) {
+ window.poll();
+ ackTask.run();
+ }
+ }
+ return total;
+ }
+ }
+
+ public int available() throws IOException {
+ synchronized (this) {
+ int total = 0;
+ for (ByteBuffer buffer : window) {
+ total += buffer.remaining();
+ if (total < 0) {
+ return Integer.MAX_VALUE;
+ }
+ }
+ return total;
+ }
+ }
+
+ public long skip(long qty) throws IOException {
+ final Queue<ByteBuffer> window = this.window;
+ synchronized (this) {
+ while (window.isEmpty()) {
+ if (done) {
+ return 0L;
+ }
+ checkFailure();
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("Interrupted on read()");
+ }
+ }
+ long skipped = 0L;
+ while (qty > 0L) {
+ final ByteBuffer buffer = window.peek();
+ if (buffer == null) {
+ break;
+ }
+ final int bytecnt = Math.min(buffer.remaining(), (int)
Math.max((long)Integer.MAX_VALUE, qty));
+ Buffers.skip(buffer, bytecnt);
+ skipped += bytecnt;
+ qty -= bytecnt;
+ if (buffer.remaining() == 0) {
+ window.poll();
+ ackTask.run();
+ }
+ }
+ return skipped;
+ }
+ }
+
+ private void checkFailure() throws IOException {
+ final IOException failure = this.failure;
+ if (failure != null) {
+ failure.setStackTrace(new Throwable().getStackTrace());
+ try {
+ throw failure;
+ } finally {
+ done = true;
+ this.failure = null;
+ }
+ }
+ }
+
+ void push(ByteBuffer buffer) throws IOException {
+ synchronized (this) {
+ checkFailure();
+ try {
+ window.add(buffer);
+ } catch (IllegalStateException e) {
+ throw new ProtocolException("Stream window overrun");
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ synchronized (this) {
+ if (! done) {
+ window.clear();
+ done = true;
+ closeTask.run();
+ }
+ }
+ }
+}
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteClient.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteClient.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteClient.java 2009-09-16
17:44:19 UTC (rev 5501)
@@ -0,0 +1,49 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import org.jboss.remoting3.spi.Result;
+import org.jboss.remoting3.spi.RequestHandler;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+final class RemoteClient {
+
+ @SuppressWarnings({ "UnusedDeclaration" })
+ private volatile Result<RequestHandler> result;
+ private final RequestHandler requestHandler;
+ private static final AtomicReferenceFieldUpdater<RemoteClient, Result> updater
= AtomicReferenceFieldUpdater.newUpdater(RemoteClient.class, Result.class,
"result");
+
+ RemoteClient(final Result<RequestHandler> result, final RequestHandler
requestHandler) {
+ this.result = result;
+ this.requestHandler = requestHandler;
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ Result<RequestHandler> takeResult() {
+ return updater.getAndSet(this, null);
+ }
+
+ public RequestHandler getRequestHandler() {
+ return result == null ? requestHandler : null;
+ }
+}
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/SendingByteOutput.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/SendingByteOutput.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/SendingByteOutput.java 2009-09-16
17:44:19 UTC (rev 5501)
@@ -0,0 +1,165 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import org.jboss.marshalling.ByteOutput;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.channels.WritableMessageChannel;
+
+final class SendingByteOutput extends OutputStream implements ByteOutput {
+ private ByteBuffer buffer;
+ private final BufferAllocator<ByteBuffer> allocator;
+ private final WritableMessageChannel channel;
+ private int windowSize;
+ private boolean closed;
+ private IOException remoteException;
+
+ public SendingByteOutput(final WritableMessageChannel channel, final
BufferAllocator<ByteBuffer> allocator, final int windowSize) {
+ this.channel = channel;
+ this.allocator = allocator;
+ this.windowSize = windowSize;
+ }
+
+ private static IOException closed() {
+ return new IOException("Stream is closed");
+ }
+
+ private void checkClosed() throws IOException {
+ if (closed) {
+ throw closed();
+ }
+ if (remoteException != null) {
+ closed = true;
+ throw remoteException;
+ }
+ }
+
+ public void write(final int b) throws IOException {
+ synchronized (this) {
+ checkClosed();
+ final ByteBuffer buffer = this.buffer;
+ if (buffer == null) {
+ this.buffer = allocator.allocate();
+ } else if (! buffer.hasRemaining()) {
+ send();
+ this.buffer = allocator.allocate();
+ }
+ buffer.put((byte) b);
+ }
+ }
+
+ public void write(final byte[] b, int off, int len) throws IOException {
+ synchronized (this) {
+ checkClosed();
+ while (len > 0) {
+ ByteBuffer buffer = this.buffer;
+ if (buffer == null) {
+ buffer = this.buffer = allocator.allocate();
+ } else if (! buffer.hasRemaining()) {
+ send();
+ buffer = this.buffer = allocator.allocate();
+ }
+ final int cnt = Math.min(len, buffer.remaining());
+ buffer.put(b, off, cnt);
+ len -= cnt;
+ off += cnt;
+
+ }
+ }
+ }
+
+ // call with lock held
+ private void send() throws IOException {
+ while (windowSize == 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ buffer = null;
+ closed = true;
+ Thread.currentThread().interrupt();
+ throw new ClosedByInterruptException();
+ }
+ checkClosed();
+ }
+ final ByteBuffer buffer = this.buffer;
+ this.buffer = null;
+ buffer.flip();
+ final WritableMessageChannel channel = this.channel;
+ try {
+ while (! (channel.send(buffer))) {
+ channel.awaitWritable();
+ checkClosed();
+ if (Thread.currentThread().isInterrupted()) {
+ closed = true;
+ throw new ClosedByInterruptException();
+ }
+ }
+ } finally {
+ allocator.free(buffer);
+ }
+ windowSize--;
+ }
+
+ void acknowledge() {
+ synchronized (this) {
+ windowSize++;
+ notify();
+ }
+ }
+
+ public void flush() throws IOException {
+ synchronized (this) {
+ final ByteBuffer buffer = this.buffer;
+ if (buffer != null && buffer.position() > 0) {
+ send();
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ synchronized (this) {
+ if (closed) {
+ return;
+ }
+ final ByteBuffer buffer = this.buffer;
+ if (buffer != null && buffer.position() > 0) {
+ send();
+ }
+ closed = true;
+ }
+ }
+
+ void abort() {
+ synchronized (this) {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ notifyAll();
+ }
+ }
+}
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/package-info.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/package-info.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/package-info.java 2009-09-16
17:44:19 UTC (rev 5501)
@@ -0,0 +1,26 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+/**
+ * The Remoting 3 multiplex protocol implementation.
+ */
+package org.jboss.remoting3.multiplex;
\ No newline at end of file