[jboss-cvs] JBoss Messaging SVN: r4244 - trunk/src/main/org/jboss/messaging/core/remoting/impl/mina.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon May 19 20:17:54 EDT 2008


Author: trustin
Date: 2008-05-19 20:17:53 -0400 (Mon, 19 May 2008)
New Revision: 4244

Modified:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
Log:
MinaHandler.blocked should be managed on per-session basis rather than per-handler basis.

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-05-19 21:11:54 UTC (rev 4243)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-05-20 00:17:53 UTC (rev 4244)
@@ -11,24 +11,23 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.mina.common.AttributeKey;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteFuture;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.PacketHandlerRegistrationListener;
 import org.jboss.messaging.core.remoting.PacketReturner;
-import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
 import org.jboss.messaging.util.OrderedExecutorFactory;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
+ *
  * @version <tt>$Revision$</tt>
- * 
+ *
  */
 public class MinaHandler extends IoHandlerAdapter implements
       PacketHandlerRegistrationListener
@@ -37,13 +36,15 @@
 
    private static final Logger log = Logger.getLogger(MinaHandler.class);
 
+   private static final AttributeKey BLOCKED = new AttributeKey(MinaHandler.class, "blocked");
+
    private static boolean trace = log.isTraceEnabled();
-      
+
    // Attributes ----------------------------------------------------
 
    private final PacketDispatcher dispatcher;
 
-   private CleanUpNotifier failureNotifier;
+   private final CleanUpNotifier failureNotifier;
 
    private final boolean closeSessionOnExceptionCaught;
 
@@ -51,17 +52,15 @@
 
    // Note! must use ConcurrentMap here to avoid race condition
    private final ConcurrentMap<Long, Executor> executors = new ConcurrentHashMap<Long, Executor>();
-   
-   private boolean blocked;
-   
+
    private final long blockTimeout;
-  
+
    //TODO - this is screwed - I want this to be zero, but unfortunately in messageSent, the current
-   //messages byts haven't been substracted so this won't work!!
+   //messages bytes haven't been subtracted so this won't work!!
    private final long bytesLow;
-   
+
    private final long bytesHigh;
-     
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -86,11 +85,11 @@
       this.closeSessionOnExceptionCaught = closeSessionOnExceptionCaught;
       if (useExecutor)
       {
-         this.executorFactory = new OrderedExecutorFactory(executorService);
+         executorFactory = new OrderedExecutorFactory(executorService);
       }
       else
       {
-         this.executorFactory = null;
+         executorFactory = null;
       }
       this.dispatcher.setListener(this);
    }
@@ -112,6 +111,12 @@
    // IoHandlerAdapter overrides ------------------------------------
 
    @Override
+   public void sessionCreated(IoSession session) throws Exception {
+      // Initialize the default attributes.
+      session.setAttribute(BLOCKED, Boolean.FALSE);
+   }
+
+   @Override
    public void exceptionCaught(final IoSession session, final Throwable cause)
          throws Exception
    {
@@ -136,25 +141,25 @@
    throws Exception
    {
       final Packet packet = (Packet) message;
-                 
+
       if (executorFactory != null)
-      {         
+      {
          long executorID = packet.getExecutorID();
-   
+
          Executor executor = executors.get(executorID);
          if (executor == null)
          {
             executor = executorFactory.getOrderedExecutor();
-   
+
             Executor oldExecutor = executors.putIfAbsent(executorID, executor);
-   
+
             if (oldExecutor != null)
             {
                //Avoid race
                executor = oldExecutor;
             }
          }
-         
+
          executor.execute(new Runnable()
          {
             public void run()
@@ -178,54 +183,55 @@
 
    @Override
    public synchronized void messageSent(final IoSession session, final Object message) throws Exception
-   {      
+   {
+      boolean blocked = (Boolean) session.getAttribute(BLOCKED);
       if (blocked)
       {
          long bytes = session.getScheduledWriteBytes();
-                      
+
          if (bytes <= bytesLow)
          {
-            blocked = false;
-   
+            session.setAttribute(BLOCKED, Boolean.FALSE);
+
             //Note that we need to notify all since there may be more than one thread waiting on this
             //E.g. the response from a blocking acknowledge and a delivery
-            notifyAll();            
+            notifyAll();
          }
       }
    }
-   
+
    public synchronized void checkWrite(final IoSession session) throws Exception
    {
       while (session.getScheduledWriteBytes() >= bytesHigh)
       {
-         blocked = true;
-                  
+         session.setAttribute(BLOCKED, Boolean.TRUE);
+
          long start = System.currentTimeMillis();
-         
+
          long toWait = blockTimeout;
-         
+
          do
          {
             wait(toWait);
-            
+
             if (session.getScheduledWriteBytes() < bytesHigh)
             {
                break;
             }
-            
+
             long now = System.currentTimeMillis();
-            
+
             toWait -= now - start;
-            
+
             start = now;
          }
          while (toWait > 0);
-         
+
          if (toWait <= 0)
          {
             throw new IllegalStateException("Timed out waiting for MINA queue to free");
          }
-      }      
+      }
    }
 
    // Package protected ---------------------------------------------
@@ -239,7 +245,7 @@
    {
       PacketReturner returner;
 
-      if (packet.getResponseTargetID() != EmptyPacket.NO_ID_SET)
+      if (packet.getResponseTargetID() != Packet.NO_ID_SET)
       {
          returner = new PacketReturner()
          {
@@ -253,7 +259,7 @@
                {
                   log.error("Failed to acquire sem", e);
                }
-               
+
                dispatcher.callFilters(p);
 
                session.write(p);
@@ -275,7 +281,9 @@
          returner = null;
       }
 
-      if (trace) log.trace("received packet " + packet);
+      if (trace) {
+        log.trace("received packet " + packet);
+    }
 
       dispatcher.dispatch(packet, returner);
    }




More information about the jboss-cvs-commits mailing list