[jboss-remoting-commits] JBoss Remoting SVN: r3998 - in remoting3/trunk: api/src/main/java/org/jboss/cx/remoting/spi/wrapper and 4 other directories.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Wed Apr 16 14:08:28 EDT 2008


Author: david.lloyd at jboss.com
Date: 2008-04-16 14:08:27 -0400 (Wed, 16 Apr 2008)
New Revision: 3998

Modified:
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
   remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppProtocolSupport.java
   remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java
   remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java
   remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java
   remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java
   remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
Log:
"bean"-ify Endpoint - now an endpoint must be maintained by a container or by the Remoting utility class

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java	2008-04-16 12:52:42 UTC (rev 3997)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java	2008-04-16 18:08:27 UTC (rev 3998)
@@ -11,7 +11,7 @@
 /**
  * A potential participant in a JBoss Remoting communications relationship.
  */
-public interface Endpoint extends Closeable<Endpoint> {
+public interface Endpoint {
     /**
      * Get the endpoint attribute map.  This is a storage area for any data associated with this endpoint, including
      * (but not limited to) connection and protocol information, and application information.

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java	2008-04-16 12:52:42 UTC (rev 3997)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java	2008-04-16 18:08:27 UTC (rev 3998)
@@ -2,7 +2,6 @@
 
 import java.net.URI;
 import java.util.concurrent.ConcurrentMap;
-import org.jboss.cx.remoting.CloseHandler;
 import org.jboss.cx.remoting.Client;
 import org.jboss.cx.remoting.ClientSource;
 import org.jboss.cx.remoting.Endpoint;
@@ -61,20 +60,4 @@
     public void removeSessionListener(final SessionListener sessionListener) {
         delegate.removeSessionListener(sessionListener);
     }
-
-    public void close() throws RemotingException {
-        delegate.close();
-    }
-
-    public void closeImmediate() throws RemotingException {
-        delegate.closeImmediate();
-    }
-
-    public void addCloseHandler(final CloseHandler<Endpoint> closeHandler) {
-        delegate.addCloseHandler(new CloseHandler<Endpoint>() {
-            public void handleClose(final Endpoint closed) {
-                closeHandler.handleClose(EndpointWrapper.this);
-            }
-        });
-    }
 }

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java	2008-04-16 12:52:42 UTC (rev 3997)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java	2008-04-16 18:08:27 UTC (rev 3998)
@@ -2,15 +2,12 @@
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.Iterator;
 import java.util.LinkedHashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import org.jboss.cx.remoting.CloseHandler;
 import org.jboss.cx.remoting.Client;
 import org.jboss.cx.remoting.ClientSource;
 import org.jboss.cx.remoting.Endpoint;
@@ -38,19 +35,10 @@
 /**
  *
  */
-public final class CoreEndpoint {
+public class CoreEndpoint implements Endpoint {
 
-    private final String name;
-    private final RequestListener<?, ?> rootListener;
-    private final Endpoint userEndpoint = new UserEndpoint();
-    private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.INITIAL);
-    private final Set<SessionListener> sessionListeners = CollectionUtil.synchronizedSet(new LinkedHashSet<SessionListener>());
-
-    private OrderedExecutorFactory orderedExecutorFactory;
-    private Executor executor;
-    private ExecutorService executorService;
-
     static {
+        // Print Remoting "greeting" message
         Logger.getLogger("org.jboss.cx.remoting").info("JBoss Remoting version %s", Version.VERSION);
     }
 
@@ -60,51 +48,72 @@
         DOWN;
 
         public boolean isReachable(final State dest) {
-            switch (this) {
-                case INITIAL:
-                    return dest != INITIAL;
-                case UP:
-                    return dest == DOWN;
-                default:
-                    return false;
-            }
+            return compareTo(dest) < 0;
         }
     }
 
-    public CoreEndpoint(final String name, final RequestListener<?, ?> rootListener) {
-        this.name = name;
-        this.rootListener = rootListener;
-    }
+    private String name;
+    private RequestListener<?, ?> rootListener;
 
+    private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.INITIAL);
+    private final Set<SessionListener> sessionListeners = CollectionUtil.synchronizedSet(new LinkedHashSet<SessionListener>());
+
+    private OrderedExecutorFactory orderedExecutorFactory;
+    private ExecutorService executorService;
+
     private final ConcurrentMap<Object, Object> endpointMap = CollectionUtil.concurrentMap();
     private final ConcurrentMap<String, CoreProtocolRegistration> protocolMap = CollectionUtil.concurrentMap();
     private final Set<CoreSession> sessions = CollectionUtil.synchronizedSet(CollectionUtil.<CoreSession>hashSet());
-    // accesses protected by {@code shutdownListeners} - always lock AFTER {@code state}
-    private final List<CloseHandler<Endpoint>> closeHandlers = CollectionUtil.synchronizedArrayList();
 
+    public CoreEndpoint() {
+    }
+
+    // Dependencies
+
+    private Executor executor;
+
     public Executor getExecutor() {
         return executor;
     }
 
+    public Executor getOrderedExecutor() {
+        return orderedExecutorFactory.getOrderedExecutor();
+    }
+
     public void setExecutor(final Executor executor) {
         this.executor = executor;
         orderedExecutorFactory = new OrderedExecutorFactory(executor);
     }
 
-    public Endpoint getUserEndpoint() {
-        return userEndpoint;
+    // Configuration
+
+    public void setName(final String name) {
+        this.name = name;
     }
 
-    void removeSession(CoreSession coreSession) {
-        synchronized (sessions) {
-            if (!sessions.remove(coreSession)) {
-                return;
-            }
-            sessions.notifyAll();
+    public String getName() {
+        return name;
+    }
+
+    public void setRootListener(final RequestListener<?, ?> rootListener) {
+        this.rootListener = rootListener;
+    }
+
+    public RequestListener<?, ?> getRootListener() {
+        return rootListener;
+    }
+
+    // Lifecycle
+
+    public void create() {
+        // todo security check
+        if (rootListener == null) {
+            throw new NullPointerException("rootListener is null");
         }
     }
 
     public void start() {
+        // todo security check
         if (executor == null) {
             executorService = Executors.newCachedThreadPool();
             setExecutor(executorService);
@@ -113,6 +122,7 @@
     }
 
     public void stop() {
+        // todo security check
         if (executorService != null) {
             executorService.shutdown();
             executorService = null;
@@ -120,10 +130,123 @@
         // todo
     }
 
-    Executor getOrderedExecutor() {
-        return orderedExecutorFactory.getOrderedExecutor();
+    public void destroy() {
+        rootListener = null;
+        executor = null;
     }
 
+
+    // Endpoint implementation
+
+    public ConcurrentMap<Object, Object> getAttributes() {
+        return endpointMap;
+    }
+
+    public Session openSession(final URI uri, final AttributeMap attributeMap) throws RemotingException {
+        if (uri == null) {
+            throw new NullPointerException("uri is null");
+        }
+        if (attributeMap == null) {
+            throw new NullPointerException("attributeMap is null");
+        }
+        final String scheme = uri.getScheme();
+        if (scheme == null) {
+            throw new RemotingException("No scheme on remote endpoint URI");
+        }
+        state.requireHold(State.UP);
+        try {
+            final CoreProtocolRegistration registration = protocolMap.get(scheme);
+            if (registration == null) {
+                throw new RemotingException("No handler available for URI scheme \"" + scheme + "\"");
+            }
+            final ProtocolHandlerFactory factory = registration.getProtocolHandlerFactory();
+            try {
+                final CoreSession session = new CoreSession(CoreEndpoint.this);
+                session.initializeClient(factory, uri, attributeMap, createClient(rootListener));
+                sessions.add(session);
+                final Session userSession = session.getUserSession();
+                for (final SessionListener listener : sessionListeners) {
+                    executor.execute(new Runnable() {
+                        public void run() {
+                            listener.handleSessionOpened(userSession);
+                        }
+                    });
+                }
+                return userSession;
+            } catch (IOException e) {
+                RemotingException rex = new RemotingException("Failed to create protocol handler: " + e.getMessage());
+                rex.setStackTrace(e.getStackTrace());
+                throw rex;
+            }
+        } finally {
+            state.release();
+        }
+    }
+
+    public ProtocolContext openIncomingSession(final ProtocolHandler handler) throws RemotingException {
+        state.requireHold(State.UP);
+        try {
+            final CoreSession session = new CoreSession(CoreEndpoint.this);
+            session.initializeServer(handler, createClient(rootListener));
+            sessions.add(session);
+            return session.getProtocolContext();
+        } finally {
+            state.release();
+        }
+    }
+
+    public Registration registerProtocol(final String scheme, final ProtocolHandlerFactory protocolHandlerFactory) throws RemotingException, IllegalArgumentException {
+        if (scheme == null) {
+            throw new NullPointerException("scheme is null");
+        }
+        if (protocolHandlerFactory == null) {
+            throw new NullPointerException("protocolHandlerFactory is null");
+        }
+        state.requireHold(State.UP);
+        try {
+            final CoreProtocolRegistration registration = new CoreProtocolRegistration(protocolHandlerFactory);
+            protocolMap.put(scheme, registration);
+            return registration;
+        } finally {
+            state.release();
+        }
+    }
+
+    public <I, O> Client<I, O> createClient(RequestListener<I, O> requestListener) {
+        final CoreInboundClient<I, O> inbound = new CoreInboundClient<I, O>(requestListener, executor);
+        final CoreOutboundClient<I, O> outbound = new CoreOutboundClient<I, O>(executor);
+        inbound.initialize(outbound.getContextClient());
+        outbound.initialize(inbound.getClientResponder());
+        return outbound.getUserContext();
+    }
+
+    public <I, O> ClientSource<I, O> createService(RequestListener<I, O> requestListener) {
+        final CoreInboundService<I, O> inbound = new CoreInboundService<I, O>(requestListener, executor);
+        final CoreOutboundService<I, O> outbound = new CoreOutboundService<I, O>(executor);
+        inbound.initialize(outbound.getServiceClient());
+        outbound.initialize(inbound.getServiceResponder());
+        return outbound.getUserContextSource();
+    }
+
+    public void addSessionListener(final SessionListener sessionListener) {
+        // TODO security check
+        sessionListeners.add(sessionListener);
+    }
+
+    public void removeSessionListener(final SessionListener sessionListener) {
+        // TODO security check
+        sessionListeners.remove(sessionListener);
+    }
+
+    void removeSession(CoreSession coreSession) {
+        synchronized (sessions) {
+            if (!sessions.remove(coreSession)) {
+                return;
+            }
+            sessions.notifyAll();
+        }
+    }
+
     public final class CoreProtocolRegistration implements Registration {
         private final ProtocolHandlerFactory protocolHandlerFactory;
 
@@ -166,153 +289,4 @@
             }
         }
     }
-
-    public final class UserEndpoint implements Endpoint {
-
-        public ConcurrentMap<Object, Object> getAttributes() {
-            return endpointMap;
-        }
-
-        public Session openSession(final URI uri, final AttributeMap attributeMap) throws RemotingException {
-            if (uri == null) {
-                throw new NullPointerException("uri is null");
-            }
-            if (attributeMap == null) {
-                throw new NullPointerException("attributeMap is null");
-            }
-            final String scheme = uri.getScheme();
-            if (scheme == null) {
-                throw new RemotingException("No scheme on remote endpoint URI");
-            }
-            state.requireHold(State.UP);
-            try {
-                final CoreProtocolRegistration registration = protocolMap.get(scheme);
-                if (registration == null) {
-                    throw new RemotingException("No handler available for URI scheme \"" + scheme + "\"");
-                }
-                final ProtocolHandlerFactory factory = registration.getProtocolHandlerFactory();
-                try {
-                    final CoreSession session = new CoreSession(CoreEndpoint.this);
-                    session.initializeClient(factory, uri, attributeMap, createClient(rootListener));
-                    sessions.add(session);
-                    final Session userSession = session.getUserSession();
-                    for (final SessionListener listener : sessionListeners) {
-                        executor.execute(new Runnable() {
-                            public void run() {
-                                listener.handleSessionOpened(userSession);
-                            }
-                        });
-                    }
-                    return userSession;
-                } catch (IOException e) {
-                    RemotingException rex = new RemotingException("Failed to create protocol handler: " + e.getMessage());
-                    rex.setStackTrace(e.getStackTrace());
-                    throw rex;
-                }
-            } finally {
-                state.release();
-            }
-        }
-
-        public ProtocolContext openIncomingSession(final ProtocolHandler handler) throws RemotingException {
-            state.requireHold(State.UP);
-            try {
-                final CoreSession session = new CoreSession(CoreEndpoint.this);
-                session.initializeServer(handler, createClient(rootListener));
-                sessions.add(session);
-                return session.getProtocolContext();
-            } finally {
-                state.release();
-            }
-        }
-
-        public String getName() {
-            return name;
-        }
-
-        public Registration registerProtocol(final String scheme, final ProtocolHandlerFactory protocolHandlerFactory) throws RemotingException, IllegalArgumentException {
-            if (scheme == null) {
-                throw new NullPointerException("scheme is null");
-            }
-            if (protocolHandlerFactory == null) {
-                throw new NullPointerException("protocolHandlerFactory is null");
-            }
-            state.requireHold(State.UP);
-            try {
-                final CoreProtocolRegistration registration = new CoreProtocolRegistration(protocolHandlerFactory);
-                protocolMap.put(scheme, registration);
-                return registration;
-            } finally {
-                state.release();
-            }
-        }
-
-        public <I, O> Client<I, O> createClient(RequestListener<I, O> requestListener) {
-            final CoreInboundClient<I, O> inbound = new CoreInboundClient<I, O>(requestListener, executor);
-            final CoreOutboundClient<I, O> outbound = new CoreOutboundClient<I, O>(executor);
-            inbound.initialize(outbound.getContextClient());
-            outbound.initialize(inbound.getClientResponder());
-            return outbound.getUserContext();
-        }
-
-        public <I, O> ClientSource<I, O> createService(RequestListener<I, O> requestListener) {
-            final CoreInboundService<I, O> inbound = new CoreInboundService<I, O>(requestListener, executor);
-            final CoreOutboundService<I, O> outbound = new CoreOutboundService<I, O>(executor);
-            inbound.initialize(outbound.getServiceClient());
-            outbound.initialize(inbound.getServiceResponder());
-            return outbound.getUserContextSource();
-        }
-
-        public void addSessionListener(final SessionListener sessionListener) {
-            // TODO security check
-            sessionListeners.add(sessionListener);
-        }
-
-        public void removeSessionListener(final SessionListener sessionListener) {
-            // TODO security check
-            sessionListeners.remove(sessionListener);
-        }
-
-        public void close() throws RemotingException {
-            if (state.transitionHold(State.UP, State.DOWN)) try {
-                Iterator<CloseHandler<Endpoint>> it = closeHandlers.iterator();
-                while (it.hasNext()) {
-                    CloseHandler<Endpoint> handler = it.next();
-                    handler.handleClose(this);
-                    it.remove();
-                }
-            } finally {
-                state.release();
-            }
-        }
-
-        public void closeImmediate() throws RemotingException {
-            if (state.transitionHold(State.UP, State.DOWN)) try {
-                Iterator<CloseHandler<Endpoint>> it = closeHandlers.iterator();
-                while (it.hasNext()) {
-                    CloseHandler<Endpoint> handler = it.next();
-                    handler.handleClose(this);
-                    it.remove();
-                }
-            } finally {
-                state.release();
-            }
-        }
-
-        public void addCloseHandler(final CloseHandler<Endpoint> closeHandler) {
-            if (closeHandler == null) {
-                throw new NullPointerException("closeHandler is null");
-            }
-            final State current = state.getStateHold();
-            try {
-                if (current != State.DOWN) {
-                    closeHandlers.add(closeHandler);
-                    return;
-                }
-            } finally {
-                state.release();
-            }
-            closeHandler.handleClose(this);
-        }
-    }
 }

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java	2008-04-16 12:52:42 UTC (rev 3997)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java	2008-04-16 18:08:27 UTC (rev 3998)
@@ -235,7 +235,7 @@
         }
 
         public String getLocalEndpointName() {
-            return endpoint.getUserEndpoint().getName();
+            return endpoint.getName();
         }
 
         public String getRemoteEndpointName() {
@@ -312,7 +312,7 @@
         }
 
         public String getLocalEndpointName() {
-            return endpoint.getUserEndpoint().getName();
+            return endpoint.getName();
         }
 
         public void receiveClientClose(ClientIdentifier remoteClientIdentifier, final boolean immediate, final boolean cancel, final boolean interrupt) {

Modified: remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppProtocolSupport.java
===================================================================
--- remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppProtocolSupport.java	2008-04-16 12:52:42 UTC (rev 3997)
+++ remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppProtocolSupport.java	2008-04-16 18:08:27 UTC (rev 3998)
@@ -5,6 +5,8 @@
 import java.net.SocketAddress;
 import java.net.URI;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoConnector;
@@ -73,7 +75,12 @@
 
     // Lifecycle
 
+    private ExecutorService executorService;
+
     public void create() throws RemotingException {
+        if (executor == null) {
+            executor = executorService = Executors.newCachedThreadPool();
+        }
         ExceptionMonitor.setInstance(new ExceptionMonitor() {
             public void exceptionCaught(final Throwable cause) {
                 // do nothing!
@@ -101,6 +108,10 @@
             registration.unregister();
             registration = null;
         }
+        if (executorService != null) {
+            executorService.shutdown();
+        }
+        executor = executorService = null;
         protocolHandlerFactory = null;
     }
 

Modified: remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java
===================================================================
--- remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java	2008-04-16 12:52:42 UTC (rev 3997)
+++ remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java	2008-04-16 18:08:27 UTC (rev 3998)
@@ -10,6 +10,7 @@
 import org.jboss.cx.remoting.RemoteExecutionException;
 import org.jboss.cx.remoting.Remoting;
 import org.jboss.cx.remoting.Session;
+import org.jboss.cx.remoting.jrpp.JrppServer;
 import org.jboss.cx.remoting.core.security.sasl.Provider;
 import org.jboss.cx.remoting.util.AttributeMap;
 
@@ -23,23 +24,27 @@
         final StringRot13RequestListener listener = new StringRot13RequestListener();
         final Endpoint endpoint = Remoting.createEndpoint("simple", listener);
         try {
-            Remoting.addJrppServer(endpoint, new InetSocketAddress(12345), AttributeMap.EMPTY);
-            Session session = endpoint.openSession(new URI("jrpp://localhost:12345"), AttributeMap.EMPTY);
+            final JrppServer jrppServer = Remoting.addJrppServer(endpoint, new InetSocketAddress(12345), AttributeMap.EMPTY);
             try {
-                final Client<String,String> client = session.getRootClient();
+                Session session = endpoint.openSession(new URI("jrpp://localhost:12345"), AttributeMap.EMPTY);
                 try {
-                    final String original = "The Secret Message\n";
-                    final String result = client.invoke(original);
-                    System.out.printf("The secret message \"%s\" became \"%s\"!\n", original.trim(), result.trim());
+                    final Client<String,String> client = session.getRootClient();
+                    try {
+                        final String original = "The Secret Message\n";
+                        final String result = client.invoke(original);
+                        System.out.printf("The secret message \"%s\" became \"%s\"!\n", original.trim(), result.trim());
+                    } finally {
+                        client.close();
+                    }
                 } finally {
-                    client.close();
+                    session.close();
                 }
             } finally {
-                session.close();
+                jrppServer.stop();
+                jrppServer.destroy();
             }
         } finally {
-            endpoint.close();
+            Remoting.closeEndpoint(endpoint);
         }
-
     }
 }
\ No newline at end of file

Modified: remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java
===================================================================
--- remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java	2008-04-16 12:52:42 UTC (rev 3997)
+++ remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java	2008-04-16 18:08:27 UTC (rev 3998)
@@ -13,6 +13,7 @@
 import org.jboss.cx.remoting.RemoteExecutionException;
 import org.jboss.cx.remoting.Remoting;
 import org.jboss.cx.remoting.Session;
+import org.jboss.cx.remoting.jrpp.JrppServer;
 import org.jboss.cx.remoting.core.security.sasl.Provider;
 import org.jboss.cx.remoting.util.AttributeMap;
 
@@ -26,37 +27,42 @@
         final StreamingRot13RequestListener listener = new StreamingRot13RequestListener();
         final Endpoint endpoint = Remoting.createEndpoint("simple", listener);
         try {
-            Remoting.addJrppServer(endpoint, new InetSocketAddress(12345), AttributeMap.EMPTY);
-            Session session = endpoint.openSession(new URI("jrpp://localhost:12345"), AttributeMap.EMPTY);
+            final JrppServer jrppServer = Remoting.addJrppServer(endpoint, new InetSocketAddress(12345), AttributeMap.EMPTY);
             try {
-                final Client<Reader,Reader> client = session.getRootClient();
+                Session session = endpoint.openSession(new URI("jrpp://localhost:12345"), AttributeMap.EMPTY);
                 try {
-                    final String original = "The Secret Message\n";
-                    final StringReader originalReader = new StringReader(original);
+                    final Client<Reader,Reader> client = session.getRootClient();
                     try {
-                        final Reader reader = client.send(originalReader).get();
+                        final String original = "The Secret Message\n";
+                        final StringReader originalReader = new StringReader(original);
                         try {
-                            final BufferedReader bufferedReader = new BufferedReader(reader);
+                            final Reader reader = client.send(originalReader).get();
                             try {
-                                final String secretLine = bufferedReader.readLine();
-                                System.out.printf("The secret message \"%s\" became \"%s\"!\n", original.trim(), secretLine);
+                                final BufferedReader bufferedReader = new BufferedReader(reader);
+                                try {
+                                    final String secretLine = bufferedReader.readLine();
+                                    System.out.printf("The secret message \"%s\" became \"%s\"!\n", original.trim(), secretLine);
+                                } finally {
+                                    bufferedReader.close();
+                                }
                             } finally {
-                                bufferedReader.close();
+                                reader.close();
                             }
                         } finally {
-                            reader.close();
+                            originalReader.close();
                         }
                     } finally {
-                        originalReader.close();
+                        client.close();
                     }
                 } finally {
-                    client.close();
+                    session.close();
                 }
             } finally {
-                session.close();
+                jrppServer.stop();
+                jrppServer.destroy();
             }
         } finally {
-            endpoint.close();
+            Remoting.closeEndpoint(endpoint);
         }
 
     }

Modified: remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java
===================================================================
--- remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java	2008-04-16 12:52:42 UTC (rev 3997)
+++ remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java	2008-04-16 18:08:27 UTC (rev 3998)
@@ -27,7 +27,7 @@
                 client.close();
             }
         } finally {
-            endpoint.close();
+            Remoting.closeEndpoint(endpoint);
         }
     }
 }
\ No newline at end of file

Modified: remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java
===================================================================
--- remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java	2008-04-16 12:52:42 UTC (rev 3997)
+++ remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java	2008-04-16 18:08:27 UTC (rev 3998)
@@ -45,7 +45,7 @@
                 client.close();
             }
         } finally {
-            endpoint.close();
+            Remoting.closeEndpoint(endpoint);
         }
     }
 }

Modified: remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
===================================================================
--- remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java	2008-04-16 12:52:42 UTC (rev 3997)
+++ remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java	2008-04-16 18:08:27 UTC (rev 3998)
@@ -1,10 +1,8 @@
 package org.jboss.cx.remoting;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ConcurrentMap;
 import org.jboss.cx.remoting.core.CoreEndpoint;
 import org.jboss.cx.remoting.core.protocol.LocalProtocolHandlerFactory;
 import org.jboss.cx.remoting.jrpp.JrppProtocolSupport;
@@ -21,51 +19,78 @@
     private static final String JRPP_SUPPORT_KEY = "org.jboss.cx.remoting.standalone.jrpp.support";
 
     public static <I, O> Endpoint createEndpoint(String name, RequestListener<I, O> listener) throws IOException {
-        final CoreEndpoint coreEndpoint = new CoreEndpoint(name, listener);
-        final ExecutorService executorService = Executors.newCachedThreadPool();
-        coreEndpoint.setExecutor(executorService);
-        coreEndpoint.start();
         boolean ok = false;
+        final CoreEndpoint coreEndpoint = new CoreEndpoint();
+        coreEndpoint.setName(name);
+        coreEndpoint.setRootListener(listener);
+        coreEndpoint.create();
         try {
-            final Endpoint userEndpoint = coreEndpoint.getUserEndpoint();
-            LocalProtocolHandlerFactory.addTo(userEndpoint);
-            final JrppProtocolSupport jrppProtocolSupport = new JrppProtocolSupport();
-            jrppProtocolSupport.setEndpoint(userEndpoint);
-            jrppProtocolSupport.setExecutor(executorService);
-            jrppProtocolSupport.create();
-            jrppProtocolSupport.start();
-            userEndpoint.getAttributes().put(JRPP_SUPPORT_KEY, jrppProtocolSupport);
-            userEndpoint.addCloseHandler(new CloseHandler<Endpoint>() {
-                public void handleClose(final Endpoint closed) {
-                    executorService.shutdown();
+            coreEndpoint.start();
+            try {
+                LocalProtocolHandlerFactory.addTo(coreEndpoint);
+                final JrppProtocolSupport jrppProtocolSupport = new JrppProtocolSupport();
+                jrppProtocolSupport.setEndpoint(coreEndpoint);
+                jrppProtocolSupport.create();
+                try {
+                    jrppProtocolSupport.start();
+                    try {
+                        final ConcurrentMap<Object, Object> attributes = coreEndpoint.getAttributes();
+                        attributes.put(JRPP_SUPPORT_KEY, jrppProtocolSupport);
+                        ok = true;
+                        return coreEndpoint;
+                    } finally {
+                        if (! ok) {
+                            jrppProtocolSupport.stop();
+                        }
+                    }
+                } finally {
+                    if (! ok) {
+                        jrppProtocolSupport.destroy();
+                    }
                 }
-            });
-            return userEndpoint;
+            } finally {
+                if (! ok) {
+                    coreEndpoint.stop();
+                }
+            }
         } finally {
             if (! ok) {
-                coreEndpoint.stop();
+                coreEndpoint.destroy();
             }
         }
     }
 
+    public static void closeEndpoint(Endpoint endpoint) {
+        if (endpoint instanceof CoreEndpoint) {
+            final CoreEndpoint coreEndpoint = (CoreEndpoint) endpoint;
+            final ConcurrentMap<Object, Object> attributes = coreEndpoint.getAttributes();
+            final JrppProtocolSupport jrppProtocolSupport = (JrppProtocolSupport) attributes.get(JRPP_SUPPORT_KEY);
+            coreEndpoint.stop();
+            coreEndpoint.destroy();
+            if (jrppProtocolSupport != null) {
+                jrppProtocolSupport.stop();
+                jrppProtocolSupport.destroy();
+            }
+        }
+    }
+
     public static JrppServer addJrppServer(Endpoint endpoint, SocketAddress address, AttributeMap attributeMap) throws IOException {
+        boolean ok = false;
         final JrppServer jrppServer = new JrppServer();
         jrppServer.setProtocolSupport((JrppProtocolSupport) endpoint.getAttributes().get(JRPP_SUPPORT_KEY));
-        jrppServer.setSocketAddress(new InetSocketAddress(12345));
-        jrppServer.setAttributeMap(AttributeMap.EMPTY);
+        jrppServer.setSocketAddress(address);
+        jrppServer.setAttributeMap(attributeMap);
         jrppServer.setEndpoint(endpoint);
         jrppServer.create();
-        jrppServer.start();
-        endpoint.addCloseHandler(new CloseHandler<Endpoint>() {
-            public void handleClose(final Endpoint closed) {
-                try {
-                    jrppServer.stop();
-                } finally {
-                    jrppServer.destroy();
-                }
+        try {
+            jrppServer.start();
+            ok = true;
+            return jrppServer;
+        } finally {
+            if (! ok) {
+                jrppServer.destroy();
             }
-        });
-        return jrppServer;
+        }
     }
 
     // privates




More information about the jboss-remoting-commits mailing list