[jboss-remoting-commits] JBoss Remoting SVN: r3885 - in remoting3/trunk: api/src/main/java/org/jboss/cx/remoting/spi/wrapper and 1 other directories.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Thu Apr 3 18:10:41 EDT 2008


Author: david.lloyd at jboss.com
Date: 2008-04-03 18:10:40 -0400 (Thu, 03 Apr 2008)
New Revision: 3885

Modified:
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ContextSource.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/SessionListener.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextSourceWrapper.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextWrapper.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ContextServer.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
Log:
JBREM-893 - notifications for when things are closed

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java	2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java	2008-04-03 22:10:40 UTC (rev 3885)
@@ -62,8 +62,6 @@
 
     void close() throws RemotingException;
 
-    void closeCancelling(boolean mayInterrupt) throws RemotingException;
-
     void closeImmediate() throws RemotingException;
 
     void addCloseHandler(final CloseHandler<Context<I, O>> closeHandler);

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ContextSource.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ContextSource.java	2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ContextSource.java	2008-04-03 22:10:40 UTC (rev 3885)
@@ -8,7 +8,7 @@
      * Close the context source.  New contexts may no longer be created after this
      * method is called.  Subsequent calls to this method have no additional effect.
      */
-    void close();
+    void close() throws RemotingException;
 
     /**
      * Create a new communications context.

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/SessionListener.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/SessionListener.java	2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/SessionListener.java	2008-04-03 22:10:40 UTC (rev 3885)
@@ -5,6 +5,4 @@
  */
 public interface SessionListener {
     void handleSessionOpened(Session session);
-
-    void handleSessionClosed(Session session);
 }

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextSourceWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextSourceWrapper.java	2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextSourceWrapper.java	2008-04-03 22:10:40 UTC (rev 3885)
@@ -15,7 +15,7 @@
         this.delegate = delegate;
     }
 
-    public void close() {
+    public void close() throws RemotingException {
         delegate.close();
     }
 

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextWrapper.java	2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextWrapper.java	2008-04-03 22:10:40 UTC (rev 3885)
@@ -21,10 +21,6 @@
         delegate.close();
     }
 
-    public void closeCancelling(final boolean mayInterrupt) throws RemotingException {
-        delegate.closeCancelling(mayInterrupt);
-    }
-
     public void closeImmediate() throws RemotingException {
         delegate.closeImmediate();
     }

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ContextServer.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ContextServer.java	2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ContextServer.java	2008-04-03 22:10:40 UTC (rev 3885)
@@ -8,5 +8,5 @@
 public interface ContextServer<I, O> {
     RequestServer<I> createNewRequest(RequestClient<O> requestClient) throws RemotingException;
 
-    void handleClose(boolean immediate, boolean cancel, boolean interrupt) throws RemotingException;
+    void handleClose(boolean immediate, boolean cancel) throws RemotingException;
 }

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java	2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java	2008-04-03 22:10:40 UTC (rev 3885)
@@ -102,14 +102,6 @@
             }
             sessions.notifyAll();
         }
-        final Session userSession = coreSession.getUserSession();
-        for (final SessionListener listener : sessionListeners) {
-            executor.execute(new Runnable() {
-                public void run() {
-                    listener.handleSessionClosed(userSession);
-                }
-            });
-        }
     }
 
     public void start() {

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java	2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java	2008-04-03 22:10:40 UTC (rev 3885)
@@ -108,13 +108,13 @@
             }
         }
 
-        public void handleClose(final boolean immediate, final boolean cancel, final boolean interrupt) throws RemotingException {
+        public void handleClose(final boolean immediate, final boolean cancel) throws RemotingException {
             if (state.transition(State.UP, State.STOPPING)) {
                 contextClient.handleClosing(false);
                 if (immediate || cancel) {
                     for (CoreInboundRequest<I, O> inboundRequest : requests) {
                         try {
-                            inboundRequest.getRequester().handleCancelRequest(immediate || interrupt);
+                            inboundRequest.getRequester().handleCancelRequest(immediate );
                         } catch (Exception e) {
                             log.trace("Failed to notify inbound request of cancellation upon context close: %s", e);
                         }
@@ -137,7 +137,7 @@
         }
 
         public void close() throws RemotingException {
-            // todo
+            contextClient.handleClosing(false);
         }
 
         public void closeImmediate() throws RemotingException {

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java	2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java	2008-04-03 22:10:40 UTC (rev 3885)
@@ -1,5 +1,7 @@
 package org.jboss.cx.remoting.core;
 
+import java.util.LinkedHashSet;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import org.jboss.cx.remoting.CloseHandler;
@@ -23,10 +25,12 @@
     private final ConcurrentMap<Object, Object> attributes = CollectionUtil.concurrentMap();
 
     private ServiceClient serviceClient;
+    private final Set<CloseHandler<ServiceContext>> closeHandlers = CollectionUtil.synchronizedSet(new LinkedHashSet<CloseHandler<ServiceContext>>());
 
     private enum State implements org.jboss.cx.remoting.util.State<State> {
         INITIAL,
-        UP;
+        UP,
+        DOWN;
 
         public boolean isReachable(final State dest) {
             return compareTo(dest) < 0;
@@ -49,10 +53,25 @@
         }
     }
 
+    private void doClose() {
+        if (state.transition(State.DOWN)) {
+            synchronized (closeHandlers) {
+                for (final CloseHandler<ServiceContext> closeHandler : closeHandlers) {
+                    executor.execute(new Runnable() {
+                        public void run() {
+                            closeHandler.handleClose(serviceContext);
+                        }
+                    });
+                }
+                closeHandlers.clear();
+            }
+        }
+    }
+
     public ServiceServer<I, O> getServiceServer() {
         return new ServiceServer<I, O>() {
             public void handleClose() throws RemotingException {
-                // todo - prevent new context creation?
+                doClose();
             }
 
             public ContextServer<I, O> createNewContext(final ContextClient client) {
@@ -72,13 +91,29 @@
         }
 
         public void close() throws RemotingException {
-            // todo
+            doClose();
+            serviceClient.handleClosing();
         }
 
         public void closeImmediate() throws RemotingException {
+            doClose();
+            serviceClient.handleClosing();
         }
 
-        public void addCloseHandler(final CloseHandler<ServiceContext> serviceContextCloseHandler) {
+        public void addCloseHandler(final CloseHandler<ServiceContext> closeHandler) {
+            final State current = state.getStateHold();
+            try {
+                switch (current) {
+                    case DOWN:
+                        closeHandler.handleClose(this);
+                        break;
+                    default:
+                        closeHandlers.add(closeHandler);
+                        break;
+                }
+            } finally {
+                state.release();
+            }
         }
     }
 }

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java	2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java	2008-04-03 22:10:40 UTC (rev 3885)
@@ -1,5 +1,7 @@
 package org.jboss.cx.remoting.core;
 
+import java.util.LinkedHashSet;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import org.jboss.cx.remoting.CloseHandler;
@@ -22,6 +24,7 @@
     private final ConcurrentMap<Object, Object> contextMap = CollectionUtil.concurrentMap();
     private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.INITIAL);
     private final ContextClient contextClient = new ContextClientImpl();
+    private final Set<CloseHandler<Context<I, O>>> closeHandlers = CollectionUtil.synchronizedSet(new LinkedHashSet<CloseHandler<Context<I, O>>>());
     private final Executor executor;
 
     private Context<I, O> userContext;
@@ -76,24 +79,48 @@
             super(contextServer);
         }
 
-        private Object writeReplace() {
-            return contextServer;
+        private void doClose(final boolean immediate, final boolean cancel) throws RemotingException {
+            state.waitForNot(State.INITIAL);
+            if (state.transitionHold(State.UP, State.STOPPING)) try {
+                synchronized (closeHandlers) {
+                    for (final CloseHandler<Context<I, O>> handler : closeHandlers) {
+                        executor.execute(new Runnable() {
+                            public void run() {
+                                handler.handleClose(UserContext.this);
+                            }
+                        });
+                    }
+                    closeHandlers.clear();
+                }
+                contextServer.handleClose(immediate, cancel);
+            } finally {
+                state.release();
+            }
         }
 
         public void close() throws RemotingException {
-            contextServer.handleClose(false, false, false);
+            doClose(false, false);
         }
 
-        public void closeCancelling(final boolean mayInterrupt) throws RemotingException {
-            contextServer.handleClose(false, true, mayInterrupt);
-        }
-
         public void closeImmediate() throws RemotingException {
-            contextServer.handleClose(true, true, true);
+            doClose(true, true);
         }
 
         public void addCloseHandler(final CloseHandler<Context<I, O>> closeHandler) {
-            // todo ...
+            final State current = state.getStateHold();
+            try {
+                switch (current) {
+                    case STOPPING:
+                    case DOWN:
+                        closeHandler.handleClose(this);
+                        break;
+                    default:
+                        closeHandlers.add(closeHandler);
+                        break;
+                }
+            } finally {
+                state.release();
+            }
         }
 
         public O invoke(final I request) throws RemotingException, RemoteExecutionException {

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java	2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java	2008-04-03 22:10:40 UTC (rev 3885)
@@ -1,5 +1,7 @@
 package org.jboss.cx.remoting.core;
 
+import java.util.LinkedHashSet;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import org.jboss.cx.remoting.CloseHandler;
 import org.jboss.cx.remoting.Context;
@@ -7,6 +9,7 @@
 import org.jboss.cx.remoting.RemotingException;
 import org.jboss.cx.remoting.log.Logger;
 import org.jboss.cx.remoting.util.AtomicStateMachine;
+import org.jboss.cx.remoting.util.CollectionUtil;
 
 /**
  *
@@ -20,6 +23,7 @@
     private final Executor executor;
 
     private ServiceServer<I,O> serviceServer;
+    private Set<CloseHandler<ContextSource<I,O>>> closeHandlers = CollectionUtil.synchronizedSet(new LinkedHashSet<CloseHandler<ContextSource<I, O>>>());
 
     public CoreOutboundService(final Executor executor) {
         this.executor = executor;
@@ -58,16 +62,47 @@
             super(serviceServer);
         }
 
-        public void close() {
-            // todo ...
+        private void doClose() throws RemotingException {
+            state.waitForNot(State.INITIAL);
+            if (state.transitionHold(State.UP, State.DOWN)) try {
+                synchronized (closeHandlers) {
+                    for (final CloseHandler<ContextSource<I, O>> handler : closeHandlers) {
+                        executor.execute(new Runnable() {
+                            public void run() {
+                                handler.handleClose(UserContextSource.this);
+                            }
+                        });
+                    }
+                    closeHandlers.clear();
+                }
+                serviceServer.handleClose();
+            } finally {
+                state.release();
+            }
         }
 
+        public void close() throws RemotingException {
+            doClose();
+        }
+
         public void closeImmediate() throws RemotingException {
-            // todo ...
+            doClose();
         }
 
         public void addCloseHandler(final CloseHandler<ContextSource<I, O>> closeHandler) {
-            // todo ...
+            final State current = state.getStateHold();
+            try {
+                switch (current) {
+                    case DOWN:
+                        closeHandler.handleClose(this);
+                        break;
+                    default:
+                        closeHandlers.add(closeHandler);
+                        break;
+                }
+            } finally {
+                state.release();
+            }
         }
 
         public Context<I, O> createContext() throws RemotingException {

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java	2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java	2008-04-03 22:10:40 UTC (rev 3885)
@@ -8,8 +8,10 @@
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import org.jboss.cx.remoting.CloseHandler;
@@ -73,6 +75,7 @@
     // don't GC the endpoint while a session lives
     private final CoreEndpoint endpoint;
     private final Executor executor;
+    private final Set<CloseHandler<Session>> closeHandlers = CollectionUtil.synchronizedSet(new LinkedHashSet<CloseHandler<Session>>());
 
     /** The protocol handler.  Set on NEW -> CONNECTING */
     private ProtocolHandler protocolHandler;
@@ -201,17 +204,30 @@
         private final ConcurrentMap<Object, Object> sessionMap = CollectionUtil.concurrentMap();
 
         public void close() throws RemotingException {
+            // todo - maybe drain the session first?
             shutdown();
-            // todo - should this be non-blocking?
             state.waitFor(State.DOWN);
         }
 
         public void closeImmediate() throws RemotingException {
-            // todo ...
+            shutdown();
+            state.waitFor(State.DOWN);
         }
 
         public void addCloseHandler(final CloseHandler<Session> closeHandler) {
-            // todo ...
+            final State current = state.getStateHold();
+            try {
+                switch (current) {
+                    case DOWN:
+                    case STOPPING:
+                        closeHandler.handleClose(this);
+                        break;
+                    default:
+                        closeHandlers.add(closeHandler);
+                }
+            } finally {
+                state.release();
+            }
         }
 
         public ConcurrentMap<Object, Object> getAttributes() {
@@ -243,6 +259,14 @@
 
     private void shutdown() {
         if (state.transition(State.UP, State.STOPPING)) {
+            for (final CloseHandler<Session> closeHandler : closeHandlers) {
+                executor.execute(new Runnable() {
+                    public void run() {
+                        closeHandler.handleClose(userSession);
+                    }
+                });
+            }
+            closeHandlers.clear();
             try {
                 log.trace("Initiating session shutdown");
                 protocolHandler.closeSession();
@@ -298,7 +322,7 @@
             final ServerContextPair contextPair = serverContexts.remove(remoteContextIdentifier);
             // todo - do the whole close operation
             try {
-                contextPair.contextServer.handleClose(immediate, cancel, interrupt);
+                contextPair.contextServer.handleClose(immediate, cancel);
             } catch (RemotingException e) {
                 log.trace(e, "Failed to forward a context close");
             }
@@ -964,9 +988,9 @@
             }
         }
 
-        public void handleClose(final boolean immediate, final boolean cancel, final boolean interrupt) throws RemotingException {
+        public void handleClose(final boolean immediate, final boolean cancel) throws RemotingException {
             try {
-                protocolHandler.sendContextClose(contextIdentifier, immediate, cancel, interrupt);
+                protocolHandler.sendContextClose(contextIdentifier, immediate, cancel, false);
             } catch (RemotingException e) {
                 throw e;
             } catch (IOException e) {




More information about the jboss-remoting-commits mailing list