[jboss-remoting-commits] JBoss Remoting SVN: r3885 - in remoting3/trunk: api/src/main/java/org/jboss/cx/remoting/spi/wrapper and 1 other directories.
jboss-remoting-commits at lists.jboss.org
jboss-remoting-commits at lists.jboss.org
Thu Apr 3 18:10:41 EDT 2008
Author: david.lloyd at jboss.com
Date: 2008-04-03 18:10:40 -0400 (Thu, 03 Apr 2008)
New Revision: 3885
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ContextSource.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/SessionListener.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextSourceWrapper.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextWrapper.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ContextServer.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
Log:
JBREM-893 - notifications for when things are closed
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java 2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java 2008-04-03 22:10:40 UTC (rev 3885)
@@ -62,8 +62,6 @@
void close() throws RemotingException;
- void closeCancelling(boolean mayInterrupt) throws RemotingException;
-
void closeImmediate() throws RemotingException;
void addCloseHandler(final CloseHandler<Context<I, O>> closeHandler);
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ContextSource.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ContextSource.java 2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ContextSource.java 2008-04-03 22:10:40 UTC (rev 3885)
@@ -8,7 +8,7 @@
* Close the context source. New contexts may no longer be created after this
* method is called. Subsequent calls to this method have no additional effect.
*/
- void close();
+ void close() throws RemotingException;
/**
* Create a new communications context.
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/SessionListener.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/SessionListener.java 2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/SessionListener.java 2008-04-03 22:10:40 UTC (rev 3885)
@@ -5,6 +5,4 @@
*/
public interface SessionListener {
void handleSessionOpened(Session session);
-
- void handleSessionClosed(Session session);
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextSourceWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextSourceWrapper.java 2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextSourceWrapper.java 2008-04-03 22:10:40 UTC (rev 3885)
@@ -15,7 +15,7 @@
this.delegate = delegate;
}
- public void close() {
+ public void close() throws RemotingException {
delegate.close();
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextWrapper.java 2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextWrapper.java 2008-04-03 22:10:40 UTC (rev 3885)
@@ -21,10 +21,6 @@
delegate.close();
}
- public void closeCancelling(final boolean mayInterrupt) throws RemotingException {
- delegate.closeCancelling(mayInterrupt);
- }
-
public void closeImmediate() throws RemotingException {
delegate.closeImmediate();
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ContextServer.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ContextServer.java 2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ContextServer.java 2008-04-03 22:10:40 UTC (rev 3885)
@@ -8,5 +8,5 @@
public interface ContextServer<I, O> {
RequestServer<I> createNewRequest(RequestClient<O> requestClient) throws RemotingException;
- void handleClose(boolean immediate, boolean cancel, boolean interrupt) throws RemotingException;
+ void handleClose(boolean immediate, boolean cancel) throws RemotingException;
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java 2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java 2008-04-03 22:10:40 UTC (rev 3885)
@@ -102,14 +102,6 @@
}
sessions.notifyAll();
}
- final Session userSession = coreSession.getUserSession();
- for (final SessionListener listener : sessionListeners) {
- executor.execute(new Runnable() {
- public void run() {
- listener.handleSessionClosed(userSession);
- }
- });
- }
}
public void start() {
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java 2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java 2008-04-03 22:10:40 UTC (rev 3885)
@@ -108,13 +108,13 @@
}
}
- public void handleClose(final boolean immediate, final boolean cancel, final boolean interrupt) throws RemotingException {
+ public void handleClose(final boolean immediate, final boolean cancel) throws RemotingException {
if (state.transition(State.UP, State.STOPPING)) {
contextClient.handleClosing(false);
if (immediate || cancel) {
for (CoreInboundRequest<I, O> inboundRequest : requests) {
try {
- inboundRequest.getRequester().handleCancelRequest(immediate || interrupt);
+ inboundRequest.getRequester().handleCancelRequest(immediate );
} catch (Exception e) {
log.trace("Failed to notify inbound request of cancellation upon context close: %s", e);
}
@@ -137,7 +137,7 @@
}
public void close() throws RemotingException {
- // todo
+ contextClient.handleClosing(false);
}
public void closeImmediate() throws RemotingException {
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java 2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java 2008-04-03 22:10:40 UTC (rev 3885)
@@ -1,5 +1,7 @@
package org.jboss.cx.remoting.core;
+import java.util.LinkedHashSet;
+import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.jboss.cx.remoting.CloseHandler;
@@ -23,10 +25,12 @@
private final ConcurrentMap<Object, Object> attributes = CollectionUtil.concurrentMap();
private ServiceClient serviceClient;
+ private final Set<CloseHandler<ServiceContext>> closeHandlers = CollectionUtil.synchronizedSet(new LinkedHashSet<CloseHandler<ServiceContext>>());
private enum State implements org.jboss.cx.remoting.util.State<State> {
INITIAL,
- UP;
+ UP,
+ DOWN;
public boolean isReachable(final State dest) {
return compareTo(dest) < 0;
@@ -49,10 +53,25 @@
}
}
+ private void doClose() {
+ if (state.transition(State.DOWN)) {
+ synchronized (closeHandlers) {
+ for (final CloseHandler<ServiceContext> closeHandler : closeHandlers) {
+ executor.execute(new Runnable() {
+ public void run() {
+ closeHandler.handleClose(serviceContext);
+ }
+ });
+ }
+ closeHandlers.clear();
+ }
+ }
+ }
+
public ServiceServer<I, O> getServiceServer() {
return new ServiceServer<I, O>() {
public void handleClose() throws RemotingException {
- // todo - prevent new context creation?
+ doClose();
}
public ContextServer<I, O> createNewContext(final ContextClient client) {
@@ -72,13 +91,29 @@
}
public void close() throws RemotingException {
- // todo
+ doClose();
+ serviceClient.handleClosing();
}
public void closeImmediate() throws RemotingException {
+ doClose();
+ serviceClient.handleClosing();
}
- public void addCloseHandler(final CloseHandler<ServiceContext> serviceContextCloseHandler) {
+ public void addCloseHandler(final CloseHandler<ServiceContext> closeHandler) {
+ final State current = state.getStateHold();
+ try {
+ switch (current) {
+ case DOWN:
+ closeHandler.handleClose(this);
+ break;
+ default:
+ closeHandlers.add(closeHandler);
+ break;
+ }
+ } finally {
+ state.release();
+ }
}
}
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java 2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java 2008-04-03 22:10:40 UTC (rev 3885)
@@ -1,5 +1,7 @@
package org.jboss.cx.remoting.core;
+import java.util.LinkedHashSet;
+import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.jboss.cx.remoting.CloseHandler;
@@ -22,6 +24,7 @@
private final ConcurrentMap<Object, Object> contextMap = CollectionUtil.concurrentMap();
private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.INITIAL);
private final ContextClient contextClient = new ContextClientImpl();
+ private final Set<CloseHandler<Context<I, O>>> closeHandlers = CollectionUtil.synchronizedSet(new LinkedHashSet<CloseHandler<Context<I, O>>>());
private final Executor executor;
private Context<I, O> userContext;
@@ -76,24 +79,48 @@
super(contextServer);
}
- private Object writeReplace() {
- return contextServer;
+ private void doClose(final boolean immediate, final boolean cancel) throws RemotingException {
+ state.waitForNot(State.INITIAL);
+ if (state.transitionHold(State.UP, State.STOPPING)) try {
+ synchronized (closeHandlers) {
+ for (final CloseHandler<Context<I, O>> handler : closeHandlers) {
+ executor.execute(new Runnable() {
+ public void run() {
+ handler.handleClose(UserContext.this);
+ }
+ });
+ }
+ closeHandlers.clear();
+ }
+ contextServer.handleClose(immediate, cancel);
+ } finally {
+ state.release();
+ }
}
public void close() throws RemotingException {
- contextServer.handleClose(false, false, false);
+ doClose(false, false);
}
- public void closeCancelling(final boolean mayInterrupt) throws RemotingException {
- contextServer.handleClose(false, true, mayInterrupt);
- }
-
public void closeImmediate() throws RemotingException {
- contextServer.handleClose(true, true, true);
+ doClose(true, true);
}
public void addCloseHandler(final CloseHandler<Context<I, O>> closeHandler) {
- // todo ...
+ final State current = state.getStateHold();
+ try {
+ switch (current) {
+ case STOPPING:
+ case DOWN:
+ closeHandler.handleClose(this);
+ break;
+ default:
+ closeHandlers.add(closeHandler);
+ break;
+ }
+ } finally {
+ state.release();
+ }
}
public O invoke(final I request) throws RemotingException, RemoteExecutionException {
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java 2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java 2008-04-03 22:10:40 UTC (rev 3885)
@@ -1,5 +1,7 @@
package org.jboss.cx.remoting.core;
+import java.util.LinkedHashSet;
+import java.util.Set;
import java.util.concurrent.Executor;
import org.jboss.cx.remoting.CloseHandler;
import org.jboss.cx.remoting.Context;
@@ -7,6 +9,7 @@
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.log.Logger;
import org.jboss.cx.remoting.util.AtomicStateMachine;
+import org.jboss.cx.remoting.util.CollectionUtil;
/**
*
@@ -20,6 +23,7 @@
private final Executor executor;
private ServiceServer<I,O> serviceServer;
+ private Set<CloseHandler<ContextSource<I,O>>> closeHandlers = CollectionUtil.synchronizedSet(new LinkedHashSet<CloseHandler<ContextSource<I, O>>>());
public CoreOutboundService(final Executor executor) {
this.executor = executor;
@@ -58,16 +62,47 @@
super(serviceServer);
}
- public void close() {
- // todo ...
+ private void doClose() throws RemotingException {
+ state.waitForNot(State.INITIAL);
+ if (state.transitionHold(State.UP, State.DOWN)) try {
+ synchronized (closeHandlers) {
+ for (final CloseHandler<ContextSource<I, O>> handler : closeHandlers) {
+ executor.execute(new Runnable() {
+ public void run() {
+ handler.handleClose(UserContextSource.this);
+ }
+ });
+ }
+ closeHandlers.clear();
+ }
+ serviceServer.handleClose();
+ } finally {
+ state.release();
+ }
}
+ public void close() throws RemotingException {
+ doClose();
+ }
+
public void closeImmediate() throws RemotingException {
- // todo ...
+ doClose();
}
public void addCloseHandler(final CloseHandler<ContextSource<I, O>> closeHandler) {
- // todo ...
+ final State current = state.getStateHold();
+ try {
+ switch (current) {
+ case DOWN:
+ closeHandler.handleClose(this);
+ break;
+ default:
+ closeHandlers.add(closeHandler);
+ break;
+ }
+ } finally {
+ state.release();
+ }
}
public Context<I, O> createContext() throws RemotingException {
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java 2008-04-03 20:19:31 UTC (rev 3884)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java 2008-04-03 22:10:40 UTC (rev 3885)
@@ -8,8 +8,10 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.jboss.cx.remoting.CloseHandler;
@@ -73,6 +75,7 @@
// don't GC the endpoint while a session lives
private final CoreEndpoint endpoint;
private final Executor executor;
+ private final Set<CloseHandler<Session>> closeHandlers = CollectionUtil.synchronizedSet(new LinkedHashSet<CloseHandler<Session>>());
/** The protocol handler. Set on NEW -> CONNECTING */
private ProtocolHandler protocolHandler;
@@ -201,17 +204,30 @@
private final ConcurrentMap<Object, Object> sessionMap = CollectionUtil.concurrentMap();
public void close() throws RemotingException {
+ // todo - maybe drain the session first?
shutdown();
- // todo - should this be non-blocking?
state.waitFor(State.DOWN);
}
public void closeImmediate() throws RemotingException {
- // todo ...
+ shutdown();
+ state.waitFor(State.DOWN);
}
public void addCloseHandler(final CloseHandler<Session> closeHandler) {
- // todo ...
+ final State current = state.getStateHold();
+ try {
+ switch (current) {
+ case DOWN:
+ case STOPPING:
+ closeHandler.handleClose(this);
+ break;
+ default:
+ closeHandlers.add(closeHandler);
+ }
+ } finally {
+ state.release();
+ }
}
public ConcurrentMap<Object, Object> getAttributes() {
@@ -243,6 +259,14 @@
private void shutdown() {
if (state.transition(State.UP, State.STOPPING)) {
+ for (final CloseHandler<Session> closeHandler : closeHandlers) {
+ executor.execute(new Runnable() {
+ public void run() {
+ closeHandler.handleClose(userSession);
+ }
+ });
+ }
+ closeHandlers.clear();
try {
log.trace("Initiating session shutdown");
protocolHandler.closeSession();
@@ -298,7 +322,7 @@
final ServerContextPair contextPair = serverContexts.remove(remoteContextIdentifier);
// todo - do the whole close operation
try {
- contextPair.contextServer.handleClose(immediate, cancel, interrupt);
+ contextPair.contextServer.handleClose(immediate, cancel);
} catch (RemotingException e) {
log.trace(e, "Failed to forward a context close");
}
@@ -964,9 +988,9 @@
}
}
- public void handleClose(final boolean immediate, final boolean cancel, final boolean interrupt) throws RemotingException {
+ public void handleClose(final boolean immediate, final boolean cancel) throws RemotingException {
try {
- protocolHandler.sendContextClose(contextIdentifier, immediate, cancel, interrupt);
+ protocolHandler.sendContextClose(contextIdentifier, immediate, cancel, false);
} catch (RemotingException e) {
throw e;
} catch (IOException e) {
More information about the jboss-remoting-commits
mailing list