[jboss-remoting-commits] JBoss Remoting SVN: r3998 - in remoting3/trunk: api/src/main/java/org/jboss/cx/remoting/spi/wrapper and 4 other directories.
jboss-remoting-commits at lists.jboss.org
jboss-remoting-commits at lists.jboss.org
Wed Apr 16 14:08:28 EDT 2008
Author: david.lloyd at jboss.com
Date: 2008-04-16 14:08:27 -0400 (Wed, 16 Apr 2008)
New Revision: 3998
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppProtocolSupport.java
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
Log:
"bean"-ify Endpoint - now an endpoint must be maintained by a container or by the Remoting utility class
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-04-16 12:52:42 UTC (rev 3997)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-04-16 18:08:27 UTC (rev 3998)
@@ -11,7 +11,7 @@
/**
* A potential participant in a JBoss Remoting communications relationship.
*/
-public interface Endpoint extends Closeable<Endpoint> {
+public interface Endpoint {
/**
* Get the endpoint attribute map. This is a storage area for any data associated with this endpoint, including
* (but not limited to) connection and protocol information, and application information.
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-04-16 12:52:42 UTC (rev 3997)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-04-16 18:08:27 UTC (rev 3998)
@@ -2,7 +2,6 @@
import java.net.URI;
import java.util.concurrent.ConcurrentMap;
-import org.jboss.cx.remoting.CloseHandler;
import org.jboss.cx.remoting.Client;
import org.jboss.cx.remoting.ClientSource;
import org.jboss.cx.remoting.Endpoint;
@@ -61,20 +60,4 @@
public void removeSessionListener(final SessionListener sessionListener) {
delegate.removeSessionListener(sessionListener);
}
-
- public void close() throws RemotingException {
- delegate.close();
- }
-
- public void closeImmediate() throws RemotingException {
- delegate.closeImmediate();
- }
-
- public void addCloseHandler(final CloseHandler<Endpoint> closeHandler) {
- delegate.addCloseHandler(new CloseHandler<Endpoint>() {
- public void handleClose(final Endpoint closed) {
- closeHandler.handleClose(EndpointWrapper.this);
- }
- });
- }
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java 2008-04-16 12:52:42 UTC (rev 3997)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java 2008-04-16 18:08:27 UTC (rev 3998)
@@ -2,15 +2,12 @@
import java.io.IOException;
import java.net.URI;
-import java.util.Iterator;
import java.util.LinkedHashSet;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import org.jboss.cx.remoting.CloseHandler;
import org.jboss.cx.remoting.Client;
import org.jboss.cx.remoting.ClientSource;
import org.jboss.cx.remoting.Endpoint;
@@ -38,19 +35,10 @@
/**
*
*/
-public final class CoreEndpoint {
+public class CoreEndpoint implements Endpoint {
- private final String name;
- private final RequestListener<?, ?> rootListener;
- private final Endpoint userEndpoint = new UserEndpoint();
- private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.INITIAL);
- private final Set<SessionListener> sessionListeners = CollectionUtil.synchronizedSet(new LinkedHashSet<SessionListener>());
-
- private OrderedExecutorFactory orderedExecutorFactory;
- private Executor executor;
- private ExecutorService executorService;
-
static {
+ // Print Remoting "greeting" message
Logger.getLogger("org.jboss.cx.remoting").info("JBoss Remoting version %s", Version.VERSION);
}
@@ -60,51 +48,72 @@
DOWN;
public boolean isReachable(final State dest) {
- switch (this) {
- case INITIAL:
- return dest != INITIAL;
- case UP:
- return dest == DOWN;
- default:
- return false;
- }
+ return compareTo(dest) < 0;
}
}
- public CoreEndpoint(final String name, final RequestListener<?, ?> rootListener) {
- this.name = name;
- this.rootListener = rootListener;
- }
+ private String name;
+ private RequestListener<?, ?> rootListener;
+ private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.INITIAL);
+ private final Set<SessionListener> sessionListeners = CollectionUtil.synchronizedSet(new LinkedHashSet<SessionListener>());
+
+ private OrderedExecutorFactory orderedExecutorFactory;
+ private ExecutorService executorService;
+
private final ConcurrentMap<Object, Object> endpointMap = CollectionUtil.concurrentMap();
private final ConcurrentMap<String, CoreProtocolRegistration> protocolMap = CollectionUtil.concurrentMap();
private final Set<CoreSession> sessions = CollectionUtil.synchronizedSet(CollectionUtil.<CoreSession>hashSet());
- // accesses protected by {@code shutdownListeners} - always lock AFTER {@code state}
- private final List<CloseHandler<Endpoint>> closeHandlers = CollectionUtil.synchronizedArrayList();
+ public CoreEndpoint() {
+ }
+
+ // Dependencies
+
+ private Executor executor;
+
public Executor getExecutor() {
return executor;
}
+ public Executor getOrderedExecutor() {
+ return orderedExecutorFactory.getOrderedExecutor();
+ }
+
public void setExecutor(final Executor executor) {
this.executor = executor;
orderedExecutorFactory = new OrderedExecutorFactory(executor);
}
- public Endpoint getUserEndpoint() {
- return userEndpoint;
+ // Configuration
+
+ public void setName(final String name) {
+ this.name = name;
}
- void removeSession(CoreSession coreSession) {
- synchronized (sessions) {
- if (!sessions.remove(coreSession)) {
- return;
- }
- sessions.notifyAll();
+ public String getName() {
+ return name;
+ }
+
+ public void setRootListener(final RequestListener<?, ?> rootListener) {
+ this.rootListener = rootListener;
+ }
+
+ public RequestListener<?, ?> getRootListener() {
+ return rootListener;
+ }
+
+ // Lifecycle
+
+ public void create() {
+ // todo security check
+ if (rootListener == null) {
+ throw new NullPointerException("rootListener is null");
}
}
public void start() {
+ // todo security check
if (executor == null) {
executorService = Executors.newCachedThreadPool();
setExecutor(executorService);
@@ -113,6 +122,7 @@
}
public void stop() {
+ // todo security check
if (executorService != null) {
executorService.shutdown();
executorService = null;
@@ -120,10 +130,123 @@
// todo
}
- Executor getOrderedExecutor() {
- return orderedExecutorFactory.getOrderedExecutor();
+ public void destroy() {
+ rootListener = null;
+ executor = null;
}
+
+ // Endpoint implementation
+
+ public ConcurrentMap<Object, Object> getAttributes() {
+ return endpointMap;
+ }
+
+ public Session openSession(final URI uri, final AttributeMap attributeMap) throws RemotingException {
+ if (uri == null) {
+ throw new NullPointerException("uri is null");
+ }
+ if (attributeMap == null) {
+ throw new NullPointerException("attributeMap is null");
+ }
+ final String scheme = uri.getScheme();
+ if (scheme == null) {
+ throw new RemotingException("No scheme on remote endpoint URI");
+ }
+ state.requireHold(State.UP);
+ try {
+ final CoreProtocolRegistration registration = protocolMap.get(scheme);
+ if (registration == null) {
+ throw new RemotingException("No handler available for URI scheme \"" + scheme + "\"");
+ }
+ final ProtocolHandlerFactory factory = registration.getProtocolHandlerFactory();
+ try {
+ final CoreSession session = new CoreSession(CoreEndpoint.this);
+ session.initializeClient(factory, uri, attributeMap, createClient(rootListener));
+ sessions.add(session);
+ final Session userSession = session.getUserSession();
+ for (final SessionListener listener : sessionListeners) {
+ executor.execute(new Runnable() {
+ public void run() {
+ listener.handleSessionOpened(userSession);
+ }
+ });
+ }
+ return userSession;
+ } catch (IOException e) {
+ RemotingException rex = new RemotingException("Failed to create protocol handler: " + e.getMessage());
+ rex.setStackTrace(e.getStackTrace());
+ throw rex;
+ }
+ } finally {
+ state.release();
+ }
+ }
+
+ public ProtocolContext openIncomingSession(final ProtocolHandler handler) throws RemotingException {
+ state.requireHold(State.UP);
+ try {
+ final CoreSession session = new CoreSession(CoreEndpoint.this);
+ session.initializeServer(handler, createClient(rootListener));
+ sessions.add(session);
+ return session.getProtocolContext();
+ } finally {
+ state.release();
+ }
+ }
+
+ public Registration registerProtocol(final String scheme, final ProtocolHandlerFactory protocolHandlerFactory) throws RemotingException, IllegalArgumentException {
+ if (scheme == null) {
+ throw new NullPointerException("scheme is null");
+ }
+ if (protocolHandlerFactory == null) {
+ throw new NullPointerException("protocolHandlerFactory is null");
+ }
+ state.requireHold(State.UP);
+ try {
+ final CoreProtocolRegistration registration = new CoreProtocolRegistration(protocolHandlerFactory);
+ protocolMap.put(scheme, registration);
+ return registration;
+ } finally {
+ state.release();
+ }
+ }
+
+ public <I, O> Client<I, O> createClient(RequestListener<I, O> requestListener) {
+ final CoreInboundClient<I, O> inbound = new CoreInboundClient<I, O>(requestListener, executor);
+ final CoreOutboundClient<I, O> outbound = new CoreOutboundClient<I, O>(executor);
+ inbound.initialize(outbound.getContextClient());
+ outbound.initialize(inbound.getClientResponder());
+ return outbound.getUserContext();
+ }
+
+ public <I, O> ClientSource<I, O> createService(RequestListener<I, O> requestListener) {
+ final CoreInboundService<I, O> inbound = new CoreInboundService<I, O>(requestListener, executor);
+ final CoreOutboundService<I, O> outbound = new CoreOutboundService<I, O>(executor);
+ inbound.initialize(outbound.getServiceClient());
+ outbound.initialize(inbound.getServiceResponder());
+ return outbound.getUserContextSource();
+ }
+
+ public void addSessionListener(final SessionListener sessionListener) {
+ // TODO security check
+ sessionListeners.add(sessionListener);
+ }
+
+ public void removeSessionListener(final SessionListener sessionListener) {
+ // TODO security check
+ sessionListeners.remove(sessionListener);
+ }
+
+ void removeSession(CoreSession coreSession) {
+ synchronized (sessions) {
+ if (!sessions.remove(coreSession)) {
+ return;
+ }
+ sessions.notifyAll();
+ }
+ }
+
public final class CoreProtocolRegistration implements Registration {
private final ProtocolHandlerFactory protocolHandlerFactory;
@@ -166,153 +289,4 @@
}
}
}
-
- public final class UserEndpoint implements Endpoint {
-
- public ConcurrentMap<Object, Object> getAttributes() {
- return endpointMap;
- }
-
- public Session openSession(final URI uri, final AttributeMap attributeMap) throws RemotingException {
- if (uri == null) {
- throw new NullPointerException("uri is null");
- }
- if (attributeMap == null) {
- throw new NullPointerException("attributeMap is null");
- }
- final String scheme = uri.getScheme();
- if (scheme == null) {
- throw new RemotingException("No scheme on remote endpoint URI");
- }
- state.requireHold(State.UP);
- try {
- final CoreProtocolRegistration registration = protocolMap.get(scheme);
- if (registration == null) {
- throw new RemotingException("No handler available for URI scheme \"" + scheme + "\"");
- }
- final ProtocolHandlerFactory factory = registration.getProtocolHandlerFactory();
- try {
- final CoreSession session = new CoreSession(CoreEndpoint.this);
- session.initializeClient(factory, uri, attributeMap, createClient(rootListener));
- sessions.add(session);
- final Session userSession = session.getUserSession();
- for (final SessionListener listener : sessionListeners) {
- executor.execute(new Runnable() {
- public void run() {
- listener.handleSessionOpened(userSession);
- }
- });
- }
- return userSession;
- } catch (IOException e) {
- RemotingException rex = new RemotingException("Failed to create protocol handler: " + e.getMessage());
- rex.setStackTrace(e.getStackTrace());
- throw rex;
- }
- } finally {
- state.release();
- }
- }
-
- public ProtocolContext openIncomingSession(final ProtocolHandler handler) throws RemotingException {
- state.requireHold(State.UP);
- try {
- final CoreSession session = new CoreSession(CoreEndpoint.this);
- session.initializeServer(handler, createClient(rootListener));
- sessions.add(session);
- return session.getProtocolContext();
- } finally {
- state.release();
- }
- }
-
- public String getName() {
- return name;
- }
-
- public Registration registerProtocol(final String scheme, final ProtocolHandlerFactory protocolHandlerFactory) throws RemotingException, IllegalArgumentException {
- if (scheme == null) {
- throw new NullPointerException("scheme is null");
- }
- if (protocolHandlerFactory == null) {
- throw new NullPointerException("protocolHandlerFactory is null");
- }
- state.requireHold(State.UP);
- try {
- final CoreProtocolRegistration registration = new CoreProtocolRegistration(protocolHandlerFactory);
- protocolMap.put(scheme, registration);
- return registration;
- } finally {
- state.release();
- }
- }
-
- public <I, O> Client<I, O> createClient(RequestListener<I, O> requestListener) {
- final CoreInboundClient<I, O> inbound = new CoreInboundClient<I, O>(requestListener, executor);
- final CoreOutboundClient<I, O> outbound = new CoreOutboundClient<I, O>(executor);
- inbound.initialize(outbound.getContextClient());
- outbound.initialize(inbound.getClientResponder());
- return outbound.getUserContext();
- }
-
- public <I, O> ClientSource<I, O> createService(RequestListener<I, O> requestListener) {
- final CoreInboundService<I, O> inbound = new CoreInboundService<I, O>(requestListener, executor);
- final CoreOutboundService<I, O> outbound = new CoreOutboundService<I, O>(executor);
- inbound.initialize(outbound.getServiceClient());
- outbound.initialize(inbound.getServiceResponder());
- return outbound.getUserContextSource();
- }
-
- public void addSessionListener(final SessionListener sessionListener) {
- // TODO security check
- sessionListeners.add(sessionListener);
- }
-
- public void removeSessionListener(final SessionListener sessionListener) {
- // TODO security check
- sessionListeners.remove(sessionListener);
- }
-
- public void close() throws RemotingException {
- if (state.transitionHold(State.UP, State.DOWN)) try {
- Iterator<CloseHandler<Endpoint>> it = closeHandlers.iterator();
- while (it.hasNext()) {
- CloseHandler<Endpoint> handler = it.next();
- handler.handleClose(this);
- it.remove();
- }
- } finally {
- state.release();
- }
- }
-
- public void closeImmediate() throws RemotingException {
- if (state.transitionHold(State.UP, State.DOWN)) try {
- Iterator<CloseHandler<Endpoint>> it = closeHandlers.iterator();
- while (it.hasNext()) {
- CloseHandler<Endpoint> handler = it.next();
- handler.handleClose(this);
- it.remove();
- }
- } finally {
- state.release();
- }
- }
-
- public void addCloseHandler(final CloseHandler<Endpoint> closeHandler) {
- if (closeHandler == null) {
- throw new NullPointerException("closeHandler is null");
- }
- final State current = state.getStateHold();
- try {
- if (current != State.DOWN) {
- closeHandlers.add(closeHandler);
- return;
- }
- } finally {
- state.release();
- }
- closeHandler.handleClose(this);
- }
- }
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java 2008-04-16 12:52:42 UTC (rev 3997)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java 2008-04-16 18:08:27 UTC (rev 3998)
@@ -235,7 +235,7 @@
}
public String getLocalEndpointName() {
- return endpoint.getUserEndpoint().getName();
+ return endpoint.getName();
}
public String getRemoteEndpointName() {
@@ -312,7 +312,7 @@
}
public String getLocalEndpointName() {
- return endpoint.getUserEndpoint().getName();
+ return endpoint.getName();
}
public void receiveClientClose(ClientIdentifier remoteClientIdentifier, final boolean immediate, final boolean cancel, final boolean interrupt) {
Modified: remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppProtocolSupport.java
===================================================================
--- remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppProtocolSupport.java 2008-04-16 12:52:42 UTC (rev 3997)
+++ remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppProtocolSupport.java 2008-04-16 18:08:27 UTC (rev 3998)
@@ -5,6 +5,8 @@
import java.net.SocketAddress;
import java.net.URI;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoConnector;
@@ -73,7 +75,12 @@
// Lifecycle
+ private ExecutorService executorService;
+
public void create() throws RemotingException {
+ if (executor == null) {
+ executor = executorService = Executors.newCachedThreadPool();
+ }
ExceptionMonitor.setInstance(new ExceptionMonitor() {
public void exceptionCaught(final Throwable cause) {
// do nothing!
@@ -101,6 +108,10 @@
registration.unregister();
registration = null;
}
+ if (executorService != null) {
+ executorService.shutdown();
+ }
+ executor = executorService = null;
protocolHandlerFactory = null;
}
Modified: remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java
===================================================================
--- remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java 2008-04-16 12:52:42 UTC (rev 3997)
+++ remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java 2008-04-16 18:08:27 UTC (rev 3998)
@@ -10,6 +10,7 @@
import org.jboss.cx.remoting.RemoteExecutionException;
import org.jboss.cx.remoting.Remoting;
import org.jboss.cx.remoting.Session;
+import org.jboss.cx.remoting.jrpp.JrppServer;
import org.jboss.cx.remoting.core.security.sasl.Provider;
import org.jboss.cx.remoting.util.AttributeMap;
@@ -23,23 +24,27 @@
final StringRot13RequestListener listener = new StringRot13RequestListener();
final Endpoint endpoint = Remoting.createEndpoint("simple", listener);
try {
- Remoting.addJrppServer(endpoint, new InetSocketAddress(12345), AttributeMap.EMPTY);
- Session session = endpoint.openSession(new URI("jrpp://localhost:12345"), AttributeMap.EMPTY);
+ final JrppServer jrppServer = Remoting.addJrppServer(endpoint, new InetSocketAddress(12345), AttributeMap.EMPTY);
try {
- final Client<String,String> client = session.getRootClient();
+ Session session = endpoint.openSession(new URI("jrpp://localhost:12345"), AttributeMap.EMPTY);
try {
- final String original = "The Secret Message\n";
- final String result = client.invoke(original);
- System.out.printf("The secret message \"%s\" became \"%s\"!\n", original.trim(), result.trim());
+ final Client<String,String> client = session.getRootClient();
+ try {
+ final String original = "The Secret Message\n";
+ final String result = client.invoke(original);
+ System.out.printf("The secret message \"%s\" became \"%s\"!\n", original.trim(), result.trim());
+ } finally {
+ client.close();
+ }
} finally {
- client.close();
+ session.close();
}
} finally {
- session.close();
+ jrppServer.stop();
+ jrppServer.destroy();
}
} finally {
- endpoint.close();
+ Remoting.closeEndpoint(endpoint);
}
-
}
}
\ No newline at end of file
Modified: remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java
===================================================================
--- remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java 2008-04-16 12:52:42 UTC (rev 3997)
+++ remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java 2008-04-16 18:08:27 UTC (rev 3998)
@@ -13,6 +13,7 @@
import org.jboss.cx.remoting.RemoteExecutionException;
import org.jboss.cx.remoting.Remoting;
import org.jboss.cx.remoting.Session;
+import org.jboss.cx.remoting.jrpp.JrppServer;
import org.jboss.cx.remoting.core.security.sasl.Provider;
import org.jboss.cx.remoting.util.AttributeMap;
@@ -26,37 +27,42 @@
final StreamingRot13RequestListener listener = new StreamingRot13RequestListener();
final Endpoint endpoint = Remoting.createEndpoint("simple", listener);
try {
- Remoting.addJrppServer(endpoint, new InetSocketAddress(12345), AttributeMap.EMPTY);
- Session session = endpoint.openSession(new URI("jrpp://localhost:12345"), AttributeMap.EMPTY);
+ final JrppServer jrppServer = Remoting.addJrppServer(endpoint, new InetSocketAddress(12345), AttributeMap.EMPTY);
try {
- final Client<Reader,Reader> client = session.getRootClient();
+ Session session = endpoint.openSession(new URI("jrpp://localhost:12345"), AttributeMap.EMPTY);
try {
- final String original = "The Secret Message\n";
- final StringReader originalReader = new StringReader(original);
+ final Client<Reader,Reader> client = session.getRootClient();
try {
- final Reader reader = client.send(originalReader).get();
+ final String original = "The Secret Message\n";
+ final StringReader originalReader = new StringReader(original);
try {
- final BufferedReader bufferedReader = new BufferedReader(reader);
+ final Reader reader = client.send(originalReader).get();
try {
- final String secretLine = bufferedReader.readLine();
- System.out.printf("The secret message \"%s\" became \"%s\"!\n", original.trim(), secretLine);
+ final BufferedReader bufferedReader = new BufferedReader(reader);
+ try {
+ final String secretLine = bufferedReader.readLine();
+ System.out.printf("The secret message \"%s\" became \"%s\"!\n", original.trim(), secretLine);
+ } finally {
+ bufferedReader.close();
+ }
} finally {
- bufferedReader.close();
+ reader.close();
}
} finally {
- reader.close();
+ originalReader.close();
}
} finally {
- originalReader.close();
+ client.close();
}
} finally {
- client.close();
+ session.close();
}
} finally {
- session.close();
+ jrppServer.stop();
+ jrppServer.destroy();
}
} finally {
- endpoint.close();
+ Remoting.closeEndpoint(endpoint);
}
}
Modified: remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java
===================================================================
--- remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java 2008-04-16 12:52:42 UTC (rev 3997)
+++ remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java 2008-04-16 18:08:27 UTC (rev 3998)
@@ -27,7 +27,7 @@
client.close();
}
} finally {
- endpoint.close();
+ Remoting.closeEndpoint(endpoint);
}
}
}
\ No newline at end of file
Modified: remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java
===================================================================
--- remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java 2008-04-16 12:52:42 UTC (rev 3997)
+++ remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java 2008-04-16 18:08:27 UTC (rev 3998)
@@ -45,7 +45,7 @@
client.close();
}
} finally {
- endpoint.close();
+ Remoting.closeEndpoint(endpoint);
}
}
}
Modified: remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
===================================================================
--- remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-04-16 12:52:42 UTC (rev 3997)
+++ remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-04-16 18:08:27 UTC (rev 3998)
@@ -1,10 +1,8 @@
package org.jboss.cx.remoting;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ConcurrentMap;
import org.jboss.cx.remoting.core.CoreEndpoint;
import org.jboss.cx.remoting.core.protocol.LocalProtocolHandlerFactory;
import org.jboss.cx.remoting.jrpp.JrppProtocolSupport;
@@ -21,51 +19,78 @@
private static final String JRPP_SUPPORT_KEY = "org.jboss.cx.remoting.standalone.jrpp.support";
public static <I, O> Endpoint createEndpoint(String name, RequestListener<I, O> listener) throws IOException {
- final CoreEndpoint coreEndpoint = new CoreEndpoint(name, listener);
- final ExecutorService executorService = Executors.newCachedThreadPool();
- coreEndpoint.setExecutor(executorService);
- coreEndpoint.start();
boolean ok = false;
+ final CoreEndpoint coreEndpoint = new CoreEndpoint();
+ coreEndpoint.setName(name);
+ coreEndpoint.setRootListener(listener);
+ coreEndpoint.create();
try {
- final Endpoint userEndpoint = coreEndpoint.getUserEndpoint();
- LocalProtocolHandlerFactory.addTo(userEndpoint);
- final JrppProtocolSupport jrppProtocolSupport = new JrppProtocolSupport();
- jrppProtocolSupport.setEndpoint(userEndpoint);
- jrppProtocolSupport.setExecutor(executorService);
- jrppProtocolSupport.create();
- jrppProtocolSupport.start();
- userEndpoint.getAttributes().put(JRPP_SUPPORT_KEY, jrppProtocolSupport);
- userEndpoint.addCloseHandler(new CloseHandler<Endpoint>() {
- public void handleClose(final Endpoint closed) {
- executorService.shutdown();
+ coreEndpoint.start();
+ try {
+ LocalProtocolHandlerFactory.addTo(coreEndpoint);
+ final JrppProtocolSupport jrppProtocolSupport = new JrppProtocolSupport();
+ jrppProtocolSupport.setEndpoint(coreEndpoint);
+ jrppProtocolSupport.create();
+ try {
+ jrppProtocolSupport.start();
+ try {
+ final ConcurrentMap<Object, Object> attributes = coreEndpoint.getAttributes();
+ attributes.put(JRPP_SUPPORT_KEY, jrppProtocolSupport);
+ ok = true;
+ return coreEndpoint;
+ } finally {
+ if (! ok) {
+ jrppProtocolSupport.stop();
+ }
+ }
+ } finally {
+ if (! ok) {
+ jrppProtocolSupport.destroy();
+ }
}
- });
- return userEndpoint;
+ } finally {
+ if (! ok) {
+ coreEndpoint.stop();
+ }
+ }
} finally {
if (! ok) {
- coreEndpoint.stop();
+ coreEndpoint.destroy();
}
}
}
+ public static void closeEndpoint(Endpoint endpoint) {
+ if (endpoint instanceof CoreEndpoint) {
+ final CoreEndpoint coreEndpoint = (CoreEndpoint) endpoint;
+ final ConcurrentMap<Object, Object> attributes = coreEndpoint.getAttributes();
+ final JrppProtocolSupport jrppProtocolSupport = (JrppProtocolSupport) attributes.get(JRPP_SUPPORT_KEY);
+ coreEndpoint.stop();
+ coreEndpoint.destroy();
+ if (jrppProtocolSupport != null) {
+ jrppProtocolSupport.stop();
+ jrppProtocolSupport.destroy();
+ }
+ }
+ }
+
public static JrppServer addJrppServer(Endpoint endpoint, SocketAddress address, AttributeMap attributeMap) throws IOException {
+ boolean ok = false;
final JrppServer jrppServer = new JrppServer();
jrppServer.setProtocolSupport((JrppProtocolSupport) endpoint.getAttributes().get(JRPP_SUPPORT_KEY));
- jrppServer.setSocketAddress(new InetSocketAddress(12345));
- jrppServer.setAttributeMap(AttributeMap.EMPTY);
+ jrppServer.setSocketAddress(address);
+ jrppServer.setAttributeMap(attributeMap);
jrppServer.setEndpoint(endpoint);
jrppServer.create();
- jrppServer.start();
- endpoint.addCloseHandler(new CloseHandler<Endpoint>() {
- public void handleClose(final Endpoint closed) {
- try {
- jrppServer.stop();
- } finally {
- jrppServer.destroy();
- }
+ try {
+ jrppServer.start();
+ ok = true;
+ return jrppServer;
+ } finally {
+ if (! ok) {
+ jrppServer.destroy();
}
- });
- return jrppServer;
+ }
}
// privates
More information about the jboss-remoting-commits
mailing list