Author: david.lloyd(a)jboss.com
Date: 2010-01-28 11:04:54 -0500 (Thu, 28 Jan 2010)
New Revision: 5678
Modified:
remoting3/trunk/jboss-remoting/pom.xml
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/Endpoint.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/Remoting.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/RemotingOptions.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionProviderContext.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/RemotingServiceDescriptor.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Streams.java
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/EndpointTestCase.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/simple/LocalBasicExample2Main.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/simple/LocalBasicExampleMain.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketUsageExamples.java
Log:
New connect methods, new streams method, options, changes to service location
Modified: remoting3/trunk/jboss-remoting/pom.xml
===================================================================
--- remoting3/trunk/jboss-remoting/pom.xml 2010-01-27 20:01:09 UTC (rev 5677)
+++ remoting3/trunk/jboss-remoting/pom.xml 2010-01-28 16:04:54 UTC (rev 5678)
@@ -37,7 +37,7 @@
<dependency>
<groupId>org.jboss.xnio</groupId>
<artifactId>xnio-api</artifactId>
- <version>2.0.0.CR4</version>
+ <version>2.1.0.CR2-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
@@ -79,7 +79,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
- <version>2.5</version>
+ <version>2.6.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
@@ -105,9 +105,9 @@
<doctitle><![CDATA[JBoss Remoting
]]>${version}</doctitle>
<header><![CDATA[JBoss Remoting
]]>${version}</header>
<footer><![CDATA[JBoss Remoting
]]>${version}</footer>
- <bottom><![CDATA[<i>Copyright © 2009 JBoss, a
division of Red Hat, Inc.</i>]]></bottom>
+ <bottom><![CDATA[<i>Copyright © 2010 JBoss, a
division of Red Hat, Inc.</i>]]></bottom>
<links>
- <
link>http://java.sun.com/javase/6/docs/</link>
+ <
link>http://java.sun.com/javase/6/docs/api/</link>
</links>
</configuration>
</plugin>
@@ -117,7 +117,7 @@
<repository>
<id>repository.jboss.org</id>
<name>JBoss Maven2 Repository</name>
- <url>${repository.url}</url>
+ <
url>http://repository.jboss.org/repos/maven2</url>
</repository>
</distributionManagement>
</project>
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/Endpoint.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/Endpoint.java 2010-01-27
20:01:09 UTC (rev 5677)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/Endpoint.java 2010-01-28
16:04:54 UTC (rev 5678)
@@ -34,7 +34,7 @@
* Create a request handler that can be used to receive incoming requests on this
endpoint. The client may be passed to a
* remote endpoint as part of a request or a reply, or it may be used locally.
* <p/>
- * You must have the {@link org.jboss.remoting3.EndpointPermission
createRequestHandler EndpointPermission} to invoke this method.
+ * You must have the {@link EndpointPermission createRequestHandler
EndpointPermission} to invoke this method.
*
* @param requestListener the request listener
* @param requestClass the class of requests sent to this request listener
@@ -132,7 +132,7 @@
/**
* Register the service.
* <p/>
- * You must have the {@link org.jboss.remoting3.EndpointPermission
registerService EndpointPermission} to invoke this method.
+ * You must have the {@link EndpointPermission registerService
EndpointPermission} to invoke this method.
*
* @return a registration handle
* @throws IOException if a problem occurs with registration
@@ -143,7 +143,7 @@
/**
* Add a service registration listener which is called whenever a local service is
registered.
* <p/>
- * You must have the {@link org.jboss.remoting3.EndpointPermission addServiceListener
EndpointPermission} to invoke this method.
+ * You must have the {@link EndpointPermission addServiceListener EndpointPermission}
to invoke this method.
*
* @param listener the listener
* @param flags the flags to apply to the listener
@@ -154,7 +154,7 @@
/**
* Create a client that uses the given request handler to handle its requests.
* <p/>
- * You must have the {@link org.jboss.remoting3.EndpointPermission createClient
EndpointPermission} to invoke this method.
+ * You must have the {@link EndpointPermission createClient EndpointPermission} to
invoke this method.
*
* @param <I> the request type
* @param <O> the reply type
@@ -170,7 +170,7 @@
* Open a connection with a peer. Returns a future connection which may be used to
cancel the connection attempt.
* This method does not block; use the return value to wait for a result if you wish
to block.
* <p/>
- * You must have the {@link org.jboss.remoting3.EndpointPermission connect
EndpointPermission} to invoke this method.
+ * You must have the {@link EndpointPermission connect EndpointPermission} to invoke
this method.
*
* @param destination the destination
* @param connectOptions options to configure this connection
@@ -184,7 +184,7 @@
* The given callback handler is used to retrieve local authentication information,
if the protocol demands it.
* This method does not block; use the return value to wait for a result if you wish
to block.
* <p/>
- * You must have the {@link org.jboss.remoting3.EndpointPermission connect
EndpointPermission} to invoke this method.
+ * You must have the {@link EndpointPermission connect EndpointPermission} to invoke
this method.
*
* @param destination the destination
* @param connectOptions options to configure this connection
@@ -195,10 +195,26 @@
IoFuture<? extends Connection> connect(URI destination, OptionMap
connectOptions, CallbackHandler callbackHandler) throws IOException;
/**
+ * Open a connection with a peer. Returns a future connection which may be used to
cancel the connection attempt.
+ * The given user name and password is used as retrieve local authentication
information, if the protocol demands it.
+ * This method does not block; use the return value to wait for a result if you wish
to block.
+ * <p/>
+ * You must have the {@link EndpointPermission connect EndpointPermission} to invoke
this method.
+ *
+ * @param destination the destination
+ * @param connectOptions options to configure this connection
+ * @param userName the user name to authenticate as, or {@code null} if it is
unspecified
+ * @param realmName the user realm to authenticate with, or {@code null} if it is
unspecified
+ * @param password the password to send @return the future connection, or {@code
null} if it is unspecified
+ * @throws IOException if an error occurs while starting the connect attempt
+ */
+ IoFuture<? extends Connection> connect(URI destination, OptionMap
connectOptions, String userName, String realmName, char[] password) throws IOException;
+
+ /**
* Register a connection provider for a URI scheme. The provider factory is called
with the context which can
* be used to accept new connections or terminate the registration.
* <p/>
- * You must have the {@link org.jboss.remoting3.EndpointPermission
addConnectionProvider EndpointPermission} to invoke this method.
+ * You must have the {@link EndpointPermission addConnectionProvider
EndpointPermission} to invoke this method.
*
* @param uriScheme the URI scheme
* @param providerFactory the provider factory
@@ -208,6 +224,20 @@
<T> ConnectionProviderRegistration<T> addConnectionProvider(String
uriScheme, ConnectionProviderFactory<T> providerFactory) throws
DuplicateRegistrationException;
/**
+ * Get the interface for a connection provider.
+ * <p/>
+ * You must have the {@link EndpointPermission getConnectionProviderInterface
EndpointPermission} to invoke this method.
+ *
+ * @param uriScheme the URI scheme of the registered connection provider
+ * @param expectedType the expected type of the interface
+ * @param <T> the expected type of the interface
+ * @return the provider interface
+ * @throws UnknownURISchemeException if the given URI scheme is not registered
+ * @throws ClassCastException if the interface type does not match the expected type
+ */
+ <T> T getConnectionProviderInterface(String uriScheme, Class<T>
expectedType) throws UnknownURISchemeException, ClassCastException;
+
+ /**
* Register a protocol service.
*
* @param type the type of service being registered
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java 2010-01-27
20:01:09 UTC (rev 5677)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java 2010-01-28
16:04:54 UTC (rev 5678)
@@ -62,6 +62,7 @@
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.TextOutputCallback;
@@ -158,7 +159,6 @@
*/
private final ConnectionHandlerContext localConnectionContext;
- private static final EndpointPermission CREATE_ENDPOINT_PERM = new
EndpointPermission("createEndpoint");
private static final EndpointPermission CREATE_REQUEST_HANDLER_PERM = new
EndpointPermission("createRequestHandler");
private static final EndpointPermission REGISTER_SERVICE_PERM = new
EndpointPermission("registerService");
private static final EndpointPermission CREATE_CLIENT_PERM = new
EndpointPermission("createClient");
@@ -166,16 +166,13 @@
private static final EndpointPermission CONNECT_PERM = new
EndpointPermission("connect");
private static final EndpointPermission ADD_CONNECTION_PROVIDER_PERM = new
EndpointPermission("addConnectionProvider");
private static final EndpointPermission ADD_MARSHALLING_PROTOCOL_PERM = new
EndpointPermission("addMarshallingProtocol");
+ private static final EndpointPermission GET_CONNECTION_PROVIDER_INTERFACE_PERM = new
EndpointPermission("getConnectionProviderInterface");
EndpointImpl(final Executor executor, final String name) {
super(executor);
for (int i = 0; i < providerMaps.length; i++) {
providerMaps[i] = concurrentMap();
}
- final SecurityManager sm = System.getSecurityManager();
- if (sm != null) {
- sm.checkPermission(CREATE_ENDPOINT_PERM);
- }
this.executor = executor;
this.name = name;
connectionProviders.put("local", new LocalConnectionProvider());
@@ -495,43 +492,7 @@
}
public IoFuture<? extends Connection> connect(final URI destination, final
OptionMap connectOptions) throws IOException {
- final String userName = connectOptions.get(RemotingOptions.AUTH_USER_NAME);
- final String realm = connectOptions.get(RemotingOptions.AUTH_REALM);
- return connect(destination, connectOptions, new CallbackHandler() {
- public void handle(final Callback[] callbacks) throws IOException,
UnsupportedCallbackException {
- MAIN: for (Callback callback : callbacks) {
- if (callback instanceof NameCallback && userName != null) {
- final NameCallback nameCallback = (NameCallback) callback;
- nameCallback.setName(userName);
- } else if (callback instanceof RealmCallback && realm !=
null) {
- final RealmCallback realmCallback = (RealmCallback) callback;
- realmCallback.setText(realm);
- } else if (callback instanceof RealmChoiceCallback && realm
!= null) {
- final RealmChoiceCallback realmChoiceCallback =
(RealmChoiceCallback) callback;
- final String[] choices = realmChoiceCallback.getChoices();
- for (int i = 0; i < choices.length; i++) {
- if (choices[i].equals(realm)) {
- realmChoiceCallback.setSelectedIndex(i);
- continue MAIN;
- }
- }
- throw new UnsupportedCallbackException(callback, "No realm
choices match realm '" + realm + "'");
- } else if (callback instanceof TextOutputCallback) {
- final TextOutputCallback textOutputCallback =
(TextOutputCallback) callback;
- final String kind;
- switch (textOutputCallback.getMessageType()) {
- case TextOutputCallback.ERROR: kind = "ERROR";
break;
- case TextOutputCallback.INFORMATION: kind =
"INFORMATION"; break;
- case TextOutputCallback.WARNING: kind = "WARNING";
break;
- default: kind = "UNKNOWN"; break;
- }
- log.debug("Authentication layer produced a %s message:
%s", kind, textOutputCallback.getMessage());
- } else {
- throw new UnsupportedCallbackException(callback);
- }
- }
- }
- });
+ return connect(destination, connectOptions, new
DefaultCallbackHandler(connectOptions.get(RemotingOptions.AUTH_USER_NAME),
connectOptions.get(RemotingOptions.AUTH_REALM), null));
}
public IoFuture<? extends Connection> connect(final URI destination, final
OptionMap connectOptions, final CallbackHandler callbackHandler) throws IOException {
@@ -553,6 +514,12 @@
return futureResult.getIoFuture();
}
+ public IoFuture<? extends Connection> connect(final URI destination, final
OptionMap connectOptions, final String userName, final String realmName, final char[]
password) throws IOException {
+ final String actualUserName = userName != null ? userName :
connectOptions.get(RemotingOptions.AUTH_USER_NAME);
+ final String actualUserRealm = realmName != null ? realmName :
connectOptions.get(RemotingOptions.AUTH_REALM);
+ return connect(destination, connectOptions, new
DefaultCallbackHandler(actualUserName, actualUserRealm, password));
+ }
+
public <T> ConnectionProviderRegistration<T> addConnectionProvider(final
String uriScheme, final ConnectionProviderFactory<T> providerFactory) {
final SecurityManager sm = System.getSecurityManager();
if (sm != null) {
@@ -586,6 +553,18 @@
return handle;
}
+ public <T> T getConnectionProviderInterface(final String uriScheme, final
Class<T> expectedType) throws UnknownURISchemeException, ClassCastException {
+ final SecurityManager sm = System.getSecurityManager();
+ if (sm != null) {
+ sm.checkPermission(GET_CONNECTION_PROVIDER_INTERFACE_PERM);
+ }
+ final ConnectionProvider<?> provider = connectionProviders.get(uriScheme);
+ if (provider == null) {
+ throw new UnknownURISchemeException("No connection provider for URI
scheme \"" + uriScheme + "\" is installed");
+ }
+ return expectedType.cast(provider.getProviderInterface());
+ }
+
private <T> Registration addMarshallingRegistration(final String name, final T
target, final ConcurrentMap<String, T> map, final String descr) throws
DuplicateRegistrationException {
final SecurityManager sm = System.getSecurityManager();
if (sm != null) {
@@ -736,6 +715,10 @@
super(executor);
}
+ public Executor getExecutor() {
+ return super.getExecutor();
+ }
+
public void accept(final ConnectionHandlerFactory connectionHandlerFactory) {
connectionHandlerFactory.createInstance(localConnectionContext);
}
@@ -854,4 +837,54 @@
// not closeable
}
}
+
+ private static class DefaultCallbackHandler implements CallbackHandler {
+
+ private final String actualUserName;
+ private final String actualUserRealm;
+ private final char[] password;
+
+ private DefaultCallbackHandler(final String actualUserName, final String
actualUserRealm, final char[] password) {
+ this.actualUserName = actualUserName;
+ this.actualUserRealm = actualUserRealm;
+ this.password = password;
+ }
+
+ public void handle(final Callback[] callbacks) throws IOException,
UnsupportedCallbackException {
+ MAIN: for (Callback callback : callbacks) {
+ if (callback instanceof NameCallback && actualUserName != null)
{
+ final NameCallback nameCallback = (NameCallback) callback;
+ nameCallback.setName(actualUserName);
+ } else if (callback instanceof RealmCallback && actualUserRealm
!= null) {
+ final RealmCallback realmCallback = (RealmCallback) callback;
+ realmCallback.setText(actualUserRealm);
+ } else if (callback instanceof RealmChoiceCallback &&
actualUserRealm != null) {
+ final RealmChoiceCallback realmChoiceCallback = (RealmChoiceCallback)
callback;
+ final String[] choices = realmChoiceCallback.getChoices();
+ for (int i = 0; i < choices.length; i++) {
+ if (choices[i].equals(actualUserRealm)) {
+ realmChoiceCallback.setSelectedIndex(i);
+ continue MAIN;
+ }
+ }
+ throw new UnsupportedCallbackException(callback, "No realm
choices match realm '" + actualUserRealm + "'");
+ } else if (callback instanceof TextOutputCallback) {
+ final TextOutputCallback textOutputCallback = (TextOutputCallback)
callback;
+ final String kind;
+ switch (textOutputCallback.getMessageType()) {
+ case TextOutputCallback.ERROR: kind = "ERROR"; break;
+ case TextOutputCallback.INFORMATION: kind =
"INFORMATION"; break;
+ case TextOutputCallback.WARNING: kind = "WARNING";
break;
+ default: kind = "UNKNOWN"; break;
+ }
+ log.debug("Authentication layer produced a %s message: %s",
kind, textOutputCallback.getMessage());
+ } else if (callback instanceof PasswordCallback && password !=
null) {
+ final PasswordCallback passwordCallback = (PasswordCallback)
callback;
+ passwordCallback.setPassword(password);
+ } else {
+ throw new UnsupportedCallbackException(callback);
+ }
+ }
+ }
+ }
}
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/Remoting.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/Remoting.java 2010-01-27
20:01:09 UTC (rev 5677)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/Remoting.java 2010-01-28
16:04:54 UTC (rev 5678)
@@ -23,6 +23,8 @@
package org.jboss.remoting3;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.InputStream;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@@ -32,11 +34,14 @@
import java.util.ServiceLoader;
import java.util.Map;
import java.util.HashMap;
+import java.util.Properties;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.lang.reflect.InvocationTargetException;
import org.jboss.remoting3.spi.RequestHandler;
import org.jboss.remoting3.spi.RemotingServiceDescriptor;
import org.jboss.remoting3.spi.ConnectionProviderFactory;
import org.jboss.remoting3.spi.ProtocolServiceType;
-import org.jboss.xnio.CloseableExecutor;
import org.jboss.xnio.IoUtils;
import org.jboss.xnio.OptionMap;
import org.jboss.xnio.log.Logger;
@@ -58,98 +63,14 @@
private static final Logger log = Logger.getLogger("org.jboss.remoting");
- /**
- * Create an endpoint. The endpoint will create its own thread pool with a maximum
of 10 threads.
- * <p>
- * You must have the {@link org.jboss.remoting3.EndpointPermission createEndpoint
EndpointPermission} to invoke this method.
- *
- * @param name the name of the endpoint
- * @return the endpoint
- */
- public static Endpoint createEndpoint(final String name) throws IOException {
- return createEndpoint(name, 10);
- }
+ private static Endpoint configuredEndpoint;
+ private static final Object lock = new Object();
- /**
- * Create an endpoint. The endpoint will create its own thread pool with a maximum
of {@code maxThreads} threads.
- * <p>
- * You must have the {@link org.jboss.remoting3.EndpointPermission createEndpoint
EndpointPermission} to invoke this method.
- *
- * @param name the name of the endpoint
- * @param maxThreads the maximum thread count
- * @return the endpoint
- */
- public static Endpoint createEndpoint(final String name, final int maxThreads) throws
IOException {
- return createEndpoint(name, OptionMap.builder().set(RemotingOptions.MAX_THREADS,
maxThreads).getMap());
- }
+ private static final EndpointPermission CREATE_ENDPOINT_PERM = new
EndpointPermission("createEndpoint");
+ private static final EndpointPermission GET_CONFIGURED_ENDPOINT_PERM = new
EndpointPermission("getConfiguredEndpoint");
- /**
- * Create an endpoint configured with the given option map. The following options
are supported:
- * <ul>
- * <li>{@link RemotingOptions#MAX_THREADS} - specify the maximum number of
threads for the created thread pool (default 10)</li>
- * <li>{@link RemotingOptions#LOAD_PROVIDERS} - specify whether providers
should be auto-loaded (default {@code true})</li>
- * </ul>
- *
- * @param endpointName the endpoint name
- * @param optionMap the endpoint options
- * @return the endpoint
- * @throws IOException if an error occurs
- */
- public static Endpoint createEndpoint(final String endpointName, final OptionMap
optionMap) throws IOException {
- if (endpointName == null) {
- throw new NullPointerException("endpointName is null");
- }
- if (optionMap == null) {
- throw new NullPointerException("optionMap is null");
- }
- final CloseableExecutor executor =
createExecutor(optionMap.get(RemotingOptions.MAX_THREADS, 10));
- final Endpoint endpoint = createEndpoint(executor, endpointName);
- endpoint.addCloseHandler(new CloseHandler<Endpoint>() {
- public void handleClose(final Endpoint closed) {
- IoUtils.safeClose(executor);
- }
- });
- if (optionMap.get(RemotingOptions.LOAD_PROVIDERS, true)) {
- for (RemotingServiceDescriptor<?> descriptor :
ServiceLoader.load(RemotingServiceDescriptor.class)) {
- final String name = descriptor.getName();
- final Class<?> serviceType = descriptor.getType();
- final Object service = descriptor.getService();
- try {
- if (serviceType == ConnectionProviderFactory.class) {
- endpoint.addConnectionProvider(name,
(ConnectionProviderFactory<?>) service);
- } else if (serviceType == ClassTable.class) {
- endpoint.addProtocolService(ProtocolServiceType.CLASS_TABLE,
name, (ClassTable) service);
- } else if (serviceType == ObjectTable.class) {
- endpoint.addProtocolService(ProtocolServiceType.OBJECT_TABLE,
name, (ObjectTable) service);
- } else if (serviceType == ClassResolver.class) {
- endpoint.addProtocolService(ProtocolServiceType.CLASS_RESOLVER,
name, (ClassResolver) service);
- } else if (serviceType == ObjectResolver.class) {
- endpoint.addProtocolService(ProtocolServiceType.OBJECT_RESOLVER,
name, (ObjectResolver) service);
- } else if (serviceType == ClassExternalizerFactory.class) {
-
endpoint.addProtocolService(ProtocolServiceType.CLASS_EXTERNALIZER_FACTORY, name,
(ClassExternalizerFactory) service);
- }
- } catch (DuplicateRegistrationException e) {
- log.debug("Duplicate registration for '" + name +
"' of " + serviceType);
- }
- }
- final Map<String, ProviderDescriptor> found = new HashMap<String,
ProviderDescriptor>();
- for (ProviderDescriptor descriptor :
ServiceLoader.load(ProviderDescriptor.class)) {
- final String name = descriptor.getName();
- // find the best one
- if (! found.containsKey(name) ||
found.get(name).getSupportedVersions()[0] < descriptor.getSupportedVersions()[0]) {
- found.put(name, descriptor);
- }
- }
- for (String name : found.keySet()) {
- try {
-
endpoint.addProtocolService(ProtocolServiceType.MARSHALLER_PROVIDER_DESCRIPTOR, name,
found.get(name));
- } catch (DuplicateRegistrationException e) {
- log.debug("Duplicate registration for '" + name +
"' of " + MarshallerFactory.class);
- }
- }
- }
- return endpoint;
- }
+ private static final String PROPERTIES = "remoting.properties";
+ private static final String PROPERTY_FILE_PROPNAME =
"remoting.property.file";
private static final ThreadFactory OUR_THREAD_FACTORY = new ThreadFactory() {
private final ThreadFactory defaultThreadFactory =
Executors.defaultThreadFactory();
@@ -165,35 +86,187 @@
}
};
- /**
- * Create a simple thread pool that is compatible with Remoting. The thread pool
will have a maximum of {@code maxThreads}
- * threads.
- *
- * @param maxThreads the maximum thread count
- * @return a closeable executor
- */
- public static CloseableExecutor createExecutor(final int maxThreads) {
- final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, maxThreads, 30L,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50), OUR_THREAD_FACTORY, new
ThreadPoolExecutor.CallerRunsPolicy());
- return new CloseableExecutor() {
- public void close() throws IOException {
- executor.shutdown();
+ public static Endpoint getConfiguredEndpoint() throws IOException {
+ final SecurityManager sm = System.getSecurityManager();
+ if (sm != null) {
+ sm.checkPermission(GET_CONFIGURED_ENDPOINT_PERM);
+ }
+ synchronized (lock) {
+ final Endpoint endpoint = configuredEndpoint;
+ if (endpoint != null) {
+ return endpoint;
}
+ return configuredEndpoint = createConfigured();
+ }
+ }
- public void execute(final Runnable command) {
- executor.execute(command);
+ private static Endpoint createConfigured() throws IOException {
+ try {
+ return AccessController.doPrivileged(new PrivilegedAction<Endpoint>()
{
+ public Endpoint run() {
+ boolean ok = false;
+ final String fileName = System.getProperty(PROPERTY_FILE_PROPNAME,
PROPERTIES);
+ final Properties props = new Properties();
+ try {
+ final InputStream stream =
getClass().getResourceAsStream(fileName);
+ if (stream != null) try {
+ final InputStreamReader reader = new
InputStreamReader(stream, "utf-8");
+ try {
+ props.load(reader);
+ reader.close();
+ } finally {
+ IoUtils.safeClose(reader);
+ }
+ } finally {
+ IoUtils.safeClose(stream);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ final ThreadPoolExecutor executor = new ThreadPoolExecutor(
+
Integer.parseInt(props.getProperty("endpoint.threadpool.coresize",
"8")),
+
Integer.parseInt(props.getProperty("endpoint.threadpool.maxsize",
"64")),
+
Long.parseLong(props.getProperty("endpoint.threadpool.keepaliveseconds",
"30")),
+ TimeUnit.SECONDS,
+ new
ArrayBlockingQueue<Runnable>(Integer.parseInt(props.getProperty("endpoint.threadpool.queuelength",
"64"))),
+ OUR_THREAD_FACTORY,
+ new ThreadPoolExecutor.CallerRunsPolicy()
+ );
+ try {
+ final Endpoint endpoint;
+ try {
+ endpoint =
createEndpoint(props.getProperty("endpoint.name", "endpoint"),
executor, OptionMap.builder().parseAll(props, "endpoint.option.").getMap());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ try {
+ endpoint.addCloseHandler(new CloseHandler<Endpoint>()
{
+ public void handleClose(final Endpoint closed) {
+ executor.shutdown();
+ }
+ });
+ addServices(endpoint, ProtocolServiceType.CLASS_TABLE,
props);
+ addServices(endpoint, ProtocolServiceType.OBJECT_TABLE,
props);
+ addServices(endpoint, ProtocolServiceType.CLASS_RESOLVER,
props);
+ addServices(endpoint, ProtocolServiceType.OBJECT_RESOLVER,
props);
+ addServices(endpoint,
ProtocolServiceType.CLASS_EXTERNALIZER_FACTORY, props);
+ for (RemotingServiceDescriptor<?> descriptor :
ServiceLoader.load(RemotingServiceDescriptor.class)) {
+ final String name = descriptor.getName();
+ final Class<?> serviceType = descriptor.getType();
+ final Object service;
+ try {
+ service = descriptor.getService(props);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ try {
+ if (serviceType == ConnectionProviderFactory.class)
{
+ endpoint.addConnectionProvider(name,
(ConnectionProviderFactory<?>) service);
+ } else if (serviceType == ClassTable.class) {
+
endpoint.addProtocolService(ProtocolServiceType.CLASS_TABLE, name, (ClassTable) service);
+ } else if (serviceType == ObjectTable.class) {
+
endpoint.addProtocolService(ProtocolServiceType.OBJECT_TABLE, name, (ObjectTable)
service);
+ } else if (serviceType == ClassResolver.class) {
+
endpoint.addProtocolService(ProtocolServiceType.CLASS_RESOLVER, name, (ClassResolver)
service);
+ } else if (serviceType == ObjectResolver.class) {
+
endpoint.addProtocolService(ProtocolServiceType.OBJECT_RESOLVER, name, (ObjectResolver)
service);
+ } else if (serviceType ==
ClassExternalizerFactory.class) {
+
endpoint.addProtocolService(ProtocolServiceType.CLASS_EXTERNALIZER_FACTORY, name,
(ClassExternalizerFactory) service);
+ }
+ } catch (DuplicateRegistrationException e) {
+ log.warn("Duplicate registration for '"
+ name + "' of " + serviceType);
+ }
+ }
+ final Map<String, ProviderDescriptor> found = new
HashMap<String, ProviderDescriptor>();
+ for (ProviderDescriptor descriptor :
ServiceLoader.load(ProviderDescriptor.class)) {
+ final String name = descriptor.getName();
+ // find the best one
+ if (! found.containsKey(name) ||
found.get(name).getSupportedVersions()[0] < descriptor.getSupportedVersions()[0]) {
+ found.put(name, descriptor);
+ }
+ }
+ for (String name : found.keySet()) {
+ try {
+
endpoint.addProtocolService(ProtocolServiceType.MARSHALLER_PROVIDER_DESCRIPTOR, name,
found.get(name));
+ } catch (DuplicateRegistrationException e) {
+ log.warn("Duplicate registration for '"
+ name + "' of " + MarshallerFactory.class);
+ }
+ }
+ ok = true;
+ return endpoint;
+ } finally {
+ if (! ok) {
+ IoUtils.safeClose(endpoint);
+ }
+ }
+ } finally {
+ if (!ok) {
+ executor.shutdown();
+ try {
+ executor.awaitTermination(Long.MAX_VALUE,
TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ final Throwable c = e.getCause();
+ if (c instanceof IOException) {
+ throw (IOException) c;
}
- };
+ throw e;
+ }
}
+ private static <T> void addServices(final Endpoint endpoint, final
ProtocolServiceType<T> serviceType, final Properties props) {
+ final String basePropName = serviceType.getName().toLowerCase();
+ final String instances = props.getProperty(endpoint.getName() + "." +
basePropName + "_list");
+ final Class<T> valueClass = serviceType.getValueClass();
+ if (instances != null) {
+ for (String name : instances.split(",")) {
+ final String trimmed = name.trim();
+ final String className = props.getProperty(name + "." +
basePropName + "." + trimmed + ".class");
+ if (className != null) {
+ try {
+ final Class<? extends T> instanceType =
Class.forName(className).asSubclass(valueClass);
+ final T instance = instanceType.getConstructor().newInstance();
+ endpoint.addProtocolService(serviceType, name, instance);
+ } catch (InvocationTargetException e) {
+ log.warn(e.getCause(), "Unable to create %s instance
'%s'", serviceType, name);
+ } catch (Exception e) {
+ log.warn("Unable to register %s '%s': %s",
serviceType, name, e);
+ }
+ }
+ }
+ }
+ }
+
/**
- * Create an endpoint using the given {@code Executor} to execute tasks.
+ * Create an endpoint configured with the given option map. The following options
are supported:
+ * <ul>
+ * </ul>
*
- * @param executor the executor to use
- * @param name the name of the endpoint
+ * @param endpointName the endpoint name
+ * @param executor the thread pool to use
+ * @param optionMap the endpoint options
* @return the endpoint
+ * @throws IOException if an error occurs
*/
- public static Endpoint createEndpoint(final Executor executor, final String name)
throws IOException {
- return new EndpointImpl(executor, name);
+ public static Endpoint createEndpoint(final String endpointName, final Executor
executor, final OptionMap optionMap) throws IOException {
+ if (endpointName == null) {
+ throw new NullPointerException("endpointName is null");
+ }
+ if (optionMap == null) {
+ throw new NullPointerException("optionMap is null");
+ }
+ final SecurityManager sm = System.getSecurityManager();
+ if (sm != null) {
+ sm.checkPermission(CREATE_ENDPOINT_PERM);
+ }
+ final Endpoint endpoint = new EndpointImpl(executor, endpointName);
+ return endpoint;
}
/**
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/RemotingOptions.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/RemotingOptions.java 2010-01-27
20:01:09 UTC (rev 5677)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/RemotingOptions.java 2010-01-28
16:04:54 UTC (rev 5678)
@@ -34,42 +34,32 @@
}
/**
- * Configure the maximum number of threads for a simple endpoint.
- */
- public static final Option<Integer> MAX_THREADS =
Option.simple(RemotingOptions.class, "MAX_THREADS", Integer.class);
-
- /**
- * Specify whether connection providers should automatically be detected and loaded.
- */
- public static final Option<Boolean> LOAD_PROVIDERS =
Option.simple(RemotingOptions.class, "LOAD_PROVIDERS", Boolean.class);
-
- /**
* Request that the marshalling layer require the use of one of the listed
marshalling protocols, in order of decreasing preference.
*/
public static final Option<Sequence<String>> MARSHALLING_PROTOCOLS =
Option.sequence(RemotingOptions.class, "MARSHALLING_PROTOCOLS", String.class);
/**
- * Request that the marshalling layer require the presense of one of the listed
user-defined class tables, in order of decreasing preference.
+ * Request that the marshalling layer require the presence of one of the listed
user-defined class tables, in order of decreasing preference.
*/
public static final Option<Sequence<String>> MARSHALLING_CLASS_TABLES =
Option.sequence(RemotingOptions.class, "MARSHALLING_CLASS_TABLES",
String.class);
/**
- * Request that the marshalling layer require the presense of one of the listed
user-defined object tables, in order of decreasing preference.
+ * Request that the marshalling layer require the presence of one of the listed
user-defined object tables, in order of decreasing preference.
*/
public static final Option<Sequence<String>> MARSHALLING_OBJECT_TABLES =
Option.sequence(RemotingOptions.class, "MARSHALLING_OBJECT_TABLES",
String.class);
/**
- * Request that the marshalling layer require the presense of one of the listed class
resolvers, in order of decreasing preference.
+ * Request that the marshalling layer require the presence of one of the listed class
resolvers, in order of decreasing preference.
*/
public static final Option<Sequence<String>> MARSHALLING_CLASS_RESOLVERS
= Option.sequence(RemotingOptions.class, "MARSHALLING_CLASS_RESOLVERS",
String.class);
/**
- * Request that the marshalling layer require the presense of one of the listed
object resolvers, in order of decreasing preference.
+ * Request that the marshalling layer require the presence of one of the listed
object resolvers, in order of decreasing preference.
*/
public static final Option<Sequence<String>> MARSHALLING_OBJECT_RESOLVERS
= Option.sequence(RemotingOptions.class, "MARSHALLING_OBJECT_RESOLVERS",
String.class);
/**
- * Request that the marshalling layer require the presense of one of the listed
user-defined externalizer factories, in order of decreasing preference.
+ * Request that the marshalling layer require the presence of one of the listed
user-defined externalizer factories, in order of decreasing preference.
*/
public static final Option<Sequence<String>>
MARSHALLING_EXTERNALIZER_FACTORIES = Option.sequence(RemotingOptions.class,
"MARSHALLING_EXTERNALIZER_FACTORIES", String.class);
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionProviderContext.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionProviderContext.java 2010-01-27
20:01:09 UTC (rev 5677)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/ConnectionProviderContext.java 2010-01-28
16:04:54 UTC (rev 5678)
@@ -24,6 +24,7 @@
import org.jboss.remoting3.HandleableCloseable;
import java.util.Map;
+import java.util.concurrent.Executor;
/**
* A context for a connection provider. This provides additional endpoint methods to
connection providers which are not
@@ -34,6 +35,13 @@
public interface ConnectionProviderContext extends
HandleableCloseable<ConnectionProviderContext> {
/**
+ * Get the endpoint's executor.
+ *
+ * @return the endpoint executor
+ */
+ Executor getExecutor();
+
+ /**
* Accept a connection that was received by the corresponding protocol handler.
*
* @param connectionHandlerFactory the connection handler factory
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/RemotingServiceDescriptor.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/RemotingServiceDescriptor.java 2010-01-27
20:01:09 UTC (rev 5677)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/spi/RemotingServiceDescriptor.java 2010-01-28
16:04:54 UTC (rev 5678)
@@ -22,6 +22,9 @@
package org.jboss.remoting3.spi;
+import java.util.Properties;
+import java.io.IOException;
+
/**
* A descriptor for automatically-discovered remoting service types. Since instances of
this interface are
* constructed automatically, implementing classes should have a no-arg constructor.
@@ -58,9 +61,12 @@
String getName();
/**
- * Get the service to associate with the given name.
+ * Get the service to associate with the given name. The given properties were used
to configure the endpoint,
+ * and may be used to configure additional properties of this provider.
*
+ * @param properties the properties used to configure the endpoint
* @return the service
+ * @throws IOException if the instance could not be produced
*/
- T getService();
+ T getService(Properties properties) throws IOException;
}
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Streams.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Streams.java 2010-01-27
20:01:09 UTC (rev 5677)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Streams.java 2010-01-28
16:04:54 UTC (rev 5678)
@@ -89,6 +89,17 @@
}
/**
+ * Get an object sink that checks the type of each accepted instance.
+ *
+ * @param delegate the object sink to delegate to
+ * @param clazz the class to check for
+ * @return a checking object sink
+ */
+ public static <T> ObjectSink<T> getCheckedObjectSink(final
ObjectSink<T> delegate, final Class<? extends T> clazz) {
+ return new CheckedObjectSink<T>(delegate, clazz);
+ }
+
+ /**
* Get an object source that reads from an iterator over map entries.
*
* @param iterator the iterator object type
@@ -373,4 +384,27 @@
public void close() throws IOException {
}
}
+
+ private static class CheckedObjectSink<T> implements ObjectSink<T> {
+
+ private final ObjectSink<T> delegate;
+ private final Class<? extends T> clazz;
+
+ private CheckedObjectSink(final ObjectSink<T> delegate, final Class<?
extends T> clazz) {
+ this.delegate = delegate;
+ this.clazz = clazz;
+ }
+
+ public void accept(final T instance) throws IOException {
+ delegate.accept(clazz.cast(instance));
+ }
+
+ public void flush() throws IOException {
+ delegate.flush();
+ }
+
+ public void close() throws IOException {
+ delegate.close();
+ }
+ }
}
Modified:
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/EndpointTestCase.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/EndpointTestCase.java 2010-01-27
20:01:09 UTC (rev 5677)
+++
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/EndpointTestCase.java 2010-01-28
16:04:54 UTC (rev 5678)
@@ -41,7 +41,7 @@
public void testCreate() throws Throwable {
final ExecutorService executorService = Executors.newCachedThreadPool();
- final Endpoint endpoint = Remoting.createEndpoint(executorService,
"foo");
+ final Endpoint endpoint = Remoting.createEndpoint("foo",
executorService, OptionMap.EMPTY);
try {
endpoint.close();
executorService.shutdown();
@@ -52,7 +52,7 @@
}
public void testLocalClientInvoke() throws Throwable {
- final Endpoint endpoint = Remoting.createEndpoint("test-endpoint");
+ final Endpoint endpoint = Remoting.getConfiguredEndpoint();
try {
final Object requestObj = new Object();
final Object replyObj = new Object();
@@ -80,7 +80,7 @@
}
public void testLocalClientSend() throws Throwable {
- final Endpoint endpoint = Remoting.createEndpoint("test-endpoint");
+ final Endpoint endpoint = Remoting.getConfiguredEndpoint();
try {
final Object requestObj = new Object();
final Object replyObj = new Object();
@@ -108,7 +108,7 @@
}
public void testLocalClientConnectInvoke() throws Throwable {
- final Endpoint endpoint = Remoting.createEndpoint("test-endpoint");
+ final Endpoint endpoint = Remoting.getConfiguredEndpoint();
try {
final Object requestObj = new Object();
final Object replyObj = new Object();
@@ -151,7 +151,7 @@
}
public void testLocalClientConnectSend() throws Throwable {
- final Endpoint endpoint = Remoting.createEndpoint("test-endpoint");
+ final Endpoint endpoint = Remoting.getConfiguredEndpoint();
try {
final Object requestObj = new Object();
final Object replyObj = new Object();
@@ -194,7 +194,7 @@
}
public void testNotFoundService() throws Throwable {
- final Endpoint endpoint = Remoting.createEndpoint("test-endpoint");
+ final Endpoint endpoint = Remoting.getConfiguredEndpoint();
try {
endpoint.connect(new URI("local:///"),
OptionMap.EMPTY).get().openClient("blah", "bzzt", Object.class,
Object.class).get();
} catch (ServiceNotFoundException e) {
Modified:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/simple/LocalBasicExample2Main.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/simple/LocalBasicExample2Main.java 2010-01-27
20:01:09 UTC (rev 5677)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/simple/LocalBasicExample2Main.java 2010-01-28
16:04:54 UTC (rev 5678)
@@ -40,7 +40,7 @@
}
public static void main(String[] args) throws Exception {
- final Endpoint endpoint = Remoting.createEndpoint("simple");
+ final Endpoint endpoint = Remoting.getConfiguredEndpoint();
try {
final Registration handle =
endpoint.serviceBuilder().setServiceType("simple.rot13").setGroupName("main")
.setRequestType(String.class).setReplyType(String.class).setClientListener(new
StringRot13ClientListener())
Modified:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/simple/LocalBasicExampleMain.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/simple/LocalBasicExampleMain.java 2010-01-27
20:01:09 UTC (rev 5677)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/simple/LocalBasicExampleMain.java 2010-01-28
16:04:54 UTC (rev 5678)
@@ -16,7 +16,7 @@
public static void main(String[] args) throws IOException {
final StringRot13RequestListener listener = new StringRot13RequestListener();
- final Endpoint endpoint = Remoting.createEndpoint("simple");
+ final Endpoint endpoint = Remoting.getConfiguredEndpoint();
try {
final Client<String,String> client =
Remoting.createLocalClient(endpoint, listener, null, null);
try {
Modified:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketUsageExamples.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketUsageExamples.java 2010-01-27
20:01:09 UTC (rev 5677)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketUsageExamples.java 2010-01-28
16:04:54 UTC (rev 5678)
@@ -75,7 +75,7 @@
// Start server service.
log.info("entering " + getName());
ExecutorService serverExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- Endpoint serverEndpoint = Remoting.createEndpoint(serverExecutor,
"server");
+ Endpoint serverEndpoint = Remoting.createEndpoint("server",
serverExecutor, OptionMap.EMPTY);
int serverPort = PORT + portCounter++;
Cancellable socketServer = SocketProtocol.registerServerTransport(serverEndpoint,
serverExecutor, HOST, serverPort);
SocketServiceConfiguration<String, String> socketServiceConfiguration = new
SocketServiceConfiguration<String, String>(SERVICE_TYPE, GROUP_NAME, String.class,
String.class, HOST, serverPort);
@@ -83,7 +83,7 @@
// Create client and connect to server.
ExecutorService clientExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- Endpoint clientEndpoint = Remoting.createEndpoint(serverExecutor,
"client");
+ Endpoint clientEndpoint = Remoting.createEndpoint("client",
serverExecutor, OptionMap.EMPTY);
SocketProtocol.registerClientTransport(clientEndpoint, clientExecutor, HOST);
Connection connection = getFutureResult(clientEndpoint.connect(new
URI("socket://" + HOST + ":" + serverPort), OptionMap.EMPTY),
"couldn't create Connection");
Client<String, String> client =
getFutureResult(connection.openClient(SERVICE_TYPE, GROUP_NAME, String.class,
String.class), "couldn't create Client");
@@ -111,7 +111,7 @@
// Start server service.
log.info("entering " + getName());
ExecutorService serverExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- Endpoint serverEndpoint = Remoting.createEndpoint(serverExecutor,
"server");
+ Endpoint serverEndpoint = Remoting.createEndpoint("server",
serverExecutor, OptionMap.EMPTY);
int serverPort = PORT + portCounter++;
Cancellable socketServer = SocketProtocol.registerServerTransport(serverEndpoint,
serverExecutor, HOST, serverPort);
SocketServiceConfiguration<String, String> socketServiceConfiguration = new
SocketServiceConfiguration<String, String>(SERVICE_TYPE, GROUP_NAME, String.class,
String.class, HOST, serverPort);
@@ -119,7 +119,7 @@
// Create client and connect to server.
ExecutorService clientExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- Endpoint clientEndpoint = Remoting.createEndpoint(serverExecutor,
"client");
+ Endpoint clientEndpoint = Remoting.createEndpoint("client",
serverExecutor, OptionMap.EMPTY);
SocketProtocol.registerClientTransport(clientEndpoint, clientExecutor, HOST);
Connection connection = getFutureResult(clientEndpoint.connect(new
URI("socket://" + HOST + ":" + serverPort), OptionMap.EMPTY),
"couldn't create Connection");
Client<String, String> client =
getFutureResult(connection.openClient(SERVICE_TYPE, GROUP_NAME, String.class,
String.class), "couldn't create Client");
@@ -147,7 +147,7 @@
// Start west coast service.
log.info("entering " + getName());
ExecutorService westernExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- Endpoint westernEndpoint = Remoting.createEndpoint(westernExecutor, "west
coast server");
+ Endpoint westernEndpoint = Remoting.createEndpoint("west coast server",
westernExecutor, OptionMap.EMPTY);
int westernPort = PORT + portCounter++;
Cancellable westernServer = SocketProtocol.registerServerTransport(westernEndpoint,
westernExecutor, HOST, westernPort);
SocketServiceConfiguration<String, String> westernServiceConfiguration = new
SocketServiceConfiguration<String, String>(SERVICE_TYPE, GROUP_NAME, String.class,
String.class, HOST, westernPort);
@@ -155,7 +155,7 @@
// Start east coast service.
ExecutorService easternExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- Endpoint easternEndpoint = Remoting.createEndpoint(easternExecutor, "west
coast server");
+ Endpoint easternEndpoint = Remoting.createEndpoint("west coast server",
easternExecutor, OptionMap.EMPTY);
int easternPort = PORT + portCounter++;
Cancellable easternServer = SocketProtocol.registerServerTransport(easternEndpoint,
easternExecutor, HOST, easternPort);
SocketServiceConfiguration<String, String> easternServiceConfiguration = new
SocketServiceConfiguration<String, String>(SERVICE_TYPE, GROUP_NAME, String.class,
String.class, HOST, easternPort);
@@ -203,7 +203,7 @@
// Create server.
log.info("entering " + getName());
ExecutorService serverExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- Endpoint serverEndpoint = Remoting.createEndpoint(serverExecutor,
"server");
+ Endpoint serverEndpoint = Remoting.createEndpoint("server",
serverExecutor, OptionMap.EMPTY);
int serverPort = PORT + portCounter++;
Cancellable socketServer = SocketProtocol.registerServerTransport(serverEndpoint,
serverExecutor, HOST, serverPort);
@@ -217,7 +217,7 @@
// Create client endpoint and get connection.
ExecutorService clientExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- Endpoint clientEndpoint = Remoting.createEndpoint(serverExecutor,
"client");
+ Endpoint clientEndpoint = Remoting.createEndpoint("client",
serverExecutor, OptionMap.EMPTY);
SocketProtocol.registerClientTransport(clientEndpoint, clientExecutor, HOST);
Connection connection = getFutureResult(clientEndpoint.connect(new
URI("socket://" + HOST + ":" + serverPort), OptionMap.EMPTY),
"couldn't create Connection");
@@ -252,7 +252,7 @@
// Start remote service.
log.info("entering " + getName());
ExecutorService remoteExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- Endpoint remoteEndpoint = Remoting.createEndpoint(remoteExecutor, "remote
endpoint");
+ Endpoint remoteEndpoint = Remoting.createEndpoint("remote endpoint",
remoteExecutor, OptionMap.EMPTY);
int remotePort = PORT + portCounter++;
Cancellable remoteServer = SocketProtocol.registerServerTransport(remoteEndpoint,
remoteExecutor, HOST, remotePort);
SocketServiceConfiguration<RequestWrapper, Object> remoteServiceConfiguration
= new SocketServiceConfiguration<RequestWrapper, Object>(SERVICE_TYPE +
"remote", GROUP_NAME, RequestWrapper.class, Object.class, HOST, remotePort);
@@ -260,7 +260,7 @@
// Start local service.
ExecutorService localExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- Endpoint localEndpoint = Remoting.createEndpoint(localExecutor, "local
server");
+ Endpoint localEndpoint = Remoting.createEndpoint("local server",
localExecutor, OptionMap.EMPTY);
int localPort = PORT + portCounter++;
Cancellable localServer = SocketProtocol.registerServerTransport(localEndpoint,
localExecutor, HOST, localPort);
SocketServiceConfiguration<Object, Object> localServiceConfiguration = new
SocketServiceConfiguration<Object, Object>(SERVICE_TYPE + "local",
GROUP_NAME, Object.class, Object.class, HOST, localPort);
@@ -303,7 +303,7 @@
// Start remote service.
log.info("entering " + getName());
ExecutorService remoteExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- Endpoint remoteEndpoint = Remoting.createEndpoint(remoteExecutor, "remote
endpoint");
+ Endpoint remoteEndpoint = Remoting.createEndpoint("remote endpoint",
remoteExecutor, OptionMap.EMPTY);
int remotePort = PORT + portCounter++;
Cancellable remoteServer = SocketProtocol.registerServerTransport(remoteEndpoint,
remoteExecutor, HOST, remotePort);
SocketServiceConfiguration<Object, Object> remoteServiceConfiguration = new
SocketServiceConfiguration<Object, Object>(SERVICE_TYPE + "remote",
GROUP_NAME, Object.class, Object.class, HOST, remotePort);
@@ -311,7 +311,7 @@
// Create local endpoint.
ExecutorService localExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- Endpoint localEndpoint = Remoting.createEndpoint(localExecutor, "local
server");
+ Endpoint localEndpoint = Remoting.createEndpoint("local server",
localExecutor, OptionMap.EMPTY);
SocketProtocol.registerClientTransport(localEndpoint, localExecutor, HOST);
// Send ClientConnector to remote server and get callbacks.