[jboss-cvs] JBoss Messaging SVN: r4315 - in trunk: src/main/org/jboss/messaging/core/server/impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed May 28 04:45:44 EDT 2008


Author: timfox
Date: 2008-05-28 04:45:44 -0400 (Wed, 28 May 2008)
New Revision: 4315

Added:
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl_c.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl_nc.java
Modified:
   trunk/examples/jms/build.xml
Log:
Added a couple of files I will use in perf analysis


Modified: trunk/examples/jms/build.xml
===================================================================
--- trunk/examples/jms/build.xml	2008-05-28 04:37:33 UTC (rev 4314)
+++ trunk/examples/jms/build.xml	2008-05-28 08:45:44 UTC (rev 4315)
@@ -43,10 +43,10 @@
    <!--perf props-->
    <property name="message.count" value="200000"/>
    <property name="message.warmup.count" value="10000"/>
-   <property name="delivery.mode" value="PERSISTENT"/>
+   <property name="delivery.mode" value="NON_PERSISTENT"/>
 	<!-- in seconds -->
    <property name="sample.period" value="1"/>
-   <property name="sess.trans" value="true"/>
+   <property name="sess.trans" value="false"/>
    <property name="sess.trans.size" value="100"/>
 
    <path id="compile.classpath">

Added: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl_c.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl_c.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl_c.java	2008-05-28 08:45:44 UTC (rev 4315)
@@ -0,0 +1,760 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.FlowController;
+import org.jboss.messaging.core.server.Consumer;
+import org.jboss.messaging.core.server.DistributionPolicy;
+import org.jboss.messaging.core.server.HandleStatus;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.core.transaction.impl.TransactionImpl;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * Implementation of a Queue
+ * 
+ * TODO use Java 5 concurrent queue
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ * 
+ */
+public class QueueImpl_c implements Queue
+{
+   private static final Logger log = Logger.getLogger(QueueImpl.class);
+
+   private static final boolean trace = log.isTraceEnabled();
+
+   private volatile long persistenceID = -1;
+
+   private final SimpleString name;
+
+   private volatile Filter filter;
+
+   private final boolean clustered;
+
+   private final boolean durable;
+
+   private final boolean temporary;
+
+   private final int maxSizeBytes;
+     
+   private final ScheduledExecutorService scheduledExecutor;
+
+   private final java.util.Queue<MessageReference> messageReferences = new ConcurrentLinkedQueue<MessageReference>();
+
+   private final List<Consumer> consumers = new ArrayList<Consumer>();
+
+   private final Set<ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashSet<ScheduledDeliveryRunnable>();
+
+   private volatile DistributionPolicy distributionPolicy = new RoundRobinDistributionPolicy();
+
+   private boolean direct;
+
+   private boolean promptDelivery;
+
+   private int pos;
+   
+   private AtomicInteger sizeBytes = new AtomicInteger(0);
+
+   private AtomicInteger messagesAdded = new AtomicInteger(0);
+
+   private AtomicInteger deliveringCount = new AtomicInteger(0);
+
+   private volatile FlowController flowController;
+   
+   private AtomicBoolean waitingToDeliver = new AtomicBoolean(false);  
+   
+   private final Runnable deliverRunner = new DeliverRunner();
+   
+   
+   public QueueImpl_c(final long persistenceID, final SimpleString name,
+         final Filter filter, final boolean clustered, final boolean durable,
+         final boolean temporary, final int maxSizeBytes,
+         final ScheduledExecutorService scheduledExecutor)
+   {
+      this.persistenceID = persistenceID;
+
+      this.name = name;
+
+      this.filter = filter;
+
+      this.clustered = clustered;
+
+      this.durable = durable;
+
+      this.temporary = temporary;
+
+      this.maxSizeBytes = maxSizeBytes;
+
+      this.scheduledExecutor = scheduledExecutor;
+
+      direct = true;
+   }
+
+   // Queue implementation
+   // -------------------------------------------------------------------
+
+   public boolean isClustered()
+   {
+      return clustered;
+   }
+
+   public boolean isDurable()
+   {
+      return durable;
+   }
+
+   public boolean isTemporary()
+   {
+      return temporary;
+   }
+
+   public SimpleString getName()
+   {
+      return name;
+   }
+
+   public HandleStatus addLast(final MessageReference ref)
+   {
+      return add(ref, false);
+   }
+
+   public HandleStatus addFirst(final MessageReference ref)
+   {
+      return add(ref, true);
+   }
+
+   public void addListFirst(final LinkedList<MessageReference> list)
+   {
+//      ListIterator<MessageReference> iter = list.listIterator(list.size());
+//
+//      while (iter.hasPrevious())
+//      {
+//         MessageReference ref = iter.previous();
+//
+//         messageReferences.addFirst(ref);
+//      }
+//
+//      deliver();
+      throw new UnsupportedOperationException();
+   }
+ 
+   public void deliverAsync(final Executor executor)
+   {
+      //Prevent too many executors running at once
+      
+      if (waitingToDeliver.compareAndSet(false, true))
+      {
+         executor.execute(deliverRunner);
+      }
+   }
+      
+   /*
+    * Attempt to deliver all the messages in the queue
+    * 
+    * @see org.jboss.messaging.newcore.intf.Queue#deliver()
+    */
+   public void deliver()
+   {
+      MessageReference reference;
+
+      Iterator<MessageReference> iterator = null;
+
+      while (true)
+      {
+         if (iterator == null)
+         {
+            reference = messageReferences.peek();
+         }
+         else
+         {
+            if (iterator.hasNext())
+            {
+               reference = iterator.next();
+            }
+            else
+            {
+               reference = null;
+            }
+         }
+
+         if (reference == null)
+         {
+            if (iterator == null)
+            {
+               // We delivered all the messages - go into direct delivery
+               direct = true;
+               
+               promptDelivery = false;
+            }
+            return;
+         }
+
+         HandleStatus status = deliver(reference);
+
+         if (status == HandleStatus.HANDLED)
+         {
+            if (iterator == null)
+            {
+               messageReferences.poll();             
+            }
+            else
+            {
+               iterator.remove();
+            }
+         }
+         else if (status == HandleStatus.BUSY)
+         {
+            // All consumers busy - give up
+            break;
+         }
+         else if (status == HandleStatus.NO_MATCH && iterator == null)
+         {
+            // Consumers not all busy - but filter not accepting - iterate back
+            // through the queue
+            iterator = messageReferences.iterator();
+         }
+      }
+   }
+
+   public synchronized void addConsumer(final Consumer consumer)
+   {
+      consumers.add(consumer);
+   }
+
+   public synchronized boolean removeConsumer(final Consumer consumer)
+   {
+      boolean removed = consumers.remove(consumer);
+
+      if (pos == consumers.size())
+      {
+         pos = 0;
+      }
+
+      if (consumers.isEmpty())
+      {
+         promptDelivery = false;
+      }
+
+      return removed;
+   }
+
+   public synchronized int getConsumerCount()
+   {
+      return consumers.size();
+   }
+
+   public synchronized List<MessageReference> list(final Filter filter)
+   {
+      if (filter == null)
+      {
+         return new ArrayList<MessageReference>(messageReferences);
+      }
+      else
+      {
+         ArrayList<MessageReference> list = new ArrayList<MessageReference>();
+
+         for (MessageReference ref : messageReferences)
+         {
+            if (filter.match(ref.getMessage()))
+            {
+               list.add(ref);
+            }
+         }
+
+         return list;
+      }
+   }
+
+   public synchronized boolean removeReferenceWithID(final long id)
+   {
+      Iterator<MessageReference> iterator = messageReferences.iterator();
+
+      boolean removed = false;
+
+      while (iterator.hasNext())
+      {
+         MessageReference ref = iterator.next();
+
+         if (ref.getMessage().getMessageID() == id)
+         {
+            iterator.remove();
+
+            removed = true;
+
+            break;
+         }
+      }
+
+      return removed;
+   }
+
+   public synchronized MessageReference getReference(final long id)
+   {
+      Iterator<MessageReference> iterator = messageReferences.iterator();
+
+      while (iterator.hasNext())
+      {
+         MessageReference ref = iterator.next();
+
+         if (ref.getMessage().getMessageID() == id) { return ref; }
+      }
+
+      return null;
+   }
+
+   public long getPersistenceID()
+   {
+      return persistenceID;
+   }
+
+   public void setPersistenceID(final long id)
+   {
+      this.persistenceID = id;
+   }
+
+   public Filter getFilter()
+   {
+      return filter;
+   }
+
+   public void setFilter(final Filter filter)
+   {
+      this.filter = filter;
+   }
+
+   public synchronized int getMessageCount()
+   {
+      //log.info("mr: " + messageReferences.size() + " sc:" + getScheduledCount() + " dc:" + getDeliveringCount());
+      return messageReferences.size() + getScheduledCount()
+            + getDeliveringCount();
+   }
+
+   public synchronized int getScheduledCount()
+   {
+      return scheduledRunnables.size();
+   }
+
+   public int getDeliveringCount()
+   {
+      return deliveringCount.get();
+   }
+
+   public void referenceAcknowledged(MessageReference ref) throws Exception
+   {
+      deliveringCount.decrementAndGet();
+      
+      sizeBytes.addAndGet(-ref.getMessage().encodeSize());
+
+//      if (flowController != null)
+//      {
+//         flowController.messageAcknowledged();
+//      }
+   }
+
+   public void referenceCancelled()
+   {
+      deliveringCount.decrementAndGet();
+   }
+
+   public int getMaxSizeBytes()
+   {
+      return maxSizeBytes;
+   }
+   
+   public int getSizeBytes()
+   {
+      return sizeBytes.get();
+   }
+
+   public DistributionPolicy getDistributionPolicy()
+   {
+      return distributionPolicy;
+   }
+
+   public void setDistributionPolicy(final DistributionPolicy distributionPolicy)
+   {
+      this.distributionPolicy = distributionPolicy;
+   }
+
+   public int getMessagesAdded()
+   {
+      return messagesAdded.get();
+   }
+
+   public void setFlowController(final FlowController flowController)
+   {
+      this.flowController = flowController;
+   }
+
+   public FlowController getFlowController()
+   {
+      return flowController;
+   }
+
+   public synchronized void deleteAllReferences(
+         final StorageManager storageManager) throws Exception
+   {
+      Transaction tx = new TransactionImpl(storageManager, null);
+
+      Iterator<MessageReference> iter = messageReferences.iterator();
+
+      while (iter.hasNext())
+      {
+         MessageReference ref = iter.next();
+
+         deliveringCount.incrementAndGet();
+
+         tx.addAcknowledgement(ref);
+
+         iter.remove();
+      }
+
+      synchronized (scheduledRunnables)
+      {
+         for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
+         {
+            runnable.cancel();
+
+            deliveringCount.incrementAndGet();
+
+            tx.addAcknowledgement(runnable.getReference());
+         }
+
+         scheduledRunnables.clear();
+      }
+
+      tx.commit();
+   }
+
+
+   // Public
+   // -----------------------------------------------------------------------------
+
+   public boolean equals(Object other)
+   {
+      if (this == other) { return true; }
+
+      QueueImpl_c qother = (QueueImpl_c) other;
+
+      return name.equals(qother.name);
+   }
+
+   public int hashCode()
+   {
+      return name.hashCode();
+   }
+
+   // Private
+   // ------------------------------------------------------------------------------
+
+   private HandleStatus add(final MessageReference ref, final boolean first)
+   {
+      if (maxSizeBytes != -1 && sizeBytes.get() + ref.getMessage().encodeSize() >= maxSizeBytes)
+      {
+         return HandleStatus.BUSY;              
+      }
+      
+      if (!first)
+      {
+         messagesAdded.incrementAndGet();
+         
+         sizeBytes.addAndGet(ref.getMessage().encodeSize());
+      }
+      
+      if (checkAndSchedule(ref))
+      {
+         return HandleStatus.HANDLED;         
+      }
+
+      boolean add = false;
+
+      if (direct)
+      {
+         // Deliver directly
+
+         HandleStatus status = deliver(ref);
+
+         if (status == HandleStatus.HANDLED)
+         {
+            // Ok
+         }
+         else if (status == HandleStatus.BUSY)
+         {
+            add = true;
+         }
+         else if (status == HandleStatus.NO_MATCH)
+         {
+            add = true;
+         }
+
+         if (add)
+         {
+            direct = false;
+         }
+      }
+      else
+      {
+         add = true;
+      }
+
+      if (add)
+      {
+         if (first)
+         {
+            //messageReferences.addFirst(ref);
+            throw new UnsupportedOperationException();
+         }
+         else
+         {
+            messageReferences.offer(ref);
+         }
+
+         if (!direct && promptDelivery)
+         {
+            // We have consumers with filters which don't match, so we need
+            // to prompt delivery every time
+            // a new message arrives - this is why you really shouldn't use
+            // filters with queues - in most cases
+            // it's an ant-pattern since it would cause a queue scan on each
+            // message
+            deliver();
+         }
+      }     
+
+      return HandleStatus.HANDLED;
+   }
+
+   private boolean checkAndSchedule(final MessageReference ref)
+   {
+      long deliveryTime = ref.getScheduledDeliveryTime();
+      
+      if (deliveryTime != 0 && scheduledExecutor != null)
+      {      
+         long now = System.currentTimeMillis();
+      
+         if (deliveryTime > now)
+         {
+            if (trace)
+            {
+               log.trace("Scheduling delivery for " + ref + " to occur at "  + deliveryTime);
+            }
+   
+            long delay = deliveryTime - now;
+   
+            ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
+   
+            scheduledRunnables.add(runnable);
+   
+            Future<?> future = scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
+   
+            runnable.setFuture(future);
+   
+            return true;
+         }
+      }
+      return false;      
+   }
+
+   private HandleStatus deliver(final MessageReference reference)
+   {
+      if (consumers.isEmpty())
+      {
+         return HandleStatus.BUSY;
+      }
+
+      int startPos = pos;
+
+      boolean filterRejected = false;
+
+      while (true)
+      {
+         Consumer consumer = consumers.get(pos);
+
+         pos = distributionPolicy.select(consumers, pos);
+
+         HandleStatus status;
+
+         try
+         {
+            status = consumer.handle(reference);
+         }
+         catch (Throwable t)
+         {
+            log.warn("removing consumer which did not handle a message, " +
+                  "consumer=" + consumer + ", message=" + reference, t);
+
+            // If the consumer throws an exception we remove the consumer
+            removeConsumer(consumer);
+
+            return HandleStatus.BUSY;
+         }
+
+         if (status == null) { throw new IllegalStateException(
+               "ClientConsumer.handle() should never return null"); }
+
+         if (status == HandleStatus.HANDLED)
+         {
+            deliveringCount.incrementAndGet();
+
+            return HandleStatus.HANDLED;
+         }
+         else if (status == HandleStatus.NO_MATCH)
+         {
+            promptDelivery = true;
+
+            filterRejected = true;
+         }
+
+         if (pos == startPos)
+         {
+            // Tried all of them
+            if (filterRejected)
+            {
+               return HandleStatus.NO_MATCH;
+            }
+            else
+            {
+               // Give up - all consumers busy
+               return HandleStatus.BUSY;
+            }
+         }
+      }
+   }
+
+   // Inner classes
+   // --------------------------------------------------------------------------
+
+   private class DeliverRunner implements Runnable
+   {
+      public void run()
+      {
+         //Must be set to false *before* executing to avoid race
+         waitingToDeliver.set(false);
+         
+         deliver();
+      }
+   }   
+   
+   private class ScheduledDeliveryRunnable implements Runnable
+   {
+      private final MessageReference ref;
+
+      private volatile Future<?> future;
+
+      private boolean cancelled;
+
+      public ScheduledDeliveryRunnable(final MessageReference ref)
+      {
+         this.ref = ref;
+      }
+
+      public synchronized void setFuture(final Future<?> future)
+      {
+         if (cancelled)
+         {
+            future.cancel(false);
+         }
+         else
+         {
+            this.future = future;
+         }
+      }
+
+      public synchronized void cancel()
+      {
+         if (future != null)
+         {
+            future.cancel(false);
+         }
+
+         cancelled = true;
+      }
+
+      public MessageReference getReference()
+      {
+         return ref;
+      }
+
+      public void run()
+      {
+         if (trace)
+         {
+            log.trace("Scheduled delivery timeout " + ref);
+         }
+
+         synchronized (scheduledRunnables)
+         {
+            boolean removed = scheduledRunnables.remove(this);
+
+            if (!removed)
+            {
+               log.warn("Failed to remove timeout " + this);
+
+               return;
+            }
+         }
+
+         ref.setScheduledDeliveryTime(0);
+
+         HandleStatus status = deliver(ref);
+
+         if (HandleStatus.HANDLED != status)
+         {
+            // Add back to the front of the queue
+
+            addFirst(ref);
+         }
+         else
+         {
+            if (trace)
+            {
+               log.trace("Delivered scheduled delivery at "
+                     + System.currentTimeMillis() + " for " + ref);
+            }
+         }
+      }
+   }
+}
+

Added: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl_nc.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl_nc.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl_nc.java	2008-05-28 08:45:44 UTC (rev 4315)
@@ -0,0 +1,756 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server.impl;
+
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.FlowController;
+import org.jboss.messaging.core.server.Consumer;
+import org.jboss.messaging.core.server.DistributionPolicy;
+import org.jboss.messaging.core.server.HandleStatus;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.core.transaction.impl.TransactionImpl;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * Implementation of a Queue
+ * 
+ * TODO use Java 5 concurrent queue
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ * 
+ */
+public class QueueImpl_nc implements Queue
+{
+   private static final Logger log = Logger.getLogger(QueueImpl.class);
+
+   private static final boolean trace = log.isTraceEnabled();
+
+   private volatile long persistenceID = -1;
+
+   private final SimpleString name;
+
+   private volatile Filter filter;
+
+   private final boolean clustered;
+
+   private final boolean durable;
+
+   private final boolean temporary;
+
+   private final int maxSizeBytes;
+     
+   private final ScheduledExecutorService scheduledExecutor;
+
+   private final LinkedList<MessageReference> messageReferences = new LinkedList<MessageReference>();
+
+   private final List<Consumer> consumers = new ArrayList<Consumer>();
+
+   private final Set<ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashSet<ScheduledDeliveryRunnable>();
+
+   private volatile DistributionPolicy distributionPolicy = new RoundRobinDistributionPolicy();
+
+   private boolean direct;
+
+   private boolean promptDelivery;
+
+   private int pos;
+   
+   private AtomicInteger sizeBytes = new AtomicInteger(0);
+
+   private AtomicInteger messagesAdded = new AtomicInteger(0);
+
+   private AtomicInteger deliveringCount = new AtomicInteger(0);
+
+   private volatile FlowController flowController;
+   
+   private AtomicBoolean waitingToDeliver = new AtomicBoolean(false);  
+   
+   private final Runnable deliverRunner = new DeliverRunner();
+   
+   
+   public QueueImpl_nc(final long persistenceID, final SimpleString name,
+         final Filter filter, final boolean clustered, final boolean durable,
+         final boolean temporary, final int maxSizeBytes,
+         final ScheduledExecutorService scheduledExecutor)
+   {
+      this.persistenceID = persistenceID;
+
+      this.name = name;
+
+      this.filter = filter;
+
+      this.clustered = clustered;
+
+      this.durable = durable;
+
+      this.temporary = temporary;
+
+      this.maxSizeBytes = maxSizeBytes;
+
+      this.scheduledExecutor = scheduledExecutor;
+
+      direct = true;
+   }
+
+   // Queue implementation
+   // -------------------------------------------------------------------
+
+   public boolean isClustered()
+   {
+      return clustered;
+   }
+
+   public boolean isDurable()
+   {
+      return durable;
+   }
+
+   public boolean isTemporary()
+   {
+      return temporary;
+   }
+
+   public SimpleString getName()
+   {
+      return name;
+   }
+
+   public synchronized HandleStatus addLast(final MessageReference ref)
+   {
+      return add(ref, false);
+   }
+
+   public synchronized HandleStatus addFirst(final MessageReference ref)
+   {
+      return add(ref, true);
+   }
+
+   public synchronized void addListFirst(final LinkedList<MessageReference> list)
+   {
+      ListIterator<MessageReference> iter = list.listIterator(list.size());
+
+      while (iter.hasPrevious())
+      {
+         MessageReference ref = iter.previous();
+
+         messageReferences.addFirst(ref);
+      }
+
+      deliver();
+   }
+ 
+   public void deliverAsync(final Executor executor)
+   {
+      //Prevent too many executors running at once
+      
+      if (waitingToDeliver.compareAndSet(false, true))
+      {
+         executor.execute(deliverRunner);
+      }
+   }
+      
+   /*
+    * Attempt to deliver all the messages in the queue
+    * 
+    * @see org.jboss.messaging.newcore.intf.Queue#deliver()
+    */
+   public synchronized void deliver()
+   {
+      MessageReference reference;
+
+      ListIterator<MessageReference> iterator = null;
+
+      while (true)
+      {
+         if (iterator == null)
+         {
+            reference = messageReferences.peek();
+         }
+         else
+         {
+            if (iterator.hasNext())
+            {
+               reference = iterator.next();
+            }
+            else
+            {
+               reference = null;
+            }
+         }
+
+         if (reference == null)
+         {
+            if (iterator == null)
+            {
+               // We delivered all the messages - go into direct delivery
+               direct = true;
+               
+               promptDelivery = false;
+            }
+            return;
+         }
+
+         HandleStatus status = deliver(reference);
+
+         if (status == HandleStatus.HANDLED)
+         {
+            if (iterator == null)
+            {
+               messageReferences.removeFirst();              
+            }
+            else
+            {
+               iterator.remove();
+            }
+         }
+         else if (status == HandleStatus.BUSY)
+         {
+            // All consumers busy - give up
+            break;
+         }
+         else if (status == HandleStatus.NO_MATCH && iterator == null)
+         {
+            // Consumers not all busy - but filter not accepting - iterate back
+            // through the queue
+            iterator = messageReferences.listIterator();
+         }
+      }
+   }
+
+   public synchronized void addConsumer(final Consumer consumer)
+   {
+      consumers.add(consumer);
+   }
+
+   public synchronized boolean removeConsumer(final Consumer consumer)
+   {
+      boolean removed = consumers.remove(consumer);
+
+      if (pos == consumers.size())
+      {
+         pos = 0;
+      }
+
+      if (consumers.isEmpty())
+      {
+         promptDelivery = false;
+      }
+
+      return removed;
+   }
+
+   public synchronized int getConsumerCount()
+   {
+      return consumers.size();
+   }
+
+   public synchronized List<MessageReference> list(final Filter filter)
+   {
+      if (filter == null)
+      {
+         return new ArrayList<MessageReference>(messageReferences);
+      }
+      else
+      {
+         ArrayList<MessageReference> list = new ArrayList<MessageReference>();
+
+         for (MessageReference ref : messageReferences)
+         {
+            if (filter.match(ref.getMessage()))
+            {
+               list.add(ref);
+            }
+         }
+
+         return list;
+      }
+   }
+
+   public synchronized boolean removeReferenceWithID(final long id)
+   {
+      ListIterator<MessageReference> iterator = messageReferences.listIterator();
+
+      boolean removed = false;
+
+      while (iterator.hasNext())
+      {
+         MessageReference ref = iterator.next();
+
+         if (ref.getMessage().getMessageID() == id)
+         {
+            iterator.remove();
+
+            removed = true;
+
+            break;
+         }
+      }
+
+      return removed;
+   }
+
+   public synchronized MessageReference getReference(final long id)
+   {
+      ListIterator<MessageReference> iterator = messageReferences.listIterator();
+
+      while (iterator.hasNext())
+      {
+         MessageReference ref = iterator.next();
+
+         if (ref.getMessage().getMessageID() == id) { return ref; }
+      }
+
+      return null;
+   }
+
+   public long getPersistenceID()
+   {
+      return persistenceID;
+   }
+
+   public void setPersistenceID(final long id)
+   {
+      this.persistenceID = id;
+   }
+
+   public Filter getFilter()
+   {
+      return filter;
+   }
+
+   public void setFilter(final Filter filter)
+   {
+      this.filter = filter;
+   }
+
+   public synchronized int getMessageCount()
+   {
+      //log.info("mr: " + messageReferences.size() + " sc:" + getScheduledCount() + " dc:" + getDeliveringCount());
+      return messageReferences.size() + getScheduledCount()
+            + getDeliveringCount();
+   }
+
+   public synchronized int getScheduledCount()
+   {
+      return scheduledRunnables.size();
+   }
+
+   public int getDeliveringCount()
+   {
+      return deliveringCount.get();
+   }
+
+   public void referenceAcknowledged(MessageReference ref) throws Exception
+   {
+      deliveringCount.decrementAndGet();
+      
+      sizeBytes.addAndGet(-ref.getMessage().encodeSize());
+
+//      if (flowController != null)
+//      {
+//         flowController.messageAcknowledged();
+//      }
+   }
+
+   public void referenceCancelled()
+   {
+      deliveringCount.decrementAndGet();
+   }
+
+   public int getMaxSizeBytes()
+   {
+      return maxSizeBytes;
+   }
+   
+   public int getSizeBytes()
+   {
+      return sizeBytes.get();
+   }
+
+   public DistributionPolicy getDistributionPolicy()
+   {
+      return distributionPolicy;
+   }
+
+   public void setDistributionPolicy(final DistributionPolicy distributionPolicy)
+   {
+      this.distributionPolicy = distributionPolicy;
+   }
+
+   public int getMessagesAdded()
+   {
+      return messagesAdded.get();
+   }
+
+   public void setFlowController(final FlowController flowController)
+   {
+      this.flowController = flowController;
+   }
+
+   public FlowController getFlowController()
+   {
+      return flowController;
+   }
+
+   public synchronized void deleteAllReferences(
+         final StorageManager storageManager) throws Exception
+   {
+      Transaction tx = new TransactionImpl(storageManager, null);
+
+      ListIterator<MessageReference> iter = messageReferences.listIterator();
+
+      while (iter.hasNext())
+      {
+         MessageReference ref = iter.next();
+
+         deliveringCount.incrementAndGet();
+
+         tx.addAcknowledgement(ref);
+
+         iter.remove();
+      }
+
+      synchronized (scheduledRunnables)
+      {
+         for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
+         {
+            runnable.cancel();
+
+            deliveringCount.incrementAndGet();
+
+            tx.addAcknowledgement(runnable.getReference());
+         }
+
+         scheduledRunnables.clear();
+      }
+
+      tx.commit();
+   }
+
+
+   // Public
+   // -----------------------------------------------------------------------------
+
+   public boolean equals(Object other)
+   {
+      if (this == other) { return true; }
+
+      QueueImpl_nc qother = (QueueImpl_nc) other;
+
+      return name.equals(qother.name);
+   }
+
+   public int hashCode()
+   {
+      return name.hashCode();
+   }
+
+   // Private
+   // ------------------------------------------------------------------------------
+
+   private HandleStatus add(final MessageReference ref, final boolean first)
+   {
+      if (maxSizeBytes != -1 && sizeBytes.get() + ref.getMessage().encodeSize() >= maxSizeBytes)
+      {
+         return HandleStatus.BUSY;              
+      }
+      
+      if (!first)
+      {
+         messagesAdded.incrementAndGet();
+         
+         sizeBytes.addAndGet(ref.getMessage().encodeSize());
+      }
+      
+      if (checkAndSchedule(ref))
+      {
+         return HandleStatus.HANDLED;         
+      }
+
+      boolean add = false;
+
+      if (direct)
+      {
+         // Deliver directly
+
+         HandleStatus status = deliver(ref);
+
+         if (status == HandleStatus.HANDLED)
+         {
+            // Ok
+         }
+         else if (status == HandleStatus.BUSY)
+         {
+            add = true;
+         }
+         else if (status == HandleStatus.NO_MATCH)
+         {
+            add = true;
+         }
+
+         if (add)
+         {
+            direct = false;
+         }
+      }
+      else
+      {
+         add = true;
+      }
+
+      if (add)
+      {
+         if (first)
+         {
+            messageReferences.addFirst(ref);
+         }
+         else
+         {
+            messageReferences.addLast(ref);
+         }
+
+         if (!direct && promptDelivery)
+         {
+            // We have consumers with filters which don't match, so we need
+            // to prompt delivery every time
+            // a new message arrives - this is why you really shouldn't use
+            // filters with queues - in most cases
+            // it's an ant-pattern since it would cause a queue scan on each
+            // message
+            deliver();
+         }
+      }     
+
+      return HandleStatus.HANDLED;
+   }
+
+   private boolean checkAndSchedule(final MessageReference ref)
+   {
+      long deliveryTime = ref.getScheduledDeliveryTime();
+      
+      if (deliveryTime != 0 && scheduledExecutor != null)
+      {      
+         long now = System.currentTimeMillis();
+      
+         if (deliveryTime > now)
+         {
+            if (trace)
+            {
+               log.trace("Scheduling delivery for " + ref + " to occur at "  + deliveryTime);
+            }
+   
+            long delay = deliveryTime - now;
+   
+            ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
+   
+            scheduledRunnables.add(runnable);
+   
+            Future<?> future = scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
+   
+            runnable.setFuture(future);
+   
+            return true;
+         }
+      }
+      return false;      
+   }
+
+   private HandleStatus deliver(final MessageReference reference)
+   {
+      if (consumers.isEmpty())
+      {
+         return HandleStatus.BUSY;
+      }
+
+      int startPos = pos;
+
+      boolean filterRejected = false;
+
+      while (true)
+      {
+         Consumer consumer = consumers.get(pos);
+
+         pos = distributionPolicy.select(consumers, pos);
+
+         HandleStatus status;
+
+         try
+         {
+            status = consumer.handle(reference);
+         }
+         catch (Throwable t)
+         {
+            log.warn("removing consumer which did not handle a message, " +
+                  "consumer=" + consumer + ", message=" + reference, t);
+
+            // If the consumer throws an exception we remove the consumer
+            removeConsumer(consumer);
+
+            return HandleStatus.BUSY;
+         }
+
+         if (status == null) { throw new IllegalStateException(
+               "ClientConsumer.handle() should never return null"); }
+
+         if (status == HandleStatus.HANDLED)
+         {
+            deliveringCount.incrementAndGet();
+
+            return HandleStatus.HANDLED;
+         }
+         else if (status == HandleStatus.NO_MATCH)
+         {
+            promptDelivery = true;
+
+            filterRejected = true;
+         }
+
+         if (pos == startPos)
+         {
+            // Tried all of them
+            if (filterRejected)
+            {
+               return HandleStatus.NO_MATCH;
+            }
+            else
+            {
+               // Give up - all consumers busy
+               return HandleStatus.BUSY;
+            }
+         }
+      }
+   }
+
+   // Inner classes
+   // --------------------------------------------------------------------------
+
+   private class DeliverRunner implements Runnable
+   {
+      public void run()
+      {
+         //Must be set to false *before* executing to avoid race
+         waitingToDeliver.set(false);
+         
+         deliver();
+      }
+   }   
+   
+   private class ScheduledDeliveryRunnable implements Runnable
+   {
+      private final MessageReference ref;
+
+      private volatile Future<?> future;
+
+      private boolean cancelled;
+
+      public ScheduledDeliveryRunnable(final MessageReference ref)
+      {
+         this.ref = ref;
+      }
+
+      public synchronized void setFuture(final Future<?> future)
+      {
+         if (cancelled)
+         {
+            future.cancel(false);
+         }
+         else
+         {
+            this.future = future;
+         }
+      }
+
+      public synchronized void cancel()
+      {
+         if (future != null)
+         {
+            future.cancel(false);
+         }
+
+         cancelled = true;
+      }
+
+      public MessageReference getReference()
+      {
+         return ref;
+      }
+
+      public void run()
+      {
+         if (trace)
+         {
+            log.trace("Scheduled delivery timeout " + ref);
+         }
+
+         synchronized (scheduledRunnables)
+         {
+            boolean removed = scheduledRunnables.remove(this);
+
+            if (!removed)
+            {
+               log.warn("Failed to remove timeout " + this);
+
+               return;
+            }
+         }
+
+         ref.setScheduledDeliveryTime(0);
+
+         HandleStatus status = deliver(ref);
+
+         if (HandleStatus.HANDLED != status)
+         {
+            // Add back to the front of the queue
+
+            addFirst(ref);
+         }
+         else
+         {
+            if (trace)
+            {
+               log.trace("Delivered scheduled delivery at "
+                     + System.currentTimeMillis() + " for " + ref);
+            }
+         }
+      }
+   }
+}
+




More information about the jboss-cvs-commits mailing list