Author: david.lloyd(a)jboss.com
Date: 2010-02-26 18:42:04 -0500 (Fri, 26 Feb 2010)
New Revision: 5758
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundClient.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundReplyExceptionTask.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundReplyTask.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundRequest.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundRequestTask.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundClient.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundReplyHandler.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundRequest.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundRequestHandler.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionProvider.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteOpenListener.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocolDescriptor.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/ReplyBufferWriter.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/ReplyInputHandler.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RequestBufferWriter.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RequestInputHandler.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/NetworkServerProvider.java
remoting3/trunk/jboss-remoting/src/main/resources/META-INF/services/
remoting3/trunk/jboss-remoting/src/main/resources/META-INF/services/org.jboss.remoting3.spi.RemotingServiceDescriptor
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/InvocationTestObject.java
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/EndpointTestCase.java
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/LocalTestCase.java
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/RemoteTestCase.java
remoting3/trunk/jboss-remoting/src/test/resources/remoting.properties
Removed:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/service/Services.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/service/locator/
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/EndpointTestCase.java
Modified:
remoting3/trunk/jboss-remoting/pom.xml
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ClientConnectorImpl.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/Connection.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ConnectionImpl.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/Endpoint.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/LocalConnectionHandler.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/LocalConnectionProvider.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/LocalRequestHandler.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/Remoting.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/RemotingOptions.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/RequestContextImpl.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/RequestHandlerFactory.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ServiceRegistrationInfo.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ServiceRegistrationListener.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionHandler.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionHandlerContext.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionProvider.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionProviderFactory.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionProviderRegistration.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketConnectionProvider.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketProtocol.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/client/SocketClientConnectionHandler.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketServerConnectionHandler.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketServerRequestHandler.java
Log:
Rough remote: impl based on original protocol, rather than fancy two-way version; still
needs a lot of TLC
Modified: remoting3/trunk/jboss-remoting/pom.xml
===================================================================
--- remoting3/trunk/jboss-remoting/pom.xml 2010-02-24 00:45:08 UTC (rev 5757)
+++ remoting3/trunk/jboss-remoting/pom.xml 2010-02-26 23:42:04 UTC (rev 5758)
@@ -31,6 +31,8 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <xnio.version>2.1.0.CR2-SNAPSHOT</xnio.version>
+ <jbmar.version>1.3.0.CR1-SNAPSHOT</jbmar.version>
</properties>
<groupId>org.jboss.remoting</groupId>
@@ -41,16 +43,28 @@
<dependency>
<groupId>org.jboss.xnio</groupId>
<artifactId>xnio-api</artifactId>
- <version>2.1.0.CR2-SNAPSHOT</version>
+ <version>${xnio.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>org.jboss.xnio</groupId>
+ <artifactId>xnio-nio</artifactId>
+ <version>${xnio.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>marshalling-api</artifactId>
- <version>1.3.0.CR1-SNAPSHOT</version>
+ <version>${jbmar.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>org.jboss.marshalling</groupId>
+ <artifactId>river</artifactId>
+ <version>${jbmar.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.2</version>
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ClientConnectorImpl.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ClientConnectorImpl.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ClientConnectorImpl.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -50,9 +50,6 @@
}
public IoFuture<? extends Client<I, O>> getFutureClient() throws
SecurityException {
- if (clientContext != null) {
- throw new SecurityException("Connector has not been sent");
- }
final FutureResult<Client<I, O>> futureResult = new
FutureResult<Client<I, O>>();
requestHandlerConnector.createRequestHandler(new
TranslatingResult<RequestHandler, Client<I, O>>(futureResult) {
protected Client<I, O> translate(final RequestHandler input) throws
IOException {
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/Connection.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/Connection.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/Connection.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -34,31 +34,6 @@
public interface Connection extends HandleableCloseable<Connection>, Attachable {
/**
- * Open a client to a well-known service on the remote endpoint via this connection.
- *
- * @param serviceSlot the service slot to connect to
- * @param requestClass the request class
- * @param replyClass the reply class
- * @param <I> the request type
- * @param <O> the reply type
- * @return the future client
- */
- <I, O> IoFuture<? extends Client<I, O>> openClient(int serviceSlot,
Class<I> requestClass, Class<O> replyClass);
-
- /**
- * Open a client to a well-known service on the remote endpoint via this connection.
- *
- * @param serviceSlot the service slot to connect to
- * @param requestClass the request class
- * @param replyClass the reply class
- * @param optionMap the option map
- * @param <I> the request type
- * @param <O> the reply type
- * @return the future client
- */
- <I, O> IoFuture<? extends Client<I, O>> openClient(int serviceSlot,
Class<I> requestClass, Class<O> replyClass, OptionMap optionMap);
-
- /**
* Locate and open a client on the remote side of this connection.
*
* @param serviceType the service type
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ConnectionImpl.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ConnectionImpl.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ConnectionImpl.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -23,18 +23,12 @@
package org.jboss.remoting3;
import java.io.IOException;
-import java.net.URI;
-import org.jboss.remoting3.security.RemotingPermission;
-import org.jboss.remoting3.service.Services;
-import org.jboss.remoting3.service.locator.ServiceReply;
-import org.jboss.remoting3.service.locator.ServiceRequest;
import org.jboss.remoting3.spi.AbstractHandleableCloseable;
import org.jboss.remoting3.spi.ConnectionHandler;
import org.jboss.remoting3.spi.ConnectionHandlerFactory;
import org.jboss.remoting3.spi.ConnectionProviderContext;
import org.jboss.remoting3.spi.RequestHandler;
import org.jboss.remoting3.spi.RequestHandlerConnector;
-import org.jboss.xnio.FailedIoFuture;
import org.jboss.xnio.FutureResult;
import org.jboss.xnio.IoFuture;
import org.jboss.xnio.IoUtils;
@@ -47,7 +41,6 @@
private final EndpointImpl endpoint;
private final ConnectionHandler connectionHandler;
- private final IoFuture<? extends Client<ServiceRequest, ServiceReply>>
serviceLocator;
private final String name;
ConnectionImpl(final EndpointImpl endpoint, final ConnectionHandlerFactory
connectionHandlerFactory, final ConnectionProviderContext connectionProviderContext, final
String name) {
@@ -55,51 +48,20 @@
this.endpoint = endpoint;
this.name = name;
connectionHandler = connectionHandlerFactory.createInstance(endpoint.new
LocalConnectionContext(connectionProviderContext, this));
- serviceLocator = doOpenClient(Services.LOCATE_SERVICE, ServiceRequest.class,
ServiceReply.class);
}
protected void closeAction() throws IOException {
connectionHandler.close();
}
- public <I, O> IoFuture<? extends Client<I, O>> openClient(final int
serviceSlot, final Class<I> requestClass, final Class<O> replyClass) {
- return openClient(serviceSlot, requestClass, replyClass, OptionMap.EMPTY);
- }
-
- public <I, O> IoFuture<? extends Client<I, O>> openClient(final int
serviceSlot, final Class<I> requestClass, final Class<O> replyClass, final
OptionMap optionMap) {
- final SecurityManager sm = System.getSecurityManager();
- if (sm != null) {
- sm.checkPermission(new RemotingPermission("openClientByID"));
- }
- return doOpenClient(serviceSlot, requestClass, replyClass);
- }
-
- private <I, O> IoFuture<? extends Client<I, O>> doOpenClient(final
int serviceSlot, final Class<I> requestClass, final Class<O> replyClass) {
- final FutureResult<Client<I, O>> futureResult = new
FutureResult<Client<I, O>>();
- futureResult.addCancelHandler(connectionHandler.open(serviceSlot, new
ClientWrapper<I, O>(endpoint, futureResult, requestClass, replyClass)));
- return futureResult.getIoFuture();
- }
-
public <I, O> IoFuture<? extends Client<I, O>> openClient(final
String serviceType, final String groupName, final Class<I> requestClass, final
Class<O> replyClass) {
return openClient(serviceType, groupName, requestClass, replyClass,
OptionMap.EMPTY);
}
public <I, O> IoFuture<? extends Client<I, O>> openClient(final
String serviceType, final String groupName, final Class<I> requestClass, final
Class<O> replyClass, final OptionMap optionMap) {
- final Client<ServiceRequest, ServiceReply> locator;
- try {
- locator = serviceLocator.get();
- } catch (IOException e) {
- return new FailedIoFuture<Client<I, O>>(e);
- }
- final FutureResult<Client<I, O>> futureClientResult = new
FutureResult<Client<I, O>>();
- try {
- final IoFuture<? extends ServiceReply<I, O>> futureServiceReply =
locator.sendTyped(new ServiceRequest<I, O>(serviceType, groupName, requestClass,
replyClass));
- futureServiceReply.addNotifier(new ServiceReplyNotifier<I,
O>(requestClass, replyClass), futureClientResult);
- futureClientResult.addCancelHandler(futureServiceReply);
- } catch (IOException e) {
- return new FailedIoFuture<Client<I, O>>(e);
- }
- return futureClientResult.getIoFuture();
+ final FutureResult<Client<I, O>> futureResult = new
FutureResult<Client<I, O>>();
+ futureResult.addCancelHandler(connectionHandler.open(serviceType, groupName, new
ClientWrapper<I, O>(endpoint, futureResult, requestClass, replyClass)));
+ return futureResult.getIoFuture();
}
public <I, O> ClientConnector<I, O> createClientConnector(final
RequestListener<I, O> listener, final Class<I> requestClass, final
Class<O> replyClass) throws IOException {
@@ -143,44 +105,4 @@
return endpoint.createClient(input, requestClass, replyClass);
}
}
-
- private class ServiceReplyNotifier<I, O> extends
IoFuture.HandlingNotifier<ServiceReply<I, O>, FutureResult<Client<I,
O>>> {
- private final Class<?> expectedRequestClass;
- private final Class<?> expectedReplyClass;
-
- private ServiceReplyNotifier(final Class<?> expectedRequestClass, final
Class<?> expectedReplyClass) {
- this.expectedRequestClass = expectedRequestClass;
- this.expectedReplyClass = expectedReplyClass;
- }
-
- public void handleCancelled(final FutureResult<Client<I, O>>
attachment) {
- attachment.setCancelled();
- }
-
- public void handleFailed(final IOException exception, final
FutureResult<Client<I, O>> attachment) {
- if (exception instanceof RemoteExecutionException) {
- final Throwable cause = exception.getCause();
- if (cause instanceof IOException) {
- attachment.setException((IOException) cause);
- return;
- }
- }
- attachment.setException(exception);
- }
-
- public void handleDone(final ServiceReply<I, O> data, final
FutureResult<Client<I, O>> attachment) {
- final Class<I> actualRequestClass = data.getActualRequestClass();
- final Class<O> actualReplyClass = data.getActualReplyClass();
- try {
- actualRequestClass.asSubclass(expectedRequestClass);
- expectedReplyClass.asSubclass(actualReplyClass);
- } catch (ClassCastException e) {
- attachment.setException(new ServiceOpenException("Incorrect type
specified", e));
- return;
- }
- final IoFuture<? extends Client<I, O>> futureClient =
doOpenClient(data.getSlot(), actualRequestClass, actualReplyClass);
- futureClient.addNotifier(IoUtils.<Client<I, O>>resultNotifier(),
attachment);
- attachment.addCancelHandler(futureClient);
- }
- }
}
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/Endpoint.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/Endpoint.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/Endpoint.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -257,7 +257,7 @@
* @return a handle which may be used to remove the registration
* @throws DuplicateRegistrationException if there is already a provider registered
to that URI scheme
*/
- <T> ConnectionProviderRegistration<T> addConnectionProvider(String
uriScheme, ConnectionProviderFactory<T> providerFactory) throws
DuplicateRegistrationException;
+ ConnectionProviderRegistration addConnectionProvider(String uriScheme,
ConnectionProviderFactory providerFactory) throws DuplicateRegistrationException;
/**
* Get the interface for a connection provider.
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -31,7 +31,6 @@
import java.util.List;
import java.util.Map;
import java.util.Queue;
-import java.util.Random;
import java.util.Set;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -39,12 +38,8 @@
import java.util.concurrent.Executor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.jboss.marshalling.util.IntKeyMap;
import org.jboss.remoting3.security.RemotingPermission;
import org.jboss.remoting3.security.SimpleClientCallbackHandler;
-import org.jboss.remoting3.service.Services;
-import org.jboss.remoting3.service.locator.ServiceReply;
-import org.jboss.remoting3.service.locator.ServiceRequest;
import org.jboss.remoting3.spi.AbstractHandleableCloseable;
import org.jboss.remoting3.spi.ConnectionHandlerFactory;
import org.jboss.remoting3.spi.ConnectionProvider;
@@ -128,11 +123,10 @@
* The currently registered services. Protected by {@link #serviceWriteLock}.
*/
private final Map<String, Map<String, ServiceRegistrationInfo>>
localServiceIndex = hashMap();
- private final IntKeyMap<ServiceRegistrationInfo> localServiceTable = new
IntKeyMap<ServiceRegistrationInfo>();
/**
* The currently registered connection providers.
*/
- private final ConcurrentMap<String, ConnectionProvider<?>>
connectionProviders = concurrentMap();
+ private final ConcurrentMap<String, ConnectionProvider> connectionProviders =
concurrentMap();
/**
* The provider maps for the different protocol service types.
*/
@@ -162,15 +156,9 @@
this.executor = executor;
this.name = name;
connectionProviderContext = new ConnectionProviderContextImpl();
- // add local connection provider
+ // add default connection providers
connectionProviders.put("local", new
LocalConnectionProvider(connectionProviderContext));
// add default services
- serviceBuilder(ServiceRequest.class, ServiceReply.class)
- .setOptionMap(OptionMap.builder().set(RemotingOptions.FIXED_SERVICE_ID,
Services.LOCATE_SERVICE).getMap())
- .setClientListener(new ServiceLocationListener())
- .setServiceType("org.jboss.remoting.service.locate")
- .setGroupName("default")
- .register();
this.optionMap = optionMap;
}
@@ -204,13 +192,22 @@
}
public <I, O> RequestHandler createLocalRequestHandler(final
RequestListener<? super I, ? extends O> requestListener, final Class<I>
requestClass, final Class<O> replyClass) throws IOException {
+ if (requestListener == null) {
+ throw new IllegalArgumentException("requestListener is null");
+ }
+ if (requestClass == null) {
+ throw new IllegalArgumentException("requestClass is null");
+ }
+ if (replyClass == null) {
+ throw new IllegalArgumentException("replyClass is null");
+ }
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(CREATE_REQUEST_HANDLER_PERM);
}
checkOpen();
final ClientContextImpl clientContext = new ClientContextImpl(executor, null);
- final LocalRequestHandler<I, O> localRequestHandler = new
LocalRequestHandler<I, O>(executor, requestListener, clientContext, requestClass,
replyClass);
+ final LocalRequestHandler<I, O> localRequestHandler = new
LocalRequestHandler<I, O>(executor, requestListener, clientContext, requestClass,
replyClass, requestListener.getClass().getClassLoader());
final WeakCloseable lrhCloseable = new WeakCloseable(localRequestHandler);
clientContext.addCloseHandler(new CloseHandler<ClientContext>() {
public void handleClose(final ClientContext closed) {
@@ -330,31 +327,13 @@
Lock lock = serviceWriteLock;
lock.lock();
try {
- final int slot;
- final Integer fixedServiceId =
optionMap.get(RemotingOptions.FIXED_SERVICE_ID);
- if (fixedServiceId != null) {
- slot = fixedServiceId.intValue();
- if (slot < 0) {
- throw new IllegalArgumentException("fixed service ID must be
greater than or equal to zero");
- }
- // todo - check permission for fixed service ID
- if (localServiceTable.containsKey(slot)) {
- throw new DuplicateRegistrationException("A service with
slot ID " + slot + " is already registered");
- }
- } else {
- int id;
- do {
- id = getRandomSlotID();
- } while (localServiceTable.containsKey(id));
- slot = id;
- }
- log.trace("Registering a service type '%s' group name
'%s' with ID %d", serviceType, groupName, Integer.valueOf(slot));
+ log.trace("Registering a service type '%s' group name
'%s'", serviceType, groupName);
final String canonServiceType = serviceType.toLowerCase();
final String canonGroupName = groupName.toLowerCase();
final Executor executor = EndpointImpl.this.executor;
final Map<String, Map<String, ServiceRegistrationInfo>>
registeredLocalServices = localServiceIndex;
- final RequestHandlerFactory<I, O> handlerFactory =
RequestHandlerFactory.create(executor, clientListener, requestType, replyType);
- final ServiceRegistrationInfo registration = new
ServiceRegistrationInfo(serviceType, groupName, name, optionMap, handlerFactory, slot);
+ final RequestHandlerFactory<I, O> handlerFactory =
RequestHandlerFactory.create(executor, clientListener, requestType, replyType,
classLoader);
+ final ServiceRegistrationInfo registration = new
ServiceRegistrationInfo(serviceType, groupName, name, optionMap, handlerFactory);
// this handle is used to remove the service registration
class ServiceRegistration extends
AbstractHandleableCloseable<Registration> implements Registration {
@@ -373,12 +352,7 @@
submap.remove(groupName);
}
}
- final int regSlot = slot;
- final ServiceRegistrationInfo oldReg =
localServiceTable.get(regSlot);
- if (oldReg == registration) {
- localServiceTable.remove(regSlot);
- }
- log.trace("Removed service type '%s' group name
'%s' with ID %d", serviceType, groupName, Integer.valueOf(slot));
+ log.trace("Removed service type '%s' group name
'%s'", serviceType, groupName);
} finally {
lock.unlock();
}
@@ -406,7 +380,6 @@
registeredLocalServices.put(canonServiceType, submap);
}
submap.put(canonGroupName, registration);
- localServiceTable.put(slot, registration);
// downgrade safely to read lock
final Lock readLock = serviceReadLock;
//noinspection LockAcquiredButNotSafelyReleased
@@ -425,7 +398,6 @@
serviceInfo.setServiceType(serviceType);
serviceInfo.setOptionMap(optionMap);
serviceInfo.setRegistrationHandle(handle);
- serviceInfo.setSlot(slot);
serviceInfo.setRequestClass(requestType);
serviceInfo.setReplyClass(replyType);
final ClassLoader classLoader = this.classLoader;
@@ -450,12 +422,6 @@
}
}
- private int getRandomSlotID() {
- final Random random = new Random();
- int id = random.nextInt() & 0x7fffffff;
- return id;
- }
-
private static void logListenerError(final Throwable t) {
log.error(t, "Service listener threw an exception");
}
@@ -543,10 +509,13 @@
executor.execute(new Runnable() {
public void run() {
final ServiceRegistrationListener.ServiceInfo serviceInfo
= new ServiceRegistrationListener.ServiceInfo();
+ final RequestHandlerFactory<?, ?> handlerFactory =
service.getRequestHandlerFactory();
+
serviceInfo.setRequestClass(handlerFactory.getRequestClass());
+
serviceInfo.setReplyClass(handlerFactory.getReplyClass());
+
serviceInfo.setServiceClassLoader(handlerFactory.getServiceClassLoader());
serviceInfo.setGroupName(service.getGroupName());
serviceInfo.setOptionMap(service.getOptionMap());
serviceInfo.setRegistrationHandle(service.getHandle());
- serviceInfo.setSlot(service.getSlot());
serviceInfo.setServiceType(service.getServiceType());
try {
listener.serviceRegistered(registration,
serviceInfo);
@@ -574,7 +543,7 @@
sm.checkPermission(CONNECT_PERM);
}
final String scheme = destination.getScheme();
- final ConnectionProvider<?> connectionProvider =
connectionProviders.get(scheme);
+ final ConnectionProvider connectionProvider = connectionProviders.get(scheme);
if (connectionProvider == null) {
throw new UnknownURISchemeException("No connection provider for URI
scheme \"" + scheme + "\" is installed");
}
@@ -593,17 +562,17 @@
return connect(destination, connectOptions, new
SimpleClientCallbackHandler(actualUserName, actualUserRealm, password));
}
- public <T> ConnectionProviderRegistration<T> addConnectionProvider(final
String uriScheme, final ConnectionProviderFactory<T> providerFactory) {
+ public ConnectionProviderRegistration addConnectionProvider(final String uriScheme,
final ConnectionProviderFactory providerFactory) {
final SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(ADD_CONNECTION_PROVIDER_PERM);
}
final ConnectionProviderContextImpl context = new
ConnectionProviderContextImpl();
- final ConnectionProvider<T> provider =
providerFactory.createInstance(context);
+ final ConnectionProvider provider = providerFactory.createInstance(context);
if (connectionProviders.putIfAbsent(uriScheme, provider) != null) {
throw new DuplicateRegistrationException("URI scheme '" +
uriScheme + "' is already registered to a provider");
}
- final ConnectionProviderRegistration<T> handle = new
ConnectionProviderRegistrationImpl<T>(uriScheme, provider);
+ final ConnectionProviderRegistration handle = new
ConnectionProviderRegistrationImpl(uriScheme, provider);
return handle;
}
@@ -612,7 +581,10 @@
if (sm != null) {
sm.checkPermission(GET_CONNECTION_PROVIDER_INTERFACE_PERM);
}
- final ConnectionProvider<?> provider = connectionProviders.get(uriScheme);
+ if (! expectedType.isInterface()) {
+ throw new IllegalArgumentException("Interface expected");
+ }
+ final ConnectionProvider provider = connectionProviders.get(uriScheme);
if (provider == null) {
throw new UnknownURISchemeException("No connection provider for URI
scheme \"" + uriScheme + "\" is installed");
}
@@ -680,16 +652,24 @@
return connectionProviderContext;
}
- public RequestHandler openService(final int slotId, final OptionMap optionMap)
throws ServiceOpenException {
- serviceReadLock.lock();
+ public RequestHandler openService(final String serviceType, final String
groupName, final OptionMap optionMap) {
+ final Lock lock = serviceReadLock;
+ lock.lock();
try {
- final ServiceRegistrationInfo info = localServiceTable.get(slotId);
+ final Map<String, ServiceRegistrationInfo> subMap =
localServiceIndex.get(serviceType);
+ final ServiceRegistrationInfo info;
+ if (groupName == null || groupName.length() == 0 ||
"*".equals(groupName)) {
+ final Iterator<ServiceRegistrationInfo> i =
subMap.values().iterator();
+ info = i.hasNext() ? i.next() : null;
+ } else {
+ info = subMap == null ? null : subMap.get(groupName);
+ }
if (info == null) {
- throw new ServiceOpenException("No service with ID of " +
slotId);
+ return null;
}
return info.getRequestHandlerFactory().createRequestHandler(connection);
} finally {
- serviceReadLock.unlock();
+ lock.unlock();
}
}
@@ -720,12 +700,12 @@
}
}
- private class ConnectionProviderRegistrationImpl<T> extends
AbstractHandleableCloseable<Registration> implements
ConnectionProviderRegistration<T> {
+ private class ConnectionProviderRegistrationImpl extends
AbstractHandleableCloseable<Registration> implements ConnectionProviderRegistration
{
private final String uriScheme;
- private final ConnectionProvider<T> provider;
+ private final ConnectionProvider provider;
- public ConnectionProviderRegistrationImpl(final String uriScheme, final
ConnectionProvider<T> provider) {
+ public ConnectionProviderRegistrationImpl(final String uriScheme, final
ConnectionProvider provider) {
super(executor);
this.uriScheme = uriScheme;
this.provider = provider;
@@ -744,55 +724,8 @@
}
}
- public T getProviderInterface() {
+ public Object getProviderInterface() {
return provider.getProviderInterface();
}
}
-
- final class ServiceLocationListener implements ClientListener<ServiceRequest,
ServiceReply> {
- private final RequestListener<ServiceRequest, ServiceReply> requestListener
= new RequestListener<ServiceRequest, ServiceReply>() {
- public void handleRequest(final RequestContext<ServiceReply> context,
final ServiceRequest request) throws RemoteExecutionException {
- final String requestGroupName = request.getGroupName();
- final String requestServiceType = request.getServiceType();
- final Lock lock = serviceReadLock;
- lock.lock();
- try {
- final Map<String, ServiceRegistrationInfo> submap =
localServiceIndex.get(requestServiceType);
- if (submap != null) {
- final ServiceRegistrationInfo info;
- if (requestGroupName == null ||
"*".equals(requestGroupName)) {
- final
Iterator<Map.Entry<String,ServiceRegistrationInfo>> i =
submap.entrySet().iterator();
- if (i.hasNext()) {
- final Map.Entry<String, ServiceRegistrationInfo>
entry = i.next();
- info = entry.getValue();
- } else {
- info = null;
- }
- } else {
- info = submap.get(requestGroupName);
- }
- if (info != null) {
- try {
- context.sendReply(ServiceReply.create(info.getSlot(),
info.getRequestHandlerFactory().getRequestClass(),
info.getRequestHandlerFactory().getReplyClass()));
- } catch (IOException e) {
- log.trace("Failed to send service reply: %s",
e);
- // reply failed
- }
- return;
- }
- }
- throw new RemoteExecutionException(new
ServiceNotFoundException(ServiceURI.create(requestServiceType, requestGroupName, null)));
- } finally {
- lock.unlock();
- }
- }
-
- public void handleClose() {
- }
- };
-
- public RequestListener<ServiceRequest, ServiceReply> handleClientOpen(final
ClientContext clientContext) {
- return requestListener;
- }
- }
}
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/LocalConnectionHandler.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/LocalConnectionHandler.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/LocalConnectionHandler.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -40,13 +40,13 @@
this.connectionHandlerContext = connectionHandlerContext;
}
- public Cancellable open(final int slot, final Result<RequestHandler> result) {
- try {
- // todo: support for call-by-value
- final RequestHandler handler = connectionHandlerContext.openService(slot,
OptionMap.EMPTY);
+ public Cancellable open(final String serviceType, final String groupName, final
Result<RequestHandler> result) {
+ // todo: support for call-by-value
+ final RequestHandler handler = connectionHandlerContext.openService(serviceType,
groupName, OptionMap.EMPTY);
+ if (handler == null) {
+ result.setException(new
ServiceNotFoundException(ServiceURI.create(serviceType, groupName, null)));
+ } else {
result.setResult(handler);
- } catch (IOException e) {
- result.setException(e);
}
return IoUtils.nullCancellable();
}
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/LocalConnectionProvider.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/LocalConnectionProvider.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/LocalConnectionProvider.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -35,7 +35,7 @@
import javax.security.auth.callback.CallbackHandler;
-final class LocalConnectionProvider implements ConnectionProvider<Void> {
+final class LocalConnectionProvider implements ConnectionProvider {
private final ConnectionProviderContext providerContext;
public LocalConnectionProvider(final ConnectionProviderContext providerContext) {
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/LocalRequestHandler.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/LocalRequestHandler.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/LocalRequestHandler.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -42,12 +42,14 @@
private final ClientContextImpl clientContext;
private final Class<I> requestClass;
private final Class<O> replyClass;
+ private final ClassLoader serviceClassLoader;
private static final Logger log =
Logger.getLogger("org.jboss.remoting.listener");
@SuppressWarnings({ "unchecked" })
- LocalRequestHandler(final Executor executor, final RequestListener<? super I, ?
extends O> requestListener, final ClientContextImpl clientContext, final Class<I>
requestClass, final Class<O> replyClass) {
+ LocalRequestHandler(final Executor executor, final RequestListener<? super I, ?
extends O> requestListener, final ClientContextImpl clientContext, final Class<I>
requestClass, final Class<O> replyClass, final ClassLoader serviceClassLoader) {
super(executor);
+ this.serviceClassLoader = serviceClassLoader;
this.requestListener = (RequestListener<I, O>) requestListener;
this.clientContext = clientContext;
this.requestClass = requestClass;
@@ -55,7 +57,7 @@
}
public Cancellable receiveRequest(final Object request, final ReplyHandler
replyHandler) {
- final RequestContextImpl<O> context = new
RequestContextImpl<O>(replyHandler, clientContext, replyClass);
+ final RequestContextImpl<O> context = new
RequestContextImpl<O>(replyHandler, clientContext, replyClass, serviceClassLoader);
try {
final I castRequest;
try {
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/Remoting.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/Remoting.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/Remoting.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -81,6 +81,7 @@
public Thread newThread(final Runnable r) {
final Thread thread = defaultThreadFactory.newThread(r);
+ thread.setDaemon(true);
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(final Thread t, final Throwable e) {
log.error(e, "Uncaught exception in thread %s", t);
@@ -110,19 +111,23 @@
public Endpoint run() {
boolean ok = false;
final String fileName = System.getProperty(PROPERTY_FILE_PROPNAME,
PROPERTIES);
+ log.trace("Searching for properties file named
'%s'", fileName);
final Properties props = new Properties();
try {
- final InputStream stream =
getClass().getResourceAsStream(fileName);
+ final InputStream stream =
getClass().getClassLoader().getResourceAsStream(fileName);
if (stream != null) try {
final InputStreamReader reader = new
InputStreamReader(stream, "utf-8");
try {
props.load(reader);
reader.close();
+ log.trace("Loaded properties");
} finally {
IoUtils.safeClose(reader);
}
} finally {
IoUtils.safeClose(stream);
+ } else {
+ log.trace("No properties file found in
classpath");
}
} catch (IOException e) {
throw new RuntimeException(e);
@@ -143,6 +148,7 @@
} catch (IOException e) {
throw new RuntimeException(e);
}
+ log.trace("Created %s from configuration", endpoint);
try {
endpoint.addCloseHandler(new CloseHandler<Endpoint>()
{
public void handleClose(final Endpoint closed) {
@@ -165,7 +171,7 @@
}
try {
if (serviceType == ConnectionProviderFactory.class)
{
- endpoint.addConnectionProvider(name,
(ConnectionProviderFactory<?>) service);
+ endpoint.addConnectionProvider(name,
(ConnectionProviderFactory) service);
} else if (serviceType == ClassTable.class) {
endpoint.addProtocolService(ProtocolServiceType.CLASS_TABLE, name, (ClassTable) service);
} else if (serviceType == ObjectTable.class) {
@@ -236,6 +242,7 @@
try {
final Class<? extends T> instanceType =
Class.forName(className).asSubclass(valueClass);
final T instance = instanceType.getConstructor().newInstance();
+ log.trace("Adding protocol service '%s' of type
'%s'", name, serviceType);
endpoint.addProtocolService(serviceType, name, instance);
} catch (InvocationTargetException e) {
log.warn(e.getCause(), "Unable to create %s instance
'%s'", serviceType, name);
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/RemotingOptions.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/RemotingOptions.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/RemotingOptions.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -135,11 +135,6 @@
public static final Option<String> AUTH_REALM =
Option.simple(RemotingOptions.class, "AUTH_REALM", String.class);
/**
- * Specify a fixed ID for a well-known service.
- */
- public static final Option<Integer> FIXED_SERVICE_ID =
Option.simple(RemotingOptions.class, "FIXED_SERVICE_ID", Integer.class);
-
- /**
* Specify whether a local connection or client should call by reference (the usual
default) or value.
*/
public static final Option<Boolean> CALL_BY_VALUE =
Option.simple(RemotingOptions.class, "CALL_BY_VALUE", Boolean.class);
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/RequestContextImpl.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/RequestContextImpl.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/RequestContextImpl.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -23,6 +23,8 @@
package org.jboss.remoting3;
import java.io.IOException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executor;
@@ -48,11 +50,13 @@
private Set<RequestCancelHandler<O>> cancelHandlers;
private final RequestListenerExecutor interruptingExecutor;
private final Class<O> replyClass;
+ private final ClassLoader serviceClassLoader;
- RequestContextImpl(final ReplyHandler replyHandler, final ClientContextImpl
clientContext, final Class<O> replyClass) {
+ RequestContextImpl(final ReplyHandler replyHandler, final ClientContextImpl
clientContext, final Class<O> replyClass, final ClassLoader serviceClassLoader) {
this.replyHandler = replyHandler;
this.clientContext = clientContext;
this.replyClass = replyClass;
+ this.serviceClassLoader = serviceClassLoader;
final Executor executor = clientContext.getExecutor();
//noinspection ThisEscapedInObjectConstruction
interruptingExecutor = new RequestListenerExecutor(executor, this);
@@ -123,9 +127,44 @@
}
public void execute(final Runnable command) {
- interruptingExecutor.execute(command);
+ interruptingExecutor.execute(new Runnable() {
+ public void run() {
+ final ClassLoader old;
+ final SecurityManager sm = System.getSecurityManager();
+ final ClassLoaderAction saveAction = new
ClassLoaderAction(serviceClassLoader);
+ old = sm != null ? AccessController.doPrivileged(saveAction) :
saveAction.run();
+ final ClassLoaderAction restoreAction = new ClassLoaderAction(old);
+ try {
+ command.run();
+ } finally {
+ if (sm != null) {
+ AccessController.doPrivileged(restoreAction);
+ } else {
+ restoreAction.run();
+ }
+ }
+ }
+ });
}
+ private static final class ClassLoaderAction implements
PrivilegedAction<ClassLoader> {
+
+ private final ClassLoader classLoader;
+
+ public ClassLoaderAction(final ClassLoader classLoader) {
+ this.classLoader = classLoader;
+ }
+
+ public ClassLoader run() {
+ final Thread thread = Thread.currentThread();
+ try {
+ return thread.getContextClassLoader();
+ } finally {
+ thread.setContextClassLoader(classLoader);
+ }
+ }
+ }
+
protected void cancel() {
synchronized (cancelLock) {
if (! cancelled) {
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/RequestHandlerFactory.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/RequestHandlerFactory.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/RequestHandlerFactory.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -33,21 +33,23 @@
private final ClientListener<? super I, ? extends O> clientListener;
private final Class<I> requestClass;
private final Class<O> replyClass;
+ private final ClassLoader serviceClassLoader;
- RequestHandlerFactory(final Executor executor, final ClientListener<? super I, ?
extends O> clientListener, final Class<I> requestClass, final Class<O>
replyClass) {
+ RequestHandlerFactory(final Executor executor, final ClientListener<? super I, ?
extends O> clientListener, final Class<I> requestClass, final Class<O>
replyClass, final ClassLoader serviceClassLoader) {
this.executor = executor;
this.clientListener = clientListener;
this.requestClass = requestClass;
this.replyClass = replyClass;
+ this.serviceClassLoader = serviceClassLoader;
}
- static <I, O> RequestHandlerFactory<I, O> create(final Executor executor,
final ClientListener<? super I, ? extends O> clientListener, final Class<I>
requestClass, final Class<O> replyClass) {
- return new RequestHandlerFactory<I, O>(executor, clientListener,
requestClass, replyClass);
+ static <I, O> RequestHandlerFactory<I, O> create(final Executor executor,
final ClientListener<? super I, ? extends O> clientListener, final Class<I>
requestClass, final Class<O> replyClass, final ClassLoader serviceClassLoader) {
+ return new RequestHandlerFactory<I, O>(executor, clientListener,
requestClass, replyClass, serviceClassLoader);
}
RequestHandler createRequestHandler(final Connection connection) {
final ClientContextImpl context = new ClientContextImpl(executor, connection);
- return new LocalRequestHandler<I, O>(executor,
clientListener.handleClientOpen(context), context, requestClass, replyClass);
+ return new LocalRequestHandler<I, O>(executor,
clientListener.handleClientOpen(context), context, requestClass, replyClass,
serviceClassLoader);
}
Class<I> getRequestClass() {
@@ -57,4 +59,8 @@
Class<O> getReplyClass() {
return replyClass;
}
+
+ ClassLoader getServiceClassLoader() {
+ return serviceClassLoader;
+ }
}
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ServiceRegistrationInfo.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ServiceRegistrationInfo.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ServiceRegistrationInfo.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -35,11 +35,9 @@
private final OptionMap optionMap;
private final RequestHandlerFactory<?, ?> requestHandlerFactory;
private volatile Registration handle;
- private final int slot;
- ServiceRegistrationInfo(final String serviceType, final String groupName, final
String endpointName, final OptionMap optionMap, final RequestHandlerFactory<?, ?>
requestHandlerFactory, final int slot) {
+ ServiceRegistrationInfo(final String serviceType, final String groupName, final
String endpointName, final OptionMap optionMap, final RequestHandlerFactory<?, ?>
requestHandlerFactory) {
this.requestHandlerFactory = requestHandlerFactory;
- this.slot = slot;
this.serviceType = serviceType;
this.groupName = groupName;
this.endpointName = endpointName;
@@ -79,8 +77,4 @@
void setHandle(final Registration handle) {
this.handle = handle;
}
-
- public int getSlot() {
- return slot;
- }
}
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ServiceRegistrationListener.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ServiceRegistrationListener.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ServiceRegistrationListener.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -51,7 +51,6 @@
private ClassLoader serviceClassLoader;
private Class<?> requestClass;
private Class<?> replyClass;
- private int slot;
private Registration registrationHandle;
private OptionMap optionMap;
@@ -152,24 +151,6 @@
}
/**
- * Get the slot of the service.
- *
- * @return the slot
- */
- public int getSlot() {
- return slot;
- }
-
- /**
- * Set the slot of the service.
- *
- * @param slot the slot
- */
- public void setSlot(final int slot) {
- this.slot = slot;
- }
-
- /**
* Get the option map.
*
* @return the option map
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundClient.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundClient.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundClient.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,43 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import org.jboss.remoting3.spi.RequestHandler;
+
+final class InboundClient {
+ private final RequestHandler handler;
+ private RemoteConnectionHandler remoteConnectionHandler;
+
+ InboundClient(final RemoteConnectionHandler remoteConnectionHandler, final
RequestHandler handler) {
+ this.remoteConnectionHandler = remoteConnectionHandler;
+ this.handler = handler;
+ }
+
+ RequestHandler getHandler() {
+ return handler;
+ }
+
+ RemoteConnectionHandler getRemoteConnectionHandler() {
+ return remoteConnectionHandler;
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundReplyExceptionTask.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundReplyExceptionTask.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundReplyExceptionTask.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,78 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.io.IOException;
+import org.jboss.marshalling.NioByteInput;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.remoting3.RemoteReplyException;
+import org.jboss.remoting3.RemoteRequestException;
+import org.jboss.remoting3.spi.ReplyHandler;
+import org.jboss.remoting3.spi.SpiUtils;
+
+final class InboundReplyExceptionTask implements Runnable {
+
+ private final OutboundRequest outboundRequest;
+ private RemoteConnectionHandler remoteConnectionHandler;
+
+ InboundReplyExceptionTask(final RemoteConnectionHandler remoteConnectionHandler,
final OutboundRequest outboundRequest) {
+ this.remoteConnectionHandler = remoteConnectionHandler;
+ this.outboundRequest = outboundRequest;
+ }
+
+ public void run() {
+ final ReplyHandler replyHandler;
+ final OutboundRequest outboundRequest = this.outboundRequest;
+ final NioByteInput oldByteInput;
+ synchronized (outboundRequest) {
+ replyHandler = outboundRequest.getInboundReplyHandler();
+ oldByteInput = outboundRequest.getByteInput();
+ }
+ try {
+ final Object exception;
+ try {
+ final RemoteConnectionHandler connectionHandler =
remoteConnectionHandler;
+ final Unmarshaller unmarshaller =
connectionHandler.getMarshallerFactory().createUnmarshaller(connectionHandler.getMarshallingConfiguration());
+ unmarshaller.start(outboundRequest.getByteInput());
+ exception = unmarshaller.readObject();
+ } catch (IOException e) {
+ SpiUtils.safeHandleException(replyHandler, e);
+ return;
+ } catch (Exception e) {
+ SpiUtils.safeHandleException(replyHandler, new
RemoteRequestException(e));
+ return;
+ }
+ RemoteReplyException rre;
+ try {
+ rre = new RemoteReplyException((Throwable) exception);
+ } catch (ClassCastException e) {
+ rre = new RemoteReplyException("Failed to unmarshall remote
exception reply");
+ }
+ SpiUtils.safeHandleException(replyHandler, rre);
+ } finally {
+ if (oldByteInput != null) {
+ oldByteInput.pushEof();
+ }
+ }
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundReplyTask.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundReplyTask.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundReplyTask.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.io.IOException;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.remoting3.RemoteReplyException;
+import org.jboss.remoting3.RemoteRequestException;
+import org.jboss.remoting3.spi.ReplyHandler;
+import org.jboss.remoting3.spi.SpiUtils;
+
+final class InboundReplyTask implements Runnable {
+
+ private final OutboundRequest outboundRequest;
+ private RemoteConnectionHandler remoteConnectionHandler;
+
+ InboundReplyTask(final RemoteConnectionHandler remoteConnectionHandler, final
OutboundRequest outboundRequest) {
+ this.remoteConnectionHandler = remoteConnectionHandler;
+ this.outboundRequest = outboundRequest;
+ }
+
+ public void run() {
+ final ReplyHandler replyHandler;
+ final OutboundRequest outboundRequest = this.outboundRequest;
+ synchronized (outboundRequest) {
+ replyHandler = outboundRequest.getInboundReplyHandler();
+ }
+ final Object reply;
+ try {
+ final RemoteConnectionHandler connectionHandler = remoteConnectionHandler;
+ final Unmarshaller unmarshaller =
connectionHandler.getMarshallerFactory().createUnmarshaller(connectionHandler.getMarshallingConfiguration());
+ unmarshaller.start(outboundRequest.getByteInput());
+ reply = unmarshaller.readObject();
+ } catch (IOException e) {
+ SpiUtils.safeHandleException(replyHandler, e);
+ return;
+ } catch (Exception e) {
+ SpiUtils.safeHandleException(replyHandler, new RemoteRequestException(e));
+ return;
+ }
+ try {
+ replyHandler.handleReply(reply);
+ } catch (IOException e) {
+ SpiUtils.safeHandleException(replyHandler, new
RemoteReplyException("Remote reply failed", e));
+ }
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundRequest.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundRequest.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundRequest.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,85 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.util.concurrent.Semaphore;
+import org.jboss.marshalling.NioByteInput;
+import org.jboss.xnio.Cancellable;
+
+final class InboundRequest {
+
+ private final Semaphore flowSemaphore = new Semaphore(5);
+
+ private Cancellable cancellable;
+ private OutboundReplyHandler replyHandler;
+ private NioByteInput byteInput;
+ private final RemoteConnectionHandler remoteConnectionHandler;
+ private State state = State.RECEIVING;
+
+ InboundRequest(final RemoteConnectionHandler remoteConnectionHandler) {
+ this.remoteConnectionHandler = remoteConnectionHandler;
+ }
+
+ void ack() {
+ flowSemaphore.release();
+ }
+
+ NioByteInput getByteInput() {
+ return byteInput;
+ }
+
+ OutboundReplyHandler getReplyHandler() {
+ return replyHandler;
+ }
+
+ void acquire() throws InterruptedException {
+ flowSemaphore.acquire();
+ }
+
+ void setByteInput(final NioByteInput byteInput) {
+ this.byteInput = byteInput;
+ }
+
+ void setReplyHandler(final OutboundReplyHandler replyHandler) {
+ this.replyHandler = replyHandler;
+ }
+
+ void setCancellable(final Cancellable cancellable) {
+ this.cancellable = cancellable;
+ }
+
+ public Cancellable getCancellable() {
+ return cancellable;
+ }
+
+ enum State {
+ RECEIVING,
+ RUNNING,
+ SENDING,
+ SENDING_EXCEPTION,
+ }
+
+ RemoteConnectionHandler getRemoteConnectionHandler() {
+ return remoteConnectionHandler;
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundRequestTask.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundRequestTask.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundRequestTask.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,74 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.io.IOException;
+import org.jboss.marshalling.NioByteInput;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.marshalling.util.IntKeyMap;
+import org.jboss.remoting3.RemoteRequestException;
+import org.jboss.remoting3.spi.SpiUtils;
+
+final class InboundRequestTask implements Runnable {
+
+ private final InboundRequest inboundRequest;
+ private final int rid;
+ private final int cid;
+ private RemoteConnectionHandler remoteConnectionHandler;
+
+ InboundRequestTask(final RemoteConnectionHandler remoteConnectionHandler, final
InboundRequest inboundRequest, final int rid, final int cid) {
+ this.remoteConnectionHandler = remoteConnectionHandler;
+ this.inboundRequest = inboundRequest;
+ this.rid = rid;
+ this.cid = cid;
+ }
+
+ public void run() {
+ final OutboundReplyHandler replyHandler;
+ final InboundRequest inboundRequest = this.inboundRequest;
+ synchronized (inboundRequest) {
+ inboundRequest.setByteInput(new NioByteInput(new
RequestInputHandler(inboundRequest, rid)));
+ inboundRequest.setReplyHandler(replyHandler = new
OutboundReplyHandler(inboundRequest, rid));
+ }
+ final Object request;
+ try {
+ final Unmarshaller unmarshaller =
remoteConnectionHandler.getMarshallerFactory().createUnmarshaller(remoteConnectionHandler.getMarshallingConfiguration());
+ unmarshaller.start(inboundRequest.getByteInput());
+ request = unmarshaller.readObject();
+ } catch (IOException e) {
+ SpiUtils.safeHandleException(replyHandler, e);
+ return;
+ } catch (Exception e) {
+ SpiUtils.safeHandleException(replyHandler, new RemoteRequestException(e));
+ return;
+ }
+ final InboundClient inboundClient;
+ final IntKeyMap<InboundClient> inboundClients =
remoteConnectionHandler.getInboundClients();
+ synchronized (inboundClients) {
+ inboundClient = inboundClients.get(cid);
+ }
+ synchronized (inboundRequest) {
+
inboundRequest.setCancellable(inboundClient.getHandler().receiveRequest(request,
replyHandler));
+ }
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundClient.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundClient.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundClient.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,97 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import org.jboss.remoting3.spi.RequestHandler;
+import org.jboss.xnio.Cancellable;
+import org.jboss.xnio.Result;
+
+final class OutboundClient implements Cancellable {
+
+ private State state = State.REPLY_WAIT;
+
+ private Result<RequestHandler> result;
+
+ private final int id;
+ private final String serviceType;
+ private final String groupName;
+ private RemoteConnectionHandler remoteConnectionHandler;
+
+ OutboundClient(final RemoteConnectionHandler remoteConnectionHandler, final int id,
final Result<RequestHandler> result, final String serviceType, final String
groupName) {
+ this.remoteConnectionHandler = remoteConnectionHandler;
+ this.id = id;
+ this.result = result;
+ this.serviceType = serviceType;
+ this.groupName = groupName;
+ }
+
+ State getState() {
+ synchronized (this) {
+ return state;
+ }
+ }
+
+ void setState(final State state) {
+ synchronized (this) {
+ this.state = state;
+ }
+ }
+
+ public Cancellable cancel() {
+ synchronized (this) {
+ if (state != State.REPLY_WAIT) {
+ return this;
+ }
+ state = State.CLOSED;
+ }
+ result.setCancelled();
+ return this;
+ }
+
+ Result<RequestHandler> getResult() {
+ return result;
+ }
+
+ String getServiceType() {
+ return serviceType;
+ }
+
+ String getGroupName() {
+ return groupName;
+ }
+
+ RemoteConnectionHandler getRemoteConnectionHandler() {
+ return remoteConnectionHandler;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ enum State {
+ REPLY_WAIT,
+ ESTABLISHED,
+ CLOSE_WAIT,
+ CLOSED,
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundReplyHandler.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundReplyHandler.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundReplyHandler.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,90 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.NioByteOutput;
+import org.jboss.remoting3.spi.ReplyHandler;
+import org.jboss.xnio.Pool;
+
+final class OutboundReplyHandler implements ReplyHandler {
+
+ private final int rid;
+ private final AtomicBoolean done = new AtomicBoolean();
+ private InboundRequest inboundRequest;
+
+ public OutboundReplyHandler(final InboundRequest inboundRequest, final int rid) {
+ this.inboundRequest = inboundRequest;
+ this.rid = rid;
+ }
+
+ public void handleReply(final Object reply) throws IOException {
+ if (! done.getAndSet(true)) {
+ final RemoteConnectionHandler connectionHandler =
inboundRequest.getRemoteConnectionHandler();
+ final Marshaller marshaller =
connectionHandler.getMarshallerFactory().createMarshaller(connectionHandler.getMarshallingConfiguration());
+ marshaller.start(new NioByteOutput(new ReplyBufferWriter(inboundRequest, rid,
false)));
+ marshaller.writeObject(reply);
+ marshaller.finish();
+ }
+ }
+
+ public void handleException(final IOException exception) throws IOException {
+ if (! done.getAndSet(true)) {
+ final RemoteConnectionHandler connectionHandler =
inboundRequest.getRemoteConnectionHandler();
+ boolean ok = false;
+ try {
+ final Marshaller marshaller =
connectionHandler.getMarshallerFactory().createMarshaller(connectionHandler.getMarshallingConfiguration());
+ marshaller.start(new NioByteOutput(new ReplyBufferWriter(inboundRequest,
rid, true)));
+ marshaller.writeObject(exception);
+ marshaller.finish();
+ ok = true;
+ } finally {
+ if (! ok) {
+ // attempt to send an exception abort
+ final Pool<ByteBuffer> bufferPool =
connectionHandler.getBufferPool();
+ final ByteBuffer buffer = bufferPool.allocate();
+ try {
+ buffer.putInt(RemoteConnectionHandler.LENGTH_PLACEHOLDER);
+ buffer.put(RemoteProtocol.REPLY_EXCEPTION_ABORT);
+ buffer.putInt(rid);
+ buffer.flip();
+ connectionHandler.sendBlocking(buffer);
+ } finally {
+ bufferPool.free(buffer);
+ }
+ }
+ }
+ }
+ }
+
+ public void handleCancellation() throws IOException {
+ setDone();
+ }
+
+ public void setDone() {
+ done.set(true);
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundRequest.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundRequest.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundRequest.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,111 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.util.concurrent.Semaphore;
+import org.jboss.marshalling.NioByteInput;
+import org.jboss.remoting3.spi.ReplyHandler;
+import org.jboss.xnio.Cancellable;
+
+final class OutboundRequest implements Cancellable {
+ private final int cid;
+ private final ReplyHandler inboundReplyHandler;
+ private final Semaphore flowSemaphore = new Semaphore(5); // todo receive window
size
+
+ private State state = State.SENDING;
+ private NioByteInput byteInput;
+ private RemoteConnectionHandler remoteConnectionHandler;
+
+ OutboundRequest(final RemoteConnectionHandler remoteConnectionHandler, final
ReplyHandler inboundReplyHandler, final int cid) {
+ this.remoteConnectionHandler = remoteConnectionHandler;
+ this.inboundReplyHandler = inboundReplyHandler;
+ this.cid = cid;
+ }
+
+ State getState() {
+ synchronized (this) {
+ return state;
+ }
+ }
+
+ void setState(final State state) {
+ synchronized (this) {
+ this.state = state;
+ }
+ }
+
+ public Cancellable cancel() {
+ synchronized (this) {
+ switch (state) {
+ case SENDING:
+ // todo: send cancel request, kill in-progress stream
+ break;
+ case REPLY_WAIT:
+ // todo: send cancel request
+ break;
+ case CANCEL_WAIT:
+ case RECEIVING:
+ case CLOSED:
+ // do nothing
+ break;
+ }
+ }
+ return this;
+ }
+
+ void setByteInput(final NioByteInput byteInput) {
+ this.byteInput = byteInput;
+ }
+
+ NioByteInput getByteInput() {
+ return byteInput;
+ }
+
+ void ack() {
+ flowSemaphore.release();
+ }
+
+ RemoteConnectionHandler getRemoteConnectionHandler() {
+ return remoteConnectionHandler;
+ }
+
+ public int getClientId() {
+ return cid;
+ }
+
+ public void acquire() throws InterruptedException {
+ flowSemaphore.acquire();
+ }
+
+ enum State {
+ SENDING,
+ REPLY_WAIT,
+ CANCEL_WAIT,
+ RECEIVING,
+ CLOSED,
+ }
+
+ ReplyHandler getInboundReplyHandler() {
+ return inboundReplyHandler;
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundRequestHandler.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundRequestHandler.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/OutboundRequestHandler.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,112 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.NioByteOutput;
+import org.jboss.marshalling.util.IntKeyMap;
+import org.jboss.remoting3.CloseHandler;
+import org.jboss.remoting3.spi.AbstractHandleableCloseable;
+import org.jboss.remoting3.spi.ReplyHandler;
+import org.jboss.remoting3.spi.RequestHandler;
+import org.jboss.remoting3.spi.SpiUtils;
+import org.jboss.xnio.Cancellable;
+import org.jboss.xnio.Pool;
+
+final class OutboundRequestHandler extends
AbstractHandleableCloseable<RequestHandler> implements RequestHandler {
+
+ private final OutboundClient outboundClient;
+
+ OutboundRequestHandler(final OutboundClient outboundClient) {
+
super(outboundClient.getRemoteConnectionHandler().getConnectionContext().getConnectionProviderContext().getExecutor());
+ this.outboundClient = outboundClient;
+ }
+
+ public Cancellable receiveRequest(final Object request, final ReplyHandler
replyHandler) {
+ final RemoteConnectionHandler connectionHandler =
outboundClient.getRemoteConnectionHandler();
+ final OutboundRequest outboundRequest = new OutboundRequest(connectionHandler,
replyHandler, outboundClient.getId());
+ int rid;
+ final IntKeyMap<OutboundRequest> outboundRequests =
connectionHandler.getOutboundRequests();
+ final Random random = connectionHandler.getRandom();
+ synchronized (outboundRequests) {
+ while (outboundRequests.containsKey(rid = random.nextInt()));
+ outboundRequests.put(rid, outboundRequest);
+ }
+ final NioByteOutput byteOutput = new NioByteOutput(new
RequestBufferWriter(outboundRequest, rid));
+ try {
+ RemoteConnectionHandler.log.trace("Starting sending request %s",
request);
+ final Marshaller marshaller =
connectionHandler.getMarshallerFactory().createMarshaller(connectionHandler.getMarshallingConfiguration());
+ marshaller.start(byteOutput);
+ marshaller.writeObject(request);
+ marshaller.finish();
+ RemoteConnectionHandler.log.trace("Finished sending request %s",
request);
+ } catch (IOException e) {
+ RemoteConnectionHandler.log.trace(e, "Got exception while marshalling
request %s", request);
+ SpiUtils.safeHandleException(replyHandler, e);
+ synchronized (outboundRequests) {
+ outboundRequests.remove(rid);
+ }
+ synchronized (outboundRequest) {
+ outboundRequest.setState(OutboundRequest.State.CLOSED);
+ }
+ // send request abort msg
+ final ByteBuffer buf = connectionHandler.getBufferPool().allocate();
+ buf.putInt(RemoteConnectionHandler.LENGTH_PLACEHOLDER);
+ buf.put(RemoteProtocol.REQUEST_ABORT);
+ buf.putInt(rid);
+ buf.flip();
+ try {
+ connectionHandler.sendBlocking(buf);
+ } catch (IOException e1) {
+ // todo log it
+ }
+ }
+ return outboundRequest;
+ }
+
+ public void close() throws IOException {
+ synchronized (outboundClient) {
+ if (outboundClient.getState() == OutboundClient.State.CLOSED) return;
+ outboundClient.setState(OutboundClient.State.CLOSED);
+ }
+ final RemoteConnectionHandler connectionHandler =
outboundClient.getRemoteConnectionHandler();
+ final Pool<ByteBuffer> bufferPool = connectionHandler.getBufferPool();
+ final ByteBuffer buf = bufferPool.allocate();
+ try {
+ buf.putInt(RemoteConnectionHandler.LENGTH_PLACEHOLDER);
+ buf.put(RemoteProtocol.CLIENT_CLOSED);
+ buf.putInt(outboundClient.getId());
+ buf.flip();
+ connectionHandler.sendBlocking(buf);
+ } finally {
+ bufferPool.free(buf);
+ }
+ }
+
+ public Key addCloseHandler(final CloseHandler<? super RequestHandler>
closeHandler) {
+ return null;
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,238 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.MarshallingConfiguration;
+import org.jboss.marshalling.util.IntKeyMap;
+import org.jboss.remoting3.IndeterminateOutcomeException;
+import org.jboss.remoting3.ServiceOpenException;
+import org.jboss.remoting3.spi.ConnectionHandler;
+import org.jboss.remoting3.spi.ConnectionHandlerContext;
+import org.jboss.remoting3.spi.RequestHandler;
+import org.jboss.remoting3.spi.RequestHandlerConnector;
+import org.jboss.remoting3.spi.SpiUtils;
+import org.jboss.xnio.Buffers;
+import org.jboss.xnio.Cancellable;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.Pool;
+import org.jboss.xnio.Result;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.log.Logger;
+
+final class RemoteConnectionHandler implements ConnectionHandler {
+
+ static final Logger log = Logger.getLogger("org.jboss.remoting.remote");
+
+ static final int LENGTH_PLACEHOLDER = 0;
+
+ private final Pool<ByteBuffer> bufferPool =
Buffers.createHeapByteBufferAllocator(4096);
+ private final MarshallerFactory marshallerFactory;
+ private final MarshallingConfiguration marshallingConfiguration;
+
+ private final ConnectionHandlerContext connectionContext;
+ private final StreamChannel channel;
+ private final Random random = new Random();
+
+ private final IntKeyMap<OutboundClient> outboundClients = new
IntKeyMap<OutboundClient>();
+ private final IntKeyMap<InboundClient> inboundClients = new
IntKeyMap<InboundClient>();
+
+ private final IntKeyMap<OutboundRequest> outboundRequests = new
IntKeyMap<OutboundRequest>();
+ private final IntKeyMap<InboundRequest> inboundRequests = new
IntKeyMap<InboundRequest>();
+
+ private final AtomicBoolean closed = new AtomicBoolean();
+
+ public RemoteConnectionHandler(final ConnectionHandlerContext connectionContext,
final StreamChannel channel, final MarshallerFactory marshallerFactory, final
MarshallingConfiguration marshallingConfiguration) {
+ this.connectionContext = connectionContext;
+ this.channel = channel;
+ this.marshallerFactory = marshallerFactory;
+ this.marshallingConfiguration = marshallingConfiguration;
+ }
+
+ void sendBlocking(final ByteBuffer buffer) throws IOException {
+ try {
+ sendBlockingNoClose(buffer);
+ } catch (IOException e) {
+ IoUtils.safeClose(this);
+ throw e;
+ } catch (RuntimeException e) {
+ IoUtils.safeClose(this);
+ throw e;
+ } catch (Error e) {
+ IoUtils.safeClose(this);
+ throw e;
+ }
+ }
+
+ void sendBlockingNoClose(final ByteBuffer buffer) throws IOException {
+ buffer.putInt(0, buffer.remaining() - 4);
+ boolean intr = false;
+ try {
+ while (buffer.hasRemaining()) {
+ if (channel.write(buffer) == 0) {
+ try {
+ channel.awaitWritable();
+ } catch (InterruptedIOException e) {
+ intr = Thread.interrupted();
+ }
+ }
+ }
+ } finally {
+ if (intr) Thread.currentThread().interrupt();
+ }
+ }
+
+ void flushBlocking() throws IOException {
+ try {
+ while (! channel.flush()) {
+ channel.awaitWritable();
+ }
+ } catch (IOException e) {
+ IoUtils.safeClose(this);
+ throw e;
+ } catch (RuntimeException e) {
+ IoUtils.safeClose(this);
+ throw e;
+ } catch (Error e) {
+ IoUtils.safeClose(this);
+ throw e;
+ }
+ }
+
+ public Cancellable open(final String serviceType, final String groupName, final
Result<RequestHandler> result) {
+ final OutboundClient outboundClient;
+ int id;
+ synchronized (outboundClients) {
+ while (outboundClients.containsKey(id = random.nextInt()));
+ outboundClient = new OutboundClient(this, id, result, serviceType,
groupName);
+ outboundClients.put(id, outboundClient);
+ }
+ // compose & send message
+ final ByteBuffer buffer = bufferPool.allocate();
+ try {
+ buffer.putInt(LENGTH_PLACEHOLDER);
+ buffer.put(RemoteProtocol.SERVICE_REQUEST);
+ buffer.putInt(id);
+ Buffers.putModifiedUtf8(buffer, serviceType);
+ buffer.put((byte) 0);
+ Buffers.putModifiedUtf8(buffer, groupName);
+ buffer.put((byte) 0);
+ buffer.flip();
+ sendBlocking(buffer);
+ } catch (IOException e) {
+ result.setException(e);
+ } catch (Throwable e) {
+ result.setException(new ServiceOpenException("Failed to open
service", e));
+ } finally {
+ bufferPool.free(buffer);
+ }
+ return outboundClient;
+ }
+
+ public RequestHandlerConnector createConnector(final RequestHandler localHandler) {
+ return null;
+ }
+
+ public void close() throws IOException {
+ if (! closed.getAndSet(true)) {
+ try {
+ channel.close();
+ } finally {
+ // other actions here
+ for (IntKeyMap.Entry<OutboundClient> entry : outboundClients) {
+ final OutboundClient outboundClient = entry.getValue();
+ synchronized (outboundClient) {
+ // todo close the request handler
+ }
+ }
+ for (IntKeyMap.Entry<InboundClient> entry : inboundClients) {
+ final InboundClient inboundClient = entry.getValue();
+ synchronized (inboundClient) {
+ IoUtils.safeClose(inboundClient.getHandler());
+ }
+ }
+ for (IntKeyMap.Entry<OutboundRequest> entry : outboundRequests) {
+ final OutboundRequest outboundRequest = entry.getValue();
+ synchronized (outboundRequest) {
+
SpiUtils.safeHandleException(outboundRequest.getInboundReplyHandler(), new
IndeterminateOutcomeException("Connection closed"));
+ }
+ }
+ for (IntKeyMap.Entry<InboundRequest> entry : inboundRequests) {
+ final InboundRequest inboundRequest = entry.getValue();
+ synchronized (inboundRequest) {
+ inboundRequest.getCancellable().cancel();
+ }
+ }
+ }
+ }
+ }
+
+ Pool<ByteBuffer> getBufferPool() {
+ return bufferPool;
+ }
+
+ MarshallerFactory getMarshallerFactory() {
+ return marshallerFactory;
+ }
+
+ MarshallingConfiguration getMarshallingConfiguration() {
+ return marshallingConfiguration;
+ }
+
+ ConnectionHandlerContext getConnectionContext() {
+ return connectionContext;
+ }
+
+ StreamChannel getChannel() {
+ return channel;
+ }
+
+ Random getRandom() {
+ return random;
+ }
+
+ IntKeyMap<OutboundClient> getOutboundClients() {
+ return outboundClients;
+ }
+
+ IntKeyMap<InboundClient> getInboundClients() {
+ return inboundClients;
+ }
+
+ IntKeyMap<OutboundRequest> getOutboundRequests() {
+ return outboundRequests;
+ }
+
+ IntKeyMap<InboundRequest> getInboundRequests() {
+ return inboundRequests;
+ }
+
+ AtomicBoolean getClosed() {
+ return closed;
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionProvider.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionProvider.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionProvider.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,101 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+import org.jboss.remoting3.spi.ConnectionHandlerFactory;
+import org.jboss.remoting3.spi.ConnectionProvider;
+import org.jboss.remoting3.spi.ConnectionProviderContext;
+import org.jboss.remoting3.spi.NetworkServerProvider;
+import org.jboss.xnio.Cancellable;
+import org.jboss.xnio.ChannelListener;
+import org.jboss.xnio.Connector;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.OptionMap;
+import org.jboss.xnio.Result;
+import org.jboss.xnio.channels.ConnectedStreamChannel;
+
+import javax.security.auth.callback.CallbackHandler;
+
+/**
+ * The connection provider class for the "remote" protocol.
+ */
+final class RemoteConnectionProvider implements ConnectionProvider {
+
+ private final ConnectionProviderContext connectionProviderContext;
+ private final Connector<InetSocketAddress, ? extends
ConnectedStreamChannel<InetSocketAddress>> connector;
+
+ RemoteConnectionProvider(final ConnectionProviderContext connectionProviderContext,
final Connector<InetSocketAddress, ? extends
ConnectedStreamChannel<InetSocketAddress>> connector) {
+ this.connectionProviderContext = connectionProviderContext;
+ this.connector = connector;
+ }
+
+ public Cancellable connect(final URI uri, final OptionMap connectOptions, final
Result<ConnectionHandlerFactory> result, final CallbackHandler callbackHandler)
throws IllegalArgumentException {
+ // Get the destination info from the URI
+ final String host = uri.getHost();
+ if (host == null) {
+ throw new IllegalArgumentException("No host name specified");
+ }
+ final int port = uri.getPort();
+ if (port < 1) {
+ throw new IllegalArgumentException("Port number must be
specified");
+ }
+ // Open a client channel
+ final IoFuture<? extends ConnectedStreamChannel<InetSocketAddress>>
futureChannel;
+ try {
+ futureChannel = connector.connectTo(new
InetSocketAddress(InetAddress.getByName(host), port), new RemoteOpenListener(false,
connectOptions, connectionProviderContext, result), null);
+ } catch (UnknownHostException e) {
+ result.setException(e);
+ return IoUtils.nullCancellable();
+ }
+ return futureChannel;
+ }
+
+ public NetworkServerProvider getProviderInterface() {
+ return new ProviderInterface();
+ }
+
+ private class ProviderInterface implements NetworkServerProvider {
+ public ChannelListener<ConnectedStreamChannel<InetSocketAddress>>
getServerListener(final OptionMap optionMap) {
+ return new RemoteOpenListener(true, optionMap, connectionProviderContext, new
Result<ConnectionHandlerFactory>() {
+ public boolean setResult(final ConnectionHandlerFactory result) {
+ connectionProviderContext.accept(result);
+ return true;
+ }
+
+ public boolean setException(final IOException exception) {
+ return true;
+ }
+
+ public boolean setCancelled() {
+ return true;
+ }
+ });
+ }
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,308 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import org.jboss.marshalling.NioByteInput;
+import org.jboss.marshalling.util.IntKeyMap;
+import org.jboss.remoting3.ReplyException;
+import org.jboss.remoting3.ServiceNotFoundException;
+import org.jboss.remoting3.ServiceURI;
+import org.jboss.remoting3.spi.ReplyHandler;
+import org.jboss.remoting3.spi.RequestHandler;
+import org.jboss.remoting3.spi.SpiUtils;
+import org.jboss.xnio.Buffers;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.OptionMap;
+import org.jboss.xnio.Pool;
+
+final class RemoteMessageHandler implements org.jboss.xnio.channels.MessageHandler {
+
+ private RemoteConnectionHandler remoteConnectionHandler;
+
+ public RemoteMessageHandler(final RemoteConnectionHandler remoteConnectionHandler) {
+ this.remoteConnectionHandler = remoteConnectionHandler;
+ }
+
+ public void handleMessage(final ByteBuffer buffer) {
+ final byte cmd = buffer.get();
+ final RemoteConnectionHandler connectionHandler = remoteConnectionHandler;
+ switch (cmd) {
+ case RemoteProtocol.SERVICE_REQUEST: {
+ final int id = buffer.getInt();
+ final String serviceType = Buffers.getModifiedUtf8Z(buffer);
+ final String groupName = Buffers.getModifiedUtf8Z(buffer);
+ final RequestHandler handler;
+ handler =
connectionHandler.getConnectionContext().openService(serviceType, groupName,
OptionMap.EMPTY);
+ final Pool<ByteBuffer> bufferPool =
connectionHandler.getBufferPool();
+ final ByteBuffer outBuf = bufferPool.allocate();
+ try {
+ outBuf.putInt(RemoteConnectionHandler.LENGTH_PLACEHOLDER);
+ if (handler == null) {
+ // no matching service found
+ outBuf.put(RemoteProtocol.SERVICE_NOT_FOUND);
+ } else {
+ // service opened locally, now register the success
+ final InboundClient inboundClient = new
InboundClient(connectionHandler, handler);
+ final IntKeyMap<InboundClient> inboundClients =
connectionHandler.getInboundClients();
+ synchronized (inboundClients) {
+ inboundClients.put(id, inboundClient);
+ }
+ outBuf.put(RemoteProtocol.SERVICE_CLIENT_OPENED);
+ }
+ outBuf.putInt(id);
+ outBuf.flip();
+ try {
+ connectionHandler.sendBlocking(outBuf);
+ } catch (IOException e) {
+ // the channel has suddenly failed
+ RemoteConnectionHandler.log.trace("Send failed: %s",
e);
+ }
+ return;
+ } finally {
+ bufferPool.free(outBuf);
+ }
+ // not reached
+ }
+ case RemoteProtocol.SERVICE_NOT_FOUND: {
+ final int id = buffer.getInt();
+ final OutboundClient client;
+ final IntKeyMap<OutboundClient> outboundClients =
connectionHandler.getOutboundClients();
+ synchronized (outboundClients) {
+ client = outboundClients.remove(id);
+ }
+ if (client == null) {
+ RemoteConnectionHandler.log.trace("Received service-not-found
for unknown client %d", Integer.valueOf(id));
+ return;
+ }
+ synchronized (client) {
+ // todo assert client state == waiting
+ client.getResult().setException(new
ServiceNotFoundException(ServiceURI.create(client.getServiceType(), client.getGroupName(),
null)));
+ client.setState(OutboundClient.State.CLOSED);
+ }
+ return;
+ }
+ case RemoteProtocol.SERVICE_CLIENT_OPENED: {
+ final int id = buffer.getInt();
+ final OutboundClient client;
+ final IntKeyMap<OutboundClient> outboundClients =
connectionHandler.getOutboundClients();
+ synchronized (outboundClients) {
+ client = outboundClients.get(id);
+ }
+ if (client == null) {
+ RemoteConnectionHandler.log.trace("Received
service-client-opened for unknown client %d", Integer.valueOf(id));
+ return;
+ }
+ synchronized (client) {
+ // todo assert client state == waiting
+ client.setState(OutboundClient.State.ESTABLISHED);
+ client.getResult().setResult(new OutboundRequestHandler(client));
+ }
+ return;
+ }
+ case RemoteProtocol.CLIENT_CLOSED: {
+ final int id = buffer.getInt();
+
+ final InboundClient client;
+ final IntKeyMap<InboundClient> inboundClients =
connectionHandler.getInboundClients();
+ synchronized (inboundClients) {
+ client = inboundClients.remove(id);
+ }
+ if (client == null) {
+ RemoteConnectionHandler.log.trace("Received client-closed for
unknown client %d", Integer.valueOf(id));
+ return;
+ }
+ synchronized (client) {
+ IoUtils.safeClose(client.getHandler());
+ }
+ return;
+ }
+ case RemoteProtocol.REQUEST: {
+ final int rid = buffer.getInt();
+ final byte flags = buffer.get();
+ final InboundRequest inboundRequest;
+ final NioByteInput byteInput;
+ final IntKeyMap<InboundRequest> inboundRequests =
connectionHandler.getInboundRequests();
+ synchronized (inboundRequests) {
+ if ((flags & RemoteProtocol.MSG_FLAG_FIRST) != 0) {
+ final int cid = buffer.getInt();
+ inboundRequest = new InboundRequest(connectionHandler);
+ // todo - check for duplicate
+ inboundRequests.put(rid, inboundRequest);
+
connectionHandler.getConnectionContext().getConnectionProviderContext().getExecutor().execute(new
InboundRequestTask(connectionHandler, inboundRequest, rid, cid));
+ } else {
+ inboundRequest = inboundRequests.get(rid);
+ }
+ if (inboundRequest == null) {
+ RemoteConnectionHandler.log.trace("Received request for
unknown request ID %d", Integer.valueOf(rid));
+ }
+ }
+ synchronized (inboundRequest) {
+ byteInput = inboundRequest.getByteInput();
+ }
+ byteInput.push(buffer);
+ return;
+ }
+ case RemoteProtocol.REQUEST_ABORT: {
+ final int rid = buffer.getInt();
+ final InboundRequest inboundRequest;
+ final IntKeyMap<InboundRequest> inboundRequests =
connectionHandler.getInboundRequests();
+ synchronized (inboundRequests) {
+ inboundRequest = inboundRequests.remove(rid);
+ }
+ if (inboundRequest == null) {
+ RemoteConnectionHandler.log.trace("Received request-abort for
unknown request ID %d", Integer.valueOf(rid));
+ return;
+ }
+ synchronized (inboundRequest) {
+ // as long as the last message hasn't been received yet, this
will disrupt the request and prevent a reply
+ inboundRequest.getReplyHandler().setDone();
+ inboundRequest.getByteInput().pushException(new
InterruptedIOException("Request aborted"));
+ }
+ return;
+ }
+ case RemoteProtocol.REQUEST_ACK_CHUNK: {
+ final int rid = buffer.getInt();
+ final InboundRequest inboundRequest;
+ final IntKeyMap<InboundRequest> inboundRequests =
connectionHandler.getInboundRequests();
+ synchronized (inboundRequests) {
+ inboundRequest = inboundRequests.get(rid);
+ }
+ if (inboundRequest == null) {
+ RemoteConnectionHandler.log.trace("Received request-ack-chunk
for unknown request ID %d", Integer.valueOf(rid));
+ return;
+ }
+ synchronized (inboundRequest) {
+ inboundRequest.ack();
+ }
+ return;
+ }
+ case RemoteProtocol.REPLY: {
+ final int rid = buffer.getInt();
+ final byte flags = buffer.get();
+ final OutboundRequest outboundRequest;
+ final NioByteInput byteInput;
+ final IntKeyMap<OutboundRequest> outboundRequests =
connectionHandler.getOutboundRequests();
+ synchronized (outboundRequests) {
+ outboundRequest = outboundRequests.get(rid);
+ }
+ if (outboundRequest == null) {
+ RemoteConnectionHandler.log.trace("Received reply for unknown
request ID %d", Integer.valueOf(rid));
+ return;
+ }
+ synchronized (outboundRequest) {
+ if ((flags & RemoteProtocol.MSG_FLAG_FIRST) != 0) {
+ // todo - check for duplicate
+ outboundRequest.setByteInput(byteInput = new NioByteInput(new
ReplyInputHandler(outboundRequest, rid)));
+
connectionHandler.getConnectionContext().getConnectionProviderContext().getExecutor().execute(new
InboundReplyTask(connectionHandler, outboundRequest));
+ } else {
+ byteInput = outboundRequest.getByteInput();
+ }
+ }
+ byteInput.push(buffer);
+ return;
+ }
+ case RemoteProtocol.REPLY_ACK_CHUNK: {
+ final int rid = buffer.getInt();
+ final OutboundRequest outboundRequest;
+ final IntKeyMap<OutboundRequest> outboundRequests =
connectionHandler.getOutboundRequests();
+ synchronized (outboundRequests) {
+ outboundRequest = outboundRequests.get(rid);
+ }
+ if (outboundRequest == null) {
+ RemoteConnectionHandler.log.trace("Received reply-ack-chunk for
unknown request ID %d", Integer.valueOf(rid));
+ return;
+ }
+ synchronized (outboundRequest) {
+ outboundRequest.ack();
+ }
+ return;
+ }
+ case RemoteProtocol.REPLY_EXCEPTION: {
+ final int rid = buffer.getInt();
+ final byte flags = buffer.get();
+ final OutboundRequest outboundRequest;
+ final NioByteInput byteInput;
+ final IntKeyMap<OutboundRequest> outboundRequests =
connectionHandler.getOutboundRequests();
+ synchronized (outboundRequests) {
+ outboundRequest = outboundRequests.get(rid);
+ }
+ if (outboundRequest == null) {
+ RemoteConnectionHandler.log.trace("Received reply-exception for
unknown request ID %d", Integer.valueOf(rid));
+ return;
+ }
+ synchronized (outboundRequest) {
+ if ((flags & RemoteProtocol.MSG_FLAG_FIRST) != 0) {
+ // todo - check for duplicate
+ outboundRequest.setByteInput(byteInput = new NioByteInput(new
ReplyInputHandler(outboundRequest, rid)));
+
connectionHandler.getConnectionContext().getConnectionProviderContext().getExecutor().execute(new
InboundReplyExceptionTask(connectionHandler, outboundRequest));
+ } else {
+ byteInput = outboundRequest.getByteInput();
+ }
+ }
+ byteInput.push(buffer);
+ return;
+ }
+ case RemoteProtocol.REPLY_EXCEPTION_ABORT: {
+ final int rid = buffer.getInt();
+ final OutboundRequest outboundRequest;
+ final IntKeyMap<OutboundRequest> outboundRequests =
connectionHandler.getOutboundRequests();
+ synchronized (outboundRequests) {
+ outboundRequest = outboundRequests.get(rid);
+ }
+ if (outboundRequest == null) {
+ RemoteConnectionHandler.log.trace("Received
reply-exception-abort for unknown request ID %d", Integer.valueOf(rid));
+ return;
+ }
+ final NioByteInput byteInput;
+ final ReplyHandler replyHandler;
+ synchronized (outboundRequest) {
+ byteInput = outboundRequest.getByteInput();
+ replyHandler = outboundRequest.getInboundReplyHandler();
+ }
+ final ReplyException re = new ReplyException("Reply exception was
aborted");
+ if (byteInput != null) {
+ byteInput.pushException(re);
+ }
+ if (replyHandler != null) {
+ SpiUtils.safeHandleException(replyHandler, re);
+ }
+ return;
+ }
+ default: {
+ RemoteConnectionHandler.log.error("Received invalid packet type on
%s, closing", connectionHandler.getChannel());
+ IoUtils.safeClose(connectionHandler);
+ }
+ }
+ }
+
+ public void handleEof() {
+ IoUtils.safeClose(remoteConnectionHandler);
+ }
+
+ public void handleException(final IOException e) {
+ IoUtils.safeClose(remoteConnectionHandler);
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteOpenListener.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteOpenListener.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteOpenListener.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,73 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.Marshalling;
+import org.jboss.marshalling.MarshallingConfiguration;
+import org.jboss.remoting3.spi.ConnectionHandler;
+import org.jboss.remoting3.spi.ConnectionHandlerContext;
+import org.jboss.remoting3.spi.ConnectionHandlerFactory;
+import org.jboss.remoting3.spi.ConnectionProviderContext;
+import org.jboss.xnio.ChannelListener;
+import org.jboss.xnio.OptionMap;
+import org.jboss.xnio.Options;
+import org.jboss.xnio.Result;
+import org.jboss.xnio.channels.Channels;
+import org.jboss.xnio.channels.ConnectedStreamChannel;
+
+final class RemoteOpenListener implements
ChannelListener<ConnectedStreamChannel<InetSocketAddress>> {
+
+ private final boolean server;
+ private final OptionMap optionMap;
+ private final ConnectionProviderContext connectionProviderContext;
+ private final Result<ConnectionHandlerFactory> factoryResult;
+
+ public RemoteOpenListener(final boolean server, final OptionMap optionMap, final
ConnectionProviderContext connectionProviderContext, final
Result<ConnectionHandlerFactory> factoryResult) {
+ this.server = server;
+ this.optionMap = optionMap;
+ this.connectionProviderContext = connectionProviderContext;
+ this.factoryResult = factoryResult;
+ }
+
+ public void handleEvent(final ConnectedStreamChannel<InetSocketAddress>
channel) {
+ try {
+ channel.setOption(Options.TCP_NODELAY, Boolean.TRUE);
+ } catch (IOException e) {
+ // ignore
+ }
+ // TODO: For now, just build a pre-set-up connection without a negotiation phase
+ factoryResult.setResult(new ConnectionHandlerFactory() {
+ public ConnectionHandler createInstance(final ConnectionHandlerContext
connectionContext) {
+ final MarshallerFactory marshallerFactory =
Marshalling.getMarshallerFactory("river");
+ final MarshallingConfiguration marshallingConfiguration = new
MarshallingConfiguration();
+ final RemoteConnectionHandler connectionHandler = new
RemoteConnectionHandler(connectionContext, channel, marshallerFactory,
marshallingConfiguration);
+ Channels.createMessageReader(channel, optionMap).set(new
RemoteMessageHandler(connectionHandler));
+ channel.resumeReads();
+ return connectionHandler;
+ }
+ });
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,65 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.net.InetSocketAddress;
+import org.jboss.remoting3.spi.ConnectionProvider;
+import org.jboss.remoting3.spi.ConnectionProviderContext;
+import org.jboss.xnio.Connector;
+import org.jboss.xnio.channels.ConnectedStreamChannel;
+
+/**
+ * The "remote" protocol. Use this class to create an instance of the
connection provider for the "remote" protocol.
+ */
+public final class RemoteProtocol {
+
+ static final int MSG_FLAG_FIRST = 1;
+ static final int MSG_FLAG_LAST = 2;
+
+ static final byte GREETING = 0;
+ static final byte SERVICE_REQUEST = 1;
+ static final byte SERVICE_NOT_FOUND = 2;
+ static final byte SERVICE_CLIENT_OPENED = 3;
+ static final byte CLIENT_CLOSED = 4;
+ static final byte REQUEST = 5;
+ static final byte REQUEST_ABORT = 6;
+ static final byte REQUEST_ACK_CHUNK = 7;
+ static final byte REPLY = 8;
+ static final byte REPLY_EXCEPTION = 9;
+ static final byte REPLY_ACK_CHUNK = 10;
+ static final byte REPLY_EXCEPTION_ABORT = 11;
+
+ /**
+ * Create an instance of the connection provider for the "remote"
protocol.
+ *
+ * @param connectionProviderContext the connection provider context
+ * @param connector the connector to use for outbound connections
+ * @return the connection provider for the "remote" protocol
+ */
+ public static ConnectionProvider getRemoteConnectionProvider(final
ConnectionProviderContext connectionProviderContext, final Connector<InetSocketAddress,
? extends ConnectedStreamChannel<InetSocketAddress>> connector) {
+ return new RemoteConnectionProvider(connectionProviderContext, connector);
+ }
+
+ private RemoteProtocol() {
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocolDescriptor.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocolDescriptor.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocolDescriptor.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,74 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Properties;
+import org.jboss.remoting3.spi.ConnectionProvider;
+import org.jboss.remoting3.spi.ConnectionProviderContext;
+import org.jboss.remoting3.spi.ConnectionProviderFactory;
+import org.jboss.remoting3.spi.RemotingServiceDescriptor;
+import org.jboss.xnio.Connector;
+import org.jboss.xnio.OptionMap;
+import org.jboss.xnio.Xnio;
+import org.jboss.xnio.channels.TcpChannel;
+
+/**
+ * The protocol descriptor for the "remote" connection protocol. This class is
used to auto-detect the "remote" protocol
+ * in standalone environments.
+ */
+public final class RemoteProtocolDescriptor implements
RemotingServiceDescriptor<ConnectionProviderFactory> {
+
+ public Class<ConnectionProviderFactory> getType() {
+ return ConnectionProviderFactory.class;
+ }
+
+ public String getName() {
+ return "remote";
+ }
+
+ public ConnectionProviderFactory getService(final Properties properties) throws
IOException {
+ final String providerName =
properties.getProperty("remote.xnio.provider", "default");
+ final boolean useSsl =
Boolean.parseBoolean(properties.getProperty("remote.ssl.enable",
"true"));
+ final Xnio xnio = Xnio.getInstance(providerName);
+ final OptionMap connectorOptions = OptionMap.builder().parseAll(properties,
"remote.connector.option").getMap();
+ final Connector<InetSocketAddress, ? extends TcpChannel> connector;
+ if (useSsl) {
+ try {
+ connector = xnio.createSslTcpConnector(null, connectorOptions);
+ } catch (Exception e) {
+ final IOException ioe = new IOException("Failed to create
connector");
+ ioe.initCause(e);
+ throw ioe;
+ }
+ } else {
+ connector = xnio.createTcpConnector(connectorOptions);
+ }
+ return new ConnectionProviderFactory() {
+ public ConnectionProvider createInstance(final ConnectionProviderContext
context) {
+ return RemoteProtocol.getRemoteConnectionProvider(context, connector);
+ }
+ };
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/ReplyBufferWriter.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/ReplyBufferWriter.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/ReplyBufferWriter.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,82 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.jboss.marshalling.NioByteOutput;
+import org.jboss.xnio.Pool;
+
+final class ReplyBufferWriter implements NioByteOutput.BufferWriter {
+
+ private final AtomicBoolean first = new AtomicBoolean(true);
+ private final int id;
+ private final boolean exception;
+ private final InboundRequest inboundRequest;
+
+ ReplyBufferWriter(final InboundRequest inboundRequest, final int id, final boolean
exception) {
+ this.inboundRequest = inboundRequest;
+ this.id = id;
+ this.exception = exception;
+ }
+
+ public ByteBuffer getBuffer() {
+ final RemoteConnectionHandler connectionHandler =
inboundRequest.getRemoteConnectionHandler();
+ final Pool<ByteBuffer> bufferPool = connectionHandler.getBufferPool();
+ final ByteBuffer buffer = bufferPool.allocate();
+ buffer.putInt(RemoteConnectionHandler.LENGTH_PLACEHOLDER);
+ buffer.put(exception ? RemoteProtocol.REPLY_EXCEPTION : RemoteProtocol.REPLY);
+ buffer.putInt(id);
+ final boolean isFirst = first.getAndSet(false);
+ if (isFirst) {
+ buffer.put((byte) RemoteProtocol.MSG_FLAG_FIRST);
+ } else {
+ buffer.put((byte)0);
+ }
+ return buffer;
+ }
+
+ public void accept(final ByteBuffer buffer, final boolean eof) throws IOException {
+ final RemoteConnectionHandler connectionHandler =
inboundRequest.getRemoteConnectionHandler();
+ try {
+ inboundRequest.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException();
+ }
+ try {
+ if (eof) {
+ buffer.put(7, (byte) (buffer.get(3) | RemoteProtocol.MSG_FLAG_LAST));
+ }
+ connectionHandler.sendBlocking(buffer);
+ } finally {
+ connectionHandler.getBufferPool().free(buffer);
+ }
+ }
+
+ public void flush() throws IOException {
+ inboundRequest.getRemoteConnectionHandler().flushBlocking();
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/ReplyInputHandler.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/ReplyInputHandler.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/ReplyInputHandler.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,55 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.jboss.marshalling.NioByteInput;
+
+final class ReplyInputHandler implements NioByteInput.InputHandler {
+ private final int rid;
+ private final OutboundRequest outboundRequest;
+
+ ReplyInputHandler(final OutboundRequest outboundRequest, final int rid) {
+ this.outboundRequest = outboundRequest;
+ this.rid = rid;
+ }
+
+ public void acknowledge() throws IOException {
+ final RemoteConnectionHandler connectionHandler =
outboundRequest.getRemoteConnectionHandler();
+ final ByteBuffer buffer = connectionHandler.getBufferPool().allocate();
+ try {
+ buffer.putInt(RemoteConnectionHandler.LENGTH_PLACEHOLDER);
+ buffer.put(RemoteProtocol.REQUEST_ACK_CHUNK);
+ buffer.putInt(rid);
+ buffer.flip();
+ connectionHandler.sendBlocking(buffer);
+ connectionHandler.flushBlocking();
+ } finally {
+ connectionHandler.getBufferPool().free(buffer);
+ }
+ }
+
+ public void close() throws IOException {
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RequestBufferWriter.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RequestBufferWriter.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RequestBufferWriter.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,84 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.jboss.marshalling.NioByteOutput;
+
+final class RequestBufferWriter implements NioByteOutput.BufferWriter {
+
+ private final AtomicBoolean first = new AtomicBoolean(true);
+ private final int rid;
+ private final OutboundRequest outboundRequest;
+
+ RequestBufferWriter(final OutboundRequest outboundRequest, final int rid) {
+ this.outboundRequest = outboundRequest;
+ this.rid = rid;
+ }
+
+ public ByteBuffer getBuffer() {
+ final ByteBuffer buffer =
outboundRequest.getRemoteConnectionHandler().getBufferPool().allocate();
+ buffer.putInt(RemoteConnectionHandler.LENGTH_PLACEHOLDER);
+ buffer.put(RemoteProtocol.REQUEST);
+ buffer.putInt(rid);
+ final boolean isFirst = first.getAndSet(false);
+ if (isFirst) {
+ buffer.put((byte) RemoteProtocol.MSG_FLAG_FIRST);
+ buffer.putInt(outboundRequest.getClientId());
+ } else {
+ buffer.put((byte)0);
+ }
+ RemoteConnectionHandler.log.trace("Allocated buffer %s for %s", buffer,
this);
+ return buffer;
+ }
+
+ public void accept(final ByteBuffer buffer, final boolean eof) throws IOException {
+ final OutboundRequest outboundRequest = this.outboundRequest;
+ try {
+ outboundRequest.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException();
+ }
+ final RemoteConnectionHandler remoteConnectionHandler =
outboundRequest.getRemoteConnectionHandler();
+ try {
+ if (eof) {
+ buffer.put(7, (byte) (buffer.get(3) | RemoteProtocol.MSG_FLAG_LAST));
+ synchronized (outboundRequest) {
+ outboundRequest.setState(OutboundRequest.State.REPLY_WAIT);
+ }
+ }
+ RemoteConnectionHandler.log.trace("Sending buffer %s for %s",
buffer, this);
+ remoteConnectionHandler.sendBlocking(buffer);
+ } finally {
+ remoteConnectionHandler.getBufferPool().free(buffer);
+ }
+ }
+
+ public void flush() throws IOException {
+ outboundRequest.getRemoteConnectionHandler().flushBlocking();
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RequestInputHandler.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RequestInputHandler.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RequestInputHandler.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.jboss.marshalling.NioByteInput;
+import org.jboss.xnio.Pool;
+
+final class RequestInputHandler implements NioByteInput.InputHandler {
+ private final int rid;
+ private final InboundRequest inboundRequest;
+
+ public RequestInputHandler(final InboundRequest inboundRequest, final int rid) {
+ this.inboundRequest = inboundRequest;
+ this.rid = rid;
+ }
+
+ public void acknowledge() throws IOException {
+ final RemoteConnectionHandler connectionHandler =
inboundRequest.getRemoteConnectionHandler();
+ final Pool<ByteBuffer> bufferPool = connectionHandler.getBufferPool();
+ final ByteBuffer buffer = bufferPool.allocate();
+ try {
+ buffer.putInt(RemoteConnectionHandler.LENGTH_PLACEHOLDER);
+ buffer.put(RemoteProtocol.REQUEST_ACK_CHUNK);
+ buffer.putInt(rid);
+ buffer.flip();
+ connectionHandler.sendBlocking(buffer);
+ connectionHandler.flushBlocking();
+ } finally {
+ bufferPool.free(buffer);
+ }
+ }
+
+ public void close() throws IOException {
+ // todo: stream was closed, no action needed
+ }
+}
Deleted:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/service/Services.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/service/Services.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/service/Services.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -1,31 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting3.service;
-
-public final class Services {
- public static final int LOCATE_SERVICE = 1;
- public static final int SERVICE_CLASS_LOOKUP = 2;
-
- private Services() {
- }
-}
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionHandler.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionHandler.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionHandler.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -34,11 +34,11 @@
/**
* Open a request handler.
*
- * @param slot the target slot
- * @param result the result for the connected request handler
- * @return a handle which may be used to cancel the pending operation
+ * @param serviceType the service type string
+ * @param groupName the group name string
+ * @param result the result for the connected request handler @return a handle which
may be used to cancel the pending operation
*/
- Cancellable open(int slot, Result<RequestHandler> result);
+ Cancellable open(String serviceType, String groupName, Result<RequestHandler>
result);
/**
* Create a connector which may be used to communicate with the given local
RequestHandler. The connector
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionHandlerContext.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionHandlerContext.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionHandlerContext.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -44,12 +44,13 @@
*
* @remoting.nonblocking
*
- * @param slotId the service slot ID
+ * @param serviceType the service type string
+ * @param groupName the group name, or {@code null} for any group name
* @param optionMap the options to pass to the service
* @return the new request handler
* @throws IOException if an error occurs
*/
- RequestHandler openService(int slotId, OptionMap optionMap) throws IOException;
+ RequestHandler openService(String serviceType, String groupName, OptionMap
optionMap);
/**
* Indicate that the remote side has terminated the connection, so the local side
should be closed as well.
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionProvider.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionProvider.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionProvider.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -35,7 +35,7 @@
*
* @remoting.implement
*/
-public interface ConnectionProvider<T> {
+public interface ConnectionProvider {
/**
* Open an outbound connection to the given URI. This method is expected to be
non-blocking, with the result
@@ -55,5 +55,5 @@
*
* @return the user data
*/
- T getProviderInterface();
+ Object getProviderInterface();
}
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionProviderFactory.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionProviderFactory.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionProviderFactory.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -27,7 +27,7 @@
* endpoint will call the {@code createInstance()} method with its provider context when
instances of this interface
* are registered on that endpoint.
*/
-public interface ConnectionProviderFactory<T> {
+public interface ConnectionProviderFactory {
/**
* Create a provider instance for an endpoint.
@@ -35,5 +35,5 @@
* @param context the provider context
* @return the provider
*/
- ConnectionProvider<T> createInstance(ConnectionProviderContext context);
+ ConnectionProvider createInstance(ConnectionProviderContext context);
}
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionProviderRegistration.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionProviderRegistration.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionProviderRegistration.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -26,15 +26,13 @@
/**
* A handle representing the registration of a connection provider.
- *
- * @param <T> the provider interface type
*/
-public interface ConnectionProviderRegistration<T> extends Registration {
+public interface ConnectionProviderRegistration extends Registration {
/**
* Get the created provider interface associated with this registration.
*
* @return the connection provider interface
*/
- T getProviderInterface();
+ Object getProviderInterface();
}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/NetworkServerProvider.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/NetworkServerProvider.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/NetworkServerProvider.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,42 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.spi;
+
+import java.net.InetSocketAddress;
+import org.jboss.xnio.ChannelListener;
+import org.jboss.xnio.OptionMap;
+import org.jboss.xnio.channels.ConnectedStreamChannel;
+
+/**
+ * A provider interface implemented by connection providers which can be connected to
across the network.
+ */
+public interface NetworkServerProvider {
+
+ /**
+ * Get the channel open listener for servers of this connection provider type.
+ *
+ * @param optionMap options which may be used to configure the returned server
+ * @return the channel listener
+ */
+ ChannelListener<ConnectedStreamChannel<InetSocketAddress>>
getServerListener(OptionMap optionMap);
+}
Added:
remoting3/trunk/jboss-remoting/src/main/resources/META-INF/services/org.jboss.remoting3.spi.RemotingServiceDescriptor
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/resources/META-INF/services/org.jboss.remoting3.spi.RemotingServiceDescriptor
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/resources/META-INF/services/org.jboss.remoting3.spi.RemotingServiceDescriptor 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,4 @@
+#
+# Remoting "remote" protocol descriptor
+#
+org.jboss.remoting3.remote.RemoteProtocolDescriptor
Deleted:
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/EndpointTestCase.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/EndpointTestCase.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/EndpointTestCase.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -1,207 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting3;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.io.IOException;
-import java.net.URI;
-import junit.framework.TestCase;
-import org.jboss.xnio.IoUtils;
-import org.jboss.xnio.OptionMap;
-import org.jboss.xnio.log.Logger;
-
-/**
- *
- */
-public final class EndpointTestCase extends TestCase {
-
- private static final Logger log = Logger.getLogger(EndpointTestCase.class);
-
- public void testCreate() throws Throwable {
- final ExecutorService executorService = Executors.newCachedThreadPool();
- final Endpoint endpoint = Remoting.createEndpoint("foo",
executorService, OptionMap.EMPTY);
- try {
- endpoint.close();
- executorService.shutdown();
- assertTrue(executorService.awaitTermination(1L, TimeUnit.SECONDS));
- } finally {
- executorService.shutdownNow();
- }
- }
-
- public void testLocalClientInvoke() throws Throwable {
- final Endpoint endpoint = Remoting.getConfiguredEndpoint();
- try {
- final Object requestObj = new Object();
- final Object replyObj = new Object();
- final Client<Object, Object> localClient =
Remoting.createLocalClient(endpoint, new RequestListener<Object, Object>() {
- public void handleRequest(final RequestContext<Object>
objectRequestContext, final Object request) throws RemoteExecutionException {
- try {
- objectRequestContext.sendReply(replyObj);
- } catch (IOException e) {
- throw new RemoteExecutionException(e);
- }
- }
-
- public void handleClose() {
- log.info("Listener closed");
- }
- }, Object.class, Object.class);
- try {
- assertEquals(replyObj, localClient.invoke(requestObj));
- } finally {
- IoUtils.safeClose(localClient);
- }
- } finally {
- IoUtils.safeClose(endpoint);
- }
- }
-
- public void testLocalClientSend() throws Throwable {
- final Endpoint endpoint = Remoting.getConfiguredEndpoint();
- try {
- final Object requestObj = new Object();
- final Object replyObj = new Object();
- final Client<Object, Object> localClient =
Remoting.createLocalClient(endpoint, new RequestListener<Object, Object>() {
- public void handleRequest(final RequestContext<Object>
objectRequestContext, final Object request) throws RemoteExecutionException {
- try {
- objectRequestContext.sendReply(replyObj);
- } catch (IOException e) {
- throw new RemoteExecutionException(e);
- }
- }
-
- public void handleClose() {
- log.info("Listener closed");
- }
- }, Object.class, Object.class);
- try {
- assertEquals(replyObj, localClient.send(requestObj).get());
- } finally {
- IoUtils.safeClose(localClient);
- }
- } finally {
- IoUtils.safeClose(endpoint);
- }
- }
-
- public void testLocalClientConnectInvoke() throws Throwable {
- final Endpoint endpoint = Remoting.getConfiguredEndpoint();
- try {
- final Object requestObj = new Object();
- final Object replyObj = new Object();
- final Registration registration =
endpoint.serviceBuilder().setGroupName("foo").setServiceType("test").setRequestType(Object.class).
- setReplyType(Object.class).setClientListener(new
ClientListener<Object, Object>() {
- public RequestListener<Object, Object> handleClientOpen(final
ClientContext clientContext) {
- return new RequestListener<Object, Object>() {
- public void handleRequest(final RequestContext<Object>
objectRequestContext, final Object request) throws RemoteExecutionException {
- try {
- objectRequestContext.sendReply(replyObj);
- } catch (IOException e) {
- throw new RemoteExecutionException(e);
- }
- }
-
- public void handleClose() {
- log.info("Listener closed");
- }
- };
- }
- }).register();
- try {
- final Connection connection = endpoint.connect(new
URI("local:///"), OptionMap.EMPTY).get();
- try {
- final Client<Object, Object> localClient =
connection.openClient("test", "*", Object.class, Object.class).get();
- try {
- assertEquals(replyObj, localClient.invoke(requestObj));
- } finally {
- IoUtils.safeClose(localClient);
- }
- } finally {
- IoUtils.safeClose(connection);
- }
- } finally {
- IoUtils.safeClose(registration);
- }
- } finally {
- IoUtils.safeClose(endpoint);
- }
- }
-
- public void testLocalClientConnectSend() throws Throwable {
- final Endpoint endpoint = Remoting.getConfiguredEndpoint();
- try {
- final Object requestObj = new Object();
- final Object replyObj = new Object();
- final Registration registration =
endpoint.serviceBuilder().setGroupName("foo").setServiceType("test").setRequestType(Object.class).
- setReplyType(Object.class).setClientListener(new
ClientListener<Object, Object>() {
- public RequestListener<Object, Object> handleClientOpen(final
ClientContext clientContext) {
- return new RequestListener<Object, Object>() {
- public void handleRequest(final RequestContext<Object>
objectRequestContext, final Object request) throws RemoteExecutionException {
- try {
- log.info("Got request %s, sending reply %s",
request, replyObj);
- objectRequestContext.sendReply(replyObj);
- } catch (IOException e) {
- log.error(e, "reply");
- throw new RemoteExecutionException(e);
- }
- }
-
- public void handleClose() {
- log.info("Listener closed");
- }
- };
- }
- }).register();
- try {
- final Connection connection = endpoint.connect(new
URI("local:///"), OptionMap.EMPTY).get();
- try {
- final Client<Object, Object> localClient =
connection.openClient("test", "*", Object.class, Object.class).get();
- try {
- assertEquals(replyObj, localClient.send(requestObj).get());
- } finally {
- IoUtils.safeClose(localClient);
- }
- } finally {
- IoUtils.safeClose(connection);
- }
- } finally {
- IoUtils.safeClose(registration);
- }
- } finally {
- IoUtils.safeClose(endpoint);
- }
- }
-
- public void testNotFoundService() throws Throwable {
- final Endpoint endpoint = Remoting.getConfiguredEndpoint();
- try {
- endpoint.connect(new URI("local:///"),
OptionMap.EMPTY).get().openClient("blah", "bzzt", Object.class,
Object.class).get();
- } catch (ServiceNotFoundException e) {
- return;
- }
- fail("Expected exception");
- }
-}
Added:
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/InvocationTestObject.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/InvocationTestObject.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/InvocationTestObject.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,29 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3;
+
+import java.io.Serializable;
+
+public final class InvocationTestObject implements Serializable {
+ private static final long serialVersionUID = 7228470862155215008L;
+}
Copied:
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/EndpointTestCase.java
(from rev 5755,
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/EndpointTestCase.java)
===================================================================
---
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/EndpointTestCase.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/EndpointTestCase.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,137 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.io.IOException;
+import junit.framework.TestCase;
+import org.jboss.remoting3.Client;
+import org.jboss.remoting3.Endpoint;
+import org.jboss.remoting3.RemoteExecutionException;
+import org.jboss.remoting3.Remoting;
+import org.jboss.remoting3.RequestContext;
+import org.jboss.remoting3.RequestListener;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.OptionMap;
+import org.jboss.xnio.log.Logger;
+
+/**
+ *
+ */
+public final class EndpointTestCase extends TestCase {
+
+ private static final Logger log = Logger.getLogger(EndpointTestCase.class);
+
+ private static void enter() {
+ log.info("Entering: %s", new
Throwable().getStackTrace()[1].getMethodName());
+ }
+
+ private static void exit() {
+ log.info("Exiting: %s", new
Throwable().getStackTrace()[1].getMethodName());
+ }
+
+ public void testCreate() throws Throwable {
+ enter();
+ try {
+ final ExecutorService executorService = Executors.newCachedThreadPool();
+ final Endpoint endpoint = Remoting.createEndpoint("foo",
executorService, OptionMap.EMPTY);
+ try {
+ endpoint.close();
+ executorService.shutdown();
+ assertTrue(executorService.awaitTermination(1L, TimeUnit.SECONDS));
+ } finally {
+ executorService.shutdownNow();
+ }
+ } finally {
+ exit();
+ }
+ }
+
+ public void testLocalClientInvoke() throws Throwable {
+ enter();
+ try {
+ final Endpoint endpoint = Remoting.getConfiguredEndpoint();
+ try {
+ final Object requestObj = new Object();
+ final Object replyObj = new Object();
+ final Client<Object, Object> localClient =
Remoting.createLocalClient(endpoint, new RequestListener<Object, Object>() {
+ public void handleRequest(final RequestContext<Object>
objectRequestContext, final Object request) throws RemoteExecutionException {
+ try {
+ objectRequestContext.sendReply(replyObj);
+ } catch (IOException e) {
+ throw new RemoteExecutionException(e);
+ }
+ }
+
+ public void handleClose() {
+ log.info("Listener closed");
+ }
+ }, Object.class, Object.class);
+ try {
+ assertEquals(replyObj, localClient.invoke(requestObj));
+ } finally {
+ IoUtils.safeClose(localClient);
+ }
+ } finally {
+ IoUtils.safeClose(endpoint);
+ }
+ } finally {
+ exit();
+ }
+ }
+
+ public void testLocalClientSend() throws Throwable {
+ enter();
+ try {
+ final Endpoint endpoint = Remoting.getConfiguredEndpoint();
+ try {
+ final Object requestObj = new Object();
+ final Object replyObj = new Object();
+ final Client<Object, Object> localClient =
Remoting.createLocalClient(endpoint, new RequestListener<Object, Object>() {
+ public void handleRequest(final RequestContext<Object>
objectRequestContext, final Object request) throws RemoteExecutionException {
+ try {
+ objectRequestContext.sendReply(replyObj);
+ } catch (IOException e) {
+ throw new RemoteExecutionException(e);
+ }
+ }
+
+ public void handleClose() {
+ log.info("Listener closed");
+ }
+ }, Object.class, Object.class);
+ try {
+ assertEquals(replyObj, localClient.send(requestObj).get());
+ } finally {
+ IoUtils.safeClose(localClient);
+ }
+ } finally {
+ IoUtils.safeClose(endpoint);
+ }
+ } finally {
+ exit();
+ }
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,238 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.test;
+
+import java.io.IOException;
+import junit.framework.TestCase;
+import org.jboss.remoting3.Client;
+import org.jboss.remoting3.ClientConnector;
+import org.jboss.remoting3.ClientContext;
+import org.jboss.remoting3.ClientListener;
+import org.jboss.remoting3.Connection;
+import org.jboss.remoting3.Endpoint;
+import org.jboss.remoting3.InvocationTestObject;
+import org.jboss.remoting3.Registration;
+import org.jboss.remoting3.RemoteExecutionException;
+import org.jboss.remoting3.Remoting;
+import org.jboss.remoting3.RequestContext;
+import org.jboss.remoting3.RequestListener;
+import org.jboss.remoting3.ServiceNotFoundException;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.Xnio;
+import org.jboss.xnio.log.Logger;
+
+public abstract class InvocationTestBase extends TestCase {
+ private static final Logger log = Logger.getLogger(InvocationTestBase.class);
+
+ protected Endpoint endpoint;
+
+ public void setUp() throws IOException {
+ enter();
+ try {
+ endpoint = Remoting.getConfiguredEndpoint();
+ } finally {
+ exit();
+ }
+ }
+
+ private static void enter() {
+ log.info("Entering: %s", new
Throwable().getStackTrace()[1].getMethodName());
+ }
+
+ private static void exit() {
+ log.info("Exiting: %s", new
Throwable().getStackTrace()[1].getMethodName());
+ }
+
+ protected abstract Connection getConnection() throws IOException;
+
+ public void testBasicInvoke() throws IOException {
+ enter();
+ try {
+ final InvocationTestObject requestObj = new InvocationTestObject();
+ final InvocationTestObject replyObj = new InvocationTestObject();
+ final Registration registration =
endpoint.serviceBuilder().setGroupName("foo").setServiceType("test1").setRequestType(InvocationTestObject.class).
+ setReplyType(InvocationTestObject.class).setClientListener(new
ClientListener<InvocationTestObject, InvocationTestObject>() {
+ public RequestListener<InvocationTestObject, InvocationTestObject>
handleClientOpen(final ClientContext clientContext) {
+ return new RequestListener<InvocationTestObject,
InvocationTestObject>() {
+ public void handleRequest(final
RequestContext<InvocationTestObject> objectRequestContext, final
InvocationTestObject request) throws RemoteExecutionException {
+ try {
+ objectRequestContext.sendReply(replyObj);
+ } catch (IOException e) {
+ throw new RemoteExecutionException(e);
+ }
+ }
+
+ public void handleClose() {
+ log.info("Listener closed");
+ }
+ };
+ }
+ }).register();
+ try {
+ final Connection connection = getConnection();
+ try {
+ final Client<InvocationTestObject, InvocationTestObject> client
= connection.openClient("test1", "*", InvocationTestObject.class,
InvocationTestObject.class).get();
+ try {
+ assertEquals(replyObj, client.invoke(requestObj));
+ } finally {
+ IoUtils.safeClose(client);
+ }
+ } finally {
+ IoUtils.safeClose(connection);
+ }
+ } finally {
+ IoUtils.safeClose(registration);
+ }
+ } finally {
+ exit();
+ }
+ }
+
+ public void testBasicSend() throws IOException {
+ enter();
+ try {
+ final InvocationTestObject requestObj = new InvocationTestObject();
+ final InvocationTestObject replyObj = new InvocationTestObject();
+ final Registration registration =
endpoint.serviceBuilder().setGroupName("foo").setServiceType("test2").setRequestType(InvocationTestObject.class).
+ setReplyType(InvocationTestObject.class).setClientListener(new
ClientListener<InvocationTestObject, InvocationTestObject>() {
+ public RequestListener<InvocationTestObject, InvocationTestObject>
handleClientOpen(final ClientContext clientContext) {
+ return new RequestListener<InvocationTestObject,
InvocationTestObject>() {
+ public void handleRequest(final
RequestContext<InvocationTestObject> objectRequestContext, final
InvocationTestObject request) throws RemoteExecutionException {
+ try {
+ log.info("Got request %s, sending reply %s",
request, replyObj);
+ objectRequestContext.sendReply(replyObj);
+ } catch (IOException e) {
+ log.error(e, "reply");
+ throw new RemoteExecutionException(e);
+ }
+ }
+
+ public void handleClose() {
+ log.info("Listener closed");
+ }
+ };
+ }
+ }).register();
+ try {
+ final Connection connection = getConnection();
+ try {
+ final Client<InvocationTestObject, InvocationTestObject> client
= connection.openClient("test2", "*", InvocationTestObject.class,
InvocationTestObject.class).get();
+ try {
+ assertEquals(replyObj, client.send(requestObj).get());
+ } finally {
+ IoUtils.safeClose(client);
+ }
+ } finally {
+ IoUtils.safeClose(connection);
+ }
+ } finally {
+ IoUtils.safeClose(registration);
+ }
+ } finally {
+ exit();
+ }
+ }
+
+ public void testBasicClientConnector() throws Throwable {
+ enter();
+ try {
+ final InvocationTestObject requestObj = new InvocationTestObject();
+ final InvocationTestObject replyObj = new InvocationTestObject();
+
+ final Registration registration =
endpoint.serviceBuilder().setGroupName("foo").setServiceType("test3").setRequestType(ClientConnector.class).
+ setReplyType(InvocationTestObject.class).setClientListener(new
ClientListener<ClientConnector, InvocationTestObject>() {
+ public RequestListener<ClientConnector, InvocationTestObject>
handleClientOpen(final ClientContext clientContext) {
+ return new RequestListener<ClientConnector,
InvocationTestObject>() {
+ public void handleRequest(final
RequestContext<InvocationTestObject> objectRequestContext, final ClientConnector
request) throws RemoteExecutionException {
+ try {
+ assertEquals(replyObj,
((ClientConnector<InvocationTestObject,
InvocationTestObject>)request).getFutureClient().get().invoke(requestObj));
+ objectRequestContext.sendReply(replyObj);
+ } catch (Throwable e) {
+ throw new RemoteExecutionException(e);
+ }
+ }
+
+ public void handleClose() {
+ log.info("Listener closed");
+ }
+ };
+ }
+ }).register();
+ try {
+ final Connection connection = getConnection();
+ try {
+ final Client<ClientConnector, InvocationTestObject> client =
connection.openClient("test3", "*", ClientConnector.class,
InvocationTestObject.class).get();
+ try {
+ client.invoke(connection.createClientConnector(new
RequestListener<InvocationTestObject, InvocationTestObject>() {
+ public void handleRequest(final
RequestContext<InvocationTestObject> requestContext, final InvocationTestObject
request) throws RemoteExecutionException {
+ try {
+ requestContext.sendReply(replyObj);
+ } catch (IOException e) {
+ throw new RemoteExecutionException(e);
+ }
+ }
+
+ public void handleClose() {
+ log.info("Inner listener closed");
+ }
+ }, InvocationTestObject.class, InvocationTestObject.class));
+ } finally {
+ IoUtils.safeClose(client);
+ }
+ } finally {
+ IoUtils.safeClose(connection);
+ }
+ } finally {
+ IoUtils.safeClose(registration);
+ }
+ } finally {
+ exit();
+ }
+ }
+
+ public void testNotFoundService() throws Throwable {
+ enter();
+ try {
+ final Connection connection = getConnection();
+ try {
+ connection.openClient("blah", "bzzt", Object.class,
Object.class).get();
+ } catch (ServiceNotFoundException e) {
+ return;
+ } finally {
+ IoUtils.safeClose(connection);
+ }
+ fail("Expected exception");
+ } finally {
+ exit();
+ }
+ }
+
+ public void tearDown() throws IOException {
+ enter();
+ try {
+ Xnio.getInstance().close();
+ } finally {
+ exit();
+ }
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/LocalTestCase.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/LocalTestCase.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/LocalTestCase.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,35 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.test;
+
+import java.io.IOException;
+import java.net.URI;
+import org.jboss.remoting3.Connection;
+import org.jboss.xnio.OptionMap;
+
+public final class LocalTestCase extends InvocationTestBase {
+
+ protected Connection getConnection() throws IOException {
+ return endpoint.connect(URI.create("local:///"),
OptionMap.EMPTY).get();
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/RemoteTestCase.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/RemoteTestCase.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/RemoteTestCase.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,65 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.test;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import org.jboss.remoting3.CloseHandler;
+import org.jboss.remoting3.Connection;
+import org.jboss.remoting3.spi.NetworkServerProvider;
+import org.jboss.xnio.AcceptingServer;
+import org.jboss.xnio.ChannelListener;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.OptionMap;
+import org.jboss.xnio.Xnio;
+import org.jboss.xnio.channels.BoundChannel;
+import org.jboss.xnio.channels.ConnectedStreamChannel;
+
+public final class RemoteTestCase extends InvocationTestBase {
+
+ protected Connection getConnection() throws IOException {
+ final NetworkServerProvider provider =
endpoint.getConnectionProviderInterface("remote", NetworkServerProvider.class);
+ final ChannelListener<ConnectedStreamChannel<InetSocketAddress>>
listener = provider.getServerListener(OptionMap.EMPTY);
+ final Xnio xnio = Xnio.getInstance();
+ try {
+// final AcceptingServer<InetSocketAddress, ?, ?> server =
xnio.createSslTcpServer(listener, OptionMap.EMPTY);
+ final AcceptingServer<InetSocketAddress, ?, ?> server =
xnio.createTcpServer(listener, OptionMap.EMPTY);
+ final IoFuture<? extends BoundChannel<InetSocketAddress>> future
= server.bind(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0));
+ final InetSocketAddress localAddress = future.get().getLocalAddress();
+ final Connection connection = endpoint.connect(new URI("remote",
null, localAddress.getAddress().getHostAddress(), localAddress.getPort(), null, null,
null), OptionMap.EMPTY).get();
+ connection.addCloseHandler(new CloseHandler<Connection>() {
+ public void handleClose(final Connection closed) {
+ IoUtils.safeClose(server);
+ }
+ });
+ return connection;
+ } catch (Exception e) {
+ final IOException ioe = new IOException();
+ ioe.initCause(e);
+ throw ioe;
+ }
+ }
+}
\ No newline at end of file
Added: remoting3/trunk/jboss-remoting/src/test/resources/remoting.properties
===================================================================
--- remoting3/trunk/jboss-remoting/src/test/resources/remoting.properties
(rev 0)
+++ remoting3/trunk/jboss-remoting/src/test/resources/remoting.properties 2010-02-26
23:42:04 UTC (rev 5758)
@@ -0,0 +1,23 @@
+#
+# JBoss, Home of Professional Open Source
+# Copyright 2010, JBoss Inc., and individual contributors as indicated
+# by the @authors tag. See the copyright.txt in the distribution for a
+# full listing of individual contributors.
+#
+# This is free software; you can redistribute it and/or modify it
+# under the terms of the GNU Lesser General Public License as
+# published by the Free Software Foundation; either version 2.1 of
+# the License, or (at your option) any later version.
+#
+# This software is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this software; if not, write to the Free
+# Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+# 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+#
+
+remote.ssl.enable=false
Modified:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketConnectionProvider.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketConnectionProvider.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketConnectionProvider.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -49,7 +49,7 @@
* Copyright Oct 14, 2009
* </p>
*/
-public class SocketConnectionProvider<T, I, O> extends
AbstractHandleableCloseable<SocketHandleableCloseable> implements
ConnectionProvider<T> {
+public class SocketConnectionProvider<I, O> extends
AbstractHandleableCloseable<SocketHandleableCloseable> implements ConnectionProvider
{
private Endpoint endpoint;
private String host;
private int port;
@@ -74,7 +74,7 @@
return null;
}
- public T getProviderInterface() {
+ public Object getProviderInterface() {
return null;
}
Modified:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketProtocol.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketProtocol.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketProtocol.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -57,9 +57,9 @@
* This endpoint can be a socket transport client.
*/
static public <T, I, O> void registerClientTransport(final Endpoint endpoint,
final Executor executor, final String host) {
- endpoint.addConnectionProvider("socket", new
ConnectionProviderFactory<T>() {
- public ConnectionProvider<T> createInstance(ConnectionProviderContext
context) {
- return new SocketConnectionProvider<T, I, O>(endpoint, executor,
host);
+ endpoint.addConnectionProvider("socket", new ConnectionProviderFactory()
{
+ public ConnectionProvider createInstance(ConnectionProviderContext context) {
+ return new SocketConnectionProvider<I, O>(endpoint, executor, host);
}});
}
@@ -69,9 +69,9 @@
* This endpoint can be both a client and server for the socket transport.
*/
static public <T, I, O> Cancellable registerServerTransport(Endpoint endpoint,
Executor executor, final String host, final int port) {
- final SocketConnectionProvider<T, I, O> connectionProvider = new
SocketConnectionProvider<T, I, O>(endpoint, executor, host);
- endpoint.addConnectionProvider("socket", new
ConnectionProviderFactory<T>() {
- public ConnectionProvider<T> createInstance(ConnectionProviderContext
context) {
+ final SocketConnectionProvider<I, O> connectionProvider = new
SocketConnectionProvider<I, O>(endpoint, executor, host);
+ endpoint.addConnectionProvider("socket", new ConnectionProviderFactory()
{
+ public ConnectionProvider createInstance(ConnectionProviderContext context) {
try {
connectionProvider.start(context, port);
return connectionProvider;
Modified:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/client/SocketClientConnectionHandler.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/client/SocketClientConnectionHandler.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/client/SocketClientConnectionHandler.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -76,7 +76,7 @@
}
}
- public Cancellable open(int slot, Result<RequestHandler> result) {
+ public Cancellable open(final String serviceType, final String groupName,
Result<RequestHandler> result) {
try
{
final Socket socket = new Socket(remoteHost, remotePort);
@@ -86,7 +86,8 @@
final Marshaller marshaller = factory.createMarshaller(configuration);
final Unmarshaller unmarshaller = factory.createUnmarshaller(configuration);
marshaller.start(Marshalling.createByteOutput(socket.getOutputStream()));
- marshaller.writeInt(slot);
+ marshaller.writeUTF(serviceType);
+ marshaller.writeUTF(groupName);
marshaller.flush();
unmarshaller.start(Marshalling.createByteInput(socket.getInputStream()));
result.setResult(new SocketClientRequestHandler(getExecutor(), marshaller,
unmarshaller));
Modified:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketServerConnectionHandler.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketServerConnectionHandler.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketServerConnectionHandler.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -110,7 +110,7 @@
}
@Override
- public Cancellable open(int slot, Result<RequestHandler> result) {
+ public Cancellable open(final String serviceType, final String groupName,
Result<RequestHandler> result) {
return null;
}
Modified:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketServerRequestHandler.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketServerRequestHandler.java 2010-02-24
00:45:08 UTC (rev 5757)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketServerRequestHandler.java 2010-02-26
23:42:04 UTC (rev 5758)
@@ -73,7 +73,8 @@
marshaller.flush();
unmarshaller = factory.createUnmarshaller(configuration);
unmarshaller.start(Marshalling.createByteInput(socket.getInputStream()));
- final int slot = unmarshaller.readInt();
+ final String serviceType = unmarshaller.readUTF();
+ final String groupName = unmarshaller.readUTF();
final RequestHandlerFuture requestHandlerFuture = new RequestHandlerFuture();
ConnectionHandlerContext.ServiceResult serviceResult = new
ConnectionHandlerContext.ServiceResult() {
@@ -84,15 +85,14 @@
requestHandlerFuture.setException(new ServiceOpenException("No such
service located"));
}
};
- try
- {
- requestHandlerFuture.setResult(connectionHandlerContext.openService(slot,
OptionMap.EMPTY));
- } catch (IOException e)
- {
- requestHandlerFuture.setException(e);
+ final RequestHandler requestHandler =
connectionHandlerContext.openService(serviceType, groupName, OptionMap.EMPTY);
+ if (requestHandler == null) {
+ requestHandlerFuture.setException(new
ServiceNotFoundException(ServiceURI.create(serviceType, groupName, null)));
+ } else {
+ requestHandlerFuture.setResult(requestHandler);
}
- requestHandler = requestHandlerFuture.get();
- if (requestHandler == null) {
+ this.requestHandler = requestHandlerFuture.get();
+ if (this.requestHandler == null) {
throw requestHandlerFuture.getException();
}
replyHandler = new SocketServerReplyHandler(marshaller);