[jboss-cvs] JBoss Messaging SVN: r4250 - trunk/src/main/org/jboss/messaging/core/remoting/impl/mina.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue May 20 03:31:45 EDT 2008


Author: trustin
Date: 2008-05-20 03:31:45 -0400 (Tue, 20 May 2008)
New Revision: 4250

Modified:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
Log:
Turned on TCP no delay for all MINA connections - this fixes weird 40ms latency issue that Clerbert reported.

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-05-20 06:49:34 UTC (rev 4249)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-05-20 07:31:45 UTC (rev 4250)
@@ -6,9 +6,7 @@
  */
 package org.jboss.messaging.core.remoting.impl.mina;
 
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addKeepAliveFilter;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
+import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.*;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -35,41 +33,39 @@
 import org.jboss.messaging.core.remoting.KeepAliveFactory;
 import org.jboss.messaging.core.remoting.NIOConnector;
 import org.jboss.messaging.core.remoting.NIOSession;
-import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * 
+ *
  * @version <tt>$Revision$</tt>
- * 
+ *
  */
 public class MinaConnector implements NIOConnector, CleanUpNotifier
 {
    // Constants -----------------------------------------------------
 
    private static final Logger log = Logger.getLogger(MinaConnector.class);
-   
+
    private static boolean trace = log.isTraceEnabled();
-   
+
    // Attributes ----------------------------------------------------
 
-   private Location location;
+   private final Location location;
 
-   private ConnectionParams connectionParams;
+   private final ConnectionParams connectionParams;
 
    private transient NioSocketConnector connector;
 
-   private PacketDispatcher dispatcher;
+   private final PacketDispatcher dispatcher;
 
    private ExecutorService threadPool;
-   
+
    private IoSession session;
 
-   private List<RemotingSessionListener> listeners = new ArrayList<RemotingSessionListener>();
+   private final List<RemotingSessionListener> listeners = new ArrayList<RemotingSessionListener>();
    private IoServiceListenerAdapter ioListener;
-   
+
    private MinaHandler handler;
 
    // Static --------------------------------------------------------
@@ -105,9 +101,9 @@
       this.connectionParams = connectionParams;
       this.dispatcher = dispatcher;
 
-      this.connector = new NioSocketConnector();
+      connector = new NioSocketConnector();
       DefaultIoFilterChainBuilder filterChain = connector.getFilterChain();
-      
+
       connector.setSessionDataStructureFactory(new MessagingIOSessionDataStructureFactory());
 
       // addMDCFilter(filterChain);
@@ -139,6 +135,7 @@
       }
       connector.getSessionConfig().setKeepAlive(true);
       connector.getSessionConfig().setReuseAddress(true);
+      connector.getSessionConfig().setTcpNoDelay(true);
    }
 
    // NIOConnector implementation -----------------------------------
@@ -149,7 +146,7 @@
       {
          return new MinaSession(session, handler);
       }
-      
+
       threadPool = Executors.newCachedThreadPool();
       //We don't order executions in the handler for messages received - this is done in the ClientConsumeImpl
       //since they are put on the queue in order
@@ -163,16 +160,16 @@
       connector.setDefaultRemoteAddress(address);
       ioListener = new IoServiceListenerAdapter();
       connector.addListener(ioListener);
-      
+
       future.awaitUninterruptibly();
       if (!future.isConnected())
       {
          throw new IOException("Cannot connect to " + address.toString());
       }
-      this.session = future.getSession();
+      session = future.getSession();
 //      Packet packet = new Ping(session.getId());
 //      session.write(packet);
-      
+
       return new MinaSession(session, handler);
    }
 
@@ -185,11 +182,11 @@
 
       CloseFuture closeFuture = session.close().awaitUninterruptibly();
       boolean closed = closeFuture.isClosed();
-      
+
       connector.removeListener(ioListener);
       connector.dispose();
       threadPool.shutdown();
-      
+
       SslFilter sslFilter = (SslFilter) session.getFilterChain().get("ssl");
       // FIXME without this hack, exceptions are thrown:
       // "Unexpected exception from SSLEngine.closeInbound()." -> because the ssl session is not stopped
@@ -205,7 +202,7 @@
             // ignore
          }
       }
-      
+
       connector = null;
       session = null;
 
@@ -220,7 +217,9 @@
       listeners.add(listener);
 
       if (trace)
+      {
          log.trace("added listener " + listener + " to " + this);
+      }
    }
 
    public synchronized void removeSessionListener(RemotingSessionListener listener)
@@ -231,11 +230,13 @@
       listeners.remove(listener);
 
       if (trace)
+      {
          log.trace("removed listener " + listener + " from " + this);
+      }
    }
 
    public String getServerURI()
-   { 
+   {
       return location.getLocation() + connectionParams.getURI();
    }
 
@@ -245,7 +246,7 @@
    }
 
    // FailureNotifier implementation -------------------------------
-   
+
    public synchronized void fireCleanup(long sessionID, MessagingException me)
    {
       for (RemotingSessionListener listener: listeners)
@@ -253,18 +254,18 @@
          listener.sessionDestroyed(sessionID, me);
       }
    }
-   
+
    public void fireCleanup(long sessionID)
    {
       fireCleanup(sessionID, null);
    }
 
    // Public --------------------------------------------------------
-   
+
    @Override
    public String toString()
    {
-      return "MinaConnector@" + System.identityHashCode(this) + "[configuration=" + location + "]"; 
+      return "MinaConnector@" + System.identityHashCode(this) + "[configuration=" + location + "]";
    }
    // Package protected ---------------------------------------------
 
@@ -286,30 +287,38 @@
       public void serviceActivated(IoService service)
       {
          if (trace)
+         {
             log.trace("activated " + service);
+         }
       }
 
       public void serviceDeactivated(IoService service)
       {
          if (trace)
+         {
             log.trace("deactivated " + service);
+         }
       }
 
       public void serviceIdle(IoService service, IdleStatus idleStatus)
       {
          if (trace)
+         {
             log.trace("idle " + service + ", status=" + idleStatus);
+         }
       }
 
       public void sessionCreated(IoSession session)
       {
          if (trace)
+         {
             log.trace("created session " + session);
+         }
       }
 
       public void sessionDestroyed(IoSession session)
       {
-         fireCleanup(session.getId(), 
+         fireCleanup(session.getId(),
                new MessagingException(MessagingException.INTERNAL_ERROR, "MINA session has been destroyed"));
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java	2008-05-20 06:49:34 UTC (rev 4249)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java	2008-05-20 07:31:45 UTC (rev 4250)
@@ -6,11 +6,10 @@
  */
 package org.jboss.messaging.core.remoting.impl.mina;
 
-import static org.jboss.messaging.core.remoting.ConnectorRegistrySingleton.REGISTRY;
-import static org.jboss.messaging.core.remoting.TransportType.INVM;
-import static org.jboss.messaging.core.remoting.impl.RemotingConfigurationValidator.validate;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
+import static org.jboss.messaging.core.remoting.ConnectorRegistrySingleton.*;
+import static org.jboss.messaging.core.remoting.TransportType.*;
+import static org.jboss.messaging.core.remoting.impl.RemotingConfigurationValidator.*;
+import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.*;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -38,9 +37,9 @@
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * 
+ *
  * @version <tt>$Revision$</tt>
- * 
+ *
  */
 public class MinaService implements RemotingService, CleanUpNotifier
 {
@@ -51,27 +50,27 @@
    // Attributes ----------------------------------------------------
 
    private boolean started = false;
-   
+
    private Configuration config;
-   
+
    private NioSocketAcceptor acceptor;
 
    private IoServiceListener acceptorListener;
 
-   private PacketDispatcher dispatcher;
+   private final PacketDispatcher dispatcher;
 
-   private ExecutorService threadPool; 
-   
-   private List<RemotingSessionListener> listeners = new ArrayList<RemotingSessionListener>();
+   private ExecutorService threadPool;
 
+   private final List<RemotingSessionListener> listeners = new ArrayList<RemotingSessionListener>();
+
    private ServerKeepAliveFactory factory;
-   
-   private List<Interceptor> filters = new CopyOnWriteArrayList<Interceptor>();
 
+   private final List<Interceptor> filters = new CopyOnWriteArrayList<Interceptor>();
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
-   
+
    public MinaService(Configuration config)
    {
       this(config, new ServerKeepAliveFactory());
@@ -83,22 +82,22 @@
       assert factory != null;
 
       validate(config);
-      
+
       this.config = config;
       this.factory = factory;
-      this.dispatcher = new PacketDispatcherImpl(this.filters);
+      dispatcher = new PacketDispatcherImpl(filters);
    }
-   
+
    @Install
    public void addInterceptor(Interceptor filter)
    {
-      this.filters.add(filter);
+      filters.add(filter);
    }
 
    @Uninstall
    public void removeInterceptor(Interceptor filter)
    {
-      this.filters.remove(filter);
+      filters.remove(filter);
    }
 
    public void addRemotingSessionListener(RemotingSessionListener listener)
@@ -120,16 +119,18 @@
    public void start() throws Exception
    {
       if (log.isDebugEnabled())
+      {
          log.debug("Start MinaService with configuration:" + config);
-      
+      }
+
       // if INVM transport is set, we bypass MINA setup
-      if (config.getTransport() != INVM 
+      if (config.getTransport() != INVM
             && acceptor == null)
       {
          acceptor = new NioSocketAcceptor();
-         
+
          acceptor.setSessionDataStructureFactory(new MessagingIOSessionDataStructureFactory());
-         
+
          DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
 
          // addMDCFilter(filterChain);
@@ -158,6 +159,7 @@
          acceptor.setReuseAddress(true);
          acceptor.getSessionConfig().setReuseAddress(true);
          acceptor.getSessionConfig().setKeepAlive(true);
+         acceptor.getSessionConfig().setTcpNoDelay(true);
          acceptor.setCloseOnDeactivation(false);
 
          threadPool = Executors.newCachedThreadPool();
@@ -170,13 +172,13 @@
          acceptorListener = new MinaSessionListener();
          acceptor.addListener(acceptorListener);
       }
-      
+
       // TODO reenable invm transport
 //      boolean disableInvm = config.isInvmDisabled();
 //      if (log.isDebugEnabled())
 //         log.debug("invm optimization for remoting is " + (disableInvm ? "disabled" : "enabled"));
      // if (!disableInvm)
-      
+
       log.info("Registering:" + config.getLocation());
          REGISTRY.register(config.getLocation(), dispatcher);
 
@@ -195,9 +197,9 @@
          acceptor = null;
          threadPool.shutdown();
       }
-      
+
       REGISTRY.unregister(config.getLocation());
-      
+
       started = false;
    }
 
@@ -205,21 +207,21 @@
    {
       return dispatcher;
    }
-   
+
    public Configuration getConfiguration()
    {
       return config;
    }
-   
+
    /**
-    * This method must only be called by tests which requires 
+    * This method must only be called by tests which requires
     * to insert Filters (e.g. to simulate network failures)
     */
-   public DefaultIoFilterChainBuilder getFilterChain() 
+   public DefaultIoFilterChainBuilder getFilterChain()
    {
       assert started == true;
       assert acceptor != null;
-      
+
       return acceptor.getFilterChain();
    }
 
@@ -234,24 +236,24 @@
          {
             listener.sessionDestroyed(clientSessionID, me);
          }
-         factory.getSessions().remove(sessionID);  
+         factory.getSessions().remove(sessionID);
       }
    }
-   
+
    // Public --------------------------------------------------------
 
    public void setKeepAliveFactory(ServerKeepAliveFactory factory)
    {
       assert factory != null;
-      
+
       this.factory = factory;
    }
 
    public void setRemotingConfiguration(Configuration remotingConfig)
    {
       assert started == false;
-      
-      this.config = remotingConfig;
+
+      config = remotingConfig;
    }
 
    // Package protected ---------------------------------------------
@@ -285,5 +287,5 @@
       {
          fireCleanup(session.getId(), null);
       }
-   }  
+   }
 }
\ No newline at end of file




More information about the jboss-cvs-commits mailing list