[jboss-cvs] JBoss Messaging SVN: r4450 - trunk/src/main/org/jboss/messaging/core/remoting/impl/mina.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jun 12 11:58:58 EDT 2008
Author: ataylor
Date: 2008-06-12 11:58:58 -0400 (Thu, 12 Jun 2008)
New Revision: 4450
Removed:
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/RemotingServiceImpl.java
Log:
abstracted out the transport implementation from remotingserviceimpl. also added tests
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/RemotingServiceImpl.java 2008-06-12 15:50:45 UTC (rev 4449)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/RemotingServiceImpl.java 2008-06-12 15:58:58 UTC (rev 4450)
@@ -1,314 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.impl.mina;
-
-import org.apache.mina.common.*;
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
-import org.jboss.beans.metadata.api.annotations.Install;
-import org.jboss.beans.metadata.api.annotations.Uninstall;
-import org.jboss.messaging.core.client.RemotingSessionListener;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.ping.Pinger;
-import org.jboss.messaging.core.ping.impl.PingerImpl;
-import org.jboss.messaging.core.remoting.ConnectorRegistryFactory;
-import org.jboss.messaging.core.remoting.Interceptor;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.RemotingService;
-import org.jboss.messaging.core.remoting.ResponseHandler;
-
-import static org.jboss.messaging.core.remoting.TransportType.INVM;
-import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
-import org.jboss.messaging.core.remoting.impl.ResponseHandlerImpl;
-import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
-
-import static org.jboss.messaging.core.remoting.impl.RemotingConfigurationValidator.validate;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
-
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.*;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * @version <tt>$Revision$</tt>
- */
-public class RemotingServiceImpl implements RemotingService, CleanUpNotifier
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(RemotingServiceImpl.class);
-
- // Attributes ----------------------------------------------------
-
- private boolean started = false;
-
- private Configuration config;
-
- private NioSocketAcceptor acceptor;
-
- private IoServiceListener acceptorListener;
-
- private final PacketDispatcher dispatcher;
-
- private ExecutorService threadPool;
-
- private List<RemotingSessionListener> listeners = new ArrayList<RemotingSessionListener>();
-
- private List<Interceptor> filters = new CopyOnWriteArrayList<Interceptor>();
-
- private ServerKeepAliveFactory factory;
-
- private ScheduledExecutorService scheduledExecutor;
- private Map<IoSession, ScheduledFuture<?>> currentScheduledPingers;
- private Map<IoSession, Pinger> currentPingers;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public RemotingServiceImpl(Configuration config)
- {
- this(config, new ServerKeepAliveFactory());
- }
-
- public RemotingServiceImpl(Configuration config, ServerKeepAliveFactory factory)
- {
- assert config != null;
- assert factory != null;
-
- validate(config);
-
- this.config = config;
- this.factory = factory;
- dispatcher = new PacketDispatcherImpl(filters);
-
- scheduledExecutor = new ScheduledThreadPoolExecutor(config.getScheduledThreadPoolMaxSize());
- currentScheduledPingers = new ConcurrentHashMap<IoSession, ScheduledFuture<?>>();
- currentPingers = new ConcurrentHashMap<IoSession, Pinger>();
- }
-
- @Install
- public void addInterceptor(Interceptor filter)
- {
- filters.add(filter);
- }
-
- @Uninstall
- public void removeInterceptor(Interceptor filter)
- {
- filters.remove(filter);
- }
-
- public void addRemotingSessionListener(RemotingSessionListener listener)
- {
- assert listener != null;
-
- listeners.add(listener);
- }
-
- public void removeRemotingSessionListener(RemotingSessionListener listener)
- {
- assert listener != null;
-
- listeners.remove(listener);
- }
-
- // TransportService implementation -------------------------------
-
- public void start() throws Exception
- {
- if (log.isDebugEnabled())
- {
- log.debug("Start RemotingServiceImpl with configuration:" + config);
- }
-
- // if INVM transport is set, we bypass MINA setup
- if (config.getTransport() != INVM
- && acceptor == null)
- {
- acceptor = new NioSocketAcceptor();
-
- acceptor.setSessionDataStructureFactory(new MessagingIOSessionDataStructureFactory());
-
- DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
-
- // addMDCFilter(filterChain);
- if (config.isSSLEnabled())
- {
- addSSLFilter(filterChain, false, config.getKeyStorePath(),
- config.getKeyStorePassword(), config
- .getTrustStorePath(), config
- .getTrustStorePassword());
- }
- addCodecFilter(filterChain);
-
- // Bind
- acceptor.setDefaultLocalAddress(new InetSocketAddress(config.getHost(), config.getPort()));
- acceptor.getSessionConfig().setTcpNoDelay(config.isTcpNoDelay());
- int receiveBufferSize = config.getTcpReceiveBufferSize();
- if (receiveBufferSize != -1)
- {
- acceptor.getSessionConfig().setReceiveBufferSize(receiveBufferSize);
- }
- int sendBufferSize = config.getTcpSendBufferSize();
- if (sendBufferSize != -1)
- {
- acceptor.getSessionConfig().setSendBufferSize(sendBufferSize);
- }
- acceptor.setReuseAddress(true);
- acceptor.getSessionConfig().setReuseAddress(true);
- acceptor.getSessionConfig().setKeepAlive(true);
- acceptor.setCloseOnDeactivation(false);
-
- threadPool = Executors.newCachedThreadPool();
- acceptor.setHandler(new MinaHandler(dispatcher, threadPool, this, true, true));
- acceptor.bind();
- acceptorListener = new MinaSessionListener();
- acceptor.addListener(acceptorListener);
- }
-
- // TODO reenable invm transport
-// boolean disableInvm = config.isInvmDisabled();
-// if (log.isDebugEnabled())
-// log.debug("invm optimization for remoting is " + (disableInvm ? "disabled" : "enabled"));
- // if (!disableInvm)
-
- log.info("Registering:" + config.getLocation());
- ConnectorRegistryFactory.getRegistry().register(config.getLocation(), dispatcher);
-
- started = true;
- }
-
- public void stop()
- {
- if (acceptor != null)
- {
- // remove the listener before disposing the acceptor
- // so that we're not notified when the sessions are destroyed
- acceptor.removeListener(acceptorListener);
- acceptor.unbind();
- acceptor.dispose();
- acceptor = null;
- threadPool.shutdown();
- }
-
- ConnectorRegistryFactory.getRegistry().unregister(config.getLocation());
-
- started = false;
- }
-
- public PacketDispatcher getDispatcher()
- {
- return dispatcher;
- }
-
- public Configuration getConfiguration()
- {
- return config;
- }
-
- public ServerKeepAliveFactory getKeepAliveFactory()
- {
- return factory;
- }
-
- /**
- * This method must only be called by tests which requires
- * to insert Filters (e.g. to simulate network failures)
- */
- public DefaultIoFilterChainBuilder getFilterChain()
- {
- assert started == true;
- assert acceptor != null;
-
- return acceptor.getFilterChain();
- }
-
- // FailureNotifier implementation -------------------------------
-
- public void fireCleanup(long sessionID, MessagingException me)
- {
- if (factory.getSessions().contains(sessionID))
- {
- for (RemotingSessionListener listener : listeners)
- {
- listener.sessionDestroyed(sessionID, me);
- }
- factory.getSessions().remove(sessionID);
- }
- }
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
- private final class MinaSessionListener implements IoServiceListener
- {
-
- public void serviceActivated(IoService service)
- {
- }
-
- public void serviceDeactivated(IoService service)
- {
- }
-
- public void serviceIdle(IoService service, IdleStatus idleStatus)
- {
- }
-
- /**
- * register a pinger for the new client
- *
- * @param session
- */
- public void sessionCreated(IoSession session)
- {
- //register pinger
- if (config.getKeepAliveInterval() > 0)
- {
- ResponseHandler pongHandler = new ResponseHandlerImpl(dispatcher.generateID());
- Pinger pinger = new PingerImpl(getDispatcher(), new MinaSession(session, null), config.getKeepAliveTimeout(), pongHandler, RemotingServiceImpl.this);
- ScheduledFuture<?> future = scheduledExecutor.scheduleAtFixedRate(pinger, config.getKeepAliveInterval(), config.getKeepAliveInterval(), TimeUnit.MILLISECONDS);
- currentScheduledPingers.put(session, future);
- currentPingers.put(session, pinger);
- factory.getSessions().add(session.getId());
- }
- }
-
- /**
- * destry th epinger and stop
- *
- * @param session
- */
- public void sessionDestroyed(IoSession session)
- {
- ScheduledFuture<?> future = currentScheduledPingers.remove(session);
- if (future != null)
- {
- future.cancel(true);
- }
- Pinger pinger = currentPingers.remove(session);
- if (pinger != null)
- {
- pinger.close();
- }
- fireCleanup(session.getId(), null);
- }
- }
-}
\ No newline at end of file
More information about the jboss-cvs-commits
mailing list