[jboss-cvs] JBoss Messaging SVN: r1674 - in branches/Branch_1_0_1_SP: src/main/org/jboss/jms/server/endpoint tests/src/org/jboss/test/messaging/jms

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Dec 1 15:55:33 EST 2006


Author: timfox
Date: 2006-12-01 15:55:27 -0500 (Fri, 01 Dec 2006)
New Revision: 1674

Modified:
   branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
Log:
More on fix



Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-12-01 20:09:48 UTC (rev 1673)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-12-01 20:55:27 UTC (rev 1674)
@@ -222,15 +222,8 @@
          // channel for delivery later.
          if (!started)
          {
-            // this is a common programming error, make this visible in the debug logs. However,
-            // make also possible to cut out the performance overhead for systems that raise the
-            // threshold to INFO or higher.
+            if (trace) { log.debug(this + " NOT started yet!"); }
 
-            if (log.isDebugEnabled())
-            {
-               log.debug(this + " NOT started yet!");
-            }
-
             return null;
          }
 
@@ -529,22 +522,27 @@
    {         
       if (trace) log.trace("attempting to remove receiver " + this + " from destination " + channel);
       
-      boolean wereDeliveries = false;
-      for(Iterator i = deliveries.values().iterator(); i.hasNext(); )
-      {
-         SingleReceiverDelivery d = (SingleReceiverDelivery)i.next();
-
-         d.cancel();
-         wereDeliveries = true;
-      }
-      deliveries.clear();           
+      boolean wereDeliveries = false;       
       
-      if (!disconnected)
-      {
-         if (!closed)
+      synchronized (lock)
+      {          
+         for (Iterator i = deliveries.values().iterator(); i.hasNext(); )
          {
-            close();
+            SingleReceiverDelivery d = (SingleReceiverDelivery)i.next();
+   
+            d.cancel();
+            wereDeliveries = true;
          }
+               
+         deliveries.clear();           
+         
+         if (!disconnected)
+         {
+            if (!closed)
+            {
+               close();
+            }
+         }
       }
       
       sessionEndpoint.getConnectionEndpoint().
@@ -599,8 +597,13 @@
    
    protected void cancelDelivery(Long messageID, int deliveryCount) throws Throwable
    {
-      SingleReceiverDelivery del = (SingleReceiverDelivery)deliveries.remove(messageID);
+      SingleReceiverDelivery del = null;
       
+      synchronized (lock)
+      {         
+         del = (SingleReceiverDelivery)deliveries.remove(messageID);
+      }
+      
       if (del != null)
       {                               
          //Cancel back to the queue
@@ -655,10 +658,10 @@
          {
             return;
          }
+      }
          
-         started = false;
-      }
-      
+      started = false;      
+   
       //Now we know no more messages will be accepted in the SCE
             
       try
@@ -687,22 +690,19 @@
             
       if (!toDeliver.isEmpty())
       { 
-         synchronized (lock)
+         for (int i = toDeliver.size() - 1; i >= 0; i--)
          {
-            for (int i = toDeliver.size() - 1; i >= 0; i--)
-            {
-               MessageProxy proxy = (MessageProxy)toDeliver.get(i);
-               
-               long id = proxy.getMessage().getMessageID();
-               
-               cancelDelivery(new Long(id), proxy.getMessage().getDeliveryCount());
-            }
+            MessageProxy proxy = (MessageProxy)toDeliver.get(i);
+            
+            long id = proxy.getMessage().getMessageID();
+            
+            cancelDelivery(new Long(id), proxy.getMessage().getDeliveryCount());
          }
-                 
+             
          toDeliver.clear();
          
          bufferFull = false;
-      }      
+      }            
    }
       
    // Private -------------------------------------------------------
@@ -862,15 +862,17 @@
       
       public synchronized void afterCommit(boolean onePhase) throws TransactionException
       {
-         // Remove the deliveries from the delivery map.
-         Iterator iter = delList.iterator();
-         while (iter.hasNext())
+         synchronized (lock)
          {
-            Long messageID = (Long)iter.next();
-            
-            if (deliveries.remove(messageID) == null)
+            // Remove the deliveries from the delivery map.
+            Iterator iter = delList.iterator();
+            while (iter.hasNext())
             {
-               throw new TransactionException("Failed to remove delivery " + messageID);
+               Long messageID = (Long)iter.next();
+               
+               //The message might have been removed already if the consumer was closed for instance
+               //This is ok
+               deliveries.remove(messageID);               
             }
          }
       }

Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-12-01 20:09:48 UTC (rev 1673)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-12-01 20:55:27 UTC (rev 1674)
@@ -384,7 +384,7 @@
       }
    }
 
-   public void close() throws JMSException
+   public synchronized void close() throws JMSException
    {
       try
       {
@@ -438,10 +438,15 @@
       }
    }
    
-   public void acknowledgeBatch(List ackInfos) throws JMSException
+   public synchronized void acknowledgeBatch(List ackInfos) throws JMSException
    {      
       try
       {
+         if (closed)
+         {
+            throw new javax.jms.IllegalStateException("Cannot acknowledge batch since session is closed");
+         }
+                  
          Iterator iter = ackInfos.iterator();
          
          while (iter.hasNext())
@@ -457,10 +462,15 @@
       }
    }
    
-   public void acknowledge(AckInfo ackInfo) throws JMSException
+   public synchronized void acknowledge(AckInfo ackInfo) throws JMSException
    {
       try
       {
+         if (closed)
+         {
+            throw new javax.jms.IllegalStateException("Cannot acknowledge " + ackInfo + " since session is closed");
+         }
+         
          acknowledgeInternal(ackInfo);      
       }
       catch (Throwable t)
@@ -469,10 +479,19 @@
       }
    }      
         
-   public void cancelDeliveries(List ackInfos) throws JMSException
+   public synchronized void cancelDeliveries(List ackInfos) throws JMSException
    {
       try
       {
+         if (closed)
+         {
+            //We can safely ignore any cancels since closing the session will have cancelled anything
+            //remaining
+            log.warn("Call to cancelDeliveries came in after session was closed");
+            
+            return;
+         }
+         
          // deliveries must be cancelled in reverse order
           
          Set consumers = new HashSet();

Modified: branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2006-12-01 20:09:48 UTC (rev 1673)
+++ branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2006-12-01 20:55:27 UTC (rev 1674)
@@ -1012,6 +1012,9 @@
           assertEquals("hello1", rm1.getText());
 
           cons1.close();
+          
+          //Give time for asynch cancel to happen
+          Thread.sleep(500);
          
           log.debug("sess.recover()");
 




More information about the jboss-cvs-commits mailing list