[jboss-cvs] JBoss Messaging SVN: r4250 - trunk/src/main/org/jboss/messaging/core/remoting/impl/mina.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue May 20 03:31:45 EDT 2008
Author: trustin
Date: 2008-05-20 03:31:45 -0400 (Tue, 20 May 2008)
New Revision: 4250
Modified:
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
Log:
Turned on TCP no delay for all MINA connections - this fixes weird 40ms latency issue that Clerbert reported.
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-05-20 06:49:34 UTC (rev 4249)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-05-20 07:31:45 UTC (rev 4250)
@@ -6,9 +6,7 @@
*/
package org.jboss.messaging.core.remoting.impl.mina;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addKeepAliveFilter;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
+import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.*;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -35,41 +33,39 @@
import org.jboss.messaging.core.remoting.KeepAliveFactory;
import org.jboss.messaging.core.remoting.NIOConnector;
import org.jboss.messaging.core.remoting.NIOSession;
-import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
+ *
* @version <tt>$Revision$</tt>
- *
+ *
*/
public class MinaConnector implements NIOConnector, CleanUpNotifier
{
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(MinaConnector.class);
-
+
private static boolean trace = log.isTraceEnabled();
-
+
// Attributes ----------------------------------------------------
- private Location location;
+ private final Location location;
- private ConnectionParams connectionParams;
+ private final ConnectionParams connectionParams;
private transient NioSocketConnector connector;
- private PacketDispatcher dispatcher;
+ private final PacketDispatcher dispatcher;
private ExecutorService threadPool;
-
+
private IoSession session;
- private List<RemotingSessionListener> listeners = new ArrayList<RemotingSessionListener>();
+ private final List<RemotingSessionListener> listeners = new ArrayList<RemotingSessionListener>();
private IoServiceListenerAdapter ioListener;
-
+
private MinaHandler handler;
// Static --------------------------------------------------------
@@ -105,9 +101,9 @@
this.connectionParams = connectionParams;
this.dispatcher = dispatcher;
- this.connector = new NioSocketConnector();
+ connector = new NioSocketConnector();
DefaultIoFilterChainBuilder filterChain = connector.getFilterChain();
-
+
connector.setSessionDataStructureFactory(new MessagingIOSessionDataStructureFactory());
// addMDCFilter(filterChain);
@@ -139,6 +135,7 @@
}
connector.getSessionConfig().setKeepAlive(true);
connector.getSessionConfig().setReuseAddress(true);
+ connector.getSessionConfig().setTcpNoDelay(true);
}
// NIOConnector implementation -----------------------------------
@@ -149,7 +146,7 @@
{
return new MinaSession(session, handler);
}
-
+
threadPool = Executors.newCachedThreadPool();
//We don't order executions in the handler for messages received - this is done in the ClientConsumeImpl
//since they are put on the queue in order
@@ -163,16 +160,16 @@
connector.setDefaultRemoteAddress(address);
ioListener = new IoServiceListenerAdapter();
connector.addListener(ioListener);
-
+
future.awaitUninterruptibly();
if (!future.isConnected())
{
throw new IOException("Cannot connect to " + address.toString());
}
- this.session = future.getSession();
+ session = future.getSession();
// Packet packet = new Ping(session.getId());
// session.write(packet);
-
+
return new MinaSession(session, handler);
}
@@ -185,11 +182,11 @@
CloseFuture closeFuture = session.close().awaitUninterruptibly();
boolean closed = closeFuture.isClosed();
-
+
connector.removeListener(ioListener);
connector.dispose();
threadPool.shutdown();
-
+
SslFilter sslFilter = (SslFilter) session.getFilterChain().get("ssl");
// FIXME without this hack, exceptions are thrown:
// "Unexpected exception from SSLEngine.closeInbound()." -> because the ssl session is not stopped
@@ -205,7 +202,7 @@
// ignore
}
}
-
+
connector = null;
session = null;
@@ -220,7 +217,9 @@
listeners.add(listener);
if (trace)
+ {
log.trace("added listener " + listener + " to " + this);
+ }
}
public synchronized void removeSessionListener(RemotingSessionListener listener)
@@ -231,11 +230,13 @@
listeners.remove(listener);
if (trace)
+ {
log.trace("removed listener " + listener + " from " + this);
+ }
}
public String getServerURI()
- {
+ {
return location.getLocation() + connectionParams.getURI();
}
@@ -245,7 +246,7 @@
}
// FailureNotifier implementation -------------------------------
-
+
public synchronized void fireCleanup(long sessionID, MessagingException me)
{
for (RemotingSessionListener listener: listeners)
@@ -253,18 +254,18 @@
listener.sessionDestroyed(sessionID, me);
}
}
-
+
public void fireCleanup(long sessionID)
{
fireCleanup(sessionID, null);
}
// Public --------------------------------------------------------
-
+
@Override
public String toString()
{
- return "MinaConnector@" + System.identityHashCode(this) + "[configuration=" + location + "]";
+ return "MinaConnector@" + System.identityHashCode(this) + "[configuration=" + location + "]";
}
// Package protected ---------------------------------------------
@@ -286,30 +287,38 @@
public void serviceActivated(IoService service)
{
if (trace)
+ {
log.trace("activated " + service);
+ }
}
public void serviceDeactivated(IoService service)
{
if (trace)
+ {
log.trace("deactivated " + service);
+ }
}
public void serviceIdle(IoService service, IdleStatus idleStatus)
{
if (trace)
+ {
log.trace("idle " + service + ", status=" + idleStatus);
+ }
}
public void sessionCreated(IoSession session)
{
if (trace)
+ {
log.trace("created session " + session);
+ }
}
public void sessionDestroyed(IoSession session)
{
- fireCleanup(session.getId(),
+ fireCleanup(session.getId(),
new MessagingException(MessagingException.INTERNAL_ERROR, "MINA session has been destroyed"));
}
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java 2008-05-20 06:49:34 UTC (rev 4249)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java 2008-05-20 07:31:45 UTC (rev 4250)
@@ -6,11 +6,10 @@
*/
package org.jboss.messaging.core.remoting.impl.mina;
-import static org.jboss.messaging.core.remoting.ConnectorRegistrySingleton.REGISTRY;
-import static org.jboss.messaging.core.remoting.TransportType.INVM;
-import static org.jboss.messaging.core.remoting.impl.RemotingConfigurationValidator.validate;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
+import static org.jboss.messaging.core.remoting.ConnectorRegistrySingleton.*;
+import static org.jboss.messaging.core.remoting.TransportType.*;
+import static org.jboss.messaging.core.remoting.impl.RemotingConfigurationValidator.*;
+import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.*;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -38,9 +37,9 @@
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
+ *
* @version <tt>$Revision$</tt>
- *
+ *
*/
public class MinaService implements RemotingService, CleanUpNotifier
{
@@ -51,27 +50,27 @@
// Attributes ----------------------------------------------------
private boolean started = false;
-
+
private Configuration config;
-
+
private NioSocketAcceptor acceptor;
private IoServiceListener acceptorListener;
- private PacketDispatcher dispatcher;
+ private final PacketDispatcher dispatcher;
- private ExecutorService threadPool;
-
- private List<RemotingSessionListener> listeners = new ArrayList<RemotingSessionListener>();
+ private ExecutorService threadPool;
+ private final List<RemotingSessionListener> listeners = new ArrayList<RemotingSessionListener>();
+
private ServerKeepAliveFactory factory;
-
- private List<Interceptor> filters = new CopyOnWriteArrayList<Interceptor>();
+ private final List<Interceptor> filters = new CopyOnWriteArrayList<Interceptor>();
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
-
+
public MinaService(Configuration config)
{
this(config, new ServerKeepAliveFactory());
@@ -83,22 +82,22 @@
assert factory != null;
validate(config);
-
+
this.config = config;
this.factory = factory;
- this.dispatcher = new PacketDispatcherImpl(this.filters);
+ dispatcher = new PacketDispatcherImpl(filters);
}
-
+
@Install
public void addInterceptor(Interceptor filter)
{
- this.filters.add(filter);
+ filters.add(filter);
}
@Uninstall
public void removeInterceptor(Interceptor filter)
{
- this.filters.remove(filter);
+ filters.remove(filter);
}
public void addRemotingSessionListener(RemotingSessionListener listener)
@@ -120,16 +119,18 @@
public void start() throws Exception
{
if (log.isDebugEnabled())
+ {
log.debug("Start MinaService with configuration:" + config);
-
+ }
+
// if INVM transport is set, we bypass MINA setup
- if (config.getTransport() != INVM
+ if (config.getTransport() != INVM
&& acceptor == null)
{
acceptor = new NioSocketAcceptor();
-
+
acceptor.setSessionDataStructureFactory(new MessagingIOSessionDataStructureFactory());
-
+
DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
// addMDCFilter(filterChain);
@@ -158,6 +159,7 @@
acceptor.setReuseAddress(true);
acceptor.getSessionConfig().setReuseAddress(true);
acceptor.getSessionConfig().setKeepAlive(true);
+ acceptor.getSessionConfig().setTcpNoDelay(true);
acceptor.setCloseOnDeactivation(false);
threadPool = Executors.newCachedThreadPool();
@@ -170,13 +172,13 @@
acceptorListener = new MinaSessionListener();
acceptor.addListener(acceptorListener);
}
-
+
// TODO reenable invm transport
// boolean disableInvm = config.isInvmDisabled();
// if (log.isDebugEnabled())
// log.debug("invm optimization for remoting is " + (disableInvm ? "disabled" : "enabled"));
// if (!disableInvm)
-
+
log.info("Registering:" + config.getLocation());
REGISTRY.register(config.getLocation(), dispatcher);
@@ -195,9 +197,9 @@
acceptor = null;
threadPool.shutdown();
}
-
+
REGISTRY.unregister(config.getLocation());
-
+
started = false;
}
@@ -205,21 +207,21 @@
{
return dispatcher;
}
-
+
public Configuration getConfiguration()
{
return config;
}
-
+
/**
- * This method must only be called by tests which requires
+ * This method must only be called by tests which requires
* to insert Filters (e.g. to simulate network failures)
*/
- public DefaultIoFilterChainBuilder getFilterChain()
+ public DefaultIoFilterChainBuilder getFilterChain()
{
assert started == true;
assert acceptor != null;
-
+
return acceptor.getFilterChain();
}
@@ -234,24 +236,24 @@
{
listener.sessionDestroyed(clientSessionID, me);
}
- factory.getSessions().remove(sessionID);
+ factory.getSessions().remove(sessionID);
}
}
-
+
// Public --------------------------------------------------------
public void setKeepAliveFactory(ServerKeepAliveFactory factory)
{
assert factory != null;
-
+
this.factory = factory;
}
public void setRemotingConfiguration(Configuration remotingConfig)
{
assert started == false;
-
- this.config = remotingConfig;
+
+ config = remotingConfig;
}
// Package protected ---------------------------------------------
@@ -285,5 +287,5 @@
{
fireCleanup(session.getId(), null);
}
- }
+ }
}
\ No newline at end of file
More information about the jboss-cvs-commits
mailing list