[Jboss-cvs] JBoss Messaging SVN: r1277 - in branches/Branch_1_0/src/main/org/jboss/jms/client: container remoting

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Sep 11 20:32:37 EDT 2006


Author: ovidiu.feodorov at jboss.com
Date: 2006-09-11 20:32:35 -0400 (Mon, 11 Sep 2006)
New Revision: 1277

Modified:
   branches/Branch_1_0/src/main/org/jboss/jms/client/container/AsfAspect.java
   branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java
   branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
Log:
cleaned some method interfaces and added tentative fix (currently commented out) for http://jira.jboss.org/jira/browse/JBMESSAGING-542

Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/container/AsfAspect.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/container/AsfAspect.java	2006-09-12 00:30:28 UTC (rev 1276)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/container/AsfAspect.java	2006-09-12 00:32:35 UTC (rev 1277)
@@ -162,8 +162,7 @@
 
          if (trace) { log.trace("sending " + holder.msg + " to the message listener" ); }
          
-         MessageCallbackHandler.callOnMessage(holder.consumerDelegate, del,
-                                              sessionListener, holder.consumerID, false,
+         MessageCallbackHandler.callOnMessage(del, sessionListener, holder.consumerID, false,
                                               holder.msg, ackMode);                          
       }
       

Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java	2006-09-12 00:30:28 UTC (rev 1276)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java	2006-09-12 00:32:35 UTC (rev 1277)
@@ -137,7 +137,7 @@
       {
          SessionDelegate del = (SessionDelegate)mi.getTargetObject();
          
-         //We acknowledge immediately
+         // We acknowledge immediately
          
          if (!state.isRecoverCalled())
          {

Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-09-12 00:30:28 UTC (rev 1276)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-09-12 00:32:35 UTC (rev 1277)
@@ -64,11 +64,10 @@
       trace = log.isTraceEnabled();
    }
      
-   //Hardcoded for now
+   // Hardcoded for now
    private static final int MAX_REDELIVERIES = 10;
       
-   public static void callOnMessage(ConsumerDelegate cons,
-                                    SessionDelegate sess,
+   public static void callOnMessage(SessionDelegate sess,
                                     MessageListener listener,
                                     int consumerID,
                                     boolean isConnectionConsumer,
@@ -76,15 +75,19 @@
                                     int ackMode)
       throws JMSException
    {
-      preDeliver(sess, consumerID, m, isConnectionConsumer);  
+      preDeliver(sess, consumerID, m, isConnectionConsumer);
                   
       int tries = 0;
       
       while (true)
       {
          try
-         {      
-            listener.onMessage(m); 
+         {
+            if (trace) { log.trace("calling listener's onMessage(" + m + ")"); }
+
+            listener.onMessage(m);
+
+            if (trace) { log.trace("listener's onMessage() finished"); }
             
             break;
          }
@@ -103,9 +106,9 @@
                {
                   m.setJMSRedelivered(true);
                   
-                  //TODO delivery count although optional should be global
-                  //so we need to send it back to the server
-                  //but this has performance hit so perhaps we just don't support it?
+                  // TODO delivery count although optional should be global so we need to send it
+                  // back to the server but this has performance hit so perhaps we just don't
+                  // support it?
                   m.incDeliveryCount();
                   
                   tries++;
@@ -130,7 +133,7 @@
          }
       }
             
-      postDeliver(sess, consumerID, m, isConnectionConsumer);          
+      postDeliver(sess, isConnectionConsumer);
    }
    
    protected static void preDeliver(SessionDelegate sess,
@@ -147,10 +150,7 @@
       }         
    }
    
-   protected static void postDeliver(SessionDelegate sess,
-                                     int consumerID,
-                                     MessageProxy m,
-                                     boolean isConnectionConsumer)
+   protected static void postDeliver(SessionDelegate sess, boolean isConnectionConsumer)
       throws JMSException
    {
       // If this is the callback-handler for a connection consumer we don't want to acknowledge or
@@ -203,15 +203,15 @@
       }
               
       this.bufferSize = bufferSize;
-      
+
       buffer = new LinkedList();
-      
+
       isConnectionConsumer = isCC;
       
       this.ackMode = ackMode;
-      
+
       this.sessionDelegate = sess;
-      
+
       this.consumerDelegate = cons;
       
       this.consumerID = consumerID;
@@ -302,13 +302,13 @@
          
          if (receiverThread != null)
          {            
-            //Wake up any receive() thread that might be waiting
+            // Wake up any receive() thread that might be waiting
             mainLock.notify();
          }   
          
          this.listener = null;
       }
-         
+
       waitForOnMessageToComplete();
       
       // Now we cancel anything left in the buffer. The reason we do this now is that otherwise the
@@ -342,14 +342,22 @@
    
    private void waitForOnMessageToComplete()
    {
-      // Wait for any on message executions to complete
-      
+      // Wait for any onMessage() executions to complete
+
+//      if (Thread.currentThread().equals(sessionExecutor.getThread()))
+//      {
+//         // the current thread already closing this MessageCallbackHandler, so no need to register
+//         // another Closer (see http://jira.jboss.org/jira/browse/JBMESSAGING-542)
+//         return;
+//      }
+
       Future result = new Future();
       
       try
       {
-         this.sessionExecutor.execute(new Closer(result));
-         
+         sessionExecutor.execute(new Closer(result));
+
+         if (trace) { log.trace("blocking wait for Closer execution"); }
          result.getResult();
       }
       catch (InterruptedException e)
@@ -439,7 +447,7 @@
                // message is acknowledged so it gets removed from the queue/subscription.
                preDeliver(sessionDelegate, consumerID, m, isConnectionConsumer);
                
-               postDeliver(sessionDelegate, consumerID, m, isConnectionConsumer);
+               postDeliver(sessionDelegate, isConnectionConsumer);
                
                if (!m.getMessage().isExpired())
                {
@@ -590,9 +598,9 @@
       {         
          MessageProxy msg = (MessageProxy)iter.next();
       
-         //if this is the handler for a connection consumer we don't want to set the session delegate
-         //since this is only used for client acknowledgement which is illegal for a session
-         //used for an MDB
+         // If this is the handler for a connection consumer we don't want to set the session
+         // delegate since this is only used for client acknowledgement which is illegal for a
+         // session used for an MDB
          msg.setSessionDelegate(sessionDelegate, isConnectionConsumer);
                   
          msg.setReceived();
@@ -628,7 +636,7 @@
          {
             listenerRunning = true;
 
-            if (trace) { log.trace(this + ": new ListenerRunner scheduled"); }
+            if (trace) { log.trace(this + " scheduled a new ListenerRunner"); }
             this.queueRunner(new ListenerRunner());
          }     
          
@@ -654,6 +662,7 @@
       public void run()
       {
          result.setResult(null);
+         if (trace) { log.trace("Closer finished run"); }
       }
    }
    
@@ -673,15 +682,16 @@
             if (listener == null)
             {
                listenerRunning = false;
-               
+               if (trace) { log.trace("no listener, returning"); }
                return;
             }
             
-            //remove a message from the buffer
+            // remove a message from the buffer
 
             if (buffer.isEmpty())
             {
-               listenerRunning = false;               
+               listenerRunning = false;
+               if (trace) { log.trace("no messages in buffer, marking listener as not running"); }
             }
             else
             {               
@@ -697,6 +707,7 @@
                if (!again)
                {
                   listenerRunning  = false;
+                  if (trace) { log.trace("no more messages in buffer, marking listener as not running"); }
                }  
             }
          }
@@ -705,7 +716,7 @@
          {
             try
             {
-               callOnMessage(consumerDelegate, sessionDelegate, listener, consumerID, false, mp, ackMode);
+               callOnMessage(sessionDelegate, listener, consumerID, false, mp, ackMode);
             }
             catch (JMSException e)
             {
@@ -715,14 +726,14 @@
          
          if (again)
          {
-            //Queue it up again
+            // Queue it up again
             queueRunner(this);
          }
          else
          {
             if (!serverSending)
             {
-               //Ask server for more messages
+               // Ask server for more messages
                try
                {
                   consumerDelegate.more();




More information about the jboss-cvs-commits mailing list