Author: david.lloyd(a)jboss.com
Date: 2008-03-13 17:05:07 -0400 (Thu, 13 Mar 2008)
New Revision: 3634
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
Log:
Start of basic lifecycle stuff for endpoints
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java 2008-03-13
21:04:31 UTC (rev 3633)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java 2008-03-13
21:05:07 UTC (rev 3634)
@@ -4,6 +4,7 @@
import java.net.URI;
import java.util.List;
import java.util.Set;
+import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@@ -40,47 +41,41 @@
private final String name;
private final Endpoint userEndpoint = new UserEndpoint();
- private final OrderedExecutorFactory orderedExecutorFactory;
- private final AtomicStateMachine<State> state =
AtomicStateMachine.start(State.UP);
- private final Executor executor;
- private final RequestListener<?, ?> rootRequestListener;
+ private final AtomicStateMachine<State> state =
AtomicStateMachine.start(State.INITIAL);
+ private OrderedExecutorFactory orderedExecutorFactory;
+ private Executor executor;
+
static {
Logger.getLogger("org.jboss.cx.remoting").info("JBoss Remoting
version %s", Version.VERSION);
}
private enum State {
+ INITIAL,
UP,
DOWN,
}
- protected CoreEndpoint(final String name, final RequestListener<?, ?>
rootRequestListener) {
+ public CoreEndpoint(final String name) {
this.name = name;
- // todo - make this configurable
- executor = Executors.newCachedThreadPool();
- orderedExecutorFactory = new OrderedExecutorFactory(executor);
- this.rootRequestListener = rootRequestListener;
}
private final ConcurrentMap<Object, Object> endpointMap =
CollectionUtil.concurrentMap();
private final ConcurrentMap<String, CoreProtocolRegistration> protocolMap =
CollectionUtil.concurrentMap();
private final Set<CoreSession> sessions =
CollectionUtil.synchronizedSet(CollectionUtil.<CoreSession>hashSet());
// accesses protected by {@code shutdownListeners} - always lock AFTER {@code state}
- private final List<CloseHandler<Endpoint>> closeHandlers =
CollectionUtil.arrayList();
+ private final List<CloseHandler<Endpoint>> closeHandlers =
CollectionUtil.synchronizedArrayList();
- ConcurrentMap<Object, Object> getAttributes() {
- return endpointMap;
- }
-
- Executor getExecutor() {
+ public Executor getExecutor() {
return executor;
}
- Executor getOrderedExecutor() {
- return orderedExecutorFactory.getOrderedExecutor();
+ public void setExecutor(final Executor executor) {
+ this.executor = executor;
+ orderedExecutorFactory = new OrderedExecutorFactory(executor);
}
- Endpoint getUserEndpoint() {
+ public Endpoint getUserEndpoint() {
return userEndpoint;
}
@@ -89,6 +84,14 @@
sessions.notifyAll();
}
+ public void start() {
+ state.requireTransition(State.INITIAL, State.UP);
+ }
+
+ Executor getOrderedExecutor() {
+ return orderedExecutorFactory.getOrderedExecutor();
+ }
+
public final class CoreProtocolServerContext implements ProtocolServerContext {
private CoreProtocolServerContext() {
}
@@ -219,15 +222,45 @@
}
public void close() throws RemotingException {
- // todo ...
+ if (state.transitionHold(State.UP, State.DOWN)) try {
+ Iterator<CloseHandler<Endpoint>> it =
closeHandlers.iterator();
+ while (it.hasNext()) {
+ CloseHandler<Endpoint> handler = it.next();
+ handler.handleClose(this);
+ it.remove();
+ }
+ } finally {
+ state.release();
+ }
}
public void closeImmediate() throws RemotingException {
- // todo ...
+ if (state.transitionHold(State.UP, State.DOWN)) try {
+ Iterator<CloseHandler<Endpoint>> it =
closeHandlers.iterator();
+ while (it.hasNext()) {
+ CloseHandler<Endpoint> handler = it.next();
+ handler.handleClose(this);
+ it.remove();
+ }
+ } finally {
+ state.release();
+ }
}
public void addCloseHandler(final CloseHandler<Endpoint> closeHandler) {
- // todo ...
+ if (closeHandler == null) {
+ throw new NullPointerException("closeHandler is null");
+ }
+ final State current = state.getStateHold();
+ try {
+ if (current != State.DOWN) {
+ closeHandlers.add(closeHandler);
+ return;
+ }
+ } finally {
+ state.release();
+ }
+ closeHandler.handleClose(this);
}
}
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java 2008-03-13
21:04:31 UTC (rev 3633)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java 2008-03-13
21:05:07 UTC (rev 3634)
@@ -124,7 +124,6 @@
}
public void handleRequest(final I request, final Executor streamExecutor) {
- state.requireTransition(State.INITIAL, State.UNSENT);
executeTagged(new Runnable() {
public void run() {
try {
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java 2008-03-13
21:04:31 UTC (rev 3633)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java 2008-03-13
21:05:07 UTC (rev 3634)
@@ -21,7 +21,7 @@
private static final Logger log = Logger.getLogger(CoreOutboundContext.class);
private final ConcurrentMap<Object, Object> contextMap =
CollectionUtil.concurrentMap();
- private final AtomicStateMachine<State> state =
AtomicStateMachine.start(State.UP);
+ private final AtomicStateMachine<State> state =
AtomicStateMachine.start(State.INITIAL);
private final Context<I, O> userContext = new UserContext();
private final ContextClient contextClient = new ContextClientImpl();
private final Executor executor;
Modified: remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
===================================================================
---
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-03-13
21:04:31 UTC (rev 3633)
+++
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-03-13
21:05:07 UTC (rev 3634)
@@ -2,11 +2,14 @@
import java.io.IOException;
import java.net.URI;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
import org.jboss.cx.remoting.util.AttributeHashMap;
import org.jboss.cx.remoting.util.AttributeMap;
import org.jboss.cx.remoting.log.Logger;
import org.jboss.cx.remoting.spi.wrapper.ContextSourceWrapper;
import org.jboss.cx.remoting.spi.wrapper.SessionWrapper;
+import org.jboss.cx.remoting.spi.wrapper.EndpointWrapper;
import org.jboss.cx.remoting.core.CoreEndpoint;
import javax.security.auth.callback.Callback;
@@ -22,7 +25,17 @@
private static final Logger log = Logger.getLogger(Remoting.class);
public static Endpoint createEndpoint(String name) {
- return null;
+ final CoreEndpoint coreEndpoint = new CoreEndpoint(name);
+ final ExecutorService executorService = Executors.newCachedThreadPool();
+ coreEndpoint.setExecutor(executorService);
+ coreEndpoint.start();
+ final Endpoint userEndpoint = coreEndpoint.getUserEndpoint();
+ userEndpoint.addCloseHandler(new CloseHandler<Endpoint>() {
+ public void handleClose(final Endpoint closed) {
+ executorService.shutdown();
+ }
+ });
+ return userEndpoint;
}
public static Session createEndpointAndSession(String endpointName, URI remoteUri,
final String userName, final char[] password) throws RemotingException {
Show replies by date