Author: david.lloyd(a)jboss.com
Date: 2009-11-19 17:14:53 -0500 (Thu, 19 Nov 2009)
New Revision: 5600
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConnectionConfiguration.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/Protocol.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteConnectionHandler.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteConnectionHandlerFactory.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteConnectionProvider.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteConnectionProviderFactory.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteDescriptor.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteProviderDescriptor.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteRequestHandler.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteServerFactory.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/Simplaphore.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/SubchannelHandler.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/UnforwardedRequestHandlerConnector.java
Removed:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConnectionConfiguration.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/InitialReadHandlerImpl.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MarshallingProtocol.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MessageType.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionHandler.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionHandlerFactory.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionProvider.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionProviderFactory.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexOptions.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRemoteRequestContext.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandler.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandlerConnector.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexServerFactory.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReadHandlerImpl.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReceivingByteInput.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/SendingByteOutput.java
Modified:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConfigParam.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/EstablishedConnection.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/Loggers.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/PermitManager.java
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteClient.java
Log:
It lives
Modified:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConfigParam.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConfigParam.java 2009-11-18
00:28:44 UTC (rev 5599)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConfigParam.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -10,8 +10,27 @@
MAX_TRANSMIT_SIZE,
MAX_RECEIVE_WINDOW_SIZE,
MAX_TRANSMIT_WINDOW_SIZE,
- MAX_OUTBOUND_REQUESTS,
- MAX_INBOUND_REQUESTS,
- MAX_OUTBOUND_CLIENTS,
- MAX_INBOUND_CLIENTS,
+ MAX_OUTBOUND_CHANNELS,
+ MAX_INBOUND_CHANNELS,
+
+ /**
+ * Argument: names
+ */
+ CLASS_RESOLVER,
+ /**
+ * Argument: names
+ */
+ OBJECT_RESOLVERS,
+ /**
+ * Argument: names
+ */
+ CLASS_TABLES,
+ /**
+ * Argument: names
+ */
+ OBJECT_TABLES,
+ /**
+ * Argument: int versionCnt + int[] versions + name
+ */
+ MARSHALLERS,
}
Deleted:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConnectionConfiguration.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConnectionConfiguration.java 2009-11-18
00:28:44 UTC (rev 5599)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConnectionConfiguration.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -1,160 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2009, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting3.multiplex;
-
-/**
- * A configuration object for the multiplex protocol.
- */
-final class ConnectionConfiguration implements Cloneable {
-
- private int linkMetric = 100;
- private int maximumReceiveSize = 0x200;
- private int maximumTransmitSize = 0x200;
- private int receiveWindowSize = 5;
- private int transmitWindowSize = 5;
- private int maximumOutboundClients = 0x10;
- private int maximumInboundClients = 0x10;
- private int maximumOutboundRequests = 0x10;
- private int maximumInboundRequests = 0x10;
- private int maximumOutboundStreams = 0x20;
- private int maximumInboundStreams = 0x20;
-
- /**
- * Construct a new instance.
- */
- public ConnectionConfiguration() {
- }
-
- /**
- * Get the link metric to assign to this multiplex connection.
- *
- * @return the link metric
- */
- public int getLinkMetric() {
- return linkMetric;
- }
-
- /**
- * Set the link metric to assign to this multiplex connection.
- *
- * @param linkMetric the link metric
- */
- public void setLinkMetric(final int linkMetric) {
- this.linkMetric = linkMetric;
- }
-
-
- public int getMaximumReceiveSize() {
- return maximumReceiveSize;
- }
-
- public void setMaximumReceiveSize(final int maximumReceiveSize) {
- this.maximumReceiveSize = maximumReceiveSize;
- }
-
- public int getMaximumTransmitSize() {
- return maximumTransmitSize;
- }
-
- public void setMaximumTransmitSize(final int maximumTransmitSize) {
- this.maximumTransmitSize = maximumTransmitSize;
- }
-
- public int getReceiveWindowSize() {
- return receiveWindowSize;
- }
-
- public void setReceiveWindowSize(final int receiveWindowSize) {
- this.receiveWindowSize = receiveWindowSize;
- }
-
- public int getTransmitWindowSize() {
- return transmitWindowSize;
- }
-
- public void setTransmitWindowSize(final int transmitWindowSize) {
- this.transmitWindowSize = transmitWindowSize;
- }
-
- public int getMaximumOutboundClients() {
- return maximumOutboundClients;
- }
-
- public void setMaximumOutboundClients(final int maximumOutboundClients) {
- this.maximumOutboundClients = maximumOutboundClients;
- }
-
- public int getMaximumInboundClients() {
- return maximumInboundClients;
- }
-
- public void setMaximumInboundClients(final int maximumInboundClients) {
- this.maximumInboundClients = maximumInboundClients;
- }
-
- public int getMaximumOutboundRequests() {
- return maximumOutboundRequests;
- }
-
- public void setMaximumOutboundRequests(final int maximumOutboundRequests) {
- this.maximumOutboundRequests = maximumOutboundRequests;
- }
-
- public int getMaximumInboundRequests() {
- return maximumInboundRequests;
- }
-
- public void setMaximumInboundRequests(final int maximumInboundRequests) {
- this.maximumInboundRequests = maximumInboundRequests;
- }
-
- public int getMaximumOutboundStreams() {
- return maximumOutboundStreams;
- }
-
- public void setMaximumOutboundStreams(final int maximumOutboundStreams) {
- this.maximumOutboundStreams = maximumOutboundStreams;
- }
-
- public int getMaximumInboundStreams() {
- return maximumInboundStreams;
- }
-
- public void setMaximumInboundStreams(final int maximumInboundStreams) {
- this.maximumInboundStreams = maximumInboundStreams;
- }
-
- /**
- * Clone this configuration.
- *
- * @return the cloned configuration
- */
- public ConnectionConfiguration clone() {
- try {
- final ConnectionConfiguration clone = (ConnectionConfiguration)
super.clone();
- return clone;
- } catch (CloneNotSupportedException e) {
- throw new RuntimeException(e);
- }
- }
-}
\ No newline at end of file
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConnectionConfiguration.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConnectionConfiguration.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ConnectionConfiguration.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -0,0 +1,275 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import org.jboss.marshalling.ClassResolver;
+import org.jboss.marshalling.ClassTable;
+import org.jboss.marshalling.ObjectTable;
+import org.jboss.marshalling.ObjectResolver;
+import org.jboss.marshalling.ProviderDescriptor;
+import org.jboss.marshalling.util.IntMap;
+import org.jboss.xnio.OptionMap;
+import org.jboss.xnio.channels.TcpChannel;
+import org.jboss.xnio.channels.WritableMessageChannel;
+
+import java.util.Map;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+final class ConnectionConfiguration {
+
+ private OptionMap optionMap;
+
+ private Map<String, ClassResolver> localClassResolverMap;
+ private Map<String, ClassTable> localClassTableMap;
+ private Map<String, ObjectTable> localObjectTableMap;
+ private Map<String, ProviderDescriptor> localMarshallerProviderDescriptorMap;
+ private Map<String, ObjectResolver> localObjectResolverMap;
+
+ private List<ClassResolver> localClassResolvers;
+ private List<ClassTable> localClassTables;
+ private List<ObjectTable> localObjectTables;
+ private List<ProviderDescriptor> localMarshallerProviderDescriptors;
+ private List<ObjectResolver> localObjectResolvers;
+
+ private IntMap<String> remoteClassResolvers;
+ private IntMap<String> remoteClassTables;
+ private IntMap<String> remoteObjectTables;
+ private Map<String, RemoteProviderDescriptor>
remoteMarshallerProviderDescriptors;
+ private IntMap<String> remoteObjectResolvers;
+
+ private int configuredReceiveSize;
+ private int configuredTransmitSize;
+ private int configuredReceiveWindowSize;
+ private int configuredTransmitWindowSize;
+ private int configuredInboundChannels;
+ private int configuredOutboundChannels;
+
+ private TcpChannel channel;
+ private WritableMessageChannel messageChannel;
+ private Executor executor;
+
+ public OptionMap getOptionMap() {
+ return optionMap;
+ }
+
+ public void setOptionMap(final OptionMap optionMap) {
+ this.optionMap = optionMap;
+ }
+
+ public Map<String, ClassResolver> getLocalClassResolverMap() {
+ return localClassResolverMap;
+ }
+
+ public void setLocalClassResolverMap(final Map<String, ClassResolver>
localClassResolverMap) {
+ this.localClassResolverMap = localClassResolverMap;
+ }
+
+ public Map<String, ClassTable> getLocalClassTableMap() {
+ return localClassTableMap;
+ }
+
+ public void setLocalClassTableMap(final Map<String, ClassTable>
localClassTableMap) {
+ this.localClassTableMap = localClassTableMap;
+ }
+
+ public Map<String, ObjectTable> getLocalObjectTableMap() {
+ return localObjectTableMap;
+ }
+
+ public void setLocalObjectTableMap(final Map<String, ObjectTable>
localObjectTableMap) {
+ this.localObjectTableMap = localObjectTableMap;
+ }
+
+ public Map<String, ProviderDescriptor>
getLocalMarshallerProviderDescriptorMap() {
+ return localMarshallerProviderDescriptorMap;
+ }
+
+ public void setLocalMarshallerProviderDescriptorMap(final Map<String,
ProviderDescriptor> localMarshallerProviderDescriptorMap) {
+ this.localMarshallerProviderDescriptorMap =
localMarshallerProviderDescriptorMap;
+ }
+
+ public Map<String, ObjectResolver> getLocalObjectResolverMap() {
+ return localObjectResolverMap;
+ }
+
+ public void setLocalObjectResolverMap(final Map<String, ObjectResolver>
localObjectResolverMap) {
+ this.localObjectResolverMap = localObjectResolverMap;
+ }
+
+ public List<ClassResolver> getLocalClassResolvers() {
+ return localClassResolvers;
+ }
+
+ public void setLocalClassResolvers(final List<ClassResolver>
localClassResolvers) {
+ this.localClassResolvers = localClassResolvers;
+ }
+
+ public List<ClassTable> getLocalClassTables() {
+ return localClassTables;
+ }
+
+ public void setLocalClassTables(final List<ClassTable> localClassTables) {
+ this.localClassTables = localClassTables;
+ }
+
+ public List<ObjectTable> getLocalObjectTables() {
+ return localObjectTables;
+ }
+
+ public void setLocalObjectTables(final List<ObjectTable> localObjectTables) {
+ this.localObjectTables = localObjectTables;
+ }
+
+ public List<ProviderDescriptor> getLocalMarshallerProviderDescriptors() {
+ return localMarshallerProviderDescriptors;
+ }
+
+ public void setLocalMarshallerProviderDescriptors(final
List<ProviderDescriptor> localMarshallerProviderDescriptors) {
+ this.localMarshallerProviderDescriptors = localMarshallerProviderDescriptors;
+ }
+
+ public List<ObjectResolver> getLocalObjectResolvers() {
+ return localObjectResolvers;
+ }
+
+ public void setLocalObjectResolvers(final List<ObjectResolver>
localObjectResolvers) {
+ this.localObjectResolvers = localObjectResolvers;
+ }
+
+ public IntMap<String> getRemoteClassResolvers() {
+ return remoteClassResolvers;
+ }
+
+ public void setRemoteClassResolvers(final IntMap<String> remoteClassResolvers)
{
+ this.remoteClassResolvers = remoteClassResolvers;
+ }
+
+ public IntMap<String> getRemoteClassTables() {
+ return remoteClassTables;
+ }
+
+ public void setRemoteClassTables(final IntMap<String> remoteClassTables) {
+ this.remoteClassTables = remoteClassTables;
+ }
+
+ public IntMap<String> getRemoteObjectTables() {
+ return remoteObjectTables;
+ }
+
+ public void setRemoteObjectTables(final IntMap<String> remoteObjectTables) {
+ this.remoteObjectTables = remoteObjectTables;
+ }
+
+ public Map<String, RemoteProviderDescriptor>
getRemoteMarshallerProviderDescriptors() {
+ return remoteMarshallerProviderDescriptors;
+ }
+
+ public void setRemoteMarshallerProviderDescriptors(final Map<String,
RemoteProviderDescriptor> remoteMarshallerProviderDescriptors) {
+ this.remoteMarshallerProviderDescriptors = remoteMarshallerProviderDescriptors;
+ }
+
+ public IntMap<String> getRemoteObjectResolvers() {
+ return remoteObjectResolvers;
+ }
+
+ public void setRemoteObjectResolvers(final IntMap<String>
remoteObjectResolvers) {
+ this.remoteObjectResolvers = remoteObjectResolvers;
+ }
+
+ public int getConfiguredReceiveSize() {
+ return configuredReceiveSize;
+ }
+
+ public void setConfiguredReceiveSize(final int configuredReceiveSize) {
+ this.configuredReceiveSize = configuredReceiveSize;
+ }
+
+ public int getConfiguredTransmitSize() {
+ return configuredTransmitSize;
+ }
+
+ public void setConfiguredTransmitSize(final int configuredTransmitSize) {
+ this.configuredTransmitSize = configuredTransmitSize;
+ }
+
+ public int getConfiguredReceiveWindowSize() {
+ return configuredReceiveWindowSize;
+ }
+
+ public void setConfiguredReceiveWindowSize(final int configuredReceiveWindowSize) {
+ this.configuredReceiveWindowSize = configuredReceiveWindowSize;
+ }
+
+ public int getConfiguredTransmitWindowSize() {
+ return configuredTransmitWindowSize;
+ }
+
+ public void setConfiguredTransmitWindowSize(final int configuredTransmitWindowSize)
{
+ this.configuredTransmitWindowSize = configuredTransmitWindowSize;
+ }
+
+ public int getConfiguredInboundChannels() {
+ return configuredInboundChannels;
+ }
+
+ public void setConfiguredInboundChannels(final int configuredInboundChannels) {
+ this.configuredInboundChannels = configuredInboundChannels;
+ }
+
+ public int getConfiguredOutboundChannels() {
+ return configuredOutboundChannels;
+ }
+
+ public void setConfiguredOutboundChannels(final int configuredOutboundChannels) {
+ this.configuredOutboundChannels = configuredOutboundChannels;
+ }
+
+ public TcpChannel getChannel() {
+ return channel;
+ }
+
+ public void setChannel(final TcpChannel channel) {
+ this.channel = channel;
+ }
+
+ public WritableMessageChannel getMessageChannel() {
+ return messageChannel;
+ }
+
+ public void setMessageChannel(final WritableMessageChannel messageChannel) {
+ this.messageChannel = messageChannel;
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+
+ public void setExecutor(final Executor executor) {
+ this.executor = executor;
+ }
+
+ EstablishedConnection create() {
+ return null;
+ }
+}
Modified:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/EstablishedConnection.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/EstablishedConnection.java 2009-11-18
00:28:44 UTC (rev 5599)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/EstablishedConnection.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -22,93 +22,35 @@
package org.jboss.remoting3.multiplex;
-import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicReferenceArray;
-import java.util.Iterator;
-import java.util.List;
-import org.jboss.marshalling.ClassTable;
-import org.jboss.marshalling.ObjectTable;
-import org.jboss.marshalling.ClassResolver;
-import org.jboss.marshalling.ObjectResolver;
-import org.jboss.marshalling.ClassExternalizerFactory;
-import org.jboss.marshalling.util.IntMap;
-import org.jboss.remoting3.IndeterminateOutcomeException;
-import org.jboss.remoting3.spi.AbstractHandleableCloseable;
-import org.jboss.remoting3.spi.Cancellable;
-import org.jboss.remoting3.spi.ConnectionHandler;
-import org.jboss.remoting3.spi.RemoteRequestContext;
-import org.jboss.remoting3.spi.ReplyHandler;
-import org.jboss.remoting3.spi.RequestHandler;
-import org.jboss.remoting3.spi.RequestHandlerConnector;
-import org.jboss.remoting3.spi.Result;
-import org.jboss.remoting3.spi.SpiUtils;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.Buffers;
-import org.jboss.xnio.IoFuture;
-import org.jboss.xnio.IoUtils;
-import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.remoting3.RemotingOptions;
+import org.jboss.remoting3.spi.ConnectionHandlerContext;
+import org.jboss.xnio.OptionMap;
+import org.jboss.xnio.channels.WritableMessageChannel;
+import org.jboss.xnio.channels.Channels;
import org.jboss.xnio.log.Logger;
/**
*
*/
-final class EstablishedConnection extends AbstractHandleableCloseable implements
ConnectionHandler {
+final class EstablishedConnection {
private static final Logger log = Loggers.MAIN_LOGGER;
//--== Connection configuration items ==--
private final Executor executor;
- // buffer allocator for outbound message assembly
- private final BufferAllocator<ByteBuffer> allocator;
- // remotely available marshalling protocol index
- private final IntMap<String> remoteMarshallingProtocols;
- // locally available marshalling protocol index
- private final List<MarshallingProtocol> localMarshallingProtocols;
- // remotely available class table index
- private final IntMap<String> remoteClassTables;
- // locally available class table index
- private final List<ClassTable> localClassTables;
- // remotely available object table index
- private final IntMap<String> remoteObjectTables;
- // locally available object table index
- private final List<ObjectTable> localObjectTables;
- // Remotely available class resolvers
- private final IntMap<String> remoteClassResolvers;
- // locally available class resolvers
- private final List<ClassResolver> localClassResolvers;
- // remotely available object resolvers
- private final IntMap<String> remoteObjectResolvers;
- // locally available object resolvers
- private final List<ObjectResolver> localObjectResolvers;
- // remotely available externalizer factories
- private final IntMap<String> remoteExternalizerFactories;
- // locally available externalizer factories
- private final List<ClassExternalizerFactory> localExternalizerFactories;
- // running on remote node
- private final AtomicReferenceArray<ReplyHandler> remoteRequests;
- // sequence for outbound remote requests
- private final PermitManager requestPermits;
+ private final ConnectionConfiguration configuration;
- // running on local node (key comes from remote side)
- private final AtomicReferenceArray<RemoteRequestContext> localRequests;
+ private final WritableMessageChannel channel;
- // clients whose requests get forwarded to the remote side
- private final AtomicReferenceArray<RemoteClient> remoteClients;
- // sequence for clients whose requests get forwarded to the remote side
- // LOW BIT == 0
- private final PermitManager remoteClientPermits;
+ // permit manager for channels opened from the local side
+ private final PermitManager outboundPermits;
- // clients whose requests are handled on this side (key comes from remote side)
- private final AtomicReferenceArray<RequestHandler> requestedClients;
- // LOW BIT == 1;
- private final PermitManager forwardedClientPermits;
+ // permit manager for local outbound clients
+ private final PermitManager outboundClientPermits;
- // the data channel
- private final AllocatedMessageChannel channel;
- // the local connection handler
- private final ConnectionHandler localConnectionHandler;
+ // the local connection handler context
+ private final ConnectionHandlerContext connectionContext;
private final int transmitWindowSize;
private final int receiveWindowSize;
@@ -117,26 +59,20 @@
private static final ThreadLocal<EstablishedConnection> currentConnection = new
ThreadLocal<EstablishedConnection>();
- public EstablishedConnection(final AllocatedMessageChannel channel, final Executor
executor, final ConnectionConfiguration configuration, final ConnectionHandler
localConnectionHandler) {
- super(executor);
- this.channel = channel;
- linkMetric = configuration.getLinkMetric();
- this.executor = executor;
- allocator =
Buffers.createHeapByteBufferAllocator(configuration.getMaximumTransmitSize());
- final int maximumInboundRequests = configuration.getMaximumInboundRequests();
- remoteRequests = new
AtomicReferenceArray<ReplyHandler>(maximumInboundRequests);
- requestPermits = new PermitManager(maximumInboundRequests);
- final int maximumOutboundRequests = configuration.getMaximumOutboundRequests();
- localRequests = new
AtomicReferenceArray<RemoteRequestContext>(maximumOutboundRequests);
- final int maximumOutboundClients = configuration.getMaximumOutboundClients();
- remoteClients = new
AtomicReferenceArray<RemoteClient>(maximumOutboundClients);
- remoteClientPermits = new PermitManager(maximumOutboundClients);
- final int maximumInboundClients = configuration.getMaximumInboundClients();
- requestedClients = new
AtomicReferenceArray<RequestHandler>(maximumInboundClients);
- forwardedClientPermits = new PermitManager(maximumInboundClients);
- transmitWindowSize = configuration.getTransmitWindowSize();
- receiveWindowSize = configuration.getReceiveWindowSize();
- this.localConnectionHandler = localConnectionHandler;
+ EstablishedConnection(final ConnectionConfiguration configuration, final
ConnectionHandlerContext connectionContext) {
+ this.configuration = configuration;
+ this.connectionContext = connectionContext;
+
+ final OptionMap optionMap = configuration.getOptionMap();
+
+ channel = Channels.createMessageWriter(configuration.getChannel(), optionMap);
+ linkMetric = optionMap.get(RemotingOptions.METRIC, 1000);
+ executor = configuration.getExecutor();
+ transmitWindowSize = optionMap.get(RemotingOptions.TRANSMIT_WINDOW_SIZE, 4);
+ receiveWindowSize = optionMap.get(RemotingOptions.RECEIVE_WINDOW_SIZE, 4);
+ outboundPermits = new
PermitManager(configuration.getConfiguredOutboundChannels());
+ // todo
+ outboundClientPermits = new PermitManager(100);
}
static EstablishedConnection getCurrent() {
@@ -151,15 +87,12 @@
}
void clearCurrent() {
- if (currentConnection.get() != this) {
- throw new IllegalStateException("clearCurrent() from wrong
context");
+ if (currentConnection.get() == this) {
+ currentConnection.set(null);
}
- currentConnection.set(null);
}
- // sequence methods
-
- protected Executor getExecutor() {
+ Executor getExecutor() {
return executor;
}
@@ -167,91 +100,23 @@
return linkMetric;
}
- BufferAllocator<ByteBuffer> getAllocator() {
- return allocator;
+ ConnectionHandlerContext getConnectionContext() {
+ return connectionContext;
}
- AllocatedMessageChannel getChannel() {
- return channel;
- }
-
- ConnectionHandler getLocalConnectionHandler() {
- return localConnectionHandler;
- }
-
- private static final IoFuture.Notifier<RequestHandler,
Result<RequestHandler>> RHS_RESULT_NOTIFIER =
- new IoFuture.HandlingNotifier<RequestHandler,
Result<RequestHandler>>() {
- public void handleCancelled(final Result<RequestHandler> attachment) {
- log.trace("Failed to open remote service (cancelled)");
- attachment.setCancelled();
- }
-
- public void handleFailed(final IOException exception, final
Result<RequestHandler> attachment) {
- log.trace("Failed to open remote service: %s", exception);
- attachment.setException(exception);
- }
-
- public void handleDone(final RequestHandler result, final
Result<RequestHandler> attachment) {
- log.trace("Opened %s", result);
- attachment.setResult(result);
- }
- };
-
- protected void closeAction() {
- // just to make sure...
- IoUtils.safeClose(channel);
- final IndeterminateOutcomeException ioe = new
IndeterminateOutcomeException("The connection was closed");
- // Things running remotely
- for (int i = 0; i < remoteRequests.length(); i ++) {
- SpiUtils.safeHandleException(remoteRequests.getAndSet(i, null), ioe);
- }
- for (RemoteClient x : clearing(remoteClients)) {
- IoUtils.safeClose(x.getRequestHandler());
- }
- // Things running locally
- for (RemoteRequestContext localRequest : clearing(localRequests)) {
- localRequest.cancel();
- }
- }
-
public String toString() {
return "multiplex connection <" + Integer.toHexString(hashCode()) +
"> via " + channel;
}
- public Cancellable open(final String serviceName, final String groupName, final
Result<RequestHandler> result) {
- return null;
+ ConnectionConfiguration getConfiguration() {
+ return configuration;
}
- public RequestHandlerConnector createConnector(final RequestHandler localHandler) {
- return null;
+ WritableMessageChannel getChannel() {
+ return channel;
}
- int getTransmitWindowSize() {
- return transmitWindowSize;
+ PermitManager.Permit nextRemoteClient() {
+ return outboundClientPermits.acquire();
}
-
- int getReceiveWindowSize() {
- return receiveWindowSize;
- }
-
- private static <T> Iterable<T> clearing(final
AtomicReferenceArray<T> array) {
- return new Iterable<T>() {
-
- public Iterator<T> iterator() {
- return new Iterator<T>() {
- private int idx;
- public boolean hasNext() {
- return idx < array.length();
- }
-
- public T next() {
- return array.getAndSet(idx++, null);
- }
-
- public void remove() {
- }
- };
- }
- };
- }
-}
\ No newline at end of file
+}
Deleted:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/InitialReadHandlerImpl.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/InitialReadHandlerImpl.java 2009-11-18
00:28:44 UTC (rev 5599)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/InitialReadHandlerImpl.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -1,125 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2009, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting3.multiplex;
-
-import org.jboss.xnio.IoReadHandler;
-import org.jboss.xnio.IoUtils;
-import org.jboss.xnio.channels.AllocatedMessageChannel;
-import org.jboss.remoting3.spi.Result;
-import org.jboss.remoting3.spi.ConnectionHandlerFactory;
-import org.jboss.remoting3.ProtocolException;
-import java.nio.ByteBuffer;
-import java.nio.BufferUnderflowException;
-import java.io.IOException;
-
-/**
- * Read handler for initial (negotiation) phase of connection.
- */
-final class InitialReadHandlerImpl implements
IoReadHandler<AllocatedMessageChannel> {
-
- private final Result<ConnectionHandlerFactory> result;
-
- private static final int GREETING = 0x484a524d;
- private static final int MAX_VERSION = 0;
-
- private final ConnectionConfiguration connectionConfiguration;
-
- InitialReadHandlerImpl(final Result<ConnectionHandlerFactory> result, final
ConnectionConfiguration connectionConfiguration) {
- this.result = result;
- this.connectionConfiguration = connectionConfiguration;
- }
-
- public void handleReadable(final AllocatedMessageChannel channel) {
- try {
- final ByteBuffer buffer = channel.receive();
- if (buffer.getInt() != GREETING) {
- throw new IOException("Incorrect greeting");
- }
- final ConfigParam[] paramIds = ConfigParam.values();
- final ConnectionConfiguration conf = connectionConfiguration;
- final int count = buffer.getShort() & 0xffff;
- for (int i = 0; i < count; i ++) {
- final int id = buffer.get() & 0xff;
- if (id > paramIds.length) {
- continue;
- }
- final ConfigParam configParam = paramIds[id];
- switch (configParam) {
- case MAX_SUPPORTED_VERSION: {
- // since we're version 0, the remote side must support us,
and we support nothing else, so...
- break;
- }
- case MAX_RECEIVE_SIZE: {
-
conf.setMaximumTransmitSize(Math.min(validate(ConfigParam.MAX_RECEIVE_SIZE,
buffer.getInt(), 0x100, 0x100000), conf.getMaximumTransmitSize()));
- break;
- }
- case MAX_TRANSMIT_SIZE: {
-
conf.setMaximumReceiveSize(Math.min(validate(ConfigParam.MAX_TRANSMIT_SIZE,
buffer.getInt(), 0x100, 0x100000), conf.getMaximumReceiveSize()));
- break;
- }
- case MAX_RECEIVE_WINDOW_SIZE: {
-
conf.setTransmitWindowSize(Math.min(validate(ConfigParam.MAX_RECEIVE_WINDOW_SIZE,
buffer.getInt(), 0x2, 0x400), conf.getTransmitWindowSize()));
- break;
- }
- case MAX_TRANSMIT_WINDOW_SIZE: {
-
conf.setReceiveWindowSize(Math.min(validate(ConfigParam.MAX_TRANSMIT_WINDOW_SIZE,
buffer.getInt(), 0x2, 0x400), conf.getReceiveWindowSize()));
- break;
- }
- case MAX_OUTBOUND_REQUESTS: {
-
conf.setMaximumInboundRequests(Math.min(validate(ConfigParam.MAX_OUTBOUND_REQUESTS,
buffer.getInt(), 0, 0x100000), conf.getMaximumInboundRequests()));
- break;
- }
- case MAX_INBOUND_REQUESTS: {
-
conf.setMaximumOutboundRequests(Math.min(validate(ConfigParam.MAX_INBOUND_REQUESTS,
buffer.getInt(), 0, 0x100000), conf.getMaximumOutboundRequests()));
- break;
- }
- case MAX_OUTBOUND_CLIENTS: {
-
conf.setMaximumInboundClients(Math.min(validate(ConfigParam.MAX_OUTBOUND_CLIENTS,
buffer.getInt(), 0, 0x100000), conf.getMaximumInboundClients()));
- break;
- }
- case MAX_INBOUND_CLIENTS: {
-
conf.setMaximumOutboundClients(Math.min(validate(ConfigParam.MAX_INBOUND_CLIENTS,
buffer.getInt(), 0, 0x100000), conf.getMaximumOutboundClients()));
- break;
- }
- default: {
- // should not be possible
- throw new IllegalStateException();
- }
- }
- }
- } catch (IOException e) {
- IoUtils.safeClose(channel);
- result.setException(e);
- } catch (BufferUnderflowException e) {
- IoUtils.safeClose(channel);
- result.setException(new ProtocolException("Truncated greeting
packet"));
- }
- }
-
- private static int validate(Object field, int val, int min, int max) throws
ProtocolException {
- if (val >= min && val <= max) {
- return val;
- }
- throw new ProtocolException("Value of " + field + " is out of
range (expected " + min + " <= n <= " + max + ", got " +
val + ")");
- }
-}
Modified:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/Loggers.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/Loggers.java 2009-11-18
00:28:44 UTC (rev 5599)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/Loggers.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -32,6 +32,6 @@
private static final String MAIN_LOGGER_NAME =
"org.jboss.remoting.multiplex";
private static final String REQUEST_HANDLER_LOGGER_NAME = MAIN_LOGGER_NAME +
".request-handler";
+ static final Logger MAIN_LOGGER = Logger.getLogger(MAIN_LOGGER_NAME);
static final Logger REQUEST_HANDLER_LOGGER =
Logger.getLogger(REQUEST_HANDLER_LOGGER_NAME);
- static final Logger MAIN_LOGGER = Logger.getLogger(MAIN_LOGGER_NAME);
}
Deleted:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MarshallingProtocol.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MarshallingProtocol.java 2009-11-18
00:28:44 UTC (rev 5599)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MarshallingProtocol.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -1,99 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2009, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting3.multiplex;
-
-import org.jboss.marshalling.Marshaller;
-import org.jboss.marshalling.Unmarshaller;
-import org.jboss.marshalling.ClassTable;
-import org.jboss.marshalling.ObjectTable;
-import org.jboss.marshalling.ClassExternalizerFactory;
-import org.jboss.marshalling.ClassResolver;
-import org.jboss.marshalling.ObjectResolver;
-import org.jboss.xnio.Pool;
-
-/**
- * A registered marshalling protocol.
- *
- * @remoting.implement
- */
-public interface MarshallingProtocol {
-
- /**
- * Get a configured unmarshaller pool.
- *
- * @param configuration the configuration to use
- * @return the pool
- */
- Pool<Unmarshaller> getUnmarshallerPool(Configuration configuration);
-
- /**
- * Get a configured marshaller pool.
- *
- * @param configuration the configuration to use
- * @return the pool
- */
- Pool<Marshaller> getMarshallerPool(Configuration configuration);
-
- /**
- * The configuration for a marshalling protocol.
- *
- * @remoting.consume
- */
- interface Configuration {
-
- /**
- * Get a user class table, if any.
- *
- * @return the user class table or {@code null} if none is configured
- */
- ClassTable getUserClassTable();
-
- /**
- * Get a user object table, if any.
- *
- * @return the user object table or {@code null} if none is configured
- */
- ObjectTable getUserObjectTable();
-
- /**
- * Get a user externalizer factory, if any.
- *
- * @return the user externalizer factory
- */
- ClassExternalizerFactory getUserExternalizerFactory();
-
- /**
- * Get a user class resolver, if any.
- *
- * @return the user class resolver
- */
- ClassResolver getUserClassResolver();
-
- /**
- * Get a user object resolver, if any.
- *
- * @return the user object resolver
- */
- ObjectResolver getUserObjectResolver();
- }
-}
Deleted:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MessageType.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MessageType.java 2009-11-18
00:28:44 UTC (rev 5599)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MessageType.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -1,73 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2009, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting3.multiplex;
-
-final class MessageType {
- private MessageType() {}
-
- public static final int FIRST = 0x80;
- public static final int LAST = 0x40;
-
- // Stream message data types
-
- public static final int DATA = 0x00;
- public static final int CANCEL = 0x01;
- public static final int FINAL = 0x02;
-
- /**
- * Request - multi-packet
- */
- public static final int REQUEST = 0x00;
- public static final int REPLY = 0x01;
- public static final int REQUEST_RECEIVE_FAILED = 0x02;
- public static final int REQUEST_FAILED = 0x03;
-
- /**
- * Request receiver aborts transmission.
- */
- public static final int REMOTE_REQUEST_ABORT = 0x10;
- public static final int REMOTE_REPLY_ABORT = 0x11;
- public static final int REMOTE_REQUEST_RECEIVE_FAILED_ABORT = 0x12;
- public static final int REMOTE_REQUEST_FAILED_ABORT = 0x13;
-
- /**
- * Request transmitter aborts transmission.
- */
- public static final int REQUEST_ABORT = 0x20;
- public static final int REPLY_ABORT = 0x21;
- public static final int REQUEST_RECEIVE_FAILED_ABORT = 0x22;
- public static final int REQUEST_FAILED_ABORT = 0x23;
-
- public static final int REQUEST_CANCEL = 0x30;
- public static final int REQUEST_CANCEL_ACK = 0x31;
- public static final int CLIENT_OPEN = 0x32;
- public static final int CLIENT_OPEN_CANCEL = 0x33;
- public static final int CLIENT_OPEN_ACK = 0x34;
- public static final int CLIENT_OPEN_CANCEL_ACK = 0x35;
- public static final int CLIENT_NOT_FOUND = 0x36;
- public static final int CLIENT_REMOTE_CLOSE = 0x37;
- public static final int CLIENT_CLOSE = 0x38;
-
-
-
-}
Deleted:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionHandler.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionHandler.java 2009-11-18
00:28:44 UTC (rev 5599)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionHandler.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -1,119 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2009, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting3.multiplex;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.jboss.remoting3.spi.Cancellable;
-import org.jboss.remoting3.spi.ConnectionHandler;
-import org.jboss.remoting3.spi.RequestHandler;
-import org.jboss.remoting3.spi.RequestHandlerConnector;
-import org.jboss.remoting3.spi.Result;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.channels.AllocatedMessageChannel;
-import org.jboss.xnio.channels.Channels;
-import org.jboss.xnio.log.Logger;
-
-final class MultiplexConnectionHandler implements ConnectionHandler {
- private static final Logger log =
Logger.getLogger("org.jboss.remoting.multiplex");
-
- private final EstablishedConnection establishedConnection;
- private final Charset charset = Charset.forName("utf-8");
-
- MultiplexConnectionHandler(final EstablishedConnection establishedConnection) {
- this.establishedConnection = establishedConnection;
- }
-
- public Cancellable open(final String serviceName, final String groupName, final
Result<RequestHandler> result) {
- final EstablishedConnection establishedConnection = this.establishedConnection;
- final PermitManager.Permit permit = establishedConnection.nextRemoteClient();
- boolean ok = false;
- try {
- final int id = permit.getId();
- final RemoteClient remoteClient = new RemoteClient(result, new
MultiplexRequestHandler(id, establishedConnection));
- establishedConnection.addRemoteClient(id, remoteClient);
- try {
- final BufferAllocator<ByteBuffer> allocator =
establishedConnection.getAllocator();
- final ByteBuffer buffer = allocator.allocate();
- final AllocatedMessageChannel channel =
establishedConnection.getChannel();
- buffer.put((byte) MessageType.CLIENT_OPEN);
- buffer.putInt(id);
- final byte[] serviceNameBytes = serviceName.getBytes(charset);
- buffer.putShort((short) serviceNameBytes.length);
- buffer.put(serviceNameBytes);
- final byte[] groupNameBytes = groupName.getBytes(charset);
- buffer.putShort((short) groupNameBytes.length);
- buffer.put(groupNameBytes);
- buffer.flip();
- try {
- Channels.sendBlocking(channel, buffer);
- } catch (IOException e) {
- result.setException(e);
- }
- ok = true;
- return new Cancellable() {
- private final AtomicBoolean cancelled = new AtomicBoolean();
-
- public void cancel() {
- if (cancelled.getAndSet(true)) {
- // cancel already sent
- return;
- }
- if (remoteClient.getRequestHandler() != null) {
- // already done; don't waste the bandwidth
- return;
- }
- final ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.CLIENT_OPEN_CANCEL);
- buffer.putInt(id);
- buffer.flip();
- try {
- Channels.sendBlocking(channel, buffer);
- } catch (IOException e) {
- log.trace("Sending a request to cancel a client open
failed");
- }
- }
- };
- } finally {
- if (! ok) {
- establishedConnection.removeRemoteClient(id);
- }
- }
- } finally {
- if (! ok) {
- permit.release();
- }
- }
- }
-
- public RequestHandlerConnector createConnector(final RequestHandler localHandler) {
- final int id = establishedConnection.addLocalClient(localHandler);
- return null;
- }
-
- public void close() throws IOException {
- establishedConnection.close();
- }
-}
Deleted:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionHandlerFactory.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionHandlerFactory.java 2009-11-18
00:28:44 UTC (rev 5599)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionHandlerFactory.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -1,33 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2009, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting3.multiplex;
-
-import org.jboss.remoting3.spi.ConnectionHandlerFactory;
-import org.jboss.remoting3.spi.ConnectionHandler;
-
-final class MultiplexConnectionHandlerFactory implements ConnectionHandlerFactory {
-
- public ConnectionHandler createInstance(final ConnectionHandler
localConnectionHandler) {
- return new MultiplexConnectionHandler(localConnectionHandler);
- }
-}
Deleted:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionProvider.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionProvider.java 2009-11-18
00:28:44 UTC (rev 5599)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionProvider.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -1,100 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2009, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting3.multiplex;
-
-import org.jboss.remoting3.spi.ConnectionProvider;
-import org.jboss.remoting3.spi.Cancellable;
-import org.jboss.remoting3.spi.ConnectionHandlerFactory;
-import org.jboss.remoting3.spi.Result;
-import org.jboss.remoting3.spi.SpiUtils;
-import org.jboss.remoting3.spi.ConnectionProviderContext;
-import org.jboss.xnio.OptionMap;
-import org.jboss.xnio.FutureConnection;
-import org.jboss.xnio.IoFuture;
-import org.jboss.xnio.IoHandlerFactory;
-import org.jboss.xnio.Connector;
-import org.jboss.xnio.channels.AllocatedMessageChannel;
-import java.net.URI;
-import java.net.InetSocketAddress;
-import java.io.IOException;
-
-final class MultiplexConnectionProvider implements
ConnectionProvider<MultiplexServerFactory> {
-
- private final Connector<InetSocketAddress, AllocatedMessageChannel> connector;
- private final ConnectionProviderContext context;
- private final IoHandlerFactory<AllocatedMessageChannel> handlerFactory;
- private final MultiplexServerFactory providerInterface = new MultiplexServerFactory()
{
- public IoHandlerFactory<AllocatedMessageChannel> getHandlerFactory() {
- return handlerFactory;
- }
- };
-
- public MultiplexConnectionProvider(final Connector<InetSocketAddress,
AllocatedMessageChannel> connector, final ConnectionProviderContext context) {
- this.connector = connector;
- this.context = context;
- handlerFactory = null;
- }
-
- public Cancellable connect(final URI uri, final OptionMap connectOptions, final
Result<ConnectionHandlerFactory> result) throws IllegalArgumentException {
- final String host = uri.getHost();
- final int port = uri.getPort();
- if (port == -1) {
- throw new IllegalArgumentException("A valid port number must be
explicitly specified");
- }
- // TODO - change this to use async DNS via XNIO DNS
- final FutureConnection<InetSocketAddress, AllocatedMessageChannel>
connection = connector.connectTo(new InetSocketAddress(host, port), null);
- connection.addNotifier(new IoFuture.HandlingNotifier<AllocatedMessageChannel,
Result<ConnectionHandlerFactory>>() {
- public void handleFailed(final IOException exception, final
Result<ConnectionHandlerFactory> attachment) {
- attachment.setException(exception);
- }
-
- public void handleDone(final AllocatedMessageChannel result, final
Result<ConnectionHandlerFactory> attachment) {
- attachment.setResult(new MultiplexConnectionHandlerFactory());
- }
-
- public void handleCancelled(final Result<ConnectionHandlerFactory>
attachment) {
- attachment.setCancelled();
- }
- }, result);
- return SpiUtils.cancellable(connection);
- }
-
- /**
- * Get the server factory.
- *
- * @return the server factory
- */
- public MultiplexServerFactory getProviderInterface() {
- return providerInterface;
- }
-
- /**
- * Get the server handler for this connection provider.
- *
- * @return the server handler factory
- */
- public IoHandlerFactory<AllocatedMessageChannel> getHandlerFactory() {
- // todo - should this be secured somehow?
- return handlerFactory;
- }
-}
Deleted:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionProviderFactory.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionProviderFactory.java 2009-11-18
00:28:44 UTC (rev 5599)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionProviderFactory.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -1,42 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2009, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting3.multiplex;
-
-import org.jboss.remoting3.spi.ConnectionProviderFactory;
-import org.jboss.remoting3.spi.ConnectionProvider;
-import org.jboss.remoting3.spi.ConnectionProviderContext;
-import org.jboss.xnio.Connector;
-import org.jboss.xnio.channels.AllocatedMessageChannel;
-import java.net.InetSocketAddress;
-
-public final class MultiplexConnectionProviderFactory implements
ConnectionProviderFactory<MultiplexServerFactory> {
- private final Connector<InetSocketAddress, AllocatedMessageChannel> connector;
-
- public MultiplexConnectionProviderFactory(final Connector<InetSocketAddress,
AllocatedMessageChannel> connector) {
- this.connector = connector;
- }
-
- public ConnectionProvider<MultiplexServerFactory> createInstance(final
ConnectionProviderContext context) {
- return new MultiplexConnectionProvider(connector, context);
- }
-}
Deleted:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexOptions.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexOptions.java 2009-11-18
00:28:44 UTC (rev 5599)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexOptions.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -1,108 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2009, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting3.multiplex;
-
-import org.jboss.xnio.Option;
-
-/**
- * Options which may be used to configure a multiplex connection.
- */
-public final class MultiplexOptions {
-
- private MultiplexOptions() {
- }
-
- /**
- * The maximum message receive size to specify to the remote side. This is the size
of the largest protocol message
- * that can be sent from the remote side to the local side. The remote side may
elect to send messages of a smaller
- * size, but it may not send larger messages (such messages will cause the connection
to close with a protocol error).
- * This setting does <b>not</b> affect the maximum request or reply size.
This value should be set to at least {@code 256}
- * if specified.
- */
- public static final Option<Integer> MAX_RECEIVE_SIZE =
Option.simple(MultiplexOptions.class, "MAX_RECEIVE_SIZE", Integer.class);
-
- /**
- * The maximum message size that this side will send to a peer. If the peer
specifies a smaller maximum receive size,
- * that size will be used; otherwise this value will be. This value should be set to
at least {@code 256} if specified.
- *
- * @see #MAX_RECEIVE_SIZE
- */
- public static final Option<Integer> MAX_TRANSMIT_SIZE =
Option.simple(MultiplexOptions.class, "MAX_TRANSMIT_SIZE", Integer.class);
-
- /**
- * The maximum send window size that will be used by this end of the connection. The
peer may specify a smaller size,
- * in which case that value will be used. Increasing this size may decrease latency
at the cost of increased memory demands.
- * The minimum allowed value for this option is {@code 2}.
- */
- public static final Option<Integer> MAX_SEND_WINDOW_SIZE =
Option.simple(MultiplexOptions.class, "MAX_SEND_WINDOW_SIZE", Integer.class);
-
- /**
- * The maximum receive window size that this end of the connection can support. The
peer will not send unacknowledged messages
- * beyond this window size. The minimum allowed value for this option is {@code 2}.
- */
- public static final Option<Integer> MAX_RECEIVE_WINDOW_SIZE =
Option.simple(MultiplexOptions.class, "MAX_RECEIVE_WINDOW_SIZE",
Integer.class);
-
- /**
- * The maximum number of concurrent inbound requests that this end of the connection
will be configured to support. Using a larger number
- * increases concurrency at the cost of additional memory demands.
- */
- public static final Option<Integer> MAX_INBOUND_REQUESTS =
Option.simple(MultiplexOptions.class, "MAX_INBOUND_REQUESTS", Integer.class);
-
- /**
- * The maximum number of concurrent outbound requests that this end of the connection
will be configured to support. Using a larger number
- * increases concurrency at the cost of additional memory demands.
- */
- public static final Option<Integer> MAX_OUTBOUND_REQUESTS =
Option.simple(MultiplexOptions.class, "MAX_OUTBOUND_REQUESTS", Integer.class);
-
- /**
- * The maximum number of concurrently active inbound clients that this end of the
connection will be configured to accept. Using a larger number
- * increases concurrency at the cost of additional memory demands.
- */
- public static final Option<Integer> MAX_INBOUND_CLIENTS =
Option.simple(MultiplexOptions.class, "MAX_INBOUND_CLIENTS", Integer.class);
-
- /**
- * The maximum number of concurrently active outbound clients that this end of the
connection will be configured to support. Using a larger number
- * increases concurrency at the cost of additional memory demands.
- */
- public static final Option<Integer> MAX_OUTBOUND_CLIENTS =
Option.simple(MultiplexOptions.class, "MAX_OUTBOUND_CLIENTS", Integer.class);
-
- /**
- * The maximum number of inbound streams. This is the number of streams that the
remote side is allowed to initiate, regardless of
- * the actual direction of flow of the stream itself.
- */
- public static final Option<Integer> MAX_INBOUND_STREAMS =
Option.simple(MultiplexOptions.class, "MAX_INBOUND_STREAMS", Integer.class);
-
- /**
- * The maximum number of outbound streams. This is the number of streams that the
local side is allowed to initiate, regardless of
- * the actual direction of flow of the stream itself.
- */
- public static final Option<Integer> MAX_OUTBOUND_STREAMS =
Option.simple(MultiplexOptions.class, "MAX_OUTBOUND_STREAMS", Integer.class);
-
- /**
- * The number of milliseconds to wait before sending accumulated ACK messages.
Higher values increase bandwidth at the cost of latency.
- * Setting this value to 0 will immediately send ACK messages.
- */
- public static final Option<Integer> ACK_CORK_MILLISECONDS =
Option.simple(MultiplexOptions.class, "ACK_CORK_MILLISECONDS", Integer.class);
-
- // todo - authentication/security mechanisms
-}
Deleted:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRemoteRequestContext.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRemoteRequestContext.java 2009-11-18
00:28:44 UTC (rev 5599)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRemoteRequestContext.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -1,61 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2009, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting3.multiplex;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.jboss.remoting3.spi.RemoteRequestContext;
-import org.jboss.xnio.channels.AllocatedMessageChannel;
-import org.jboss.xnio.channels.Channels;
-import org.jboss.xnio.log.Logger;
-
-final class MultiplexRemoteRequestContext implements RemoteRequestContext {
- private static final Logger log = Loggers.MAIN_LOGGER;
-
- private final int id;
- private final EstablishedConnection connection;
- private final AtomicBoolean cancelled = new AtomicBoolean();
-
- public MultiplexRemoteRequestContext(final int id, final EstablishedConnection
connection) {
- this.id = id;
- this.connection = connection;
- }
-
- public void cancel() {
- if (cancelled.getAndSet(true)) {
- return;
- }
- final AllocatedMessageChannel messageChannel = connection.getChannel();
- ByteBuffer buf = ByteBuffer.allocate(5);
- buf.put((byte) MessageType.REQUEST_CANCEL);
- buf.putInt(id);
- buf.flip();
- try {
- Channels.sendBlocking(messageChannel, buf);
- log.trace("Sent cancel request for ID %d", Integer.valueOf(id));
- } catch (IOException e) {
- log.trace("Failed to send a request cancel message: %s", e);
- }
- }
-}
Deleted:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandler.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandler.java 2009-11-18
00:28:44 UTC (rev 5599)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandler.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -1,110 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2009, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting3.multiplex;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.jboss.marshalling.ByteOutput;
-import org.jboss.marshalling.Marshaller;
-import org.jboss.remoting3.spi.AbstractHandleableCloseable;
-import org.jboss.remoting3.spi.RemoteRequestContext;
-import org.jboss.remoting3.spi.ReplyHandler;
-import org.jboss.remoting3.spi.RequestHandler;
-import org.jboss.remoting3.spi.SpiUtils;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.IoUtils;
-import org.jboss.xnio.channels.Channels;
-import org.jboss.xnio.log.Logger;
-import org.jboss.xnio.Pool;
-
-/**
- * Handler for outbound requests.
- */
-final class MultiplexRequestHandler extends
AbstractHandleableCloseable<RequestHandler> implements RequestHandler {
- private static final Logger log = Loggers.REQUEST_HANDLER_LOGGER;
-
- private final int identifier;
- private final BufferAllocator<ByteBuffer> allocator;
- private final Pool<Marshaller> marshallerPool;
- private final EstablishedConnection connection;
-
- MultiplexRequestHandler(final int identifier, final EstablishedConnection connection)
{
- super(connection.getExecutor());
- this.connection = connection;
- this.identifier = identifier;
- allocator = connection.getAllocator();
- }
-
- @Override
- protected void closeAction() throws IOException {
- connection.removeRemoteClient(identifier);
- ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.CLIENT_CLOSE);
- buffer.putInt(identifier);
- buffer.flip();
- Channels.sendBlocking(connection.getChannel(), buffer);
- }
-
- public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler
handler) {
- log.trace("Sending outbound request (request id = %d) (type is %s)",
request == null ? "null" : request.getClass());
- final EstablishedConnection connection = this.connection;
- connection.setCurrent();
- try {
- final Marshaller marshaller = marshallerPool.allocate();
- final ByteOutput output = new SendingByteOutput(null, allocator,
connection.getTransmitWindowSize());
- boolean ok = false;
- try {
- marshaller.start(output);
- marshaller.write(MessageType.REQUEST);
- marshaller.writeInt(identifier);
- final int id = connection.nextRequest();
- connection.addRemoteRequest(id, handler);
- marshaller.writeInt(id);
- marshaller.writeObject(request);
- ok = true;
- marshaller.finish();
- marshallerPool.free(marshaller);
- output.close();
- return new MultiplexRemoteRequestContext(id, connection);
- } finally {
- IoUtils.safeClose(output);
- if (! ok) try {
- marshaller.finish();
- marshallerPool.free(marshaller);
- } catch (final Exception e) {
- log.trace("Failed to free a marshaller back into the pool:
%s", e);
- }
- }
- } catch (final IOException t) {
- log.trace(t, "receiveRequest failed with an exception");
- SpiUtils.safeHandleException(handler, t);
- return SpiUtils.getBlankRemoteRequestContext();
- } finally {
- connection.clearCurrent();
- }
- }
-
- public String toString() {
- return "forwarding request handler <" +
Integer.toHexString(hashCode()) + "> (id = " + identifier + ") for
" + connection;
- }
-}
\ No newline at end of file
Deleted:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandlerConnector.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandlerConnector.java 2009-11-18
00:28:44 UTC (rev 5599)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandlerConnector.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -1,40 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2009, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting3.multiplex;
-
-import org.jboss.remoting3.spi.RequestHandlerConnector;
-import org.jboss.remoting3.spi.Cancellable;
-import org.jboss.remoting3.spi.RequestHandler;
-import org.jboss.remoting3.spi.Result;
-
-final class MultiplexRequestHandlerConnector implements RequestHandlerConnector {
- private final EstablishedConnection connection;
-
- MultiplexRequestHandlerConnector(final EstablishedConnection connection) {
- this.connection = connection;
- }
-
- public Cancellable createRequestHandler(final Result<RequestHandler> result)
throws SecurityException {
- throw new SecurityException("Not forwarded");
- }
-}
Deleted:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexServerFactory.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexServerFactory.java 2009-11-18
00:28:44 UTC (rev 5599)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexServerFactory.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -1,40 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2009, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting3.multiplex;
-
-import org.jboss.xnio.channels.AllocatedMessageChannel;
-import org.jboss.xnio.IoHandlerFactory;
-
-/**
- * The server factory for a multiplex connection provider. Use this to create servers
which connect to the
- * corresponding endpoint.
- */
-public interface MultiplexServerFactory {
-
- /**
- * Get the I/O handler factory for this server factory.
- *
- * @return the I/O handler factory
- */
- IoHandlerFactory<AllocatedMessageChannel> getHandlerFactory();
-}
Modified:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/PermitManager.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/PermitManager.java 2009-11-18
00:28:44 UTC (rev 5599)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/PermitManager.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -27,13 +27,11 @@
final class PermitManager {
private final Object lock = new Object();
- private final int maxPermits;
private final int[] permitIds;
private int topOfStack;
public PermitManager(final int maxPermits) {
synchronized (lock) {
- this.maxPermits = maxPermits;
permitIds = new int[maxPermits];
for (int i = 0; i < maxPermits; i ++) {
permitIds[i] = i;
@@ -42,13 +40,13 @@
}
public Permit acquireInterruptibly() throws InterruptedException {
- final int maxPermits = this.maxPermits;
+ final int[] permitIds = this.permitIds;
+ final int maxPermits = permitIds.length;
synchronized (lock) {
int topOfStack;
while ((topOfStack = this.topOfStack) == maxPermits) {
lock.wait();
}
- final int[] permitIds = this.permitIds;
int id = permitIds[topOfStack];
this.topOfStack = topOfStack + 1;
return new Permit(id);
@@ -56,7 +54,8 @@
}
public Permit acquire() {
- final int maxPermits = this.maxPermits;
+ final int[] permitIds = this.permitIds;
+ final int maxPermits = permitIds.length;
synchronized (lock) {
boolean intr = Thread.interrupted();
try {
@@ -66,7 +65,6 @@
} catch (InterruptedException e) {
intr = true;
}
- final int[] permitIds = this.permitIds;
int id = permitIds[topOfStack];
this.topOfStack = topOfStack + 1;
return new Permit(id);
@@ -76,6 +74,20 @@
}
}
+ public Permit tryAcquire() {
+ final int[] permitIds = this.permitIds;
+ final int maxPermits = permitIds.length;
+ synchronized (lock) {
+ final int topOfStack = this.topOfStack;
+ if (topOfStack == maxPermits) {
+ return null;
+ }
+ int id = permitIds[topOfStack];
+ this.topOfStack = topOfStack + 1;
+ return new Permit(id);
+ }
+ }
+
private static final AtomicIntegerFieldUpdater<Permit> permitUpdater =
AtomicIntegerFieldUpdater.newUpdater(Permit.class, "id");
Copied:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/Protocol.java (from
rev 5501,
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MessageType.java)
===================================================================
--- remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/Protocol.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/Protocol.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -0,0 +1,146 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import java.nio.ByteBuffer;
+
+/**
+ * <pre>
+ * Bit #:
+ * 7 6 5 4 3 2 1 0
+ *Byte# +---+---+---+---+---+---+---+---+
+ * 0 | I | F | L | R | A | Type |
+ * +---+---+---+---+---+---+---+---+
+ * 1 | Channel ID (High Byte) |
+ * +-------------------------------+
+ * 2 | Channel ID (Low Byte) |
+ * +-------------------------------+
+ * 3+ | Data ... |
+ * | |
+ * : :
+ * . .
+ * </pre>
+ *
+ * The flag bit definitions are as follows:
+ * <ul>
+ * <li><b><code>I</code></b> - Initiated-By-Me: 1 if
this channel ID was initiated by the local side</li>
+ * <li><b><code>F</code></b> - First: 1 if this is the
initiating outbound message for the channel</li>
+ * <li><b><code>L</code></b> - Last: 1 if this is the
terminating outbound message for the channel</li>
+ * <li><b><code>R</code></b> - Reset: 1 to indicate that
an inbound message was received for an unknown channel, or that the input channel was
overrun, or that the input channel was shut down</li>
+ * <li><b><code>A</code></b> - Acknowledge: indicate
that an inbound packet has been received</li>
+ * <li><b><code>Type</code</b> - Type; one of the
following: <ul>
+ * <li><b><code>000 (0)</code></b> - Client
control message</li>
+ * <li><b><code>001 (1)</code></b> - Request/reply
message</li>
+ * <li><b><code>010 (2)</code></b> - Stream data
message</li>
+ * <li><b><code>011 (3)</code></b> -
Reserved</li>
+ * <li><b><code>100 (4)</code></b> -
Reserved</li>
+ * <li><b><code>101 (5)</code></b> -
Reserved</li>
+ * <li><b><code>110 (6)</code></b> -
Reserved</li>
+ * <li><b><code>111 (7)</code></b> -
Reserved</li>
+ * </ul></li>
+ * </ul>
+ *
+ * The channel ID is a unique identifier for the channel (note that there are separate
channel ID namespaces, one for
+ * each side of the connection, times each channel type).
+ * <p/>
+ * The Data field for a client control message is as follows:
+ * <pre>
+ * Bit #:
+ * 7 6 5 4 3 2 1 0
+ *Byte# +---+---+---+---+---+---+---+---+
+ * D+0 | Service type (length) |
+ * + +
+ * D+1 | Service type (length) |
+ * +-------------------------------+
+ * D+0 | Service type (length) |
+ * + +
+ * D+0 | Service type (length) |
+ * + +
+ * </pre>
+ *
+ */
+final class Protocol {
+
+ private Protocol() {}
+
+ public static final int FLAG_I = 1 << 7;
+ public static final int FLAG_FIRST = 1 << 6;
+ public static final int FLAG_LAST = 1 << 5;
+ public static final int FLAG_RESET = 1 << 4;
+ public static final int FLAG_ACK = 1 << 3;
+
+ public static final int TYPE_CLIENT = 0;
+ public static final int TYPE_REQUEST = 1;
+ public static final int TYPE_STREAM = 2;
+
+ static void writeFlags(ByteBuffer buffer, boolean i, boolean first, boolean last,
boolean rst, boolean ack, int type) {
+ buffer.put((byte) (
+ (i ? FLAG_I : 0) |
+ (first ? FLAG_FIRST : 0) |
+ (last ? FLAG_LAST : 0) |
+ (rst ? FLAG_RESET : 0) |
+ (ack ? FLAG_ACK : 0) |
+ type & 0x07
+ ));
+ }
+
+ // Request
+ public static final byte REQUEST_DATA = 0x00;
+ public static final byte REQUEST_DATA_LAST = 0x01;
+ public static final byte REQUEST_EXCEPTION = 0x02;
+ public static final byte REQUEST_CANCEL_REQ = 0x03;
+ public static final byte REQUEST_CANCEL_ACK = 0x04;
+
+
+
+ /**
+ * Request receiver aborts transmission.
+ */
+ public static final int REMOTE_REQUEST_ABORT = 0x10;
+ public static final int REMOTE_REPLY_ABORT = 0x11;
+ public static final int REMOTE_REQUEST_RECEIVE_FAILED_ABORT = 0x12;
+ public static final int REMOTE_REQUEST_FAILED_ABORT = 0x13;
+
+ /**
+ * Request transmitter aborts transmission.
+ */
+ public static final int REQUEST_ABORT = 0x20;
+ public static final int REPLY_ABORT = 0x21;
+ public static final int REQUEST_RECEIVE_FAILED_ABORT = 0x22;
+ public static final int REQUEST_FAILED_ABORT = 0x23;
+
+ // CONTROL channel messages
+ public static final int CLIENT_OPEN = 0x32;
+ public static final int CLIENT_OPEN_CANCEL = 0x33;
+ public static final int CLIENT_OPEN_ACK = 0x34;
+ public static final int CLIENT_OPEN_CANCEL_ACK = 0x35;
+ public static final int CLIENT_NOT_FOUND = 0x36;
+ public static final int CLIENT_REMOTE_CLOSE = 0x37;
+ public static final int CLIENT_CLOSE = 0x38;
+
+ // data channel messages
+
+ public static final int START_REQUEST = 0x01;
+ // stream data cannot be sent until this byte is received on the agreed-upon
subchannel
+ public static final int START_STREAM = 0x02;
+}
Deleted:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReadHandlerImpl.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReadHandlerImpl.java 2009-11-18
00:28:44 UTC (rev 5599)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReadHandlerImpl.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -1,69 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2009, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting3.multiplex;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.jboss.marshalling.MarshallerFactory;
-import org.jboss.marshalling.MarshallingConfiguration;
-import org.jboss.marshalling.Marshaller;
-import org.jboss.marshalling.Unmarshaller;
-import org.jboss.remoting3.spi.ConnectionHandler;
-import org.jboss.xnio.IoReadHandler;
-import org.jboss.xnio.IoUtils;
-import org.jboss.xnio.Buffers;
-import org.jboss.xnio.Pool;
-import org.jboss.xnio.channels.ReadableAllocatedMessageChannel;
-import org.jboss.xnio.log.Logger;
-
-final class ReadHandlerImpl implements
IoReadHandler<ReadableAllocatedMessageChannel> {
- private static final Logger log =
Logger.getLogger("org.jboss.remoting.multiplex");
-
- private final int maxReceiveSize;
- private final int maxSendSize;
- private final int receiveWindowSize;
- private final int sendWindowSize;
- private final Pool<Marshaller> marshallerPool;
- private final Pool<Unmarshaller> unmarshallerPool;
- private final ConnectionHandler connectionHandler;
-
- public void handleReadable(final ReadableAllocatedMessageChannel channel) {
- final ByteBuffer buffer;
- try {
- buffer = channel.receive();
- } catch (IOException e) {
- log.error("I/O error in protocol channel; closing channel (%s)",
e);
- IoUtils.safeClose(connectionHandler);
- IoUtils.safeClose(channel);
- return;
- }
- if (log.isTrace()) {
- log.trace("Received raw message:\n%s", Buffers.createDumper(buffer,
8, 1));
- }
- final int idByte = buffer.get() & 0xff;
- final boolean isMulti = (idByte & 0x80) != 0;
- final boolean isFirst = isMulti && (idByte & 0x40) != 0;
- final boolean isLast = isMulti && (idByte & 0x20) != 0;
-
- }
-}
Deleted:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReceivingByteInput.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReceivingByteInput.java 2009-11-18
00:28:44 UTC (rev 5599)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/ReceivingByteInput.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -1,193 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2009, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting3.multiplex;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.nio.ByteBuffer;
-import java.util.Queue;
-import java.util.ArrayDeque;
-import org.jboss.marshalling.ByteInput;
-import org.jboss.xnio.Buffers;
-import org.jboss.remoting3.ProtocolException;
-
-final class ReceivingByteInput extends InputStream implements ByteInput {
- private final Queue<ByteBuffer> window;
- private final Runnable ackTask;
- private final Runnable closeTask;
-
- private IOException failure;
- private boolean done;
-
- ReceivingByteInput(final Runnable ackTask, final int windowSize, final Runnable
closeTask) {
- this.ackTask = ackTask;
- this.closeTask = closeTask;
- window = new ArrayDeque<ByteBuffer>(windowSize);
- }
-
- public int read() throws IOException {
- final Queue<ByteBuffer> window = this.window;
- synchronized (this) {
- while (window.isEmpty()) {
- if (done) {
- return -1;
- }
- checkFailure();
- try {
- wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException("Interrupted on read()");
- }
- }
- final ByteBuffer buf = window.peek();
- try {
- return buf.get() & 0xff;
- } finally {
- if (buf.remaining() == 0) {
- window.poll();
- ackTask.run();
- }
- }
- }
- }
-
- public int read(final byte[] bytes, int offs, int len) throws IOException {
- if (len == 0) {
- return 0;
- }
- synchronized (this) {
- while (window.isEmpty()) {
- if (done) {
- return -1;
- }
- checkFailure();
- try {
- wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException("Interrupted on read()");
- }
- }
- final Queue<ByteBuffer> window = this.window;
- int total = 0;
- while (len > 0) {
- final ByteBuffer buffer = window.peek();
- if (buffer == null) {
- break;
- }
- final int bytecnt = Math.min(buffer.remaining(), len);
- buffer.get(bytes, offs, bytecnt);
- total += bytecnt;
- len -= bytecnt;
- if (buffer.remaining() == 0) {
- window.poll();
- ackTask.run();
- }
- }
- return total;
- }
- }
-
- public int available() throws IOException {
- synchronized (this) {
- int total = 0;
- for (ByteBuffer buffer : window) {
- total += buffer.remaining();
- if (total < 0) {
- return Integer.MAX_VALUE;
- }
- }
- return total;
- }
- }
-
- public long skip(long qty) throws IOException {
- final Queue<ByteBuffer> window = this.window;
- synchronized (this) {
- while (window.isEmpty()) {
- if (done) {
- return 0L;
- }
- checkFailure();
- try {
- wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException("Interrupted on read()");
- }
- }
- long skipped = 0L;
- while (qty > 0L) {
- final ByteBuffer buffer = window.peek();
- if (buffer == null) {
- break;
- }
- final int bytecnt = Math.min(buffer.remaining(), (int)
Math.max((long)Integer.MAX_VALUE, qty));
- Buffers.skip(buffer, bytecnt);
- skipped += bytecnt;
- qty -= bytecnt;
- if (buffer.remaining() == 0) {
- window.poll();
- ackTask.run();
- }
- }
- return skipped;
- }
- }
-
- private void checkFailure() throws IOException {
- final IOException failure = this.failure;
- if (failure != null) {
- failure.setStackTrace(new Throwable().getStackTrace());
- try {
- throw failure;
- } finally {
- done = true;
- this.failure = null;
- }
- }
- }
-
- void push(ByteBuffer buffer) throws IOException {
- synchronized (this) {
- checkFailure();
- try {
- window.add(buffer);
- } catch (IllegalStateException e) {
- throw new ProtocolException("Stream window overrun");
- }
- }
- }
-
- public void close() throws IOException {
- synchronized (this) {
- if (! done) {
- window.clear();
- done = true;
- closeTask.run();
- }
- }
- }
-}
Modified:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteClient.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteClient.java 2009-11-18
00:28:44 UTC (rev 5599)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteClient.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -22,8 +22,8 @@
package org.jboss.remoting3.multiplex;
-import org.jboss.remoting3.spi.Result;
import org.jboss.remoting3.spi.RequestHandler;
+import org.jboss.xnio.Result;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
final class RemoteClient {
Copied:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteConnectionHandler.java
(from rev 5514,
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionHandler.java)
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteConnectionHandler.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteConnectionHandler.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -0,0 +1,113 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.jboss.remoting3.spi.ConnectionHandler;
+import org.jboss.remoting3.spi.RequestHandler;
+import org.jboss.remoting3.spi.RequestHandlerConnector;
+import org.jboss.remoting3.spi.ConnectionHandlerContext;
+import org.jboss.xnio.Cancellable;
+import org.jboss.xnio.Result;
+import org.jboss.xnio.Buffers;
+import org.jboss.xnio.channels.Channels;
+import org.jboss.xnio.channels.WritableMessageChannel;
+
+final class RemoteConnectionHandler implements ConnectionHandler {
+
+ private final WritableMessageChannel channel;
+ private final ConnectionHandlerContext connectionContext;
+ private final EstablishedConnection establishedConnection;
+
+ RemoteConnectionHandler(final WritableMessageChannel channel, final
ConnectionHandlerContext connectionContext, final EstablishedConnection
establishedConnection) {
+ this.channel = channel;
+ this.connectionContext = connectionContext;
+ this.establishedConnection = establishedConnection;
+ }
+
+ static RemoteConnectionHandler create(final ConnectionHandlerContext
connectionContext, final EstablishedConnection establishedConnection) {
+ final RemoteConnectionHandler handler = new
RemoteConnectionHandler(establishedConnection.getChannel(), connectionContext,
establishedConnection);
+ handler.start();
+ return handler;
+ }
+
+ private void start() {
+ }
+
+ public Cancellable open(final String serviceType, final String groupName, final
Result<RequestHandler> result) {
+ final EstablishedConnection establishedConnection = this.establishedConnection;
+ final PermitManager.Permit permit = establishedConnection.nextRemoteClient();
+ boolean ok = false;
+ try {
+ final int id = permit.getId();
+ final WritableMessageChannel channel = establishedConnection.getChannel();
+ final ConnectionConfiguration configuration =
establishedConnection.getConfiguration();
+ final ByteBuffer buffer =
ByteBuffer.allocate(configuration.getConfiguredTransmitSize());
+ final AtomicBoolean finishable = new AtomicBoolean(true);
+ Protocol.writeFlags(buffer, true, true, false, false, false,
Protocol.TYPE_CLIENT);
+ buffer.putShort((short) id);
+ // send request
+ Buffers.putModifiedUtf8(buffer, serviceType);
+ buffer.put((byte) 0);
+ Buffers.putModifiedUtf8(buffer, groupName);
+ buffer.put((byte) 0);
+ buffer.flip();
+ try {
+ Channels.sendBlocking(channel, buffer);
+ } catch (IOException e) {
+ result.setException(e);
+ }
+ // todo register response handler...
+ return new Cancellable() {
+ public Cancellable cancel() {
+ if (finishable.getAndSet(false)) {
+ ByteBuffer.allocate(4);
+ Protocol.writeFlags(buffer, true, false, true, false, false,
Protocol.TYPE_CLIENT);
+ buffer.putShort((short) id);
+ buffer.put((byte) Protocol.CLIENT_OPEN_CANCEL);
+ buffer.flip();
+ try {
+ Channels.sendBlocking(channel, buffer);
+ } catch (IOException e) {
+ result.setException(e);
+ }
+ }
+ return this;
+ }
+ };
+ } finally {
+ if (! ok) {
+ permit.release();
+ }
+ }
+ }
+
+ public RequestHandlerConnector createConnector(final RequestHandler localHandler) {
+ return null;
+ }
+
+ public void close() throws IOException {
+ }
+}
Copied:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteConnectionHandlerFactory.java
(from rev 5508,
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionHandlerFactory.java)
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteConnectionHandlerFactory.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteConnectionHandlerFactory.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -0,0 +1,40 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import org.jboss.remoting3.spi.ConnectionHandlerFactory;
+import org.jboss.remoting3.spi.ConnectionHandler;
+import org.jboss.remoting3.spi.ConnectionHandlerContext;
+
+final class RemoteConnectionHandlerFactory implements ConnectionHandlerFactory {
+
+ private final ConnectionConfiguration configuration;
+
+ public RemoteConnectionHandlerFactory(final ConnectionConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ public ConnectionHandler createInstance(final ConnectionHandlerContext
connectionContext) {
+ return RemoteConnectionHandler.create(connectionContext,
configuration.create());
+ }
+}
Copied:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteConnectionProvider.java
(from rev 5514,
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionProvider.java)
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteConnectionProvider.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteConnectionProvider.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -0,0 +1,431 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import org.jboss.remoting3.spi.ConnectionProvider;
+import org.jboss.remoting3.spi.ConnectionHandlerFactory;
+import org.jboss.remoting3.spi.ConnectionProviderContext;
+import org.jboss.remoting3.spi.ProtocolServiceType;
+import org.jboss.remoting3.RemotingOptions;
+import org.jboss.remoting3.ProtocolException;
+import org.jboss.xnio.Cancellable;
+import org.jboss.xnio.OptionMap;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.Connector;
+import org.jboss.xnio.ChannelListener;
+import org.jboss.xnio.Result;
+import org.jboss.xnio.Option;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.Buffers;
+import org.jboss.xnio.channels.Channels;
+import org.jboss.xnio.channels.TcpChannel;
+import org.jboss.xnio.channels.WritableMessageChannel;
+import org.jboss.xnio.channels.MessageHandler;
+import org.jboss.marshalling.ClassResolver;
+import org.jboss.marshalling.ClassTable;
+import org.jboss.marshalling.ObjectTable;
+import org.jboss.marshalling.ObjectResolver;
+import org.jboss.marshalling.ProviderDescriptor;
+import org.jboss.marshalling.util.IntMap;
+import java.net.URI;
+import java.net.InetSocketAddress;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.BufferUnderflowException;
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.Executor;
+
+import javax.net.ssl.SSLContext;
+import javax.security.auth.callback.CallbackHandler;
+
+final class RemoteConnectionProvider implements
ConnectionProvider<RemoteServerFactory> {
+
+ private static final Charset UTF_8 = Charset.forName("utf-8");
+
+ private final Connector<InetSocketAddress, TcpChannel> connector;
+ private final ConnectionProviderContext context;
+ private final SSLContext sslContext;
+ private final Executor executor;
+ private final RemoteServerFactory providerInterface;
+ static final int DEFAULT_WINDOW_SIZE = 4;
+ static final int DEFAULT_MAX_MESSAGE_SIZE = 1400;
+ static final int DEFAULT_MAX_CHANNEL_COUNT = 16;
+ static final int GREETING = 0xff;
+ static final int MAX_VERSION = 0;
+
+ RemoteConnectionProvider(final Connector<InetSocketAddress, TcpChannel>
connector, final ConnectionProviderContext context, final SSLContext sslContext, final
Executor executor) {
+ this.connector = connector;
+ this.context = context;
+ this.sslContext = sslContext;
+ this.executor = executor;
+ providerInterface = new RemoteServerFactory() {
+ public ChannelListener<TcpChannel> getServerListener(final OptionMap
optionMap) {
+ validateServerOptions(optionMap);
+ return Channels.createSslTcpChannelListener(
+ RemoteConnectionProvider.this.sslContext,
+ new OpenListener(optionMap),
+ RemoteConnectionProvider.this.executor,
+ optionMap);
+ }
+ };
+ }
+
+ public Cancellable connect(final URI uri, final OptionMap connectOptions, final
Result<ConnectionHandlerFactory> result, final CallbackHandler callbackHandler)
throws IllegalArgumentException {
+ final String host = uri.getHost();
+ final int port = uri.getPort();
+ if (port == -1) {
+ throw new IllegalArgumentException("A valid port number must be
explicitly specified");
+ }
+ // Validate the option map
+ validateConnectOptions(connectOptions);
+ // TODO - change this to use async DNS via XNIO DNS
+ final IoFuture<TcpChannel> connection = connector.connectTo(new
InetSocketAddress(host, port), null, null);
+ connection.addNotifier(new IoFuture.HandlingNotifier<TcpChannel,
Result<ConnectionHandlerFactory>>() {
+ public void handleFailed(final IOException exception, final
Result<ConnectionHandlerFactory> attachment) {
+ attachment.setException(exception);
+ }
+
+ public void handleCancelled(final Result<ConnectionHandlerFactory>
attachment) {
+ attachment.setCancelled();
+ }
+
+ public void handleDone(final TcpChannel channel, final
Result<ConnectionHandlerFactory> attachment) {
+ try {
+ beginConnection(channel, connectOptions, attachment);
+ } catch (IOException e) {
+ attachment.setException(e);
+ }
+ }
+ }, result);
+ return connection;
+ }
+
+ private void validateConnectOptions(final OptionMap connectOptions) {
+ validateCommonOptions(connectOptions);
+ }
+
+ private void validateServerOptions(final OptionMap serverOptions) {
+ validateCommonOptions(serverOptions);
+ }
+
+ private void validateCommonOptions(final OptionMap optionMap) {
+ validate(optionMap, RemotingOptions.TRANSMIT_WINDOW_SIZE, 1, 32, false);
+ validate(optionMap, RemotingOptions.RECEIVE_WINDOW_SIZE, 1, 32, false);
+ validate(optionMap, RemotingOptions.MAX_TRANSMIT_SIZE, 256, 65536, false);
+ validate(optionMap, RemotingOptions.MAX_RECEIVE_SIZE, 256, 65536, false);
+ validate(optionMap, RemotingOptions.MAX_INBOUND_CHANNELS, 2, 65536, false);
+ validate(optionMap, RemotingOptions.MAX_OUTBOUND_CHANNELS, 2, 65536, false);
+ }
+
+ private void validate(OptionMap optionMap, Option<Integer> option, int
minValue, int maxValue, boolean required) {
+ final Integer v = optionMap.get(option);
+ if (v == null) {
+ if (required) {
+ throw new IllegalArgumentException("Missing required option " +
option);
+ }
+ return;
+ }
+ int vv = v.intValue();
+ if (vv < minValue) {
+ throw new IllegalArgumentException("Value for option " + option +
" must be greater than or equal to " + minValue);
+ }
+ if (vv > maxValue) {
+ throw new IllegalArgumentException("Value for option " + option +
" must be less than or equal to " + maxValue);
+ }
+ }
+
+ /**
+ * Begin the connection by negotiating connection parameters.
+ *
+ * @param channel the channel
+ * @param optionMap the preconfigured (and prevalidated) option map
+ * @param result the result for the created connection
+ */
+ void beginConnection(TcpChannel channel, OptionMap optionMap,
Result<ConnectionHandlerFactory> result) throws IOException {
+ final WritableMessageChannel messageChannel =
Channels.createMessageWriter(channel, optionMap);
+ final ConnectionConfiguration connectionBuilder = new ConnectionConfiguration();
+ // Acquire information for this session from the endpoint
+ final Map<String, ClassResolver> classResolvers =
fillMap(context.getProtocolServiceProviders(ProtocolServiceType.CLASS_RESOLVER));
+ final Map<String, ClassTable> classTables =
fillMap(context.getProtocolServiceProviders(ProtocolServiceType.CLASS_TABLE));
+ final Map<String, ObjectTable> objectTables =
fillMap(context.getProtocolServiceProviders(ProtocolServiceType.OBJECT_TABLE));
+ final Map<String, ProviderDescriptor> marshallerProviderDescriptors =
fillMap(context.getProtocolServiceProviders(ProtocolServiceType.MARSHALLER_PROVIDER_DESCRIPTOR));
+ final Map<String, ObjectResolver> objectResolvers =
fillMap(context.getProtocolServiceProviders(ProtocolServiceType.OBJECT_RESOLVER));
+ connectionBuilder.setLocalClassResolverMap(classResolvers);
+ connectionBuilder.setLocalClassTableMap(classTables);
+ connectionBuilder.setLocalObjectTableMap(objectTables);
+
connectionBuilder.setLocalMarshallerProviderDescriptorMap(marshallerProviderDescriptors);
+ connectionBuilder.setLocalObjectResolverMap(objectResolvers);
+ // First, send our config packet
+ final ByteBuffer buffer = ByteBuffer.allocate(2048);
+ buffer.put((byte) GREETING);
+ sendIntParam(ConfigParam.MAX_SUPPORTED_VERSION, MAX_VERSION, buffer);
+ sendIntParam(ConfigParam.MAX_TRANSMIT_WINDOW_SIZE,
optionMap.get(RemotingOptions.TRANSMIT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE), buffer);
+ sendIntParam(ConfigParam.MAX_RECEIVE_WINDOW_SIZE,
optionMap.get(RemotingOptions.RECEIVE_WINDOW_SIZE, DEFAULT_WINDOW_SIZE), buffer);
+ sendIntParam(ConfigParam.MAX_TRANSMIT_SIZE,
optionMap.get(RemotingOptions.MAX_TRANSMIT_SIZE, DEFAULT_MAX_MESSAGE_SIZE), buffer);
+ sendIntParam(ConfigParam.MAX_RECEIVE_SIZE,
optionMap.get(RemotingOptions.MAX_RECEIVE_SIZE, DEFAULT_MAX_MESSAGE_SIZE), buffer);
+ sendIntParam(ConfigParam.MAX_INBOUND_CHANNELS,
optionMap.get(RemotingOptions.MAX_INBOUND_CHANNELS, DEFAULT_MAX_CHANNEL_COUNT), buffer);
+ sendIntParam(ConfigParam.MAX_OUTBOUND_CHANNELS,
optionMap.get(RemotingOptions.MAX_OUTBOUND_CHANNELS, DEFAULT_MAX_CHANNEL_COUNT), buffer);
+ // locally provided protocol services
+ connectionBuilder.setLocalClassResolvers(sendList(ConfigParam.CLASS_RESOLVER,
classResolvers, buffer));
+ connectionBuilder.setLocalClassTables(sendList(ConfigParam.CLASS_TABLES,
classTables, buffer));
+ connectionBuilder.setLocalObjectTables(sendList(ConfigParam.OBJECT_TABLES,
objectTables, buffer));
+
connectionBuilder.setLocalMarshallerProviderDescriptors(sendListWithVersion(ConfigParam.MARSHALLERS,
marshallerProviderDescriptors, buffer));
+ connectionBuilder.setLocalObjectResolvers(sendList(ConfigParam.OBJECT_RESOLVERS,
objectResolvers, buffer));
+ buffer.flip();
+ Channels.sendBlocking(messageChannel, buffer);
+ final MessageHandler.Setter setter = Channels.createMessageReader(channel,
optionMap);
+ setter.set(new InitialReadHandler(result, connectionBuilder));
+ channel.resumeReads();
+ // that's it.
+ }
+
+ private static void sendIntParam(ConfigParam param, int val, ByteBuffer buffer) {
+ buffer.put((byte) param.ordinal());
+ buffer.putShort((short) 4);
+ buffer.putInt(val);
+ }
+
+ private static <V> List<V> sendList(ConfigParam param, Map<String,
V> src, ByteBuffer buffer) {
+ final List<V> list = new ArrayList<V>(src.size());
+ for (Map.Entry<String, V> entry : src.entrySet()) {
+ list.add(entry.getValue());
+ buffer.put((byte) param.ordinal());
+ final byte[] bytes = entry.getKey().getBytes(UTF_8);
+ buffer.putShort((short) bytes.length);
+ buffer.put(bytes);
+ }
+ return list;
+ }
+
+ private static List<ProviderDescriptor> sendListWithVersion(ConfigParam param,
Map<String, ProviderDescriptor> src, ByteBuffer buffer) {
+ final List<ProviderDescriptor> list = new
ArrayList<ProviderDescriptor>(src.size());
+ for (Map.Entry<String, ProviderDescriptor> entry : src.entrySet()) {
+ final ProviderDescriptor descriptor = entry.getValue();
+ list.add(descriptor);
+ buffer.put((byte) param.ordinal());
+ final byte[] bytes = entry.getKey().getBytes(UTF_8);
+ final int[] versions = descriptor.getSupportedVersions();
+ buffer.putShort((short) (bytes.length + 2 + 4 * (versions.length)));
+ buffer.putShort((short) versions.length);
+ for (int version : versions) {
+ buffer.putInt(version);
+ }
+ buffer.put(bytes);
+ }
+ return list;
+ }
+
+ private static <K, V> Map<K, V> fillMap(Iterable<Map.Entry<K,
V>> src) {
+ final Map<K, V> map = new HashMap<K, V>();
+ for (Map.Entry<K, V> entry : src) {
+ map.put(entry.getKey(), entry.getValue());
+ }
+ return map;
+ }
+
+ /**
+ * Get the server factory.
+ *
+ * @return the server factory
+ */
+ public RemoteServerFactory getProviderInterface() {
+ return providerInterface;
+ }
+
+ private class OpenListener implements ChannelListener<TcpChannel> {
+
+ private final OptionMap optionMap;
+
+ OpenListener(final OptionMap optionMap) {
+ this.optionMap = optionMap;
+ }
+
+ public void handleEvent(final TcpChannel channel) {
+ try {
+ beginConnection(channel, optionMap, new
Result<ConnectionHandlerFactory>() {
+ public boolean setResult(final ConnectionHandlerFactory result) {
+ context.accept(result);
+ return true;
+ }
+
+ public boolean setException(final IOException exception) {
+ // todo log it
+ return true;
+ }
+
+ public boolean setCancelled() {
+ // todo not possible, log it anyway
+ return true;
+ }
+ });
+ } catch (IOException e) {
+ // todo log it
+ }
+ }
+ }
+
+ /**
+ * Read handler for initial (negotiation) phase of connection.
+ */
+ static final class InitialReadHandler implements MessageHandler {
+
+ private final Result<ConnectionHandlerFactory> result;
+
+ private final ConnectionConfiguration configuration;
+
+ InitialReadHandler(final Result<ConnectionHandlerFactory> result, final
ConnectionConfiguration configuration) {
+ this.result = result;
+ this.configuration = configuration;
+ }
+
+ public void handleMessage(final ByteBuffer buffer) {
+ try {
+ if ((buffer.get() & 0xff) != GREETING) {
+ throw new IOException("Incorrect greeting");
+ }
+ final ConfigParam[] paramIds = ConfigParam.values();
+ final ConnectionConfiguration configuration = this.configuration;
+ final OptionMap optionMap = configuration.getOptionMap();
+ final IntMap<String> remoteClassResolvers = new
IntMap<String>();
+ int classResolverIndex = 0;
+ final IntMap<String> remoteClassTables = new
IntMap<String>();
+ int classTableIndex = 0;
+ final IntMap<String> remoteObjectResolvers = new
IntMap<String>();
+ int objectResolverIndex = 0;
+ final IntMap<String> remoteObjectTables = new
IntMap<String>();
+ int objectTableIndex = 0;
+ final Map<String, RemoteProviderDescriptor> remoteMarshallers = new
HashMap<String, RemoteProviderDescriptor>();
+ int remoteMarshallerIndex = 0;
+ configuration.setRemoteClassResolvers(remoteClassResolvers);
+ configuration.setRemoteClassTables(remoteClassTables);
+ configuration.setRemoteObjectResolvers(remoteObjectResolvers);
+ configuration.setRemoteObjectTables(remoteObjectTables);
+ configuration.setRemoteMarshallerProviderDescriptors(remoteMarshallers);
+ while (buffer.hasRemaining()) {
+ final int id = buffer.get() & 0xff;
+ final int len = buffer.getShort() & 0xffff;
+ final ByteBuffer data = Buffers.slice(buffer, len);
+ if (id > paramIds.length) {
+ continue;
+ }
+ final ConfigParam configParam = paramIds[id];
+ switch (configParam) {
+ case MAX_SUPPORTED_VERSION: {
+ // since we're version 0, the remote side must support
us, and we support nothing else, so...
+ break;
+ }
+ case MAX_RECEIVE_SIZE: {
+
configuration.setConfiguredTransmitSize(Math.min(validate(ConfigParam.MAX_RECEIVE_SIZE,
data.getInt(), 0x200, 0x100000), optionMap.get(RemotingOptions.MAX_TRANSMIT_SIZE,
DEFAULT_MAX_MESSAGE_SIZE)));
+ break;
+ }
+ case MAX_TRANSMIT_SIZE: {
+
configuration.setConfiguredReceiveSize(Math.min(validate(ConfigParam.MAX_TRANSMIT_SIZE,
data.getInt(), 0x200, 0x100000), optionMap.get(RemotingOptions.MAX_RECEIVE_SIZE,
DEFAULT_MAX_MESSAGE_SIZE)));
+ break;
+ }
+ case MAX_RECEIVE_WINDOW_SIZE: {
+
configuration.setConfiguredTransmitWindowSize(Math.min(validate(ConfigParam.MAX_RECEIVE_WINDOW_SIZE,
data.getInt(), 0x2, 0x400), optionMap.get(RemotingOptions.TRANSMIT_WINDOW_SIZE,
DEFAULT_WINDOW_SIZE)));
+ break;
+ }
+ case MAX_TRANSMIT_WINDOW_SIZE: {
+
configuration.setConfiguredReceiveWindowSize(Math.min(validate(ConfigParam.MAX_TRANSMIT_WINDOW_SIZE,
data.getInt(), 0x2, 0x400), optionMap.get(RemotingOptions.RECEIVE_WINDOW_SIZE,
DEFAULT_WINDOW_SIZE)));
+ break;
+ }
+ case MAX_OUTBOUND_CHANNELS: {
+
configuration.setConfiguredInboundChannels(Math.min(validate(ConfigParam.MAX_OUTBOUND_CHANNELS,
data.getInt(), 0, 0x100000), optionMap.get(RemotingOptions.MAX_INBOUND_CHANNELS,
DEFAULT_MAX_CHANNEL_COUNT)));
+ break;
+ }
+ case MAX_INBOUND_CHANNELS: {
+
configuration.setConfiguredOutboundChannels(Math.min(validate(ConfigParam.MAX_INBOUND_CHANNELS,
data.getInt(), 0, 0x100000), optionMap.get(RemotingOptions.MAX_OUTBOUND_CHANNELS,
DEFAULT_MAX_CHANNEL_COUNT)));
+ break;
+ }
+ case CLASS_RESOLVER: {
+ remoteClassResolvers.put(getString(data),
classResolverIndex++);
+ break;
+ }
+ case CLASS_TABLES: {
+ remoteClassTables.put(getString(data), classTableIndex++);
+ break;
+ }
+ case OBJECT_RESOLVERS: {
+ remoteObjectResolvers.put(getString(data),
objectResolverIndex++);
+ break;
+ }
+ case OBJECT_TABLES: {
+ remoteObjectTables.put(getString(data), objectTableIndex++);
+ break;
+ }
+ case MARSHALLERS: {
+ final int versionLen = data.getShort() & 0xffff;
+ final int[] versions = new int[versionLen];
+ for (int j = 0; j < versions.length; j++) {
+ versions[j] = data.getInt();
+ }
+ final String name = getString(data);
+ remoteMarshallers.put(name, new
RemoteProviderDescriptor(name, versions, remoteMarshallerIndex++));
+ break;
+ }
+ default: {
+ // should not be possible
+ throw new IllegalStateException();
+ }
+ }
+ }
+ result.setResult(new RemoteConnectionHandlerFactory(configuration));
+ } catch (IOException e) {
+ IoUtils.safeClose(configuration.getChannel());
+ result.setException(e);
+ } catch (BufferUnderflowException e) {
+ IoUtils.safeClose(configuration.getChannel());
+ result.setException(new ProtocolException("Truncated greeting
packet"));
+ }
+ }
+
+ public void handleEof() {
+ IoUtils.safeClose(configuration.getChannel());
+ result.setException(new ProtocolException("Unexpected end of
stream"));
+ }
+
+ public void handleException(final IOException e) {
+ IoUtils.safeClose(configuration.getChannel());
+ result.setException(e);
+ }
+
+ private static String getString(ByteBuffer buf) {
+ final int len = buf.remaining();
+ final byte[] bytes = new byte[len];
+ buf.get(bytes);
+ return new String(bytes, UTF_8);
+ }
+
+ private static int validate(Object field, int val, int min, int max) throws
ProtocolException {
+ if (val >= min && val <= max) {
+ return val;
+ }
+ throw new ProtocolException("Value of " + field + " is out of
range (expected " + min + " <= n <= " + max + ", got " +
val + ")");
+ }
+ }
+}
Copied:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteConnectionProviderFactory.java
(from rev 5508,
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexConnectionProviderFactory.java)
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteConnectionProviderFactory.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteConnectionProviderFactory.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -0,0 +1,49 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import org.jboss.remoting3.spi.ConnectionProviderFactory;
+import org.jboss.remoting3.spi.ConnectionProvider;
+import org.jboss.remoting3.spi.ConnectionProviderContext;
+import org.jboss.xnio.Connector;
+import org.jboss.xnio.channels.TcpChannel;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executor;
+
+import javax.net.ssl.SSLContext;
+
+final class RemoteConnectionProviderFactory implements
ConnectionProviderFactory<RemoteServerFactory> {
+ private final Connector<InetSocketAddress, TcpChannel> connector;
+ private final SSLContext sslContext;
+ private final Executor executor;
+
+ public RemoteConnectionProviderFactory(final Connector<InetSocketAddress,
TcpChannel> connector, final SSLContext sslContext, final Executor executor) {
+ this.connector = connector;
+ this.sslContext = sslContext;
+ this.executor = executor;
+ }
+
+ public ConnectionProvider<RemoteServerFactory> createInstance(final
ConnectionProviderContext context) {
+ return new RemoteConnectionProvider(connector, context, sslContext, executor);
+ }
+}
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteDescriptor.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteDescriptor.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteDescriptor.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -0,0 +1,88 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import org.jboss.remoting3.spi.RemotingServiceDescriptor;
+import org.jboss.remoting3.spi.ConnectionProviderFactory;
+import org.jboss.xnio.Xnio;
+import org.jboss.xnio.XnioProvider;
+import org.jboss.xnio.XnioConfiguration;
+import org.jboss.xnio.OptionMap;
+import org.jboss.xnio.log.Logger;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.security.NoSuchAlgorithmException;
+import java.util.ServiceLoader;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.io.IOException;
+
+import javax.net.ssl.SSLContext;
+
+/**
+ * A descriptor to facilitate simple initialization by standalone clients.
+ */
+public final class RemoteDescriptor implements
RemotingServiceDescriptor<ConnectionProviderFactory> {
+
+ private static final Logger log = Loggers.MAIN_LOGGER;
+
+ /** {@inheritDoc} */
+ public Class<ConnectionProviderFactory> getType() {
+ return ConnectionProviderFactory.class;
+ }
+
+ /** {@inheritDoc} */
+ public String getName() {
+ return "remote";
+ }
+
+ /** {@inheritDoc} */
+ public ConnectionProviderFactory getService() {
+ return AccessController.doPrivileged(new
PrivilegedAction<ConnectionProviderFactory>() {
+ public ConnectionProviderFactory run() {
+ Xnio xnio = null;
+ for (XnioProvider provider : ServiceLoader.load(XnioProvider.class)) {
+ try {
+ xnio = provider.getNewInstance(new XnioConfiguration());
+ } catch (IOException e) {
+ log.debug(e, "Cannot use XNIO provider '%s'",
provider.getName());
+ }
+ break;
+ }
+ if (xnio == null) {
+ throw new RuntimeException("No XNIO providers were found.
Cannot set up protocol.");
+ }
+ final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 5 *
(Runtime.getRuntime().availableProcessors() + 1), 30L, TimeUnit.MILLISECONDS, new
ArrayBlockingQueue<Runnable>(100), new ThreadPoolExecutor.CallerRunsPolicy());
+ try {
+ return new RemoteConnectionProviderFactory(
+ xnio.createTcpConnector(OptionMap.EMPTY),
+ SSLContext.getDefault(),
+ executor);
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException("No SSL providers were found. Cannot
set up protocol.", e);
+ }
+ }
+ });
+ }
+}
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteProviderDescriptor.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteProviderDescriptor.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteProviderDescriptor.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -0,0 +1,47 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+final class RemoteProviderDescriptor {
+ private final String name;
+ private final int[] supportedVersions;
+ private final int index;
+
+ public RemoteProviderDescriptor(final String name, final int[] supportedVersions,
final int index) {
+ this.name = name;
+ this.supportedVersions = supportedVersions;
+ this.index = index;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public int[] getSupportedVersions() {
+ return supportedVersions.clone();
+ }
+
+ public int getIndex() {
+ return index;
+ }
+}
Copied:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteRequestHandler.java
(from rev 5508,
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandler.java)
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteRequestHandler.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteRequestHandler.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -0,0 +1,171 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.NioByteOutput;
+import org.jboss.marshalling.ByteOutput;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.MarshallingConfiguration;
+import org.jboss.remoting3.spi.AbstractHandleableCloseable;
+import org.jboss.remoting3.spi.RemoteRequestContext;
+import org.jboss.remoting3.spi.ReplyHandler;
+import org.jboss.remoting3.spi.RequestHandler;
+import org.jboss.remoting3.spi.SpiUtils;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.channels.Channels;
+import org.jboss.xnio.channels.WritableMessageChannel;
+import org.jboss.xnio.log.Logger;
+
+/**
+ * Handler for outbound requests.
+ */
+final class RemoteRequestHandler extends
AbstractHandleableCloseable<RequestHandler> implements RequestHandler {
+ private static final Logger log = Loggers.REQUEST_HANDLER_LOGGER;
+
+ private final boolean local;
+ private final int identifier;
+ private final MarshallerFactory factory;
+ private final MarshallingConfiguration marshallingConfiguration;
+ private final EstablishedConnection connection;
+
+ RemoteRequestHandler(final boolean local, final int identifier, final
EstablishedConnection connection, final MarshallerFactory factory, final
MarshallingConfiguration marshallingConfiguration) {
+ super(connection.getExecutor());
+ this.connection = connection;
+ this.identifier = identifier;
+ this.local = local;
+ this.factory = factory;
+ this.marshallingConfiguration = marshallingConfiguration;
+ }
+
+ @Override
+ protected void closeAction() throws IOException {
+ // todo remove from connection
+ ByteBuffer buffer = ByteBuffer.allocate(10);
+ Protocol.writeFlags(buffer, local, false, true, false, false,
Protocol.TYPE_CLIENT);
+ buffer.put((byte) Protocol.CLIENT_CLOSE);
+ buffer.putInt(identifier);
+ buffer.flip();
+ Channels.sendBlocking(connection.getChannel(), buffer);
+ }
+
+ public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler
handler) {
+ log.trace("Sending outbound request (request id = %d) (type is %s)",
request == null ? "null" : request.getClass());
+ final EstablishedConnection connection = this.connection;
+ connection.setCurrent();
+ try {
+ final int newRequestId = 0;
+ // todo associate the reply handler with the new request ID
+ final WritableMessageChannel channel = connection.getChannel();
+ final Marshaller marshaller =
factory.createMarshaller(marshallingConfiguration);
+ final Simplaphore windowSemaphore = new
Simplaphore(connection.getConfiguration().getConfiguredTransmitWindowSize());
+ final AtomicBoolean first = new AtomicBoolean();
+ final ByteOutput output = new NioByteOutput(new NioByteOutput.BufferWriter()
{
+
+ public ByteBuffer getBuffer() {
+ final ByteBuffer buffer =
ByteBuffer.allocate(connection.getConfiguration().getConfiguredTransmitSize());
+ Protocol.writeFlags(buffer, local, first.getAndSet(false), false,
false, false, Protocol.TYPE_REQUEST);
+ buffer.putShort((short) newRequestId);
+ buffer.put(Protocol.REQUEST_DATA);
+ return buffer;
+ }
+
+ public void accept(final ByteBuffer buffer, final boolean eof) throws
IOException {
+ if (eof) {
+ buffer.put(3, Protocol.REQUEST_DATA_LAST);
+ }
+ try {
+ windowSemaphore.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException();
+ }
+ Channels.sendBlocking(channel, buffer);
+ }
+
+ public void flush() throws IOException {
+ Channels.flushBlocking(channel);
+ }
+ });
+ boolean ok = false;
+ try {
+ marshaller.start(output);
+ marshaller.writeShort(identifier);
+ marshaller.writeObject(request);
+ marshaller.finish();
+ output.close();
+ ok = true;
+ return new RemoteRequestContext() {
+ public RemoteRequestContext cancel() {
+ final ByteBuffer buffer = ByteBuffer.allocate(5);
+ Protocol.writeFlags(buffer, local, first.getAndSet(false), false,
false, false, Protocol.TYPE_REQUEST);
+ buffer.putShort((short) newRequestId);
+ buffer.put(Protocol.REQUEST_CANCEL_REQ);
+ buffer.flip();
+ try {
+ windowSemaphore.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return this;
+ }
+ try {
+ Channels.sendBlocking(channel, buffer);
+ } catch (IOException e) {
+ log.trace("Exception while sending cancel
request");
+ return this;
+ }
+ return this;
+ }
+ };
+ } finally {
+ if (!ok) {
+ // try to send notice of the exception
+ final ByteBuffer buffer = ByteBuffer.allocate(10);
+ Protocol.writeFlags(buffer, local, first.getAndSet(true), true,
false, false, Protocol.TYPE_REQUEST);
+ buffer.putShort((short) newRequestId);
+ buffer.put(Protocol.REQUEST_EXCEPTION);
+ buffer.flip();
+ windowSemaphore.acquireUninterruptibly();
+ Channels.sendBlocking(channel, buffer);
+ // todo remove request
+ }
+ IoUtils.safeClose(output);
+ IoUtils.safeClose(marshaller);
+ }
+ } catch (final IOException t) {
+ log.trace(t, "receiveRequest failed with an exception");
+ SpiUtils.safeHandleException(handler, t);
+ return SpiUtils.getBlankRemoteRequestContext();
+ } finally {
+ connection.clearCurrent();
+ }
+ }
+
+ public String toString() {
+ return "forwarding request handler <" +
Integer.toHexString(hashCode()) + "> (id = " + identifier + ") for
" + connection;
+ }
+}
\ No newline at end of file
Copied:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteServerFactory.java
(from rev 5508,
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexServerFactory.java)
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteServerFactory.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/RemoteServerFactory.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -0,0 +1,42 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import org.jboss.xnio.channels.TcpChannel;
+import org.jboss.xnio.ChannelListener;
+import org.jboss.xnio.OptionMap;
+
+/**
+ * The server factory for a remote connection provider. Use this to create servers which
connect to the
+ * corresponding endpoint.
+ */
+public interface RemoteServerFactory {
+
+ /**
+ * Get a channel listener for a remote server.
+ *
+ * @param optionMap the server option map
+ * @return the channel listener
+ */
+ ChannelListener<TcpChannel> getServerListener(OptionMap optionMap);
+}
Deleted:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/SendingByteOutput.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/SendingByteOutput.java 2009-11-18
00:28:44 UTC (rev 5599)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/SendingByteOutput.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -1,165 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2009, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting3.multiplex;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedByInterruptException;
-import org.jboss.marshalling.ByteOutput;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.channels.WritableMessageChannel;
-
-final class SendingByteOutput extends OutputStream implements ByteOutput {
- private ByteBuffer buffer;
- private final BufferAllocator<ByteBuffer> allocator;
- private final WritableMessageChannel channel;
- private int windowSize;
- private boolean closed;
- private IOException remoteException;
-
- public SendingByteOutput(final WritableMessageChannel channel, final
BufferAllocator<ByteBuffer> allocator, final int windowSize) {
- this.channel = channel;
- this.allocator = allocator;
- this.windowSize = windowSize;
- }
-
- private static IOException closed() {
- return new IOException("Stream is closed");
- }
-
- private void checkClosed() throws IOException {
- if (closed) {
- throw closed();
- }
- if (remoteException != null) {
- closed = true;
- throw remoteException;
- }
- }
-
- public void write(final int b) throws IOException {
- synchronized (this) {
- checkClosed();
- final ByteBuffer buffer = this.buffer;
- if (buffer == null) {
- this.buffer = allocator.allocate();
- } else if (! buffer.hasRemaining()) {
- send();
- this.buffer = allocator.allocate();
- }
- buffer.put((byte) b);
- }
- }
-
- public void write(final byte[] b, int off, int len) throws IOException {
- synchronized (this) {
- checkClosed();
- while (len > 0) {
- ByteBuffer buffer = this.buffer;
- if (buffer == null) {
- buffer = this.buffer = allocator.allocate();
- } else if (! buffer.hasRemaining()) {
- send();
- buffer = this.buffer = allocator.allocate();
- }
- final int cnt = Math.min(len, buffer.remaining());
- buffer.put(b, off, cnt);
- len -= cnt;
- off += cnt;
-
- }
- }
- }
-
- // call with lock held
- private void send() throws IOException {
- while (windowSize == 0) {
- try {
- wait();
- } catch (InterruptedException e) {
- buffer = null;
- closed = true;
- Thread.currentThread().interrupt();
- throw new ClosedByInterruptException();
- }
- checkClosed();
- }
- final ByteBuffer buffer = this.buffer;
- this.buffer = null;
- buffer.flip();
- final WritableMessageChannel channel = this.channel;
- try {
- while (! (channel.send(buffer))) {
- channel.awaitWritable();
- checkClosed();
- if (Thread.currentThread().isInterrupted()) {
- closed = true;
- throw new ClosedByInterruptException();
- }
- }
- } finally {
- allocator.free(buffer);
- }
- windowSize--;
- }
-
- void acknowledge() {
- synchronized (this) {
- windowSize++;
- notify();
- }
- }
-
- public void flush() throws IOException {
- synchronized (this) {
- final ByteBuffer buffer = this.buffer;
- if (buffer != null && buffer.position() > 0) {
- send();
- }
- }
- }
-
- public void close() throws IOException {
- synchronized (this) {
- if (closed) {
- return;
- }
- final ByteBuffer buffer = this.buffer;
- if (buffer != null && buffer.position() > 0) {
- send();
- }
- closed = true;
- }
- }
-
- void abort() {
- synchronized (this) {
- if (closed) {
- return;
- }
- closed = true;
- notifyAll();
- }
- }
-}
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/Simplaphore.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/Simplaphore.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/Simplaphore.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -0,0 +1,74 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+/**
+ * A very simple, unfair, fast semaphore impl; much smaller than
java.util.concurrent.Semaphore.
+ */
+final class Simplaphore {
+ int permits;
+
+ Simplaphore(final int permits) {
+ this.permits = permits;
+ }
+
+ void release() {
+ synchronized (this) {
+ final int permits = this.permits++;
+ if (permits == 0) {
+ notify();
+ }
+ }
+ }
+
+ void acquire() throws InterruptedException {
+ synchronized (this) {
+ final int permits = this.permits;
+ while (permits == 0) {
+ wait();
+ }
+ this.permits = permits - 1;
+ }
+ }
+
+ void acquireUninterruptibly() {
+ synchronized (this) {
+ boolean intr = false;
+ try {
+ final int permits = this.permits;
+ while (permits == 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ intr = true;
+ }
+ }
+ this.permits = permits - 1;
+ } finally {
+ if (intr) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+}
Added:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/SubchannelHandler.java
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/SubchannelHandler.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/SubchannelHandler.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -0,0 +1,34 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+public interface SubchannelHandler {
+ void handleMessage(EstablishedConnection conn, int subchannel, ByteBuffer message);
+
+ void handleEof();
+}
Copied:
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/UnforwardedRequestHandlerConnector.java
(from rev 5501,
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/MultiplexRequestHandlerConnector.java)
===================================================================
---
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/UnforwardedRequestHandlerConnector.java
(rev 0)
+++
remoting3-multiplex/trunk/src/main/java/org/jboss/remoting3/multiplex/UnforwardedRequestHandlerConnector.java 2009-11-19
22:14:53 UTC (rev 5600)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.multiplex;
+
+import org.jboss.remoting3.spi.RequestHandlerConnector;
+import org.jboss.remoting3.spi.RequestHandler;
+import org.jboss.xnio.Cancellable;
+import org.jboss.xnio.Result;
+
+final class UnforwardedRequestHandlerConnector implements RequestHandlerConnector {
+ private final EstablishedConnection connection;
+
+ UnforwardedRequestHandlerConnector(final EstablishedConnection connection) {
+ this.connection = connection;
+ }
+
+ EstablishedConnection getConnection() {
+ return connection;
+ }
+
+ public Cancellable createRequestHandler(final Result<RequestHandler> result)
throws SecurityException {
+ throw new SecurityException("Not forwarded");
+ }
+}