[jboss-remoting-commits] JBoss Remoting SVN: r3634 - in remoting3/trunk: standalone/src/main/java/org/jboss/cx/remoting and 1 other directory.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Thu Mar 13 17:05:07 EDT 2008


Author: david.lloyd at jboss.com
Date: 2008-03-13 17:05:07 -0400 (Thu, 13 Mar 2008)
New Revision: 3634

Modified:
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java
   remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
Log:
Start of basic lifecycle stuff for endpoints

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java	2008-03-13 21:04:31 UTC (rev 3633)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java	2008-03-13 21:05:07 UTC (rev 3634)
@@ -4,6 +4,7 @@
 import java.net.URI;
 import java.util.List;
 import java.util.Set;
+import java.util.Iterator;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
@@ -40,47 +41,41 @@
 
     private final String name;
     private final Endpoint userEndpoint = new UserEndpoint();
-    private final OrderedExecutorFactory orderedExecutorFactory;
-    private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.UP);
-    private final Executor executor;
-    private final RequestListener<?, ?> rootRequestListener;
+    private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.INITIAL);
 
+    private OrderedExecutorFactory orderedExecutorFactory;
+    private Executor executor;
+
     static {
         Logger.getLogger("org.jboss.cx.remoting").info("JBoss Remoting version %s", Version.VERSION);
     }
 
     private enum State {
+        INITIAL,
         UP,
         DOWN,
     }
 
-    protected CoreEndpoint(final String name, final RequestListener<?, ?> rootRequestListener) {
+    public CoreEndpoint(final String name) {
         this.name = name;
-        // todo - make this configurable
-        executor = Executors.newCachedThreadPool();
-        orderedExecutorFactory = new OrderedExecutorFactory(executor);
-        this.rootRequestListener = rootRequestListener;
     }
 
     private final ConcurrentMap<Object, Object> endpointMap = CollectionUtil.concurrentMap();
     private final ConcurrentMap<String, CoreProtocolRegistration> protocolMap = CollectionUtil.concurrentMap();
     private final Set<CoreSession> sessions = CollectionUtil.synchronizedSet(CollectionUtil.<CoreSession>hashSet());
     // accesses protected by {@code shutdownListeners} - always lock AFTER {@code state}
-    private final List<CloseHandler<Endpoint>> closeHandlers = CollectionUtil.arrayList();
+    private final List<CloseHandler<Endpoint>> closeHandlers = CollectionUtil.synchronizedArrayList();
 
-    ConcurrentMap<Object, Object> getAttributes() {
-        return endpointMap;
-    }
-
-    Executor getExecutor() {
+    public Executor getExecutor() {
         return executor;
     }
 
-    Executor getOrderedExecutor() {
-        return orderedExecutorFactory.getOrderedExecutor();
+    public void setExecutor(final Executor executor) {
+        this.executor = executor;
+        orderedExecutorFactory = new OrderedExecutorFactory(executor);
     }
 
-    Endpoint getUserEndpoint() {
+    public Endpoint getUserEndpoint() {
         return userEndpoint;
     }
 
@@ -89,6 +84,14 @@
         sessions.notifyAll();
     }
 
+    public void start() {
+        state.requireTransition(State.INITIAL, State.UP);
+    }
+
+    Executor getOrderedExecutor() {
+        return orderedExecutorFactory.getOrderedExecutor();
+    }
+
     public final class CoreProtocolServerContext implements ProtocolServerContext {
         private CoreProtocolServerContext() {
         }
@@ -219,15 +222,45 @@
         }
 
         public void close() throws RemotingException {
-            // todo ...
+            if (state.transitionHold(State.UP, State.DOWN)) try {
+                Iterator<CloseHandler<Endpoint>> it = closeHandlers.iterator();
+                while (it.hasNext()) {
+                    CloseHandler<Endpoint> handler = it.next();
+                    handler.handleClose(this);
+                    it.remove();
+                }
+            } finally {
+                state.release();
+            }
         }
 
         public void closeImmediate() throws RemotingException {
-            // todo ...
+            if (state.transitionHold(State.UP, State.DOWN)) try {
+                Iterator<CloseHandler<Endpoint>> it = closeHandlers.iterator();
+                while (it.hasNext()) {
+                    CloseHandler<Endpoint> handler = it.next();
+                    handler.handleClose(this);
+                    it.remove();
+                }
+            } finally {
+                state.release();
+            }
         }
 
         public void addCloseHandler(final CloseHandler<Endpoint> closeHandler) {
-            // todo ...
+            if (closeHandler == null) {
+                throw new NullPointerException("closeHandler is null");
+            }
+            final State current = state.getStateHold();
+            try {
+                if (current != State.DOWN) {
+                    closeHandlers.add(closeHandler);
+                    return;
+                }
+            } finally {
+                state.release();
+            }
+            closeHandler.handleClose(this);
         }
     }
 }

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java	2008-03-13 21:04:31 UTC (rev 3633)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java	2008-03-13 21:05:07 UTC (rev 3634)
@@ -124,7 +124,6 @@
         }
 
         public void handleRequest(final I request, final Executor streamExecutor) {
-            state.requireTransition(State.INITIAL, State.UNSENT);
             executeTagged(new Runnable() {
                 public void run() {
                     try {

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java	2008-03-13 21:04:31 UTC (rev 3633)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java	2008-03-13 21:05:07 UTC (rev 3634)
@@ -21,7 +21,7 @@
     private static final Logger log = Logger.getLogger(CoreOutboundContext.class);
 
     private final ConcurrentMap<Object, Object> contextMap = CollectionUtil.concurrentMap();
-    private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.UP);
+    private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.INITIAL);
     private final Context<I, O> userContext = new UserContext();
     private final ContextClient contextClient = new ContextClientImpl();
     private final Executor executor;

Modified: remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
===================================================================
--- remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java	2008-03-13 21:04:31 UTC (rev 3633)
+++ remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java	2008-03-13 21:05:07 UTC (rev 3634)
@@ -2,11 +2,14 @@
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
 import org.jboss.cx.remoting.util.AttributeHashMap;
 import org.jboss.cx.remoting.util.AttributeMap;
 import org.jboss.cx.remoting.log.Logger;
 import org.jboss.cx.remoting.spi.wrapper.ContextSourceWrapper;
 import org.jboss.cx.remoting.spi.wrapper.SessionWrapper;
+import org.jboss.cx.remoting.spi.wrapper.EndpointWrapper;
 import org.jboss.cx.remoting.core.CoreEndpoint;
 
 import javax.security.auth.callback.Callback;
@@ -22,7 +25,17 @@
     private static final Logger log = Logger.getLogger(Remoting.class);
 
     public static Endpoint createEndpoint(String name) {
-        return null;
+        final CoreEndpoint coreEndpoint = new CoreEndpoint(name);
+        final ExecutorService executorService = Executors.newCachedThreadPool();
+        coreEndpoint.setExecutor(executorService);
+        coreEndpoint.start();
+        final Endpoint userEndpoint = coreEndpoint.getUserEndpoint();
+        userEndpoint.addCloseHandler(new CloseHandler<Endpoint>() {
+            public void handleClose(final Endpoint closed) {
+                executorService.shutdown();
+            }
+        });
+        return userEndpoint;
     }
 
     public static Session createEndpointAndSession(String endpointName, URI remoteUri, final String userName, final char[] password) throws RemotingException {




More information about the jboss-remoting-commits mailing list