[jboss-cvs] JBoss Messaging SVN: r4413 - in trunk/src/main/org/jboss/messaging/core/server: impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jun 9 15:08:14 EDT 2008


Author: timfox
Date: 2008-06-09 15:08:14 -0400 (Mon, 09 Jun 2008)
New Revision: 4413

Removed:
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl_c.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl_nc.java
Modified:
   trunk/src/main/org/jboss/messaging/core/server/Queue.java
   trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1352


Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-06-09 17:22:39 UTC (rev 4412)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-06-09 19:08:14 UTC (rev 4413)
@@ -117,4 +117,8 @@
    MessageReference getReference(long id);
    
    void deleteAllReferences(StorageManager storageManager) throws Exception;
+   
+   void lock();
+   
+   void unlock();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2008-06-09 17:22:39 UTC (rev 4412)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2008-06-09 19:08:14 UTC (rev 4413)
@@ -40,4 +40,6 @@
 	void setStarted(boolean started) throws Exception;
 	
 	void receiveCredits(int credits) throws Exception;
+	
+	Queue getQueue();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-06-09 17:22:39 UTC (rev 4412)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-06-09 19:08:14 UTC (rev 4413)
@@ -33,6 +33,8 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.list.PriorityLinkedList;
@@ -108,6 +110,7 @@
    
    private final Runnable deliverRunner = new DeliverRunner();
    
+   private final Lock lock = new ReentrantLock(false);
    
    public QueueImpl(final long persistenceID, final SimpleString name,
          final Filter filter, final boolean clustered, final boolean durable,
@@ -155,29 +158,53 @@
    {
       return name;
    }
-
-   public synchronized HandleStatus addLast(final MessageReference ref)
+   
+   public HandleStatus addLast(final MessageReference ref)
    {
-      return add(ref, false);
+      lock.lock();
+      try
+      {
+         return add(ref, false);
+      }
+      finally
+      {
+         lock.unlock();
+      }
    }
 
-   public synchronized HandleStatus addFirst(final MessageReference ref)
+   public HandleStatus addFirst(final MessageReference ref)
    {
-      return add(ref, true);
+      lock.lock();
+      try
+      {
+         return add(ref, true);
+      }
+      finally
+      {
+         lock.unlock();
+      }
    }
 
-   public synchronized void addListFirst(final LinkedList<MessageReference> list)
+   public void addListFirst(final LinkedList<MessageReference> list)
    {
-      ListIterator<MessageReference> iter = list.listIterator(list.size());
-
-      while (iter.hasPrevious())
+      lock.lock();
+      try
       {
-         MessageReference ref = iter.previous();
-
-         messageReferences.addFirst(ref, ref.getMessage().getPriority());
+         ListIterator<MessageReference> iter = list.listIterator(list.size());
+   
+         while (iter.hasPrevious())
+         {
+            MessageReference ref = iter.previous();
+   
+            messageReferences.addFirst(ref, ref.getMessage().getPriority());
+         }
+   
+         deliver();
       }
-
-      deliver();
+      finally
+      {
+         lock.unlock();
+      }
    }
  
    public void deliverAsync(final Executor executor)
@@ -195,67 +222,75 @@
     * 
     * @see org.jboss.messaging.newcore.intf.Queue#deliver()
     */
-   public synchronized void deliver()
+   public void deliver()
    {
-      MessageReference reference;
-
-      ListIterator<MessageReference> iterator = null;
-
-      while (true)
+      lock.lock();
+      try
       {
-         if (iterator == null)
+         MessageReference reference;
+   
+         ListIterator<MessageReference> iterator = null;
+   
+         while (true)
          {
-            reference = messageReferences.peekFirst();
-         }
-         else
-         {
-            if (iterator.hasNext())
+            if (iterator == null)
             {
-               reference = iterator.next();
+               reference = messageReferences.peekFirst();
             }
             else
             {
-               reference = null;
+               if (iterator.hasNext())
+               {
+                  reference = iterator.next();
+               }
+               else
+               {
+                  reference = null;
+               }
             }
-         }
-
-         if (reference == null)
-         {
-            if (iterator == null)
+   
+            if (reference == null)
             {
-               // We delivered all the messages - go into direct delivery
-               direct = true;
-               
-               promptDelivery = false;
+               if (iterator == null)
+               {
+                  // We delivered all the messages - go into direct delivery
+                  direct = true;
+                  
+                  promptDelivery = false;
+               }
+               return;
             }
-            return;
-         }
-
-         HandleStatus status = deliver(reference);
-
-         if (status == HandleStatus.HANDLED)
-         {
-            if (iterator == null)
+   
+            HandleStatus status = deliver(reference);
+   
+            if (status == HandleStatus.HANDLED)
             {
-               messageReferences.removeFirst();              
+               if (iterator == null)
+               {
+                  messageReferences.removeFirst();              
+               }
+               else
+               {
+                  iterator.remove();
+               }
             }
-            else
+            else if (status == HandleStatus.BUSY)
             {
-               iterator.remove();
+               // All consumers busy - give up
+               break;
             }
+            else if (status == HandleStatus.NO_MATCH && iterator == null)
+            {
+               // Consumers not all busy - but filter not accepting - iterate back
+               // through the queue
+               iterator = messageReferences.iterator();
+            }
          }
-         else if (status == HandleStatus.BUSY)
-         {
-            // All consumers busy - give up
-            break;
-         }
-         else if (status == HandleStatus.NO_MATCH && iterator == null)
-         {
-            // Consumers not all busy - but filter not accepting - iterate back
-            // through the queue
-            iterator = messageReferences.iterator();
-         }
       }
+      finally
+      {
+         lock.unlock();
+      }
    }
 
    public synchronized void addConsumer(final Consumer consumer)
@@ -433,8 +468,7 @@
       return flowController;
    }
 
-   public synchronized void deleteAllReferences(
-         final StorageManager storageManager) throws Exception
+   public synchronized void deleteAllReferences(final StorageManager storageManager) throws Exception
    {
       Transaction tx = new TransactionImpl(storageManager, null);
 
@@ -467,8 +501,17 @@
 
       tx.commit();
    }
+   
+   public void lock()
+   {
+      lock.lock();
+   }
+   
+   public void unlock()
+   {
+      lock.unlock();
+   }
 
-
    // Public
    // -----------------------------------------------------------------------------
 

Deleted: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl_c.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl_c.java	2008-06-09 17:22:39 UTC (rev 4412)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl_c.java	2008-06-09 19:08:14 UTC (rev 4413)
@@ -1,760 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.server.impl;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.jboss.messaging.core.filter.Filter;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.postoffice.FlowController;
-import org.jboss.messaging.core.server.Consumer;
-import org.jboss.messaging.core.server.DistributionPolicy;
-import org.jboss.messaging.core.server.HandleStatus;
-import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.transaction.Transaction;
-import org.jboss.messaging.core.transaction.impl.TransactionImpl;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * 
- * Implementation of a Queue
- * 
- * TODO use Java 5 concurrent queue
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="ataylor at redhat.com">Andy Taylor</a>
- * 
- */
-public class QueueImpl_c implements Queue
-{
-   private static final Logger log = Logger.getLogger(QueueImpl.class);
-
-   private static final boolean trace = log.isTraceEnabled();
-
-   private volatile long persistenceID = -1;
-
-   private final SimpleString name;
-
-   private volatile Filter filter;
-
-   private final boolean clustered;
-
-   private final boolean durable;
-
-   private final boolean temporary;
-
-   private final int maxSizeBytes;
-     
-   private final ScheduledExecutorService scheduledExecutor;
-
-   private final java.util.Queue<MessageReference> messageReferences = new ConcurrentLinkedQueue<MessageReference>();
-
-   private final List<Consumer> consumers = new ArrayList<Consumer>();
-
-   private final Set<ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashSet<ScheduledDeliveryRunnable>();
-
-   private volatile DistributionPolicy distributionPolicy = new RoundRobinDistributionPolicy();
-
-   private boolean direct;
-
-   private boolean promptDelivery;
-
-   private int pos;
-   
-   private AtomicInteger sizeBytes = new AtomicInteger(0);
-
-   private AtomicInteger messagesAdded = new AtomicInteger(0);
-
-   private AtomicInteger deliveringCount = new AtomicInteger(0);
-
-   private volatile FlowController flowController;
-   
-   private AtomicBoolean waitingToDeliver = new AtomicBoolean(false);  
-   
-   private final Runnable deliverRunner = new DeliverRunner();
-   
-   
-   public QueueImpl_c(final long persistenceID, final SimpleString name,
-         final Filter filter, final boolean clustered, final boolean durable,
-         final boolean temporary, final int maxSizeBytes,
-         final ScheduledExecutorService scheduledExecutor)
-   {
-      this.persistenceID = persistenceID;
-
-      this.name = name;
-
-      this.filter = filter;
-
-      this.clustered = clustered;
-
-      this.durable = durable;
-
-      this.temporary = temporary;
-
-      this.maxSizeBytes = maxSizeBytes;
-
-      this.scheduledExecutor = scheduledExecutor;
-
-      direct = true;
-   }
-
-   // Queue implementation
-   // -------------------------------------------------------------------
-
-   public boolean isClustered()
-   {
-      return clustered;
-   }
-
-   public boolean isDurable()
-   {
-      return durable;
-   }
-
-   public boolean isTemporary()
-   {
-      return temporary;
-   }
-
-   public SimpleString getName()
-   {
-      return name;
-   }
-
-   public HandleStatus addLast(final MessageReference ref)
-   {
-      return add(ref, false);
-   }
-
-   public HandleStatus addFirst(final MessageReference ref)
-   {
-      return add(ref, true);
-   }
-
-   public void addListFirst(final LinkedList<MessageReference> list)
-   {
-//      ListIterator<MessageReference> iter = list.listIterator(list.size());
-//
-//      while (iter.hasPrevious())
-//      {
-//         MessageReference ref = iter.previous();
-//
-//         messageReferences.addFirst(ref);
-//      }
-//
-//      deliver();
-      throw new UnsupportedOperationException();
-   }
- 
-   public void deliverAsync(final Executor executor)
-   {
-      //Prevent too many executors running at once
-      
-      if (waitingToDeliver.compareAndSet(false, true))
-      {
-         executor.execute(deliverRunner);
-      }
-   }
-      
-   /*
-    * Attempt to deliver all the messages in the queue
-    * 
-    * @see org.jboss.messaging.newcore.intf.Queue#deliver()
-    */
-   public void deliver()
-   {
-      MessageReference reference;
-
-      Iterator<MessageReference> iterator = null;
-
-      while (true)
-      {
-         if (iterator == null)
-         {
-            reference = messageReferences.peek();
-         }
-         else
-         {
-            if (iterator.hasNext())
-            {
-               reference = iterator.next();
-            }
-            else
-            {
-               reference = null;
-            }
-         }
-
-         if (reference == null)
-         {
-            if (iterator == null)
-            {
-               // We delivered all the messages - go into direct delivery
-               direct = true;
-               
-               promptDelivery = false;
-            }
-            return;
-         }
-
-         HandleStatus status = deliver(reference);
-
-         if (status == HandleStatus.HANDLED)
-         {
-            if (iterator == null)
-            {
-               messageReferences.poll();             
-            }
-            else
-            {
-               iterator.remove();
-            }
-         }
-         else if (status == HandleStatus.BUSY)
-         {
-            // All consumers busy - give up
-            break;
-         }
-         else if (status == HandleStatus.NO_MATCH && iterator == null)
-         {
-            // Consumers not all busy - but filter not accepting - iterate back
-            // through the queue
-            iterator = messageReferences.iterator();
-         }
-      }
-   }
-
-   public synchronized void addConsumer(final Consumer consumer)
-   {
-      consumers.add(consumer);
-   }
-
-   public synchronized boolean removeConsumer(final Consumer consumer)
-   {
-      boolean removed = consumers.remove(consumer);
-
-      if (pos == consumers.size())
-      {
-         pos = 0;
-      }
-
-      if (consumers.isEmpty())
-      {
-         promptDelivery = false;
-      }
-
-      return removed;
-   }
-
-   public synchronized int getConsumerCount()
-   {
-      return consumers.size();
-   }
-
-   public synchronized List<MessageReference> list(final Filter filter)
-   {
-      if (filter == null)
-      {
-         return new ArrayList<MessageReference>(messageReferences);
-      }
-      else
-      {
-         ArrayList<MessageReference> list = new ArrayList<MessageReference>();
-
-         for (MessageReference ref : messageReferences)
-         {
-            if (filter.match(ref.getMessage()))
-            {
-               list.add(ref);
-            }
-         }
-
-         return list;
-      }
-   }
-
-   public synchronized boolean removeReferenceWithID(final long id)
-   {
-      Iterator<MessageReference> iterator = messageReferences.iterator();
-
-      boolean removed = false;
-
-      while (iterator.hasNext())
-      {
-         MessageReference ref = iterator.next();
-
-         if (ref.getMessage().getMessageID() == id)
-         {
-            iterator.remove();
-
-            removed = true;
-
-            break;
-         }
-      }
-
-      return removed;
-   }
-
-   public synchronized MessageReference getReference(final long id)
-   {
-      Iterator<MessageReference> iterator = messageReferences.iterator();
-
-      while (iterator.hasNext())
-      {
-         MessageReference ref = iterator.next();
-
-         if (ref.getMessage().getMessageID() == id) { return ref; }
-      }
-
-      return null;
-   }
-
-   public long getPersistenceID()
-   {
-      return persistenceID;
-   }
-
-   public void setPersistenceID(final long id)
-   {
-      this.persistenceID = id;
-   }
-
-   public Filter getFilter()
-   {
-      return filter;
-   }
-
-   public void setFilter(final Filter filter)
-   {
-      this.filter = filter;
-   }
-
-   public synchronized int getMessageCount()
-   {
-      //log.info("mr: " + messageReferences.size() + " sc:" + getScheduledCount() + " dc:" + getDeliveringCount());
-      return messageReferences.size() + getScheduledCount()
-            + getDeliveringCount();
-   }
-
-   public synchronized int getScheduledCount()
-   {
-      return scheduledRunnables.size();
-   }
-
-   public int getDeliveringCount()
-   {
-      return deliveringCount.get();
-   }
-
-   public void referenceAcknowledged(MessageReference ref) throws Exception
-   {
-      deliveringCount.decrementAndGet();
-      
-      sizeBytes.addAndGet(-ref.getMessage().encodeSize());
-
-//      if (flowController != null)
-//      {
-//         flowController.messageAcknowledged();
-//      }
-   }
-
-   public void referenceCancelled()
-   {
-      deliveringCount.decrementAndGet();
-   }
-
-   public int getMaxSizeBytes()
-   {
-      return maxSizeBytes;
-   }
-   
-   public int getSizeBytes()
-   {
-      return sizeBytes.get();
-   }
-
-   public DistributionPolicy getDistributionPolicy()
-   {
-      return distributionPolicy;
-   }
-
-   public void setDistributionPolicy(final DistributionPolicy distributionPolicy)
-   {
-      this.distributionPolicy = distributionPolicy;
-   }
-
-   public int getMessagesAdded()
-   {
-      return messagesAdded.get();
-   }
-
-   public void setFlowController(final FlowController flowController)
-   {
-      this.flowController = flowController;
-   }
-
-   public FlowController getFlowController()
-   {
-      return flowController;
-   }
-
-   public synchronized void deleteAllReferences(
-         final StorageManager storageManager) throws Exception
-   {
-      Transaction tx = new TransactionImpl(storageManager, null);
-
-      Iterator<MessageReference> iter = messageReferences.iterator();
-
-      while (iter.hasNext())
-      {
-         MessageReference ref = iter.next();
-
-         deliveringCount.incrementAndGet();
-
-         tx.addAcknowledgement(ref);
-
-         iter.remove();
-      }
-
-      synchronized (scheduledRunnables)
-      {
-         for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
-         {
-            runnable.cancel();
-
-            deliveringCount.incrementAndGet();
-
-            tx.addAcknowledgement(runnable.getReference());
-         }
-
-         scheduledRunnables.clear();
-      }
-
-      tx.commit();
-   }
-
-
-   // Public
-   // -----------------------------------------------------------------------------
-
-   public boolean equals(Object other)
-   {
-      if (this == other) { return true; }
-
-      QueueImpl_c qother = (QueueImpl_c) other;
-
-      return name.equals(qother.name);
-   }
-
-   public int hashCode()
-   {
-      return name.hashCode();
-   }
-
-   // Private
-   // ------------------------------------------------------------------------------
-
-   private HandleStatus add(final MessageReference ref, final boolean first)
-   {
-      if (maxSizeBytes != -1 && sizeBytes.get() + ref.getMessage().encodeSize() >= maxSizeBytes)
-      {
-         return HandleStatus.BUSY;              
-      }
-      
-      if (!first)
-      {
-         messagesAdded.incrementAndGet();
-         
-         sizeBytes.addAndGet(ref.getMessage().encodeSize());
-      }
-      
-      if (checkAndSchedule(ref))
-      {
-         return HandleStatus.HANDLED;         
-      }
-
-      boolean add = false;
-
-      if (direct)
-      {
-         // Deliver directly
-
-         HandleStatus status = deliver(ref);
-
-         if (status == HandleStatus.HANDLED)
-         {
-            // Ok
-         }
-         else if (status == HandleStatus.BUSY)
-         {
-            add = true;
-         }
-         else if (status == HandleStatus.NO_MATCH)
-         {
-            add = true;
-         }
-
-         if (add)
-         {
-            direct = false;
-         }
-      }
-      else
-      {
-         add = true;
-      }
-
-      if (add)
-      {
-         if (first)
-         {
-            //messageReferences.addFirst(ref);
-            throw new UnsupportedOperationException();
-         }
-         else
-         {
-            messageReferences.offer(ref);
-         }
-
-         if (!direct && promptDelivery)
-         {
-            // We have consumers with filters which don't match, so we need
-            // to prompt delivery every time
-            // a new message arrives - this is why you really shouldn't use
-            // filters with queues - in most cases
-            // it's an ant-pattern since it would cause a queue scan on each
-            // message
-            deliver();
-         }
-      }     
-
-      return HandleStatus.HANDLED;
-   }
-
-   private boolean checkAndSchedule(final MessageReference ref)
-   {
-      long deliveryTime = ref.getScheduledDeliveryTime();
-      
-      if (deliveryTime != 0 && scheduledExecutor != null)
-      {      
-         long now = System.currentTimeMillis();
-      
-         if (deliveryTime > now)
-         {
-            if (trace)
-            {
-               log.trace("Scheduling delivery for " + ref + " to occur at "  + deliveryTime);
-            }
-   
-            long delay = deliveryTime - now;
-   
-            ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
-   
-            scheduledRunnables.add(runnable);
-   
-            Future<?> future = scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
-   
-            runnable.setFuture(future);
-   
-            return true;
-         }
-      }
-      return false;      
-   }
-
-   private HandleStatus deliver(final MessageReference reference)
-   {
-      if (consumers.isEmpty())
-      {
-         return HandleStatus.BUSY;
-      }
-
-      int startPos = pos;
-
-      boolean filterRejected = false;
-
-      while (true)
-      {
-         Consumer consumer = consumers.get(pos);
-
-         pos = distributionPolicy.select(consumers, pos);
-
-         HandleStatus status;
-
-         try
-         {
-            status = consumer.handle(reference);
-         }
-         catch (Throwable t)
-         {
-            log.warn("removing consumer which did not handle a message, " +
-                  "consumer=" + consumer + ", message=" + reference, t);
-
-            // If the consumer throws an exception we remove the consumer
-            removeConsumer(consumer);
-
-            return HandleStatus.BUSY;
-         }
-
-         if (status == null) { throw new IllegalStateException(
-               "ClientConsumer.handle() should never return null"); }
-
-         if (status == HandleStatus.HANDLED)
-         {
-            deliveringCount.incrementAndGet();
-
-            return HandleStatus.HANDLED;
-         }
-         else if (status == HandleStatus.NO_MATCH)
-         {
-            promptDelivery = true;
-
-            filterRejected = true;
-         }
-
-         if (pos == startPos)
-         {
-            // Tried all of them
-            if (filterRejected)
-            {
-               return HandleStatus.NO_MATCH;
-            }
-            else
-            {
-               // Give up - all consumers busy
-               return HandleStatus.BUSY;
-            }
-         }
-      }
-   }
-
-   // Inner classes
-   // --------------------------------------------------------------------------
-
-   private class DeliverRunner implements Runnable
-   {
-      public void run()
-      {
-         //Must be set to false *before* executing to avoid race
-         waitingToDeliver.set(false);
-         
-         deliver();
-      }
-   }   
-   
-   private class ScheduledDeliveryRunnable implements Runnable
-   {
-      private final MessageReference ref;
-
-      private volatile Future<?> future;
-
-      private boolean cancelled;
-
-      public ScheduledDeliveryRunnable(final MessageReference ref)
-      {
-         this.ref = ref;
-      }
-
-      public synchronized void setFuture(final Future<?> future)
-      {
-         if (cancelled)
-         {
-            future.cancel(false);
-         }
-         else
-         {
-            this.future = future;
-         }
-      }
-
-      public synchronized void cancel()
-      {
-         if (future != null)
-         {
-            future.cancel(false);
-         }
-
-         cancelled = true;
-      }
-
-      public MessageReference getReference()
-      {
-         return ref;
-      }
-
-      public void run()
-      {
-         if (trace)
-         {
-            log.trace("Scheduled delivery timeout " + ref);
-         }
-
-         synchronized (scheduledRunnables)
-         {
-            boolean removed = scheduledRunnables.remove(this);
-
-            if (!removed)
-            {
-               log.warn("Failed to remove timeout " + this);
-
-               return;
-            }
-         }
-
-         ref.setScheduledDeliveryTime(0);
-
-         HandleStatus status = deliver(ref);
-
-         if (HandleStatus.HANDLED != status)
-         {
-            // Add back to the front of the queue
-
-            addFirst(ref);
-         }
-         else
-         {
-            if (trace)
-            {
-               log.trace("Delivered scheduled delivery at "
-                     + System.currentTimeMillis() + " for " + ref);
-            }
-         }
-      }
-   }
-}
-

Deleted: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl_nc.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl_nc.java	2008-06-09 17:22:39 UTC (rev 4412)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl_nc.java	2008-06-09 19:08:14 UTC (rev 4413)
@@ -1,756 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.server.impl;
-
-import java.util.ArrayList;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.jboss.messaging.core.filter.Filter;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.postoffice.FlowController;
-import org.jboss.messaging.core.server.Consumer;
-import org.jboss.messaging.core.server.DistributionPolicy;
-import org.jboss.messaging.core.server.HandleStatus;
-import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.transaction.Transaction;
-import org.jboss.messaging.core.transaction.impl.TransactionImpl;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * 
- * Implementation of a Queue
- * 
- * TODO use Java 5 concurrent queue
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="ataylor at redhat.com">Andy Taylor</a>
- * 
- */
-public class QueueImpl_nc implements Queue
-{
-   private static final Logger log = Logger.getLogger(QueueImpl.class);
-
-   private static final boolean trace = log.isTraceEnabled();
-
-   private volatile long persistenceID = -1;
-
-   private final SimpleString name;
-
-   private volatile Filter filter;
-
-   private final boolean clustered;
-
-   private final boolean durable;
-
-   private final boolean temporary;
-
-   private final int maxSizeBytes;
-     
-   private final ScheduledExecutorService scheduledExecutor;
-
-   private final LinkedList<MessageReference> messageReferences = new LinkedList<MessageReference>();
-
-   private final List<Consumer> consumers = new ArrayList<Consumer>();
-
-   private final Set<ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashSet<ScheduledDeliveryRunnable>();
-
-   private volatile DistributionPolicy distributionPolicy = new RoundRobinDistributionPolicy();
-
-   private boolean direct;
-
-   private boolean promptDelivery;
-
-   private int pos;
-   
-   private AtomicInteger sizeBytes = new AtomicInteger(0);
-
-   private AtomicInteger messagesAdded = new AtomicInteger(0);
-
-   private AtomicInteger deliveringCount = new AtomicInteger(0);
-
-   private volatile FlowController flowController;
-   
-   private AtomicBoolean waitingToDeliver = new AtomicBoolean(false);  
-   
-   private final Runnable deliverRunner = new DeliverRunner();
-   
-   
-   public QueueImpl_nc(final long persistenceID, final SimpleString name,
-         final Filter filter, final boolean clustered, final boolean durable,
-         final boolean temporary, final int maxSizeBytes,
-         final ScheduledExecutorService scheduledExecutor)
-   {
-      this.persistenceID = persistenceID;
-
-      this.name = name;
-
-      this.filter = filter;
-
-      this.clustered = clustered;
-
-      this.durable = durable;
-
-      this.temporary = temporary;
-
-      this.maxSizeBytes = maxSizeBytes;
-
-      this.scheduledExecutor = scheduledExecutor;
-
-      direct = true;
-   }
-
-   // Queue implementation
-   // -------------------------------------------------------------------
-
-   public boolean isClustered()
-   {
-      return clustered;
-   }
-
-   public boolean isDurable()
-   {
-      return durable;
-   }
-
-   public boolean isTemporary()
-   {
-      return temporary;
-   }
-
-   public SimpleString getName()
-   {
-      return name;
-   }
-
-   public synchronized HandleStatus addLast(final MessageReference ref)
-   {
-      return add(ref, false);
-   }
-
-   public synchronized HandleStatus addFirst(final MessageReference ref)
-   {
-      return add(ref, true);
-   }
-
-   public synchronized void addListFirst(final LinkedList<MessageReference> list)
-   {
-      ListIterator<MessageReference> iter = list.listIterator(list.size());
-
-      while (iter.hasPrevious())
-      {
-         MessageReference ref = iter.previous();
-
-         messageReferences.addFirst(ref);
-      }
-
-      deliver();
-   }
- 
-   public void deliverAsync(final Executor executor)
-   {
-      //Prevent too many executors running at once
-      
-      if (waitingToDeliver.compareAndSet(false, true))
-      {
-         executor.execute(deliverRunner);
-      }
-   }
-      
-   /*
-    * Attempt to deliver all the messages in the queue
-    * 
-    * @see org.jboss.messaging.newcore.intf.Queue#deliver()
-    */
-   public synchronized void deliver()
-   {
-      MessageReference reference;
-
-      ListIterator<MessageReference> iterator = null;
-
-      while (true)
-      {
-         if (iterator == null)
-         {
-            reference = messageReferences.peek();
-         }
-         else
-         {
-            if (iterator.hasNext())
-            {
-               reference = iterator.next();
-            }
-            else
-            {
-               reference = null;
-            }
-         }
-
-         if (reference == null)
-         {
-            if (iterator == null)
-            {
-               // We delivered all the messages - go into direct delivery
-               direct = true;
-               
-               promptDelivery = false;
-            }
-            return;
-         }
-
-         HandleStatus status = deliver(reference);
-
-         if (status == HandleStatus.HANDLED)
-         {
-            if (iterator == null)
-            {
-               messageReferences.removeFirst();              
-            }
-            else
-            {
-               iterator.remove();
-            }
-         }
-         else if (status == HandleStatus.BUSY)
-         {
-            // All consumers busy - give up
-            break;
-         }
-         else if (status == HandleStatus.NO_MATCH && iterator == null)
-         {
-            // Consumers not all busy - but filter not accepting - iterate back
-            // through the queue
-            iterator = messageReferences.listIterator();
-         }
-      }
-   }
-
-   public synchronized void addConsumer(final Consumer consumer)
-   {
-      consumers.add(consumer);
-   }
-
-   public synchronized boolean removeConsumer(final Consumer consumer)
-   {
-      boolean removed = consumers.remove(consumer);
-
-      if (pos == consumers.size())
-      {
-         pos = 0;
-      }
-
-      if (consumers.isEmpty())
-      {
-         promptDelivery = false;
-      }
-
-      return removed;
-   }
-
-   public synchronized int getConsumerCount()
-   {
-      return consumers.size();
-   }
-
-   public synchronized List<MessageReference> list(final Filter filter)
-   {
-      if (filter == null)
-      {
-         return new ArrayList<MessageReference>(messageReferences);
-      }
-      else
-      {
-         ArrayList<MessageReference> list = new ArrayList<MessageReference>();
-
-         for (MessageReference ref : messageReferences)
-         {
-            if (filter.match(ref.getMessage()))
-            {
-               list.add(ref);
-            }
-         }
-
-         return list;
-      }
-   }
-
-   public synchronized boolean removeReferenceWithID(final long id)
-   {
-      ListIterator<MessageReference> iterator = messageReferences.listIterator();
-
-      boolean removed = false;
-
-      while (iterator.hasNext())
-      {
-         MessageReference ref = iterator.next();
-
-         if (ref.getMessage().getMessageID() == id)
-         {
-            iterator.remove();
-
-            removed = true;
-
-            break;
-         }
-      }
-
-      return removed;
-   }
-
-   public synchronized MessageReference getReference(final long id)
-   {
-      ListIterator<MessageReference> iterator = messageReferences.listIterator();
-
-      while (iterator.hasNext())
-      {
-         MessageReference ref = iterator.next();
-
-         if (ref.getMessage().getMessageID() == id) { return ref; }
-      }
-
-      return null;
-   }
-
-   public long getPersistenceID()
-   {
-      return persistenceID;
-   }
-
-   public void setPersistenceID(final long id)
-   {
-      this.persistenceID = id;
-   }
-
-   public Filter getFilter()
-   {
-      return filter;
-   }
-
-   public void setFilter(final Filter filter)
-   {
-      this.filter = filter;
-   }
-
-   public synchronized int getMessageCount()
-   {
-      //log.info("mr: " + messageReferences.size() + " sc:" + getScheduledCount() + " dc:" + getDeliveringCount());
-      return messageReferences.size() + getScheduledCount()
-            + getDeliveringCount();
-   }
-
-   public synchronized int getScheduledCount()
-   {
-      return scheduledRunnables.size();
-   }
-
-   public int getDeliveringCount()
-   {
-      return deliveringCount.get();
-   }
-
-   public void referenceAcknowledged(MessageReference ref) throws Exception
-   {
-      deliveringCount.decrementAndGet();
-      
-      sizeBytes.addAndGet(-ref.getMessage().encodeSize());
-
-//      if (flowController != null)
-//      {
-//         flowController.messageAcknowledged();
-//      }
-   }
-
-   public void referenceCancelled()
-   {
-      deliveringCount.decrementAndGet();
-   }
-
-   public int getMaxSizeBytes()
-   {
-      return maxSizeBytes;
-   }
-   
-   public int getSizeBytes()
-   {
-      return sizeBytes.get();
-   }
-
-   public DistributionPolicy getDistributionPolicy()
-   {
-      return distributionPolicy;
-   }
-
-   public void setDistributionPolicy(final DistributionPolicy distributionPolicy)
-   {
-      this.distributionPolicy = distributionPolicy;
-   }
-
-   public int getMessagesAdded()
-   {
-      return messagesAdded.get();
-   }
-
-   public void setFlowController(final FlowController flowController)
-   {
-      this.flowController = flowController;
-   }
-
-   public FlowController getFlowController()
-   {
-      return flowController;
-   }
-
-   public synchronized void deleteAllReferences(
-         final StorageManager storageManager) throws Exception
-   {
-      Transaction tx = new TransactionImpl(storageManager, null);
-
-      ListIterator<MessageReference> iter = messageReferences.listIterator();
-
-      while (iter.hasNext())
-      {
-         MessageReference ref = iter.next();
-
-         deliveringCount.incrementAndGet();
-
-         tx.addAcknowledgement(ref);
-
-         iter.remove();
-      }
-
-      synchronized (scheduledRunnables)
-      {
-         for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
-         {
-            runnable.cancel();
-
-            deliveringCount.incrementAndGet();
-
-            tx.addAcknowledgement(runnable.getReference());
-         }
-
-         scheduledRunnables.clear();
-      }
-
-      tx.commit();
-   }
-
-
-   // Public
-   // -----------------------------------------------------------------------------
-
-   public boolean equals(Object other)
-   {
-      if (this == other) { return true; }
-
-      QueueImpl_nc qother = (QueueImpl_nc) other;
-
-      return name.equals(qother.name);
-   }
-
-   public int hashCode()
-   {
-      return name.hashCode();
-   }
-
-   // Private
-   // ------------------------------------------------------------------------------
-
-   private HandleStatus add(final MessageReference ref, final boolean first)
-   {
-      if (maxSizeBytes != -1 && sizeBytes.get() + ref.getMessage().encodeSize() >= maxSizeBytes)
-      {
-         return HandleStatus.BUSY;              
-      }
-      
-      if (!first)
-      {
-         messagesAdded.incrementAndGet();
-         
-         sizeBytes.addAndGet(ref.getMessage().encodeSize());
-      }
-      
-      if (checkAndSchedule(ref))
-      {
-         return HandleStatus.HANDLED;         
-      }
-
-      boolean add = false;
-
-      if (direct)
-      {
-         // Deliver directly
-
-         HandleStatus status = deliver(ref);
-
-         if (status == HandleStatus.HANDLED)
-         {
-            // Ok
-         }
-         else if (status == HandleStatus.BUSY)
-         {
-            add = true;
-         }
-         else if (status == HandleStatus.NO_MATCH)
-         {
-            add = true;
-         }
-
-         if (add)
-         {
-            direct = false;
-         }
-      }
-      else
-      {
-         add = true;
-      }
-
-      if (add)
-      {
-         if (first)
-         {
-            messageReferences.addFirst(ref);
-         }
-         else
-         {
-            messageReferences.addLast(ref);
-         }
-
-         if (!direct && promptDelivery)
-         {
-            // We have consumers with filters which don't match, so we need
-            // to prompt delivery every time
-            // a new message arrives - this is why you really shouldn't use
-            // filters with queues - in most cases
-            // it's an ant-pattern since it would cause a queue scan on each
-            // message
-            deliver();
-         }
-      }     
-
-      return HandleStatus.HANDLED;
-   }
-
-   private boolean checkAndSchedule(final MessageReference ref)
-   {
-      long deliveryTime = ref.getScheduledDeliveryTime();
-      
-      if (deliveryTime != 0 && scheduledExecutor != null)
-      {      
-         long now = System.currentTimeMillis();
-      
-         if (deliveryTime > now)
-         {
-            if (trace)
-            {
-               log.trace("Scheduling delivery for " + ref + " to occur at "  + deliveryTime);
-            }
-   
-            long delay = deliveryTime - now;
-   
-            ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
-   
-            scheduledRunnables.add(runnable);
-   
-            Future<?> future = scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
-   
-            runnable.setFuture(future);
-   
-            return true;
-         }
-      }
-      return false;      
-   }
-
-   private HandleStatus deliver(final MessageReference reference)
-   {
-      if (consumers.isEmpty())
-      {
-         return HandleStatus.BUSY;
-      }
-
-      int startPos = pos;
-
-      boolean filterRejected = false;
-
-      while (true)
-      {
-         Consumer consumer = consumers.get(pos);
-
-         pos = distributionPolicy.select(consumers, pos);
-
-         HandleStatus status;
-
-         try
-         {
-            status = consumer.handle(reference);
-         }
-         catch (Throwable t)
-         {
-            log.warn("removing consumer which did not handle a message, " +
-                  "consumer=" + consumer + ", message=" + reference, t);
-
-            // If the consumer throws an exception we remove the consumer
-            removeConsumer(consumer);
-
-            return HandleStatus.BUSY;
-         }
-
-         if (status == null) { throw new IllegalStateException(
-               "ClientConsumer.handle() should never return null"); }
-
-         if (status == HandleStatus.HANDLED)
-         {
-            deliveringCount.incrementAndGet();
-
-            return HandleStatus.HANDLED;
-         }
-         else if (status == HandleStatus.NO_MATCH)
-         {
-            promptDelivery = true;
-
-            filterRejected = true;
-         }
-
-         if (pos == startPos)
-         {
-            // Tried all of them
-            if (filterRejected)
-            {
-               return HandleStatus.NO_MATCH;
-            }
-            else
-            {
-               // Give up - all consumers busy
-               return HandleStatus.BUSY;
-            }
-         }
-      }
-   }
-
-   // Inner classes
-   // --------------------------------------------------------------------------
-
-   private class DeliverRunner implements Runnable
-   {
-      public void run()
-      {
-         //Must be set to false *before* executing to avoid race
-         waitingToDeliver.set(false);
-         
-         deliver();
-      }
-   }   
-   
-   private class ScheduledDeliveryRunnable implements Runnable
-   {
-      private final MessageReference ref;
-
-      private volatile Future<?> future;
-
-      private boolean cancelled;
-
-      public ScheduledDeliveryRunnable(final MessageReference ref)
-      {
-         this.ref = ref;
-      }
-
-      public synchronized void setFuture(final Future<?> future)
-      {
-         if (cancelled)
-         {
-            future.cancel(false);
-         }
-         else
-         {
-            this.future = future;
-         }
-      }
-
-      public synchronized void cancel()
-      {
-         if (future != null)
-         {
-            future.cancel(false);
-         }
-
-         cancelled = true;
-      }
-
-      public MessageReference getReference()
-      {
-         return ref;
-      }
-
-      public void run()
-      {
-         if (trace)
-         {
-            log.trace("Scheduled delivery timeout " + ref);
-         }
-
-         synchronized (scheduledRunnables)
-         {
-            boolean removed = scheduledRunnables.remove(this);
-
-            if (!removed)
-            {
-               log.warn("Failed to remove timeout " + this);
-
-               return;
-            }
-         }
-
-         ref.setScheduledDeliveryTime(0);
-
-         HandleStatus status = deliver(ref);
-
-         if (HandleStatus.HANDLED != status)
-         {
-            // Add back to the front of the queue
-
-            addFirst(ref);
-         }
-         else
-         {
-            if (trace)
-            {
-               log.trace("Delivered scheduled delivery at "
-                     + System.currentTimeMillis() + " for " + ref);
-            }
-         }
-      }
-   }
-}
-

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-06-09 17:22:39 UTC (rev 4412)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-06-09 19:08:14 UTC (rev 4413)
@@ -284,6 +284,11 @@
          }
       }  	
    }      
+   
+   public Queue getQueue()
+   {
+      return messageQueue;
+   }
 
    // Public -----------------------------------------------------------------------------
      

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-06-09 17:22:39 UTC (rev 4412)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-06-09 19:08:14 UTC (rev 4413)
@@ -21,6 +21,20 @@
  */
 package org.jboss.messaging.core.server.impl;
 
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.filter.impl.FilterImpl;
@@ -31,11 +45,24 @@
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.PacketReturner;
-import org.jboss.messaging.core.remoting.impl.wireformat.*;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
 import org.jboss.messaging.core.security.CheckType;
 import org.jboss.messaging.core.security.SecurityStore;
-import org.jboss.messaging.core.server.*;
+import org.jboss.messaging.core.server.Delivery;
+import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerConnection;
+import org.jboss.messaging.core.server.ServerConsumer;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.ServerProducer;
+import org.jboss.messaging.core.server.ServerSession;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.ResourceManager;
@@ -44,15 +71,6 @@
 import org.jboss.messaging.util.ConcurrentHashSet;
 import org.jboss.messaging.util.SimpleString;
 
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * Session implementation
  *
@@ -117,7 +135,7 @@
 
    private Transaction tx;
 
-   private final Object rollbackCancelLock = new Object();
+  // private final Object rollbackCancelLock = new Object();
 
    // Constructors
    // ---------------------------------------------------------------------------------
@@ -210,14 +228,11 @@
    {
       Delivery delivery;
 
-      synchronized (rollbackCancelLock)
-      {
-         long nextID = deliveryIDSequence.getAndIncrement();
+      long nextID = deliveryIDSequence.getAndIncrement();
 
-         delivery = new DeliveryImpl(ref, id, consumer.getClientTargetID(), nextID, sender);
+      delivery = new DeliveryImpl(ref, id, consumer.getClientTargetID(), nextID, sender);
 
-         deliveries.add(delivery);
-      }
+      deliveries.add(delivery);      
 
       delivery.deliver();
    }
@@ -422,25 +437,46 @@
 
          tx = new TransactionImpl(persistenceManager, postOffice);
       }
-
-      // Synchronize to prevent any new deliveries arriving during this recovery. 
-      synchronized (rollbackCancelLock)
+      
+      //We need to lock all the queues while we're rolling back, to prevent any deliveries occurring during this
+      //period
+      
+      List<Queue> locked = new ArrayList<Queue>();
+      
+      for (ServerConsumer consumer: consumers)
+      {         
+         consumer.getQueue().lock();
+         
+         locked.add(consumer.getQueue());
+      }
+      
+      try
       {
+         
          // Add any unacked deliveries into the tx. Doing this ensures all references are rolled back in the correct
          // order in a single contiguous block
-
+   
          for (Delivery del : deliveries)
          {
             tx.addAcknowledgement(del.getReference());
          }
-
+   
          deliveries.clear();
-
+   
          deliveryIDSequence.addAndGet(-tx.getAcknowledgementsCount());
+         
+         tx.rollback(queueSettingsRepository);
       }
-
-      tx.rollback(queueSettingsRepository);
-
+      finally
+      {
+         //Now unlock
+         
+         for (Queue queue: locked)
+         {
+            queue.unlock();
+         }
+      }
+      
       tx = new TransactionImpl(persistenceManager, postOffice);
    }
 
@@ -449,22 +485,41 @@
       if (deliveryID == -1)
       {
          // Cancel all
-
-         Transaction cancelTx;
-
-         synchronized (rollbackCancelLock)
+         
+         //We need to lock all the queues while we're rolling back, to prevent any deliveries occurring during this
+         //period
+         
+         List<Queue> locked = new ArrayList<Queue>();
+         
+         for (ServerConsumer consumer: consumers)
+         {         
+            consumer.getQueue().lock();
+            
+            locked.add(consumer.getQueue());
+         }
+         
+         try
          {
-            cancelTx = new TransactionImpl(persistenceManager, postOffice);
-
+            Transaction cancelTx = new TransactionImpl(persistenceManager, postOffice);
+   
             for (Delivery del : deliveries)
             {
                cancelTx.addAcknowledgement(del.getReference());
             }
-
+   
             deliveries.clear();
+            
+            cancelTx.rollback(queueSettingsRepository);
          }
-
-         cancelTx.rollback(queueSettingsRepository);
+         finally
+         {
+            //Now unlock
+            
+            for (Queue queue: locked)
+            {
+               queue.unlock();
+            }
+         }
       }
       else if (expired)
       {




More information about the jboss-cvs-commits mailing list