[jboss-cvs] JBoss Messaging SVN: r4230 - in trunk/src: main/org/jboss/messaging/core/client/impl and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun May 18 09:34:40 EDT 2008


Author: timfox
Date: 2008-05-18 09:34:39 -0400 (Sun, 18 May 2008)
New Revision: 4230

Modified:
   trunk/src/config/jbm-configuration.xml
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowTokenMessage.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
   trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java
Log:
Various tweaks and reverted OrderedExecutorFactory change


Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml	2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/config/jbm-configuration.xml	2008-05-18 13:34:39 UTC (rev 4230)
@@ -44,9 +44,9 @@
       
       <remoting-writequeue-block-timeout>10000</remoting-writequeue-block-timeout>
       
-      <remoting-writequeue-minbytes>32768</remoting-writequeue-minbytes>
+      <remoting-writequeue-minbytes>4096</remoting-writequeue-minbytes>
       
-      <remoting-writequeue-maxbytes>65536</remoting-writequeue-maxbytes>
+      <remoting-writequeue-maxbytes>8192</remoting-writequeue-maxbytes>
       
       <!--  if ssl is enabled, all remoting-ssl-* properties must be set -->
       <remoting-enable-ssl>false</remoting-enable-ssl>
@@ -69,7 +69,7 @@
       
       <create-journal-dir>true</create-journal-dir>
       
-      <journal-type>asyncio</journal-type>
+      <journal-type>nio</journal-type>
       
       <journal-sync>true</journal-sync>
       
@@ -84,4 +84,4 @@
       
    </configuration>
 
-</deployment>
\ No newline at end of file
+</deployment>

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-05-18 13:34:39 UTC (rev 4230)
@@ -835,7 +835,7 @@
          remotingConnection.sendBlocking(serverTargetID, serverTargetID, message);
       }
       else
-      {
+      {         
          remotingConnection.sendOneWay(serverTargetID, serverTargetID, message);
       }
             

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-05-18 13:34:39 UTC (rev 4230)
@@ -123,9 +123,11 @@
       
       if (config.getJournalType() == JournalType.ASYNCIO)
       {
+         log.info("AIO journal selected");
          if (!AIOSequentialFileFactory.isSupported())
          {
-            log.warn("AIO wasn't located on this platform, using just standard Java NIO. If you are on Linux, install LibAIO and the required wrapper and you will get a lot of performance benefit");
+            log.warn("AIO wasn't located on this platform, will fall back to  Java NIO." +
+                     "If you are on Linux, install LibAIO to enable the AIO journal");
             journalFF = new NIOSequentialFileFactory(journalDir);
          }
          else
@@ -136,10 +138,12 @@
       }
       else if (config.getJournalType() == JournalType.NIO)
       {
+         log.info("NIO Journal selected");
          journalFF = new NIOSequentialFileFactory(bindingsDir);
       }
       else if (config.getJournalType() == JournalType.JDBC)
       {
+         log.info("JDBC Journal selected");
          // Sanity check only... this is previously tested
          throw new IllegalArgumentException("JDBC Journal is not supported yet");
       }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java	2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java	2008-05-18 13:34:39 UTC (rev 4230)
@@ -25,12 +25,15 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionAttributeMap;
 import org.apache.mina.common.IoSessionDataStructureFactory;
 import org.apache.mina.common.WriteRequest;
 import org.apache.mina.common.WriteRequestQueue;
+import org.apache.mina.util.CircularQueue;
 
 /**
  * 
@@ -134,34 +137,34 @@
   }
    
    
-//   private static class DefaultWriteRequestQueue implements WriteRequestQueue
-//   {
-//      private final Queue<WriteRequest> q = new CircularQueue<WriteRequest>(16);
-//      
-//      public void dispose(IoSession session) {
-//      }
-//      
-//      public void clear(IoSession session) {
-//          q.clear();
-//      }
-//
-//      public synchronized boolean isEmpty(IoSession session) {
-//          return q.isEmpty();
-//      }
-//
-//      public synchronized void offer(IoSession session, WriteRequest writeRequest) {
-//          q.offer(writeRequest);
-//      }
-//
-//      public synchronized WriteRequest poll(IoSession session) {
-//          return q.poll();
-//      }
-//      
-//      @Override
-//      public String toString() {
-//          return q.toString();
-//      }
-//  }
+   private static class DefaultWriteRequestQueue implements WriteRequestQueue
+   {
+      private final Queue<WriteRequest> q = new CircularQueue<WriteRequest>(16);
+      
+      public void dispose(IoSession session) {
+      }
+      
+      public void clear(IoSession session) {
+          q.clear();
+      }
+
+      public synchronized boolean isEmpty(IoSession session) {
+          return q.isEmpty();
+      }
+
+      public synchronized void offer(IoSession session, WriteRequest writeRequest) {
+          q.offer(writeRequest);
+      }
+
+      public synchronized WriteRequest poll(IoSession session) {
+          return q.poll();
+      }
+      
+      @Override
+      public String toString() {
+          return q.toString();
+      }
+  }
    
    private static class ConcurrentWriteRequestQueue implements WriteRequestQueue
    {
@@ -191,5 +194,55 @@
           return q.toString();
       }
   }
+   
+   private static class SynchronousWriteRequestQueue implements WriteRequestQueue
+   {
+      private final LinkedBlockingQueue<WriteRequest> q = new LinkedBlockingQueue<WriteRequest>(1);
+      
+      public void dispose(IoSession session) {
+      }
+      
+      public void clear(IoSession session) {
+          q.clear();
+      }
 
+      public synchronized boolean isEmpty(IoSession session) {
+          return q.isEmpty();
+      }
+
+      public synchronized void offer(IoSession session, WriteRequest writeRequest) {
+         try
+         {
+          boolean ok = q.offer(writeRequest, 5000L, TimeUnit.MILLISECONDS);
+          
+          if (!ok)
+          {
+             throw new IllegalStateException("Timed out trying to offer to queue");
+          }
+         }
+         catch (InterruptedException e)
+         {
+            throw new IllegalStateException("Failed to offer");
+         }
+      }
+
+      public synchronized WriteRequest poll(IoSession session) {
+         try
+         {
+          WriteRequest request = q.poll(5000L, TimeUnit.MILLISECONDS);
+          
+          return request;
+         }
+         catch (InterruptedException e)
+         {
+            throw new IllegalStateException("Failed to offer");
+         }
+      }
+      
+      @Override
+      public String toString() {
+          return q.toString();
+      }
+  }
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-05-18 13:34:39 UTC (rev 4230)
@@ -51,7 +51,7 @@
    // Note! must use ConcurrentMap here to avoid race condition
    private final ConcurrentMap<Long, Executor> executors = new ConcurrentHashMap<Long, Executor>();
    
-   private volatile boolean blocked;
+   private boolean blocked;
    
    private final long blockTimeout;
    
@@ -133,10 +133,9 @@
    throws Exception
    {
       final Packet packet = (Packet) message;
-      
+                 
       if (executorFactory != null)
-      {
-         
+      {         
          long executorID = packet.getExecutorID();
    
          Executor executor = executors.get(executorID);
@@ -152,7 +151,7 @@
                executor = oldExecutor;
             }
          }
-   
+         
          executor.execute(new Runnable()
          {
             public void run()
@@ -160,7 +159,8 @@
                try
                {
                   messageReceivedInternal(session, packet);
-               } catch (Exception e)
+               }
+               catch (Exception e)
                {
                   log.error("unexpected error", e);
                }
@@ -173,42 +173,51 @@
       }
    }
 
+   private final int maxSize = 2;
+   
+   private int size;
+    
    @Override
-   public void messageSent(final IoSession session, final Object message) throws Exception
+   public synchronized void messageSent(final IoSession session, final Object message) throws Exception
    {      
-      if (blocked)
+//      if (blocked)
+//      {
+//         long bytes = session.getScheduledWriteBytes();
+//                      
+//         if (bytes <= bytesLow)
+//         {
+//            blocked = false;
+//   
+//            //Note that we need to notify all since there may be more than one thread waiting on this
+//            //E.g. the response from a blocking acknowledge and a delivery
+//            notifyAll();            
+//         }
+//      }
+      
+      size--;
+      
+      if (blocked && size == 0)
       {
-         long bytes = session.getScheduledWriteBytes();
-                      
-         if (bytes <= bytesLow)
-         {
-            blocked = false;
-   
-            synchronized (this)
-            {
-               notify();
-            }
-         }
+         notifyAll();
       }
    }
    
-   public void checkWrite(final IoSession session) throws Exception
+   public synchronized void checkWrite(final IoSession session) throws Exception
    {
-      if (session.getScheduledWriteBytes() >= bytesHigh)
+//      if (session.getScheduledWriteBytes() >= bytesHigh)
+//      {
+//         blocked = true;
+//
+//         wait();                  
+//      }      
+      if (size == maxSize)
       {
          blocked = true;
-
-         synchronized (this)
-         {
-            wait(blockTimeout);
-         }
          
-         if (session.getScheduledWriteBytes() >= bytesHigh)
-         {
-            //TODO should really cope with spurious wakeups
-            throw new IllegalStateException("Timed out waiting for MINA write queue to free up");
-         }
-      }      
+         wait();
+      }
+            
+      size++;
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowTokenMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowTokenMessage.java	2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowTokenMessage.java	2008-05-18 13:34:39 UTC (rev 4230)
@@ -21,7 +21,7 @@
    // Attributes ----------------------------------------------------
 
    private int tokens;
-
+   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-05-18 13:34:39 UTC (rev 4230)
@@ -208,7 +208,7 @@
             {
                // We delivered all the messages - go into direct delivery
                direct = true;
-
+               
                promptDelivery = false;
             }
             return;

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-05-18 13:34:39 UTC (rev 4230)
@@ -273,12 +273,12 @@
    }
    
    public void receiveTokens(final int tokens) throws Exception
-   {
+   {      
       if (availableTokens != null)
       {
          int previous = availableTokens.getAndAdd(tokens);
-         
-         if (previous <= 0)
+
+         if (previous <= 0 && (previous + tokens) > 0)
          {
             promptDelivery();
          }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java	2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java	2008-05-18 13:34:39 UTC (rev 4230)
@@ -22,6 +22,7 @@
 package org.jboss.messaging.core.server.impl;
 
 import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.PacketReturner;
 import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowTokenMessage;
@@ -38,6 +39,8 @@
  */
 public class ServerConsumerPacketHandler extends ServerPacketHandlerSupport
 {
+   private static final Logger log = Logger.getLogger(ServerConsumerPacketHandler.class);
+
 	private final ServerConsumer consumer;
 	
 	public ServerConsumerPacketHandler(final ServerConsumer consumer)

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-05-18 13:34:39 UTC (rev 4230)
@@ -29,6 +29,7 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.transaction.xa.XAException;
@@ -228,6 +229,7 @@
    public void handleDelivery(final MessageReference ref, final ServerConsumer consumer) throws Exception
    {
       Delivery delivery;
+      
       synchronized (rollbackCancelLock)
       {
          long nextID = deliveryIDSequence.getAndIncrement();

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java	2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java	2008-05-18 13:34:39 UTC (rev 4230)
@@ -399,7 +399,7 @@
             
             ackBatchSize = 1;
             
-            blockOnAcknowledge = true;
+            blockOnAcknowledge = false;
          }
          else if (acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)
          {

Modified: trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java	2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java	2008-05-18 13:34:39 UTC (rev 4230)
@@ -8,10 +8,9 @@
 
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Queue;
+import java.util.LinkedList;
 import java.util.Set;
 import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * This factory creates a hierarchy of Executor which shares the threads of the
@@ -40,14 +39,17 @@
 
    private final class ChildExecutor implements Executor, Runnable
    {
-      private final Queue<Runnable> tasks = new LinkedBlockingQueue<Runnable>();
-      
+      private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
+
       public void execute(Runnable command)
       {
-         tasks.offer(command);
-         if (tasks.size() == 1 && runningChildren.add(this))
+         synchronized (tasks)
          {
-            parent.execute(this);
+            tasks.add(command);
+            if (tasks.size() == 1 && runningChildren.add(this))
+            {
+               parent.execute(this);
+            }
          }
       }
 
@@ -55,11 +57,15 @@
       {
          for (;;)
          {
-            final Runnable task = tasks.poll();
-            if (task == null)
+            final Runnable task;
+            synchronized (tasks)
             {
-               runningChildren.remove(this);
-               return;
+               task = tasks.poll();
+               if (task == null)
+               {
+                  runningChildren.remove(this);
+                  return;
+               }
             }
             task.run();
          }




More information about the jboss-cvs-commits mailing list