[jboss-cvs] JBoss Messaging SVN: r1672 - in branches/Branch_1_0_1_SP: src/main/org/jboss/jms/server/endpoint src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin tests/src/org/jboss/test/messaging/jms

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Dec 1 14:37:17 EST 2006


Author: timfox
Date: 2006-12-01 14:37:10 -0500 (Fri, 01 Dec 2006)
New Revision: 1672

Modified:
   branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java
   branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
   branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
Log:
More for patch



Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-12-01 09:47:27 UTC (rev 1671)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-12-01 19:37:10 UTC (rev 1672)
@@ -76,7 +76,7 @@
 
    private static final Logger log = Logger.getLogger(ServerConsumerEndpoint.class);
 
-   // Static --------------------------------------------------------
+   // Static --------------------------------------------------------  
 
    // Attributes ----------------------------------------------------
 
@@ -118,9 +118,9 @@
    private Object lock;
 
    private Map deliveries;
-
+   
    private CoreDestination dlq;
-
+   
    // Constructors --------------------------------------------------
 
    protected ServerConsumerEndpoint(int id, Channel channel,
@@ -148,7 +148,7 @@
          sessionEndpoint.getConnectionEndpoint().getServerPeer().getQueuedExecutorPool();
 
       this.executor = (QueuedExecutor)pool.get("consumer" + id);
-
+             
       // Note that using a PooledExecutor with a linked queue is not sufficient to ensure that
       // deliveries for the same consumer happen serially, since even if they are queued serially
       // the actual deliveries can happen in parallel, resulting in a later one "overtaking" an
@@ -162,9 +162,9 @@
 
       this.noLocal = noLocal;
       this.destination = dest;
-
+      
       this.toDeliver = new ArrayList();
-
+      
       this.lock = new Object();
 
       if (selector != null)
@@ -174,7 +174,7 @@
          if (trace) log.trace("created selector");
       }
 
-      //FIXME -
+      //FIXME - 
       //We really need to get rid of this delivery list - it's only purpose in life is to solve
       //the race condition where acks or cancels can come in before handle has returned - and
       //that can be solved in a simpler way anyway.
@@ -183,15 +183,15 @@
       //and when we do clustering we will have to replicate it too!!
       //Let's GET RID OF IT!!!!!!!!!!!
       this.deliveries = new LinkedHashMap();
-
+            
       this.started = this.sessionEndpoint.getConnectionEndpoint().isStarted();
 
       // adding the consumer to the channel
       this.channel.add(this);
-
+      
       // prompt delivery
       channel.deliver(false);
-
+      
       log.debug(this + " constructed");
    }
 
@@ -203,21 +203,21 @@
    public Delivery handle(DeliveryObserver observer, Routable reference, Transaction tx)
    {
       if (trace) { log.trace(this + " receives " + reference + " for delivery"); }
-
+      
       // This is ok to have outside lock - is volatile
       if (bufferFull)
       {
          // We buffer a maximum of PREFETCH_LIMIT messages at once
-
+         
          if (trace) { log.trace(this + " has reached prefetch size will not accept any more references"); }
-
+         
          return null;
       }
-
+       
       // Need to synchronized around the whole block to prevent setting started = false
       // but handle is already running and a message is deposited during the stop procedure.
       synchronized (lock)
-      {
+      {  
          // If the consumer is stopped then we don't accept the message, it should go back into the
          // channel for delivery later.
          if (!started)
@@ -237,40 +237,40 @@
          if (trace) { log.trace(this + " has the main lock, preparing the message for delivery"); }
 
          MessageReference ref = (MessageReference)reference;
-
+                     
          JBossMessage message = (JBossMessage)ref.getMessage();
-
+         
          boolean selectorRejected = !this.accept(message);
-
+   
          SimpleDelivery delivery = new SimpleDelivery(observer, ref, false, !selectorRejected);
-
+         
          if (selectorRejected)
          {
             return delivery;
          }
-
+                 
          if (delivery.isDone())
          {
             return delivery;
          }
-
-         deliveries.put(new Long(ref.getMessageID()), delivery);
-
+   
+         deliveries.put(new Long(ref.getMessageID()), delivery);                 
+   
          // We don't send the message as-is, instead we create a MessageProxy instance. This allows
          // local fields such as deliveryCount to be handled by the proxy but global data to be
          // fielded by the same underlying Message instance. This allows us to avoid expensive
          // copying of messages
-
+   
          MessageProxy mp = JBossMessage.createThinDelegate(message, ref.getDeliveryCount());
-
+    
          // Add the proxy to the list to deliver
-
-         toDeliver.add(mp);
-
+                           
+         toDeliver.add(mp);     
+          
          bufferFull = toDeliver.size() >= prefetchSize;
-
+             
          if (!clientConsumerFull)
-         {
+         {            
             try
             {
                if (trace) { log.trace(this + " scheduling a new Deliverer"); }
@@ -281,12 +281,12 @@
                log.warn("Thread interrupted", e);
             }
          }
-
-         return delivery;
+                             
+         return delivery;              
       }
-   }
+   }      
+   
 
-
    // Filter implementation -----------------------------------------
 
    public boolean accept(Routable r)
@@ -299,7 +299,7 @@
          if (messageSelector != null)
          {
             accept = messageSelector.accept(r);
-
+   
             if (trace) { log.trace("message selector " + (accept ? "accepts " :  "DOES NOT accept ") + "the message"); }
          }
       }
@@ -328,21 +328,21 @@
       try
       {
          if (trace) { log.trace(this + " closing"); }
-
-         stop();
+         
+         stop(); 
       }
       catch (Throwable t)
       {
          throw ExceptionUtil.handleJMSInvocation(t, this + " closing");
-      }
+      }     
    }
-
+   
    public void close() throws JMSException
-   {
+   {      
       try
       {
          synchronized (lock)
-         {
+         { 
             // On close we only disconnect the consumer from the Channel we don't actually remove
             // it. This is because it may still contain deliveries that may well be acknowledged
             // after the consumer has closed. This is perfectly valid.
@@ -352,11 +352,11 @@
             // keeping deliveries after this is closed.
 
             if (trace) { log.trace(this + " grabbed the main lock in close()"); }
-
-            disconnect();
-
+                  
+            disconnect(); 
+            
             JMSDispatcher.instance.unregisterTarget(new Integer(id));
-
+            
             // If it's a subscription, remove it
             if (channel instanceof Subscription)
             {
@@ -364,10 +364,10 @@
                if (!sub.isRecoverable())
                {
                   //We don't disconnect durable subs
-                  sub.disconnect();
-               }
-            }
-
+                  sub.disconnect();                  
+               }            
+            } 
+            
             // If it's non recoverable, i.e. it's a non durable sub or a temporary queue then remove
             // all its references
 
@@ -375,7 +375,7 @@
             {
                channel.removeAllReferences();
             }
-
+            
             closed = true;
          }
       }
@@ -389,15 +389,15 @@
    {
       return closed;
    }
-
+               
    // ConsumerEndpoint implementation -------------------------------
-
+   
    /*
     * This is called by the client consumer to tell the server to wake up and start sending more
     * messages if available
     */
    public void more() throws JMSException
-   {
+   {           
       try
       {
          /*
@@ -412,90 +412,83 @@
          5) The next deliverer runs but doesn't do anything since clientConsumerFull = true even
          though the client needs messages
          */
-         this.executor.execute(new Runnable() { public void run() { clientConsumerFull = false; } });
-
+         this.executor.execute(new Runnable() { public void run() { clientConsumerFull = false; } });         
+                           
          //Run a deliverer to deliver any existing ones
          this.executor.execute(new Deliverer());
-
+         
          //TODO Why do we need to wait for it to execute??
          //Why not just return immediately?
-
+         
          //Now wait for it to execute
          Future result = new Future();
-
+         
          this.executor.execute(new Waiter(result));
-
+         
          result.getResult();
-
+                  
          //Now we know the deliverer has delivered any outstanding messages to the client buffer
-
+         
          channel.deliver(false);
       }
       catch (InterruptedException e)
       {
          log.warn("Thread interrupted", e);
-      }
+      }       
       catch (Throwable t)
       {
          throw ExceptionUtil.handleJMSInvocation(t, this + " more");
       }
    }
-
-
+   
+   
    // Public --------------------------------------------------------
-
+   
    public String toString()
    {
       return "ConsumerEndpoint[" + id + "]";
    }
-
+   
    public JBossDestination getDestination()
    {
       return destination;
    }
-
+   
    public ServerSessionEndpoint getSessionEndpoint()
    {
       return sessionEndpoint;
    }
-
+   
    public int getId()
    {
       return id;
    }
-
+    
    // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
+   
+   // Protected -----------------------------------------------------   
+   
    protected void acknowledgeTransactionally(long messageID, Transaction tx) throws Throwable
    {
       if (trace) { log.trace("acknowledging transactionally " + messageID); }
-
+      
       SingleReceiverDelivery d = null;
-
+                 
       // The actual removal of the deliveries from the delivery list is deferred until tx commit
       synchronized (lock)
       {
-         if (closed)
-         {
-            //We must throw an exception to the client otherwise they may think the ack has succeeded
-            //This can happen if the connection is closed by another thread - e.g. the leasePinger
-            throw new IllegalStateException("Failed to acknowledge message, session has been closed");
-         }
-
          d = (SingleReceiverDelivery)deliveries.get(new Long(messageID));
       }
-
+      
       DeliveryCallback deliveryCallback = (DeliveryCallback)tx.getKeyedCallback(this);
-
+            
       if (deliveryCallback == null)
       {
          deliveryCallback = new DeliveryCallback();
          tx.addKeyedCallback(deliveryCallback, this);
       }
       deliveryCallback.addMessageID(messageID);
-
+         
       if (d != null)
       {
          d.acknowledge(tx);
@@ -503,36 +496,29 @@
       else
       {
          throw new IllegalStateException("Failed to acknowledge delivery " + d);
-      }
-   }
-
+      }             
+   }      
+   
    protected void acknowledge(long messageID) throws Throwable
-   {
-      // acknowledge a delivery
+   {  
+      // acknowledge a delivery   
       SingleReceiverDelivery d;
-
+        
       synchronized (lock)
       {
-         if (closed)
-         {
-            //We must throw an exception to the client otherwise they may think the ack has succeeded
-            //This can happen if the connection is closed by another thread - e.g. the leasePinger
-            throw new IllegalStateException("Failed to acknowledge message, session has been closed");
-         }
-
          d = (SingleReceiverDelivery)deliveries.remove(new Long(messageID));
       }
-
+      
       if (d != null)
       {
          d.acknowledge(null);
       }
       else
-      {
+      {     
          throw new IllegalStateException("Cannot find delivery to acknowledge:" + messageID);
-      }
+      }      
    }
-
+   
    /**
     * Actually remove the consumer and clear up any deliveries it may have
     * This is called by the session on session.close()
@@ -540,33 +526,32 @@
     *
     **/
    protected void remove() throws Throwable
-   {
+   {         
       if (trace) log.trace("attempting to remove receiver " + this + " from destination " + channel);
-
+      
       boolean wereDeliveries = false;
+      for(Iterator i = deliveries.values().iterator(); i.hasNext(); )
+      {
+         SingleReceiverDelivery d = (SingleReceiverDelivery)i.next();
 
-      synchronized (lock)
+         d.cancel();
+         wereDeliveries = true;
+      }
+      deliveries.clear();           
+      
+      if (!disconnected)
       {
-         for(Iterator i = deliveries.values().iterator(); i.hasNext(); )
+         if (!closed)
          {
-            SingleReceiverDelivery d = (SingleReceiverDelivery)i.next();
-
-            d.cancel();
-            wereDeliveries = true;
+            close();
          }
-         deliveries.clear();
       }
-
-      if (!disconnected && !closed)
-      {
-         close();
-      }
-
+      
       sessionEndpoint.getConnectionEndpoint().
-         getServerPeer().removeConsumerEndpoint(new Integer(id));
-
+         getServerPeer().removeConsumerEndpoint(new Integer(id));                  
+            
       sessionEndpoint.removeConsumerEndpoint(id);
-
+      
       if (wereDeliveries)
       {
          //If we cancelled any deliveries we need to force a deliver on the channel
@@ -574,34 +559,34 @@
          //any of the cancelled messages
          channel.deliver(false);
       }
-   }
-
+   }  
+   
    protected void promptDelivery()
    {
       channel.deliver(false);
    }
-
+   
    protected void sendToDLQ(Long messageID, Transaction tx) throws Throwable
    {
       SingleReceiverDelivery del = (SingleReceiverDelivery)deliveries.remove(messageID);
-
+      
       if (del != null)
-      {
+      { 
          log.warn(del.getReference() + " has exceed maximum delivery attempts and will be sent to the DLQ");
-
+         
          if (dlq != null)
-         {
+         {         
             //reset delivery count to zero
             del.getReference().setDeliveryCount(0);
-
+            
             dlq.handle(null, del.getReference(), tx);
-
-            del.acknowledge(tx);
+            
+            del.acknowledge(tx);           
          }
          else
          {
             log.warn("Cannot send to DLQ since DLQ has not been deployed! The message will be removed");
-
+            
             del.acknowledge(tx);
          }
       }
@@ -609,44 +594,31 @@
       {
          throw new IllegalStateException("Cannot find delivery to send to DLQ:" + id);
       }
-
+      
    }
-
+   
    protected void cancelDelivery(Long messageID, int deliveryCount) throws Throwable
    {
-      SingleReceiverDelivery del;
-
-      synchronized (lock)
-      {
-         if (closed)
-         {
-            //This can happen if the connection is closed by another thread - e.g. the leasePinger
-            //We should then ignore the cancel since the delivery will have already been cancelled
-            //back to the queue
-            return;
-         }
-
-         del = (SingleReceiverDelivery)deliveries.remove(messageID);
-      }
-
+      SingleReceiverDelivery del = (SingleReceiverDelivery)deliveries.remove(messageID);
+      
       if (del != null)
-      {
+      {                               
          //Cancel back to the queue
-
+         
          //Update the delivery count
-
+           
          del.getReference().setDeliveryCount(deliveryCount);
-
-         del.cancel();
+              
+         del.cancel();         
       }
       else
       {
          throw new IllegalStateException("Cannot find delivery to cancel:" + id);
       }
    }
-
+               
    protected void start()
-   {
+   {             
       synchronized (lock)
       {
          // Can't start or stop it if it is closed.
@@ -654,21 +626,21 @@
          {
             return;
          }
-
+         
          if (started)
          {
             return;
          }
-
+         
          started = true;
       }
-
+      
       // Prompt delivery
       channel.deliver(false);
    }
-
+   
    protected void stop() throws Throwable
-   {
+   {     
       //We need to:
       //Stop accepting any new messages in the SCE
       //Flush any messages from the SCE to the buffer
@@ -683,58 +655,58 @@
          {
             return;
          }
-
+         
          started = false;
       }
-
+      
       //Now we know no more messages will be accepted in the SCE
-
+            
       try
       {
          //Flush any messages waiting to be sent to the client
          this.executor.execute(new Deliverer());
-
+         
          //Now wait for it to execute
          Future result = new Future();
-
+         
          this.executor.execute(new Waiter(result));
-
+         
          result.getResult();
-
+             
          //Now we know any deliverer has delivered any outstanding messages to the client buffer
       }
       catch (InterruptedException e)
       {
          log.warn("Thread interrupted", e);
       }
-
+            
       //Now we know that there are no in flight messages on the way to the client consumer.
-
+      
       //But there may be messages still in the toDeliver list since the client consumer might be full
       //So we need to cancel these
-
+            
       if (!toDeliver.isEmpty())
-      {
+      { 
          synchronized (lock)
          {
             for (int i = toDeliver.size() - 1; i >= 0; i--)
             {
                MessageProxy proxy = (MessageProxy)toDeliver.get(i);
-
+               
                long id = proxy.getMessage().getMessageID();
-
+               
                cancelDelivery(new Long(id), proxy.getMessage().getDeliveryCount());
             }
          }
-
+                 
          toDeliver.clear();
-
+         
          bufferFull = false;
-      }
+      }      
    }
-
+      
    // Private -------------------------------------------------------
-
+   
    /**
     * Disconnect this consumer from the Channel that feeds it. This method does not clear up
     * deliveries
@@ -742,18 +714,18 @@
    private void disconnect()
    {
       boolean removed = channel.remove(this);
-
+      
       if (removed)
       {
          disconnected = true;
          if (trace) { log.trace(this + " removed from the channel"); }
       }
    }
-
-   // Inner classes -------------------------------------------------
-
+        
+   // Inner classes -------------------------------------------------   
+   
    /*
-    * Delivers messages to the client
+    * Delivers messages to the client 
     * TODO - We can make this a bit more intelligent by letting it measure the rate
     * the client is consuming messages and send messages at that rate.
     * This would mean the client consumer wouldn't be full so often and more wouldn't have to be called
@@ -770,9 +742,9 @@
             if (trace) { log.trace(this + " client consumer full, do nothing"); }
             return;
          }
-
+         
          List list = null;
-
+             
          synchronized (lock)
          {
             if (trace) { log.trace(this + " has the main lock, attempting delivery"); }
@@ -784,7 +756,7 @@
                bufferFull = false;
             }
          }
-
+                                                           
          if (list == null)
          {
             if (trace) { log.trace(this + " has a null list, returning"); }
@@ -796,20 +768,7 @@
 
          try
          {
-            if (trace)
-            {
-               StringBuffer sb = new StringBuffer(ServerConsumerEndpoint.this + " handing [");
-               for(int i = 0; i < list.size(); i++)
-               {
-                  sb.append(((MessageProxy)list.get(i)).getMessage().getMessageID());
-                  if (i < list.size() - 1)
-                  {
-                     sb.append(",");
-                  }
-               }
-               sb.append("] over to the remoting layer");
-               log.trace(sb.toString());
-            }
+            if (trace) { log.trace(ServerConsumerEndpoint.this + " handing " + list.size() + " message(s) over to the remoting layer"); }
 
             ClientDelivery del = new ClientDelivery(list, id);
 
@@ -848,7 +807,7 @@
          return "Deliverer[" + Integer.toHexString(hashCode()) + "]";
       }
    }
-
+   
    /*
     * The purpose of this class is to put it on the QueuedExecutor and wait for it to run
     * We can then ensure that all the Runnables in front of it on the queue have also executed
@@ -859,20 +818,20 @@
    private class Waiter implements Runnable
    {
       Future result;
-
+      
       Waiter(Future result)
       {
          this.result = result;
       }
-
+      
       public void run()
       {
          result.setResult(null);
       }
    }
-
+   
    /**
-    *
+    * 
     * The purpose of this class is to remove deliveries from the delivery list on commit
     * Each transaction has once instance of this per SCE
     *
@@ -880,62 +839,51 @@
    private class DeliveryCallback implements TxCallback
    {
       List delList = new ArrayList();
-
+         
       public void beforePrepare()
-      {
+      {         
          //NOOP
       }
-
+      
       public void beforeCommit(boolean onePhase)
-      {
+      {         
          //NOOP
       }
-
+      
       public void beforeRollback(boolean onePhase)
-      {
+      {         
          //NOOP
       }
-
+      
       public void afterPrepare()
-      {
+      {         
          //NOOP
       }
-
+      
       public synchronized void afterCommit(boolean onePhase) throws TransactionException
       {
          // Remove the deliveries from the delivery map.
-         
-         synchronized (lock)
+         Iterator iter = delList.iterator();
+         while (iter.hasNext())
          {
-            if (closed)
-            {
-               //We must throw an exception to the client otherwise they may think the commit has succeeded
-               //This can happen if the connection is closed by another thread - e.g. the leasePinger
-               throw new TransactionException("Failed to acknowledge message, session has been closed");
-            }
+            Long messageID = (Long)iter.next();
             
-            Iterator iter = delList.iterator();
-            while (iter.hasNext())
+            if (deliveries.remove(messageID) == null)
             {
-               Long messageID = (Long)iter.next();
-   
-               if (deliveries.remove(messageID) == null)
-               {
-                  throw new TransactionException("Failed to remove delivery " + messageID);
-               }
+               throw new TransactionException("Failed to remove delivery " + messageID);
             }
          }
       }
-
+      
       public void afterRollback(boolean onePhase) throws TransactionException
-      {
+      {                            
          //NOOP
       }
-
+      
       synchronized void addMessageID(long messageID)
       {
          delList.add(new Long(messageID));
       }
    }
-
+   
 }

Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-12-01 09:47:27 UTC (rev 1671)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-12-01 19:37:10 UTC (rev 1672)
@@ -278,29 +278,8 @@
 
    public void cancel(Delivery d) throws Throwable
    {
-      // We put the cancellation on the event queue
-      // try
-      // {
-      // Future result = new Future();
-      //
-      // this.executor.execute(new CancelRunnable(d, result));
-      //
-      // //For now we wait for result, but this may not be necessary
-      // result.getResult();
-      // }
-      // catch (InterruptedException e)
-      // {
-      // log.warn("Thread interrupted", e);
-      // }
-
-//      Exception e = new Exception();
-//
-//      log.error("cancelling delivery: " + d, e);
-//
-
-
-      // TODO We should also consider executing cancels on the event queue
-      cancelInternal(d);
+       //We put the cancellation on the event queue     
+       this.executor.execute(new CancelRunnable(d));
    }
 
    // Distributor implementation ------------------------------------
@@ -645,24 +624,23 @@
          // The iterator is used to iterate through the refs in the channel in the case that they
          // don't match the selectors of any receivers.
          ListIterator iter = null;
-
+         
          MessageReference ref = null;
-
+         
          while (true)
-         {
+         {           
             synchronized (refLock)
-            {
+            {              
                if (iter == null)
                {
-                  //ref = (MessageReference) messageRefs.peekFirst();
-                  ref = removeFirstInMemory();
+                  ref = (MessageReference) messageRefs.peekFirst();
                }
                else
                {
                   if (iter.hasNext())
-                  {
+                  {                        
                      ref = (MessageReference)iter.next();
-                  }
+                  } 
                   else
                   {
                      ref = null;
@@ -672,10 +650,9 @@
 
             if (ref != null)
             {
-               if (trace) { log.trace(this + " pushing " + ref); }
-
-               // Check if message is expired (we also do this on the client side). If so ack it
-               // from the channel.
+               // Check if message is expired (we also do this on the client
+               // side)
+               // If so ack it from the channel
                if (ref.isExpired())
                {
                   if (trace) { log.trace("Message reference: " + ref + " has expired"); }
@@ -683,8 +660,7 @@
                   // remove and acknowledge it
                   if (iter == null)
                   {
-                     //already removed
-                     //removeFirstInMemory();
+                     removeFirstInMemory();
                   }
                   else
                   {
@@ -711,28 +687,15 @@
                      if (trace) { log.trace(this + ": no delivery returned for message" + ref + " so no receiver got the message. Delivery is now complete"); }
 
                      receiversReady = false;
-
-                     if (iter == null)
-                     {
-                        // add the message back
-                        synchronized (refLock)
-                        {
-                           messageRefs.addFirst(ref, ref.getPriority());
-                        }
-                     }
-                     else
-                     {
-                        //we didn't remove it in the first place
-                     }
-
+                     
                      return;
                   }
                   else if (!del.isSelectorAccepted())
                   {
                      // No receiver accepted the message because no selectors matched, so we create
                      // an iterator (if we haven't already created it) to iterate through the refs
-                     // in the channel.
-
+                     // in the channel. 
+                     
                      // TODO Note that this is only a partial solution since if there are messages
                      // paged to storage it won't try those - i.e. it will only iterate through
                      // those refs in memory. Dealing with refs in storage is somewhat tricky since
@@ -742,27 +705,15 @@
                      // indexes here to prevent having to iterate through all the refs every time.
                      // Having said all that, having consumers on a queue that don't match many
                      // messages is an antipattern and should be avoided by the user.
-
                      if (iter == null)
                      {
-                        //Add the message back
-
-                        synchronized (refLock)
-                        {
-                           messageRefs.addFirst(ref, ref.getPriority());
-
-                           iter = messageRefs.iterator();
-
-                           //And skip the first one (that's the one we just added back)
-
-                           iter.next();
-                        }
-                     }
+                        iter = messageRefs.iterator();
+                     }                     
                   }
                   else
                   {
                      if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
-
+                     
                      // Receiver accepted the reference
 
                      // We must synchronize here to cope with another race condition where message
@@ -776,9 +727,11 @@
                         // FIXME - It's actually possible the delivery could be
                         // cancelled before it reaches
                         // here, in which case we wouldn't get a delivery but we
-                        // still need to increment the delivery count
+                        // still need to increment the
+                        // delivery count
                         // All the problems related to these race conditions and
-                        // fiddly edge cases will disappear once we do
+                        // fiddly edge cases will disappear
+                        // once we do
                         // http://jira.jboss.com/jira/browse/JBMESSAGING-355
                         // This will make life a lot easier
 
@@ -786,13 +739,11 @@
                         {
                            if (iter == null)
                            {
-                              //do nothing - already removed
-                              //removeFirstInMemory();
+                              removeFirstInMemory();
                            }
                            else
                            {
-                              iter.remove();
-                              if (trace) { log.trace(this + " removed current message from iterator"); }
+                              iter.remove();                                
                            }
 
                            // delivered
@@ -802,7 +753,6 @@
                               synchronized (deliveryLock)
                               {
                                  deliveries.add(del);
-                                 if (trace) { log.trace(this + " starting to track  " + del); }
                               }
                            }
                         }
@@ -1734,7 +1684,29 @@
          }
       }
    }
+   
+   private class CancelRunnable implements Runnable
+   {
+      Delivery del;
+      
+      CancelRunnable(Delivery del)
+      {
+         this.del = del;
+      }
 
+      public void run()
+      {
+         try
+         {
+            cancelInternal(del);
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to cancel delivery", e);
+         }
+      }
+   }
+
    private class HandleRunnable implements Runnable
    {
       Future result;

Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-12-01 09:47:27 UTC (rev 1671)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-12-01 19:37:10 UTC (rev 1672)
@@ -386,7 +386,7 @@
             
             ps.setLong(2, size);
             
-            int rows = ps.executeUpdate();
+            int rows = updateWithRetry(ps);
             
             if (trace)
             {
@@ -418,7 +418,7 @@
          
          ps.setString(2, counterName);
          
-         int rows = ps.executeUpdate();
+         int rows = updateWithRetry(ps);
          
          if (trace)
          {
@@ -471,8 +471,6 @@
       PreparedStatement ps = null;
       TransactionWrapper wrap = new TransactionWrapper();
       
-      final int MAX_TRIES = 25;      
-      
       try
       {
          conn = ds.getConnection();
@@ -484,41 +482,15 @@
          ps.setLong(2, orderEnd);
          
          ps.setLong(3, channelID);
-         
-         int tries = 0;
-         
-         while (true)
+                  
+         int rows = updateWithRetry(ps);
+               
+         if (trace)
          {
-            try
-            {
-               int rows = ps.executeUpdate();
-               
-               if (trace)
-               {
-                  log.trace(JDBCUtil.statementToString(updateReliableRefs, new Long(channelID),
-                        new Long(orderStart), new Long(orderEnd))
-                        + " updated " + rows + " rows");
-               }
-               if (tries > 0)
-               {
-                  log.warn("Update worked after retry");
-               }
-               break;
-            }
-            catch (SQLException e)
-            {
-               log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e);
-               tries++;
-               if (tries == MAX_TRIES)
-               {
-                  log.error("Retried " + tries + " times, now giving up");
-                  throw new IllegalStateException("Failed to update references");
-               }
-               log.warn("Trying again after a pause");
-               //Now we wait for a random amount of time to minimise risk of deadlock
-               Thread.sleep((long)(Math.random() * 500));
-            }  
-         }
+            log.trace(JDBCUtil.statementToString(updateReliableRefs, new Long(channelID),
+                  new Long(orderStart), new Long(orderEnd))
+                  + " updated " + rows + " rows");
+         }          
       }
       catch (Exception e)
       {
@@ -550,6 +522,8 @@
          wrap.end();
       }
    }
+   
+   
          
    public int getNumberOfUnloadedReferences(long channelID) throws Exception
    {
@@ -859,7 +833,7 @@
             }
             else
             {
-               int rows = psInsertReference.executeUpdate();
+               int rows = updateWithRetry(psInsertReference);
                
                if (trace)
                {
@@ -916,7 +890,7 @@
             {
                if (added)
                {
-                  int rows = psInsertMessage.executeUpdate();
+                  int rows = updateWithRetry(psInsertMessage);
                                       
                   if (trace)
                   {
@@ -925,7 +899,7 @@
                }
                else
                {
-                  int rows = psUpdateMessage.executeUpdate();
+                  int rows = updateWithRetry(psUpdateMessage);
                  
                   if (trace)
                   {
@@ -941,7 +915,7 @@
          
          if (usingBatchUpdates)
          {
-            int[] rowsReference = psInsertReference.executeBatch();
+            int[] rowsReference = updateWithRetryBatch(psInsertReference);
             
             if (trace)
             {
@@ -950,7 +924,7 @@
             
             if (messageInsertsInBatch)
             {
-               int[] rowsMessage = psInsertMessage.executeBatch();
+               int[] rowsMessage = updateWithRetryBatch(psInsertMessage);
                
                if (trace)
                {
@@ -959,7 +933,7 @@
             }
             if (messageUpdatesInBatch)
             {
-               int[] rowsMessage = psUpdateMessage.executeBatch();
+               int[] rowsMessage = updateWithRetryBatch(psUpdateMessage);
                
                if (trace)
                {
@@ -1092,7 +1066,7 @@
             }
             else
             {
-               int rows = psDeleteReference.executeUpdate();
+               int rows = updateWithRetry(psDeleteReference);
                
                if (trace)
                {
@@ -1150,7 +1124,7 @@
             {
                if (removed)
                {
-                  int rows = psDeleteMessage.executeUpdate();
+                  int rows = updateWithRetry(psDeleteMessage);
                   
                   if (trace)
                   {
@@ -1159,7 +1133,7 @@
                }
                else
                {
-                  int rows = psUpdateMessage.executeUpdate();
+                  int rows = updateWithRetry(psUpdateMessage);
                   
                   if (trace)
                   {
@@ -1175,7 +1149,7 @@
          
          if (usingBatchUpdates)
          {
-            int[] rowsReference = psDeleteReference.executeBatch();
+            int[] rowsReference = updateWithRetryBatch(psDeleteReference);
             
             if (trace)
             {
@@ -1184,7 +1158,7 @@
             
             if (messageDeletionsInBatch)
             {
-               int[] rowsMessage = psDeleteMessage.executeBatch();
+               int[] rowsMessage = updateWithRetryBatch(psDeleteMessage);
                
                if (trace)
                {
@@ -1193,7 +1167,7 @@
             }
             if (messageUpdatesInBatch)
             {
-               int[] rowsMessage = psUpdateMessage.executeBatch();
+               int[] rowsMessage = updateWithRetryBatch(psUpdateMessage);
                
                if (trace)
                {
@@ -1561,7 +1535,7 @@
             }
             else
             {
-               int rows = psUpdateReference.executeUpdate();
+               int rows = updateWithRetry(psUpdateReference);
                
                if (trace)
                {
@@ -1575,7 +1549,7 @@
                      
          if (usingBatchUpdates)
          {
-            int[] rowsReference = psUpdateReference.executeBatch();
+            int[] rowsReference = updateWithRetryBatch(psUpdateReference);
             
             if (trace)
             {
@@ -1652,7 +1626,7 @@
             // Add the reference
             addReference(channelID, ref, psReference, true);
             
-            int rows = psReference.executeUpdate();
+            int rows = updateWithRetry(psReference);
             
             if (trace)
             {
@@ -1677,7 +1651,7 @@
                updateMessageChannelCount(m, psMessage);
             }
                            
-            rows = psMessage.executeUpdate();
+            rows = updateWithRetry(psMessage);
             if (trace)
             {
                log.trace("Inserted/updated " + rows + " rows");
@@ -1757,7 +1731,7 @@
          
          psReference.setLong(3, ref.getMessageID());
          
-         int rows = psReference.executeUpdate();                         
+         int rows = updateWithRetry(psReference);                         
       }
       catch (Exception e)
       {
@@ -1825,7 +1799,7 @@
             //Remove the message reference
             removeReference(channelID, ref, psReference);
             
-            int rows = psReference.executeUpdate();
+            int rows = updateWithRetry(psReference);
             
             if (trace)
             {
@@ -1850,7 +1824,7 @@
                updateMessageChannelCount(m, psMessage);
             }
                               
-            rows = psMessage.executeUpdate();
+            rows = updateWithRetry(psMessage);
             
             if (trace)
             {
@@ -2747,7 +2721,7 @@
             }
             else
             {
-               int rows = psReference.executeUpdate();
+               int rows = updateWithRetry(psReference);
                
                if (trace)
                {
@@ -2802,7 +2776,7 @@
             {
                if (added)
                {
-                  int rows = psInsertMessage.executeUpdate();
+                  int rows = updateWithRetry(psInsertMessage);
                   
                   if (trace)
                   {
@@ -2811,7 +2785,7 @@
                }
                else
                {
-                  int rows = psUpdateMessage.executeUpdate();
+                  int rows = updateWithRetry(psUpdateMessage);
                   
                   if (trace)
                   {
@@ -2827,7 +2801,7 @@
          
          if (batch)
          {
-            int[] rowsReference = psReference.executeBatch();
+            int[] rowsReference = updateWithRetryBatch(psReference);
             
             if (trace)
             {
@@ -2836,7 +2810,7 @@
             
             if (messageInsertsInBatch)
             {
-               int[] rowsMessage = psInsertMessage.executeBatch();
+               int[] rowsMessage = updateWithRetryBatch(psInsertMessage);
                
                if (trace)
                {
@@ -2845,7 +2819,7 @@
             }
             if (messageUpdatesInBatch)
             {
-               int[] rowsMessage = psUpdateMessage.executeBatch();
+               int[] rowsMessage = updateWithRetryBatch(psUpdateMessage);
                
                if (trace)
                {
@@ -2896,7 +2870,7 @@
             }
             else
             {
-               int rows = psReference.executeUpdate();
+               int rows = updateWithRetry(psReference);
                
                if (trace)
                {
@@ -2953,7 +2927,7 @@
             {
                if (removed)
                {
-                  int rows = psDeleteMessage.executeUpdate();
+                  int rows = updateWithRetry(psDeleteMessage);
                   
                   if (trace)
                   {
@@ -2962,7 +2936,7 @@
                }
                else
                {
-                  int rows = psUpdateMessage.executeUpdate();
+                  int rows = updateWithRetry(psUpdateMessage);
                   
                   if (trace)
                   {
@@ -2978,7 +2952,7 @@
          
          if (batch)
          {
-            int[] rowsReference = psReference.executeBatch();
+            int[] rowsReference = updateWithRetryBatch(psReference);
             
             if (trace)
             {
@@ -2987,7 +2961,7 @@
             
             if (messageDeletionsInBatch)
             {
-               int[] rowsMessage = psDeleteMessage.executeBatch();
+               int[] rowsMessage = updateWithRetryBatch(psDeleteMessage);
                
                if (trace)
                {
@@ -2996,7 +2970,7 @@
             }
             if (messageUpdatesInBatch)
             {
-               int[] rowsMessage = psUpdateMessage.executeBatch();
+               int[] rowsMessage = updateWithRetryBatch(psUpdateMessage);
                
                if (trace)
                {
@@ -3190,7 +3164,7 @@
             {
                if (removed)
                {
-                  int rows = psDeleteMessage.executeUpdate();
+                  int rows = updateWithRetry(psDeleteMessage);
                   
                   if (trace)
                   {
@@ -3199,7 +3173,7 @@
                }
                else
                {
-                  int rows = psUpdateMessage.executeUpdate();
+                  int rows = updateWithRetry(psUpdateMessage);
                   
                   if (trace)
                   {
@@ -3217,7 +3191,7 @@
          {
             if (messageDeletionsInBatch)
             {
-               int[] rows = psDeleteMessage.executeBatch();
+               int[] rows = updateWithRetryBatch(psDeleteMessage);
                
                if (trace)
                {
@@ -3229,7 +3203,7 @@
             }
             if (messageUpdatesInBatch)
             {
-               int[] rows = psUpdateMessage.executeBatch();
+               int[] rows = updateWithRetryBatch(psUpdateMessage);
                
                if (trace)
                {
@@ -3364,7 +3338,7 @@
             }
             else
             {
-               int rows = psReference.executeUpdate();
+               int rows = updateWithRetry(psReference);
                
                if (trace)
                {
@@ -3419,7 +3393,7 @@
             {
                if (added)
                {
-                  int rows = psInsertMessage.executeUpdate();
+                  int rows = updateWithRetry(psInsertMessage);
                   
                   if (trace)
                   {
@@ -3428,7 +3402,7 @@
                }
                else
                {
-                  int rows = psUpdateMessage.executeUpdate();
+                  int rows = updateWithRetry(psUpdateMessage);
                   
                   if (trace)
                   {
@@ -3444,7 +3418,7 @@
          
          if (batch)
          {
-            int[] rowsReference = psReference.executeBatch();
+            int[] rowsReference = updateWithRetryBatch(psReference);
             
             if (trace)
             {
@@ -3453,7 +3427,7 @@
             
             if (messageInsertsInBatch)
             {
-               int[] rowsMessage = psInsertMessage.executeBatch();
+               int[] rowsMessage = updateWithRetryBatch(psInsertMessage);
                
                if (trace)
                {
@@ -3462,7 +3436,7 @@
             }
             if (messageUpdatesInBatch)
             {
-               int[] rowsMessage = psUpdateMessage.executeBatch();
+               int[] rowsMessage = updateWithRetryBatch(psUpdateMessage);
                
                if (trace)
                {
@@ -3505,7 +3479,7 @@
             }
             else
             {
-               int rows = psReference.executeUpdate();
+               int rows = updateWithRetry(psReference);
                
                if (trace)
                {
@@ -3519,7 +3493,7 @@
          
          if (batch)
          {
-            int[] rows = psReference.executeBatch();
+            int[] rows = updateWithRetryBatch(psReference);
             
             if (trace)
             {
@@ -3689,7 +3663,7 @@
             {
                if (removed)
                {
-                  int rows = psDeleteMessage.executeUpdate();
+                  int rows = updateWithRetry(psDeleteMessage);
                   
                   if (trace)
                   {
@@ -3698,7 +3672,7 @@
                }
                else
                {
-                  int rows = psUpdateMessage.executeUpdate();
+                  int rows = updateWithRetry(psUpdateMessage);
                   
                   if (trace)
                   {
@@ -3716,7 +3690,7 @@
          {
             if (messageDeletionsInBatch)
             {
-               int[] rows = psDeleteMessage.executeBatch();
+               int[] rows = updateWithRetryBatch(psDeleteMessage);
                
                if (trace)
                {
@@ -3728,7 +3702,7 @@
             }
             if (messageUpdatesInBatch)
             {
-               int[] rows = psUpdateMessage.executeBatch();
+               int[] rows = updateWithRetryBatch(psUpdateMessage);
                
                if (trace)
                {
@@ -3820,7 +3794,7 @@
          ps.setInt(3, formatID);
          ps.setBytes(4, xid.getGlobalTransactionId());
          
-         rows = ps.executeUpdate();
+         rows = updateWithRetry(ps);
          
       }
       finally
@@ -3845,7 +3819,7 @@
       }
    }
    
-   protected void removeTXRecord(Connection conn, Transaction tx) throws SQLException
+   protected void removeTXRecord(Connection conn, Transaction tx) throws Exception
    {
       PreparedStatement ps = null;
       try
@@ -3854,7 +3828,7 @@
          
          ps.setLong(1, tx.getId());
          
-         int rows = ps.executeUpdate();
+         int rows = updateWithRetry(ps);
          
          if (trace)
          {
@@ -3948,7 +3922,7 @@
          
          ps.setLong(1, tx.getId());        
          
-         int rows = ps.executeUpdate();
+         int rows = updateWithRetry(ps);
          
          if (trace)
          {
@@ -3959,7 +3933,7 @@
          ps = conn.prepareStatement(commitMessageRef2);
          ps.setLong(1, tx.getId());         
          
-         rows = ps.executeUpdate();
+         rows = updateWithRetry(ps);
          
          if (trace)
          {
@@ -3994,7 +3968,7 @@
          
          ps.setLong(1, tx.getId());         
          
-         int rows = ps.executeUpdate();
+         int rows = updateWithRetry(ps);
          
          if (trace)
          {
@@ -4006,7 +3980,7 @@
          ps = conn.prepareStatement(rollbackMessageRef2);
          ps.setLong(1, tx.getId());
          
-         rows = ps.executeUpdate();
+         rows = updateWithRetry(ps);
          
          if (trace)
          {
@@ -4380,6 +4354,70 @@
       log.trace("Batch update " + name + ", " + action + " total of " + count + " rows");
    }
    
+   protected int updateWithRetry(PreparedStatement ps) throws Exception
+   {
+      return updateWithRetry(ps, false)[0];
+   }
+   
+   protected int[] updateWithRetryBatch(PreparedStatement ps) throws Exception
+   {
+      return updateWithRetry(ps, true);
+   }
+   
+   private int[] updateWithRetry(PreparedStatement ps, boolean batch) throws Exception
+   {
+      final int MAX_TRIES = 25;      
+      
+      int rows = 0;
+      
+      int[] rowsArr = null;
+      
+      int tries = 0;
+      
+      while (true)
+      {
+         try
+         {
+            if (batch)
+            {
+               rowsArr = ps.executeBatch();
+            }
+            else
+            {
+               rows = ps.executeUpdate();
+            }
+            
+            if (tries > 0)
+            {
+               log.warn("Update worked after retry");
+            }
+            break;
+         }
+         catch (SQLException e)
+         {
+            log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e);
+            tries++;
+            if (tries == MAX_TRIES)
+            {
+               log.error("Retried " + tries + " times, now giving up");
+               throw new IllegalStateException("Failed to update references");
+            }
+            log.warn("Trying again after a pause");
+            //Now we wait for a random amount of time to minimise risk of deadlock
+            Thread.sleep((long)(Math.random() * 500));
+         }  
+      }
+      
+      if (batch)
+      {
+         return rowsArr;
+      }
+      else
+      {
+         return new int[] { rows };
+      }
+   }
+   
    // Private -------------------------------------------------------
 
    // never access directly

Modified: branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2006-12-01 09:47:27 UTC (rev 1671)
+++ branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2006-12-01 19:37:10 UTC (rev 1672)
@@ -895,6 +895,9 @@
 
           cons1.close();
           
+          //Cancelling is asynch so can take some time
+          Thread.sleep(500);
+          
           //rollback should cause redelivery of messages
 
           //in this case redelivery occurs to a different receiver




More information about the jboss-cvs-commits mailing list