[jboss-cvs] JBoss Messaging SVN: r4210 - in trunk: src/main/org/jboss/messaging/core/client and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri May 16 02:20:24 EDT 2008


Author: timfox
Date: 2008-05-16 02:20:24 -0400 (Fri, 16 May 2008)
New Revision: 4210

Modified:
   trunk/src/etc/jbm-configuration.xml
   trunk/src/main/org/jboss/messaging/core/client/ConnectionParams.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java
   trunk/src/main/org/jboss/messaging/core/config/Configuration.java
   trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
   trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java
Log:
Implemented MINA write queue blockin properly


Modified: trunk/src/etc/jbm-configuration.xml
===================================================================
--- trunk/src/etc/jbm-configuration.xml	2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/src/etc/jbm-configuration.xml	2008-05-16 06:20:24 UTC (rev 4210)
@@ -41,7 +41,13 @@
       <!-- Set it to -1 if you want to use the value hinted by the Operating System      --> 
       <!-- This setting is taken into account only when remoting-transport is set to TCP -->
       <remoting-tcp-send-buffer-size>32768</remoting-tcp-send-buffer-size>
-
+      
+      <remoting-writequeue-block-timeout>10000</remoting-writequeue-block-timeout>
+      
+      <remoting-writequeue-minbytes>65536</remoting-writequeue-minbytes>
+      
+      <remoting-writequeue-maxbytes>1048576</remoting-writequeue-maxbytes>
+      
       <!--  if ssl is enabled, all remoting-ssl-* properties must be set -->
       <remoting-enable-ssl>false</remoting-enable-ssl>
       

Modified: trunk/src/main/org/jboss/messaging/core/client/ConnectionParams.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ConnectionParams.java	2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/src/main/org/jboss/messaging/core/client/ConnectionParams.java	2008-05-16 06:20:24 UTC (rev 4210)
@@ -43,6 +43,18 @@
    boolean isTcpNoDelay();
 
    void setTcpNoDelay(boolean tcpNoDelay);
+   
+   long getWriteQueueMaxBytes();
+   
+   void setWriteQueueMaxBytes(long maxBytes);
+   
+   long getWriteQueueMinBytes();
+   
+   void setWriteQueueMinBytes(long minBytes);
+   
+   long getWriteQueueBlockTimeout();
+   
+   void setWriteQueueBlockTimeout(long timeout);
 
    int getTcpReceiveBufferSize();
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java	2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java	2008-05-16 06:20:24 UTC (rev 4210)
@@ -45,6 +45,9 @@
    protected String keyStorePassword;
    protected String trustStorePath;
    protected String trustStorePassword;
+   protected long writeQueueBlockTimeout = 5000;
+   protected long writeQueueMinBytes = 65536;
+   protected long writeQueueMaxBytes = 1048576;
 
    public int getTimeout()
    {
@@ -125,7 +128,37 @@
    {
       this.tcpSendBufferSize = tcpSendBufferSize;
    }
+   
+   public long getWriteQueueBlockTimeout()
+   {
+      return writeQueueBlockTimeout;
+   }
 
+   public long getWriteQueueMaxBytes()
+   {
+      return writeQueueMaxBytes;
+   }
+
+   public long getWriteQueueMinBytes()
+   {
+      return writeQueueMinBytes;
+   }
+   
+   public void setWriteQueueBlockTimeout(final long timeout)
+   {
+      this.writeQueueBlockTimeout = timeout;
+   }
+
+   public void setWriteQueueMaxBytes(final long bytes)
+   {
+      this.writeQueueMaxBytes = bytes;
+   }
+
+   public void setWriteQueueMinBytes(final long bytes)
+   {
+      this.writeQueueMinBytes = bytes;
+   }
+
    public boolean isSSLEnabled()
    {
       String sslEnabledProperty = System.getProperty(REMOTING_ENABLE_SSL);

Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-05-16 06:20:24 UTC (rev 4210)
@@ -73,6 +73,12 @@
 
    int getTimeout();
    
+   long getWriteQueueMaxBytes();
+
+   long getWriteQueueMinBytes();
+   
+   long getWriteQueueBlockTimeout();
+      
    boolean isSecurityEnabled();
 
    String getKeyStorePath();

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-05-16 06:20:24 UTC (rev 4210)
@@ -99,6 +99,10 @@
    protected boolean invmDisabled = DEFAULT_INVM_DISABLED;
    protected boolean invmDisabledModified = false;
    protected boolean tcpNoDelay;
+   protected long writeQueueBlockTimeout = 5000;
+   protected long writeQueueMinBytes = 65536;
+   protected long writeQueueMaxBytes = 1048576;
+   
    protected int tcpReceiveBufferSize = -1;
    protected int tcpSendBufferSize = -1;
    protected boolean sslEnabled = DEFAULT_SSL_ENABLED;
@@ -323,7 +327,37 @@
    {
       this.tcpSendBufferSize = size;
    }
+     
+   public long getWriteQueueBlockTimeout()
+   {
+      return writeQueueBlockTimeout;
+   }
 
+   public long getWriteQueueMaxBytes()
+   {
+      return writeQueueMaxBytes;
+   }
+
+   public long getWriteQueueMinBytes()
+   {
+      return writeQueueMinBytes;
+   }
+   
+   public void setWriteQueueBlockTimeout(final long timeout)
+   {
+      this.writeQueueBlockTimeout = timeout;
+   }
+
+   public void setWriteQueueMaxBytes(final long bytes)
+   {
+      this.writeQueueMaxBytes = bytes;
+   }
+
+   public void setWriteQueueMinBytes(final long bytes)
+   {
+      this.writeQueueMinBytes = bytes;
+   }
+
    public String getURI()
    {
       StringBuffer buff = new StringBuffer();
@@ -414,9 +448,11 @@
       connectionParams.setTcpReceiveBufferSize(tcpReceiveBufferSize);
       connectionParams.setTcpSendBufferSize(tcpSendBufferSize);
       connectionParams.setTimeout(timeout);
+      connectionParams.setWriteQueueBlockTimeout(writeQueueBlockTimeout);
+      connectionParams.setWriteQueueMinBytes(writeQueueMinBytes);
+      connectionParams.setWriteQueueMaxBytes(writeQueueMaxBytes);
       return connectionParams;
    }
-
   
 }
  

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-05-16 06:20:24 UTC (rev 4210)
@@ -79,6 +79,12 @@
       tcpReceiveBufferSize = getInteger(e, "remoting-tcp-receive-buffer-size", -1);
 
       tcpSendBufferSize = getInteger(e, "remoting-tcp-send-buffer-size", -1);
+      
+      writeQueueBlockTimeout = getLong(e, "remoting-writequeue-block-timeout", 10000L);
+      
+      writeQueueMinBytes = getLong(e, "remoting-writequeue-minbytes", 65536L);
+      
+      writeQueueMaxBytes = getLong(e, "remoting-writequeue-maxbytes", 1048576L);
 
       sslEnabled = getBoolean(e, "remoting-enable-ssl", false);
       

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-05-16 06:20:24 UTC (rev 4210)
@@ -153,7 +153,10 @@
       threadPool = Executors.newCachedThreadPool();
       //We don't order executions in the handler for messages received - this is done in the ClientConsumeImpl
       //since they are put on the queue in order
-      handler = new MinaHandler(dispatcher, threadPool, this, false, false);
+      handler = new MinaHandler(dispatcher, threadPool, this, false, false,
+                                connectionParams.getWriteQueueBlockTimeout(),
+                                connectionParams.getWriteQueueMinBytes(),
+                                connectionParams.getWriteQueueMaxBytes());
       connector.setHandler(handler);
       InetSocketAddress address = new InetSocketAddress(location.getHost(), location.getPort());
       ConnectFuture future = connector.connect(address);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-05-16 06:20:24 UTC (rev 4210)
@@ -10,7 +10,6 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
@@ -38,7 +37,7 @@
    private static final Logger log = Logger.getLogger(MinaHandler.class);
 
    private static boolean trace = log.isTraceEnabled();
-
+      
    // Attributes ----------------------------------------------------
 
    private final PacketDispatcher dispatcher;
@@ -52,6 +51,14 @@
    // Note! must use ConcurrentMap here to avoid race condition
    private final ConcurrentMap<Long, Executor> executors = new ConcurrentHashMap<Long, Executor>();
    
+   private volatile boolean blocked;
+   
+   private final long blockTimeout;
+   
+   private final long bytesLow;
+   
+   private final long bytesHigh;
+     
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -59,11 +66,18 @@
                       final ExecutorService executorService,
                       final CleanUpNotifier failureNotifier,
                       final boolean closeSessionOnExceptionCaught,
-                      final boolean useExecutor)
+                      final boolean useExecutor,
+                      final long blockTimeout,
+                      final long bytesLow,
+                      final long bytesHigh)
    {
       assert dispatcher != null;
       assert executorService != null;
 
+      this.blockTimeout = blockTimeout;
+      this.bytesLow = bytesLow;
+      this.bytesHigh = bytesHigh;
+
       this.dispatcher = dispatcher;
       this.failureNotifier = failureNotifier;
       this.closeSessionOnExceptionCaught = closeSessionOnExceptionCaught;
@@ -159,53 +173,42 @@
       }
    }
 
-   private final int high = 2000;
-
-   private final int low = 500;
-
-   private AtomicInteger count = new AtomicInteger(0);
-
-   private volatile boolean blocked = true;
-
    @Override
-   public void messageSent(final IoSession session, final Object message)
-         throws Exception
-   {
-      int newcount = count.decrementAndGet();
-
+   public void messageSent(final IoSession session, final Object message) throws Exception
+   {      
       if (blocked)
       {
-         if (newcount == low)
+         long bytes = session.getScheduledWriteBytes();
+                      
+         if (bytes <= bytesLow)
          {
             blocked = false;
-
-            // log.info("unblocking");
-
+   
             synchronized (this)
             {
-               this.notify();
+               notify();
             }
          }
       }
-
    }
-
-   public void acquireSemaphore() throws Exception
+   
+   public void checkWrite(final IoSession session) throws Exception
    {
-      int newcount = count.incrementAndGet();
-
-      if (newcount == high)
+      if (session.getScheduledWriteBytes() >= bytesHigh)
       {
          blocked = true;
 
-         // log.info("blocking");
-
          synchronized (this)
          {
-            this.wait(5000);
+            wait(blockTimeout);
          }
-      }
-
+         
+         if (session.getScheduledWriteBytes() >= bytesHigh)
+         {
+            //TODO should really cope with spurious wakeups
+            throw new IllegalStateException("Timed out waiting for MINA write queue to free up");
+         }
+      }      
    }
 
    // Package protected ---------------------------------------------
@@ -227,7 +230,7 @@
             {
                try
                {
-                  acquireSemaphore();
+                  checkWrite(session);
                }
                catch (Exception e)
                {

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java	2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java	2008-05-16 06:20:24 UTC (rev 4210)
@@ -161,7 +161,11 @@
          acceptor.setCloseOnDeactivation(false);
 
          threadPool = Executors.newCachedThreadPool();
-         acceptor.setHandler(new MinaHandler(dispatcher, threadPool, this, true, true));
+         acceptor.setHandler(new MinaHandler(dispatcher, threadPool,
+                                             this, true, true,
+                                             config.getWriteQueueBlockTimeout(),
+                                             config.getWriteQueueMinBytes(),
+                                             config.getWriteQueueMaxBytes()));
          acceptor.bind();
          acceptorListener = new MinaSessionListener();
          acceptor.addListener(acceptorListener);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java	2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java	2008-05-16 06:20:24 UTC (rev 4210)
@@ -49,12 +49,13 @@
    {
       return session.getId();
    }
-
+   
+   
    public void write(Packet packet)
-   {      
+   {     
       try
       {
-         handler.acquireSemaphore();
+         handler.checkWrite(session);
       }
       catch (Exception e)
       {

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java	2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java	2008-05-16 06:20:24 UTC (rev 4210)
@@ -102,7 +102,7 @@
    {
       clientDispatcher = new PacketDispatcherImpl(null);
       threadPool = Executors.newCachedThreadPool();
-      handler = new MinaHandler(clientDispatcher, threadPool, null, true, true);
+      handler = new MinaHandler(clientDispatcher, threadPool, null, true, true, 5000, 1 * 204 * 1024, 5 * 1024 * 1024);
 
       handler_1 = new TestPacketHandler(23);
       clientDispatcher.register(handler_1);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java	2008-05-15 17:12:48 UTC (rev 4209)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java	2008-05-16 06:20:24 UTC (rev 4210)
@@ -78,7 +78,7 @@
    {
       clientDispatcher = new PacketDispatcherImpl(null);
       threadPool = Executors.newCachedThreadPool();
-      handler = new MinaHandler(clientDispatcher, threadPool, null, true, true);
+      handler = new MinaHandler(clientDispatcher, threadPool, null, true, true, 5000, 1 * 204 * 1024, 5 * 1024 * 1024);
 
       packetHandler = new TestPacketHandler(23);
       clientDispatcher.register(packetHandler);




More information about the jboss-cvs-commits mailing list