[jboss-cvs] JBoss Messaging SVN: r1669 - branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Nov 30 19:45:28 EST 2006


Author: clebert.suconic at jboss.com
Date: 2006-11-30 19:45:26 -0500 (Thu, 30 Nov 2006)
New Revision: 1669

Modified:
   branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-660 - Tim provided by Tim

Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-12-01 00:16:45 UTC (rev 1668)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-12-01 00:45:26 UTC (rev 1669)
@@ -21,17 +21,16 @@
  */
 package org.jboss.jms.server.endpoint;
 
-
+import EDU.oswego.cs.dl.util.concurrent.Executor;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-
 import javax.jms.IllegalStateException;
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
-
 import org.jboss.jms.client.remoting.HandleMessageResponse;
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.message.JBossMessage;
@@ -59,9 +58,6 @@
 import org.jboss.messaging.core.tx.TxCallback;
 import org.jboss.messaging.util.Future;
 
-import EDU.oswego.cs.dl.util.concurrent.Executor;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
 /**
  * Concrete implementation of ConsumerEndpoint. Lives on the boundary between Messaging Core and the
  * JMS Facade.
@@ -80,7 +76,7 @@
 
    private static final Logger log = Logger.getLogger(ServerConsumerEndpoint.class);
 
-   // Static --------------------------------------------------------  
+   // Static --------------------------------------------------------
 
    // Attributes ----------------------------------------------------
 
@@ -122,9 +118,9 @@
    private Object lock;
 
    private Map deliveries;
-   
+
    private CoreDestination dlq;
-   
+
    // Constructors --------------------------------------------------
 
    protected ServerConsumerEndpoint(int id, Channel channel,
@@ -152,7 +148,7 @@
          sessionEndpoint.getConnectionEndpoint().getServerPeer().getQueuedExecutorPool();
 
       this.executor = (QueuedExecutor)pool.get("consumer" + id);
-             
+
       // Note that using a PooledExecutor with a linked queue is not sufficient to ensure that
       // deliveries for the same consumer happen serially, since even if they are queued serially
       // the actual deliveries can happen in parallel, resulting in a later one "overtaking" an
@@ -166,9 +162,9 @@
 
       this.noLocal = noLocal;
       this.destination = dest;
-      
+
       this.toDeliver = new ArrayList();
-      
+
       this.lock = new Object();
 
       if (selector != null)
@@ -178,7 +174,7 @@
          if (trace) log.trace("created selector");
       }
 
-      //FIXME - 
+      //FIXME -
       //We really need to get rid of this delivery list - it's only purpose in life is to solve
       //the race condition where acks or cancels can come in before handle has returned - and
       //that can be solved in a simpler way anyway.
@@ -187,15 +183,15 @@
       //and when we do clustering we will have to replicate it too!!
       //Let's GET RID OF IT!!!!!!!!!!!
       this.deliveries = new LinkedHashMap();
-            
+
       this.started = this.sessionEndpoint.getConnectionEndpoint().isStarted();
 
       // adding the consumer to the channel
       this.channel.add(this);
-      
+
       // prompt delivery
       channel.deliver(false);
-      
+
       log.debug(this + " constructed");
    }
 
@@ -207,21 +203,21 @@
    public Delivery handle(DeliveryObserver observer, Routable reference, Transaction tx)
    {
       if (trace) { log.trace(this + " receives " + reference + " for delivery"); }
-      
+
       // This is ok to have outside lock - is volatile
       if (bufferFull)
       {
          // We buffer a maximum of PREFETCH_LIMIT messages at once
-         
+
          if (trace) { log.trace(this + " has reached prefetch size will not accept any more references"); }
-         
+
          return null;
       }
-       
+
       // Need to synchronized around the whole block to prevent setting started = false
       // but handle is already running and a message is deposited during the stop procedure.
       synchronized (lock)
-      {  
+      {
          // If the consumer is stopped then we don't accept the message, it should go back into the
          // channel for delivery later.
          if (!started)
@@ -241,40 +237,40 @@
          if (trace) { log.trace(this + " has the main lock, preparing the message for delivery"); }
 
          MessageReference ref = (MessageReference)reference;
-                     
+
          JBossMessage message = (JBossMessage)ref.getMessage();
-         
+
          boolean selectorRejected = !this.accept(message);
-   
+
          SimpleDelivery delivery = new SimpleDelivery(observer, ref, false, !selectorRejected);
-         
+
          if (selectorRejected)
          {
             return delivery;
          }
-                 
+
          if (delivery.isDone())
          {
             return delivery;
          }
-   
-         deliveries.put(new Long(ref.getMessageID()), delivery);                 
-   
+
+         deliveries.put(new Long(ref.getMessageID()), delivery);
+
          // We don't send the message as-is, instead we create a MessageProxy instance. This allows
          // local fields such as deliveryCount to be handled by the proxy but global data to be
          // fielded by the same underlying Message instance. This allows us to avoid expensive
          // copying of messages
-   
+
          MessageProxy mp = JBossMessage.createThinDelegate(message, ref.getDeliveryCount());
-    
+
          // Add the proxy to the list to deliver
-                           
-         toDeliver.add(mp);     
-          
+
+         toDeliver.add(mp);
+
          bufferFull = toDeliver.size() >= prefetchSize;
-             
+
          if (!clientConsumerFull)
-         {            
+         {
             try
             {
                if (trace) { log.trace(this + " scheduling a new Deliverer"); }
@@ -285,12 +281,12 @@
                log.warn("Thread interrupted", e);
             }
          }
-                             
-         return delivery;              
+
+         return delivery;
       }
-   }      
-   
+   }
 
+
    // Filter implementation -----------------------------------------
 
    public boolean accept(Routable r)
@@ -303,7 +299,7 @@
          if (messageSelector != null)
          {
             accept = messageSelector.accept(r);
-   
+
             if (trace) { log.trace("message selector " + (accept ? "accepts " :  "DOES NOT accept ") + "the message"); }
          }
       }
@@ -332,21 +328,21 @@
       try
       {
          if (trace) { log.trace(this + " closing"); }
-         
-         stop(); 
+
+         stop();
       }
       catch (Throwable t)
       {
          throw ExceptionUtil.handleJMSInvocation(t, this + " closing");
-      }     
+      }
    }
-   
+
    public void close() throws JMSException
-   {      
+   {
       try
       {
          synchronized (lock)
-         { 
+         {
             // On close we only disconnect the consumer from the Channel we don't actually remove
             // it. This is because it may still contain deliveries that may well be acknowledged
             // after the consumer has closed. This is perfectly valid.
@@ -356,11 +352,11 @@
             // keeping deliveries after this is closed.
 
             if (trace) { log.trace(this + " grabbed the main lock in close()"); }
-                  
-            disconnect(); 
-            
+
+            disconnect();
+
             JMSDispatcher.instance.unregisterTarget(new Integer(id));
-            
+
             // If it's a subscription, remove it
             if (channel instanceof Subscription)
             {
@@ -368,10 +364,10 @@
                if (!sub.isRecoverable())
                {
                   //We don't disconnect durable subs
-                  sub.disconnect();                  
-               }            
-            } 
-            
+                  sub.disconnect();
+               }
+            }
+
             // If it's non recoverable, i.e. it's a non durable sub or a temporary queue then remove
             // all its references
 
@@ -379,7 +375,7 @@
             {
                channel.removeAllReferences();
             }
-            
+
             closed = true;
          }
       }
@@ -393,15 +389,15 @@
    {
       return closed;
    }
-               
+
    // ConsumerEndpoint implementation -------------------------------
-   
+
    /*
     * This is called by the client consumer to tell the server to wake up and start sending more
     * messages if available
     */
    public void more() throws JMSException
-   {           
+   {
       try
       {
          /*
@@ -416,83 +412,90 @@
          5) The next deliverer runs but doesn't do anything since clientConsumerFull = true even
          though the client needs messages
          */
-         this.executor.execute(new Runnable() { public void run() { clientConsumerFull = false; } });         
-                           
+         this.executor.execute(new Runnable() { public void run() { clientConsumerFull = false; } });
+
          //Run a deliverer to deliver any existing ones
          this.executor.execute(new Deliverer());
-         
+
          //TODO Why do we need to wait for it to execute??
          //Why not just return immediately?
-         
+
          //Now wait for it to execute
          Future result = new Future();
-         
+
          this.executor.execute(new Waiter(result));
-         
+
          result.getResult();
-                  
+
          //Now we know the deliverer has delivered any outstanding messages to the client buffer
-         
+
          channel.deliver(false);
       }
       catch (InterruptedException e)
       {
          log.warn("Thread interrupted", e);
-      }       
+      }
       catch (Throwable t)
       {
          throw ExceptionUtil.handleJMSInvocation(t, this + " more");
       }
    }
-   
-   
+
+
    // Public --------------------------------------------------------
-   
+
    public String toString()
    {
       return "ConsumerEndpoint[" + id + "]";
    }
-   
+
    public JBossDestination getDestination()
    {
       return destination;
    }
-   
+
    public ServerSessionEndpoint getSessionEndpoint()
    {
       return sessionEndpoint;
    }
-   
+
    public int getId()
    {
       return id;
    }
-    
+
    // Package protected ---------------------------------------------
-   
-   // Protected -----------------------------------------------------   
-   
+
+   // Protected -----------------------------------------------------
+
    protected void acknowledgeTransactionally(long messageID, Transaction tx) throws Throwable
    {
       if (trace) { log.trace("acknowledging transactionally " + messageID); }
-      
+
       SingleReceiverDelivery d = null;
-                 
+
       // The actual removal of the deliveries from the delivery list is deferred until tx commit
       synchronized (lock)
       {
+         if (closed)
+         {
+            //We must throw an exception to the client otherwise they may think the ack has succeeded
+            //This can happen if the connection is closed by another thread - e.g. the leasePinger
+            throw new IllegalStateException("Failed to acknowledge message, session has been closed");
+         }
+
          d = (SingleReceiverDelivery)deliveries.get(new Long(messageID));
       }
-      
+
       DeliveryCallback deliveryCallback = (DeliveryCallback)tx.getKeyedCallback(this);
-            
+
       if (deliveryCallback == null)
       {
          deliveryCallback = new DeliveryCallback();
          tx.addKeyedCallback(deliveryCallback, this);
       }
       deliveryCallback.addMessageID(messageID);
-         
+
       if (d != null)
       {
          d.acknowledge(tx);
@@ -500,29 +503,36 @@
       else
       {
          throw new IllegalStateException("Failed to acknowledge delivery " + d);
-      }             
-   }      
-   
+      }
+   }
+
    protected void acknowledge(long messageID) throws Throwable
-   {  
-      // acknowledge a delivery   
+   {
+      // acknowledge a delivery
       SingleReceiverDelivery d;
-        
+
       synchronized (lock)
       {
+         if (closed)
+         {
+            //We must throw an exception to the client otherwise they may think the ack has succeeded
+            //This can happen if the connection is closed by another thread - e.g. the leasePinger
+            throw new IllegalStateException("Failed to acknowledge message, session has been closed");
+         }
+
          d = (SingleReceiverDelivery)deliveries.remove(new Long(messageID));
       }
-      
+
       if (d != null)
       {
          d.acknowledge(null);
       }
       else
-      {     
+      {
          throw new IllegalStateException("Cannot find delivery to acknowledge:" + messageID);
-      }      
+      }
    }
-   
+
    /**
     * Actually remove the consumer and clear up any deliveries it may have
     * This is called by the session on session.close()
@@ -530,32 +540,33 @@
     *
     **/
    protected void remove() throws Throwable
-   {         
+   {
       if (trace) log.trace("attempting to remove receiver " + this + " from destination " + channel);
-      
+
       boolean wereDeliveries = false;
-      for(Iterator i = deliveries.values().iterator(); i.hasNext(); )
-      {
-         SingleReceiverDelivery d = (SingleReceiverDelivery)i.next();
 
-         d.cancel();
-         wereDeliveries = true;
-      }
-      deliveries.clear();           
-      
-      if (!disconnected)
+      synchronized (lock)
       {
-         if (!closed)
+         for(Iterator i = deliveries.values().iterator(); i.hasNext(); )
          {
-            close();
+            SingleReceiverDelivery d = (SingleReceiverDelivery)i.next();
+
+            d.cancel();
+            wereDeliveries = true;
          }
+         deliveries.clear();
       }
-      
+
+      if (!disconnected && !closed)
+      {
+         close();
+      }
+
       sessionEndpoint.getConnectionEndpoint().
-         getServerPeer().removeConsumerEndpoint(new Integer(id));                  
-            
+         getServerPeer().removeConsumerEndpoint(new Integer(id));
+
       sessionEndpoint.removeConsumerEndpoint(id);
-      
+
       if (wereDeliveries)
       {
          //If we cancelled any deliveries we need to force a deliver on the channel
@@ -563,34 +574,34 @@
          //any of the cancelled messages
          channel.deliver(false);
       }
-   }  
-   
+   }
+
    protected void promptDelivery()
    {
       channel.deliver(false);
    }
-   
+
    protected void sendToDLQ(Long messageID, Transaction tx) throws Throwable
    {
       SingleReceiverDelivery del = (SingleReceiverDelivery)deliveries.remove(messageID);
-      
+
       if (del != null)
-      { 
+      {
          log.warn(del.getReference() + " has exceed maximum delivery attempts and will be sent to the DLQ");
-         
+
          if (dlq != null)
-         {         
+         {
             //reset delivery count to zero
             del.getReference().setDeliveryCount(0);
-            
+
             dlq.handle(null, del.getReference(), tx);
-            
-            del.acknowledge(tx);           
+
+            del.acknowledge(tx);
          }
          else
          {
             log.warn("Cannot send to DLQ since DLQ has not been deployed! The message will be removed");
-            
+
             del.acknowledge(tx);
          }
       }
@@ -598,31 +609,44 @@
       {
          throw new IllegalStateException("Cannot find delivery to send to DLQ:" + id);
       }
-      
+
    }
-   
+
    protected void cancelDelivery(Long messageID, int deliveryCount) throws Throwable
    {
-      SingleReceiverDelivery del = (SingleReceiverDelivery)deliveries.remove(messageID);
-      
+      SingleReceiverDelivery del;
+
+      synchronized (lock)
+      {
+         if (closed)
+         {
+            //This can happen if the connection is closed by another thread - e.g. the leasePinger
+            //We should then ignore the cancel since the delivery will have already been cancelled
+            //back to the queue
+            return;
+         }
+
+         del = (SingleReceiverDelivery)deliveries.remove(messageID);
+      }
+
       if (del != null)
-      {                               
+      {
          //Cancel back to the queue
-         
+
          //Update the delivery count
-           
+
          del.getReference().setDeliveryCount(deliveryCount);
-              
-         del.cancel();         
+
+         del.cancel();
       }
       else
       {
          throw new IllegalStateException("Cannot find delivery to cancel:" + id);
       }
    }
-               
+
    protected void start()
-   {             
+   {
       synchronized (lock)
       {
          // Can't start or stop it if it is closed.
@@ -630,21 +654,21 @@
          {
             return;
          }
-         
+
          if (started)
          {
             return;
          }
-         
+
          started = true;
       }
-      
+
       // Prompt delivery
       channel.deliver(false);
    }
-   
+
    protected void stop() throws Throwable
-   {     
+   {
       //We need to:
       //Stop accepting any new messages in the SCE
       //Flush any messages from the SCE to the buffer
@@ -659,64 +683,58 @@
          {
             return;
          }
-         
+
          started = false;
       }
-      
+
       //Now we know no more messages will be accepted in the SCE
-            
+
       try
       {
          //Flush any messages waiting to be sent to the client
          this.executor.execute(new Deliverer());
-         
+
          //Now wait for it to execute
          Future result = new Future();
-         
+
          this.executor.execute(new Waiter(result));
-         
+
          result.getResult();
-             
+
          //Now we know any deliverer has delivered any outstanding messages to the client buffer
       }
       catch (InterruptedException e)
       {
          log.warn("Thread interrupted", e);
       }
-            
+
       //Now we know that there are no in flight messages on the way to the client consumer.
-      
+
       //But there may be messages still in the toDeliver list since the client consumer might be full
       //So we need to cancel these
-            
+
       if (!toDeliver.isEmpty())
       {
-         // this is to avoid the deadlock described in http://jira.jboss.com/jira/browse/JBMESSAGING-660
-         List toDeliverCopy = new ArrayList(toDeliver.size());
          synchronized (lock)
          {
             for (int i = toDeliver.size() - 1; i >= 0; i--)
             {
-               toDeliverCopy.add(toDeliver.get(i));
-            }
+               MessageProxy proxy = (MessageProxy)toDeliver.get(i);
 
-            toDeliver.clear();
-         }
+               long id = proxy.getMessage().getMessageID();
 
-         for(Iterator i = toDeliverCopy.iterator(); i.hasNext();)
-         {
-            MessageProxy proxy = (MessageProxy)i.next();
-            long id = proxy.getMessage().getMessageID();
-            cancelDelivery(new Long(id), proxy.getMessage().getDeliveryCount());
+               cancelDelivery(new Long(id), proxy.getMessage().getDeliveryCount());
+            }
          }
 
+         toDeliver.clear();
+
          bufferFull = false;
+      }
+   }
 
-      }      
-   }
-      
    // Private -------------------------------------------------------
-   
+
    /**
     * Disconnect this consumer from the Channel that feeds it. This method does not clear up
     * deliveries
@@ -724,18 +742,18 @@
    private void disconnect()
    {
       boolean removed = channel.remove(this);
-      
+
       if (removed)
       {
          disconnected = true;
          if (trace) { log.trace(this + " removed from the channel"); }
       }
    }
-        
-   // Inner classes -------------------------------------------------   
-   
+
+   // Inner classes -------------------------------------------------
+
    /*
-    * Delivers messages to the client 
+    * Delivers messages to the client
     * TODO - We can make this a bit more intelligent by letting it measure the rate
     * the client is consuming messages and send messages at that rate.
     * This would mean the client consumer wouldn't be full so often and more wouldn't have to be called
@@ -752,9 +770,9 @@
             if (trace) { log.trace(this + " client consumer full, do nothing"); }
             return;
          }
-         
+
          List list = null;
-             
+
          synchronized (lock)
          {
             if (trace) { log.trace(this + " has the main lock, attempting delivery"); }
@@ -766,7 +784,7 @@
                bufferFull = false;
             }
          }
-                                                           
+
          if (list == null)
          {
             if (trace) { log.trace(this + " has a null list, returning"); }
@@ -830,7 +848,7 @@
          return "Deliverer[" + Integer.toHexString(hashCode()) + "]";
       }
    }
-   
+
    /*
     * The purpose of this class is to put it on the QueuedExecutor and wait for it to run
     * We can then ensure that all the Runnables in front of it on the queue have also executed
@@ -841,20 +859,20 @@
    private class Waiter implements Runnable
    {
       Future result;
-      
+
       Waiter(Future result)
       {
          this.result = result;
       }
-      
+
       public void run()
       {
          result.setResult(null);
       }
    }
-   
+
    /**
-    * 
+    *
     * The purpose of this class is to remove deliveries from the delivery list on commit
     * Each transaction has once instance of this per SCE
     *
@@ -862,27 +880,27 @@
    private class DeliveryCallback implements TxCallback
    {
       List delList = new ArrayList();
-         
+
       public void beforePrepare()
-      {         
+      {
          //NOOP
       }
-      
+
       public void beforeCommit(boolean onePhase)
-      {         
+      {
          //NOOP
       }
-      
+
       public void beforeRollback(boolean onePhase)
-      {         
+      {
          //NOOP
       }
-      
+
       public void afterPrepare()
-      {         
+      {
          //NOOP
       }
-      
+
       public synchronized void afterCommit(boolean onePhase) throws TransactionException
       {
          // Remove the deliveries from the delivery map.
@@ -890,23 +908,23 @@
          while (iter.hasNext())
          {
             Long messageID = (Long)iter.next();
-            
+
             if (deliveries.remove(messageID) == null)
             {
                throw new TransactionException("Failed to remove delivery " + messageID);
             }
          }
       }
-      
+
       public void afterRollback(boolean onePhase) throws TransactionException
-      {                            
+      {
          //NOOP
       }
-      
+
       synchronized void addMessageID(long messageID)
       {
          delList.add(new Long(messageID));
       }
    }
-   
+
 }




More information about the jboss-cvs-commits mailing list