[jboss-cvs] JBoss Messaging SVN: r4210 - in trunk: src/main/org/jboss/messaging/core/client and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri May 16 02:20:24 EDT 2008
Author: timfox
Date: 2008-05-16 02:20:24 -0400 (Fri, 16 May 2008)
New Revision: 4210
Modified:
trunk/src/etc/jbm-configuration.xml
trunk/src/main/org/jboss/messaging/core/client/ConnectionParams.java
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java
trunk/src/main/org/jboss/messaging/core/config/Configuration.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java
Log:
Implemented MINA write queue blockin properly
Modified: trunk/src/etc/jbm-configuration.xml
===================================================================
--- trunk/src/etc/jbm-configuration.xml 2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/src/etc/jbm-configuration.xml 2008-05-16 06:20:24 UTC (rev 4210)
@@ -41,7 +41,13 @@
<!-- Set it to -1 if you want to use the value hinted by the Operating System -->
<!-- This setting is taken into account only when remoting-transport is set to TCP -->
<remoting-tcp-send-buffer-size>32768</remoting-tcp-send-buffer-size>
-
+
+ <remoting-writequeue-block-timeout>10000</remoting-writequeue-block-timeout>
+
+ <remoting-writequeue-minbytes>65536</remoting-writequeue-minbytes>
+
+ <remoting-writequeue-maxbytes>1048576</remoting-writequeue-maxbytes>
+
<!-- if ssl is enabled, all remoting-ssl-* properties must be set -->
<remoting-enable-ssl>false</remoting-enable-ssl>
Modified: trunk/src/main/org/jboss/messaging/core/client/ConnectionParams.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ConnectionParams.java 2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/src/main/org/jboss/messaging/core/client/ConnectionParams.java 2008-05-16 06:20:24 UTC (rev 4210)
@@ -43,6 +43,18 @@
boolean isTcpNoDelay();
void setTcpNoDelay(boolean tcpNoDelay);
+
+ long getWriteQueueMaxBytes();
+
+ void setWriteQueueMaxBytes(long maxBytes);
+
+ long getWriteQueueMinBytes();
+
+ void setWriteQueueMinBytes(long minBytes);
+
+ long getWriteQueueBlockTimeout();
+
+ void setWriteQueueBlockTimeout(long timeout);
int getTcpReceiveBufferSize();
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java 2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java 2008-05-16 06:20:24 UTC (rev 4210)
@@ -45,6 +45,9 @@
protected String keyStorePassword;
protected String trustStorePath;
protected String trustStorePassword;
+ protected long writeQueueBlockTimeout = 5000;
+ protected long writeQueueMinBytes = 65536;
+ protected long writeQueueMaxBytes = 1048576;
public int getTimeout()
{
@@ -125,7 +128,37 @@
{
this.tcpSendBufferSize = tcpSendBufferSize;
}
+
+ public long getWriteQueueBlockTimeout()
+ {
+ return writeQueueBlockTimeout;
+ }
+ public long getWriteQueueMaxBytes()
+ {
+ return writeQueueMaxBytes;
+ }
+
+ public long getWriteQueueMinBytes()
+ {
+ return writeQueueMinBytes;
+ }
+
+ public void setWriteQueueBlockTimeout(final long timeout)
+ {
+ this.writeQueueBlockTimeout = timeout;
+ }
+
+ public void setWriteQueueMaxBytes(final long bytes)
+ {
+ this.writeQueueMaxBytes = bytes;
+ }
+
+ public void setWriteQueueMinBytes(final long bytes)
+ {
+ this.writeQueueMinBytes = bytes;
+ }
+
public boolean isSSLEnabled()
{
String sslEnabledProperty = System.getProperty(REMOTING_ENABLE_SSL);
Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-05-16 06:20:24 UTC (rev 4210)
@@ -73,6 +73,12 @@
int getTimeout();
+ long getWriteQueueMaxBytes();
+
+ long getWriteQueueMinBytes();
+
+ long getWriteQueueBlockTimeout();
+
boolean isSecurityEnabled();
String getKeyStorePath();
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-05-16 06:20:24 UTC (rev 4210)
@@ -99,6 +99,10 @@
protected boolean invmDisabled = DEFAULT_INVM_DISABLED;
protected boolean invmDisabledModified = false;
protected boolean tcpNoDelay;
+ protected long writeQueueBlockTimeout = 5000;
+ protected long writeQueueMinBytes = 65536;
+ protected long writeQueueMaxBytes = 1048576;
+
protected int tcpReceiveBufferSize = -1;
protected int tcpSendBufferSize = -1;
protected boolean sslEnabled = DEFAULT_SSL_ENABLED;
@@ -323,7 +327,37 @@
{
this.tcpSendBufferSize = size;
}
+
+ public long getWriteQueueBlockTimeout()
+ {
+ return writeQueueBlockTimeout;
+ }
+ public long getWriteQueueMaxBytes()
+ {
+ return writeQueueMaxBytes;
+ }
+
+ public long getWriteQueueMinBytes()
+ {
+ return writeQueueMinBytes;
+ }
+
+ public void setWriteQueueBlockTimeout(final long timeout)
+ {
+ this.writeQueueBlockTimeout = timeout;
+ }
+
+ public void setWriteQueueMaxBytes(final long bytes)
+ {
+ this.writeQueueMaxBytes = bytes;
+ }
+
+ public void setWriteQueueMinBytes(final long bytes)
+ {
+ this.writeQueueMinBytes = bytes;
+ }
+
public String getURI()
{
StringBuffer buff = new StringBuffer();
@@ -414,9 +448,11 @@
connectionParams.setTcpReceiveBufferSize(tcpReceiveBufferSize);
connectionParams.setTcpSendBufferSize(tcpSendBufferSize);
connectionParams.setTimeout(timeout);
+ connectionParams.setWriteQueueBlockTimeout(writeQueueBlockTimeout);
+ connectionParams.setWriteQueueMinBytes(writeQueueMinBytes);
+ connectionParams.setWriteQueueMaxBytes(writeQueueMaxBytes);
return connectionParams;
}
-
}
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-05-16 06:20:24 UTC (rev 4210)
@@ -79,6 +79,12 @@
tcpReceiveBufferSize = getInteger(e, "remoting-tcp-receive-buffer-size", -1);
tcpSendBufferSize = getInteger(e, "remoting-tcp-send-buffer-size", -1);
+
+ writeQueueBlockTimeout = getLong(e, "remoting-writequeue-block-timeout", 10000L);
+
+ writeQueueMinBytes = getLong(e, "remoting-writequeue-minbytes", 65536L);
+
+ writeQueueMaxBytes = getLong(e, "remoting-writequeue-maxbytes", 1048576L);
sslEnabled = getBoolean(e, "remoting-enable-ssl", false);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-05-16 06:20:24 UTC (rev 4210)
@@ -153,7 +153,10 @@
threadPool = Executors.newCachedThreadPool();
//We don't order executions in the handler for messages received - this is done in the ClientConsumeImpl
//since they are put on the queue in order
- handler = new MinaHandler(dispatcher, threadPool, this, false, false);
+ handler = new MinaHandler(dispatcher, threadPool, this, false, false,
+ connectionParams.getWriteQueueBlockTimeout(),
+ connectionParams.getWriteQueueMinBytes(),
+ connectionParams.getWriteQueueMaxBytes());
connector.setHandler(handler);
InetSocketAddress address = new InetSocketAddress(location.getHost(), location.getPort());
ConnectFuture future = connector.connect(address);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-05-16 06:20:24 UTC (rev 4210)
@@ -10,7 +10,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
@@ -38,7 +37,7 @@
private static final Logger log = Logger.getLogger(MinaHandler.class);
private static boolean trace = log.isTraceEnabled();
-
+
// Attributes ----------------------------------------------------
private final PacketDispatcher dispatcher;
@@ -52,6 +51,14 @@
// Note! must use ConcurrentMap here to avoid race condition
private final ConcurrentMap<Long, Executor> executors = new ConcurrentHashMap<Long, Executor>();
+ private volatile boolean blocked;
+
+ private final long blockTimeout;
+
+ private final long bytesLow;
+
+ private final long bytesHigh;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -59,11 +66,18 @@
final ExecutorService executorService,
final CleanUpNotifier failureNotifier,
final boolean closeSessionOnExceptionCaught,
- final boolean useExecutor)
+ final boolean useExecutor,
+ final long blockTimeout,
+ final long bytesLow,
+ final long bytesHigh)
{
assert dispatcher != null;
assert executorService != null;
+ this.blockTimeout = blockTimeout;
+ this.bytesLow = bytesLow;
+ this.bytesHigh = bytesHigh;
+
this.dispatcher = dispatcher;
this.failureNotifier = failureNotifier;
this.closeSessionOnExceptionCaught = closeSessionOnExceptionCaught;
@@ -159,53 +173,42 @@
}
}
- private final int high = 2000;
-
- private final int low = 500;
-
- private AtomicInteger count = new AtomicInteger(0);
-
- private volatile boolean blocked = true;
-
@Override
- public void messageSent(final IoSession session, final Object message)
- throws Exception
- {
- int newcount = count.decrementAndGet();
-
+ public void messageSent(final IoSession session, final Object message) throws Exception
+ {
if (blocked)
{
- if (newcount == low)
+ long bytes = session.getScheduledWriteBytes();
+
+ if (bytes <= bytesLow)
{
blocked = false;
-
- // log.info("unblocking");
-
+
synchronized (this)
{
- this.notify();
+ notify();
}
}
}
-
}
-
- public void acquireSemaphore() throws Exception
+
+ public void checkWrite(final IoSession session) throws Exception
{
- int newcount = count.incrementAndGet();
-
- if (newcount == high)
+ if (session.getScheduledWriteBytes() >= bytesHigh)
{
blocked = true;
- // log.info("blocking");
-
synchronized (this)
{
- this.wait(5000);
+ wait(blockTimeout);
}
- }
-
+
+ if (session.getScheduledWriteBytes() >= bytesHigh)
+ {
+ //TODO should really cope with spurious wakeups
+ throw new IllegalStateException("Timed out waiting for MINA write queue to free up");
+ }
+ }
}
// Package protected ---------------------------------------------
@@ -227,7 +230,7 @@
{
try
{
- acquireSemaphore();
+ checkWrite(session);
}
catch (Exception e)
{
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java 2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java 2008-05-16 06:20:24 UTC (rev 4210)
@@ -161,7 +161,11 @@
acceptor.setCloseOnDeactivation(false);
threadPool = Executors.newCachedThreadPool();
- acceptor.setHandler(new MinaHandler(dispatcher, threadPool, this, true, true));
+ acceptor.setHandler(new MinaHandler(dispatcher, threadPool,
+ this, true, true,
+ config.getWriteQueueBlockTimeout(),
+ config.getWriteQueueMinBytes(),
+ config.getWriteQueueMaxBytes()));
acceptor.bind();
acceptorListener = new MinaSessionListener();
acceptor.addListener(acceptorListener);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java 2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java 2008-05-16 06:20:24 UTC (rev 4210)
@@ -49,12 +49,13 @@
{
return session.getId();
}
-
+
+
public void write(Packet packet)
- {
+ {
try
{
- handler.acquireSemaphore();
+ handler.checkWrite(session);
}
catch (Exception e)
{
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java 2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java 2008-05-16 06:20:24 UTC (rev 4210)
@@ -102,7 +102,7 @@
{
clientDispatcher = new PacketDispatcherImpl(null);
threadPool = Executors.newCachedThreadPool();
- handler = new MinaHandler(clientDispatcher, threadPool, null, true, true);
+ handler = new MinaHandler(clientDispatcher, threadPool, null, true, true, 5000, 1 * 204 * 1024, 5 * 1024 * 1024);
handler_1 = new TestPacketHandler(23);
clientDispatcher.register(handler_1);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java 2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java 2008-05-16 06:20:24 UTC (rev 4210)
@@ -78,7 +78,7 @@
{
clientDispatcher = new PacketDispatcherImpl(null);
threadPool = Executors.newCachedThreadPool();
- handler = new MinaHandler(clientDispatcher, threadPool, null, true, true);
+ handler = new MinaHandler(clientDispatcher, threadPool, null, true, true, 5000, 1 * 204 * 1024, 5 * 1024 * 1024);
packetHandler = new TestPacketHandler(23);
clientDispatcher.register(packetHandler);
More information about the jboss-cvs-commits
mailing list