[jboss-cvs] JBoss Messaging SVN: r5150 - in branches/Branch_JBMESSAGING_1416: integration/EAP4/etc and 10 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Oct 19 08:34:40 EDT 2008


Author: gaohoward
Date: 2008-10-19 08:34:40 -0400 (Sun, 19 Oct 2008)
New Revision: 5150

Added:
   branches/Branch_JBMESSAGING_1416/docs/DeliveryDetails.dia
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java
Modified:
   branches/Branch_JBMESSAGING_1416/integration/EAP4/etc/aop-messaging-client.xml
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/JBossMessageProducer.java
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/container/ProducerAspect.java
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/state/ProducerState.java
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/state/SessionState.java
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/delegate/ProducerDelegate.java
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/message/JBossMessage.java
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/util/prioritylinkedlist/BasicPriorityLinkedList.java
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/util/prioritylinkedlist/PriorityLinkedList.java
Log:
JBMESSAGING-1416


Added: branches/Branch_JBMESSAGING_1416/docs/DeliveryDetails.dia
===================================================================
(Binary files differ)


Property changes on: branches/Branch_JBMESSAGING_1416/docs/DeliveryDetails.dia
___________________________________________________________________
Name: svn:mime-type
   + application/octet-stream

Modified: branches/Branch_JBMESSAGING_1416/integration/EAP4/etc/aop-messaging-client.xml
===================================================================
--- branches/Branch_JBMESSAGING_1416/integration/EAP4/etc/aop-messaging-client.xml	2008-10-19 12:07:59 UTC (rev 5149)
+++ branches/Branch_JBMESSAGING_1416/integration/EAP4/etc/aop-messaging-client.xml	2008-10-19 12:34:40 UTC (rev 5150)
@@ -260,6 +260,12 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->close())">
       <advice name="handleClose" aspect="org.jboss.jms.client.container.ProducerAspect"/>
    </bind>    
+   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->enableOrderingGroup(..))">
+      <advice name="handleEnableOrderingGroup" aspect="org.jboss.jms.client.container.ProducerAspect"/>
+   </bind>    
+   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->disableOrderingGroup())">
+      <advice name="handleDisableOrderingGroup" aspect="org.jboss.jms.client.container.ProducerAspect"/>
+   </bind>    
    <!-- Producers never go to the server - so no need for a failover interceptor -->   
 
    <!--

Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/JBossMessageProducer.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/JBossMessageProducer.java	2008-10-19 12:07:59 UTC (rev 5149)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/JBossMessageProducer.java	2008-10-19 12:34:40 UTC (rev 5150)
@@ -212,6 +212,47 @@
       return (Queue)getDestination();
    }
    
+   // JBM Ordering Group Extension Implementation ---------------------------------
+
+
+   /**
+    * It makes the producer to be able to send messages in an ordering
+    * group named by orgpName. If the ogrpName is null, the group name
+    * will be automatically generated. By default a newly created producer
+    * has this feature disabled.
+    * 
+    * Calling this method will override the group name set by previous call.
+    * 
+    * If the group name exists in the session exception may be thrown unless
+    * the existing group is targeted at the same destination as this one's.
+    * 
+    * The server won't allow a session to create an ordering group of same name as
+    * any existing one already created by other session. For example, if 
+    * Session S1 first creates an ordering group named O1 (targeted at Destination1), 
+    * and then Session S2 tries to create an ordering group also named O1 
+    * (targeted at Destination1), that will make the ordering
+    * group broken, as ordering group messages from the two sessions are sequenced separately.
+    * So to avoid this, the server may combine the SessionID and the group name to
+    * uniquely identify an ordering group.
+    * 
+    * @param ogrpName the name of the group or null for an auto-generated name.
+    * 
+    */
+   public void enableOrderingGroup(String ogrpName) throws JMSException
+   {
+      delegate.enableOrderingGroup(ogrpName);
+      //throw new JMSException("Implementation not complete, need to check any existing group of same name!");
+   }
+   
+   /*
+    * It disables the ordering group capability of the producer.
+    */
+   public void disableOrderingGroup() throws JMSException
+   {
+      delegate.disableOrderingGroup();
+   }
+
+   
    // Public --------------------------------------------------------
 
    public ProducerDelegate getDelegate()

Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/container/ProducerAspect.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/container/ProducerAspect.java	2008-10-19 12:07:59 UTC (rev 5149)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/container/ProducerAspect.java	2008-10-19 12:34:40 UTC (rev 5150)
@@ -263,7 +263,14 @@
       {
          m.setJMSMessageID(messageToSend.getJMSMessageID());
       }
-            
+      
+      if (producerState.isOrderingGroupEnabled())
+      {
+         String grpName = producerState.getOrderingGroupName();
+         messageToSend.setJBMOrderingGroupName(grpName);
+         messageToSend.setJBMOrderingGroupSeq(sessionState.getGroupSequenceNum(grpName));
+      }
+
       // we now invoke the send(Message) method on the session, which will eventually be fielded
       // by connection endpoint
       ((SessionDelegate)sessionState.getDelegate()).send(messageToSend, false);
@@ -385,6 +392,30 @@
       return null;
    }
    
+   public Object handleEnableOrderingGroup(Invocation invocation) throws Throwable
+   {
+      Object[] args = ((MethodInvocation)invocation).getArguments();
+      String orderingGroupName = (String)args[0];
+      
+      ProducerState state = getProducerState(invocation);
+      if (orderingGroupName == null)
+      {
+         orderingGroupName = "JBM-ORD-GRP:" + UUID.randomUUID().toString();
+      }
+      
+      state.setOrderingGroupName(orderingGroupName);
+      state.setOrderingGroupEnabled(true);
+      
+      return null;
+   }
+   
+   public Object handleDisableOrderingGroup(Invocation invocation) throws Throwable
+   {
+      ProducerState pState = getProducerState(invocation);
+      pState.disableOrderingGroup();
+      return null;
+   }
+   
    // Class YYY overrides --------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------

Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java	2008-10-19 12:07:59 UTC (rev 5149)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java	2008-10-19 12:34:40 UTC (rev 5150)
@@ -218,6 +218,17 @@
       throw new IllegalStateException("This invocation should not be handled here!");
    }
 
+   // JBM Ordering Group Extension ---------------------------------
+
+   public void enableOrderingGroup(String ogrpName) throws JMSException
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+   
+   public void disableOrderingGroup() throws JMSException
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
    // Public ---------------------------------------------------------------------------------------
 
    public String toString()
@@ -230,7 +241,7 @@
    // Package Private ------------------------------------------------------------------------------
 
    // Private --------------------------------------------------------------------------------------
-
+   
    // Inner Classes --------------------------------------------------------------------------------
 
 }

Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/state/ProducerState.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/state/ProducerState.java	2008-10-19 12:07:59 UTC (rev 5149)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/state/ProducerState.java	2008-10-19 12:34:40 UTC (rev 5150)
@@ -61,6 +61,8 @@
    private SessionState parent;
    private ProducerDelegate delegate;
 
+   private boolean isOrderingGroupEnabled = false;
+   private String orderingGroupName = null;
    // Constructors ---------------------------------------------------------------------------------
 
    public ProducerState(SessionState parent, ProducerDelegate delegate, Destination dest)
@@ -166,6 +168,36 @@
       this.deliveryMode = deliveryMode;
    }
 
+   public void setOrderingGroupEnabled(boolean isOrderingGroupEnabled)
+   {
+      this.isOrderingGroupEnabled = isOrderingGroupEnabled;
+   }
+
+   public boolean isOrderingGroupEnabled()
+   {
+      return isOrderingGroupEnabled;
+   }
+
+   public void setOrderingGroupName(String orderingGroupName)
+   {
+      this.orderingGroupName = orderingGroupName;
+   }
+
+   public String getOrderingGroupName()
+   {
+      return orderingGroupName;
+   }
+
+   //let the session reset the ordering group
+   public void disableOrderingGroup()
+   {
+      if (!isOrderingGroupEnabled)
+      {
+         setOrderingGroupEnabled(false);
+         parent.removeOrderingGroup(orderingGroupName);
+      }
+   }
+
    // Package protected ----------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------

Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/state/SessionState.java	2008-10-19 12:07:59 UTC (rev 5149)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/state/SessionState.java	2008-10-19 12:34:40 UTC (rev 5150)
@@ -108,6 +108,9 @@
    //The distinguished message listener - for ASF
    private MessageListener sessionListener;
    
+   //holding sequential counters for ordering groups.
+   private HashMap<String, OrderingGroupSeq> orderingGrpSeqMap = new HashMap<String, OrderingGroupSeq>();
+   
    //This is somewhat strange - but some of the MQ and TCK tests expect an XA session to behaviour as AUTO_ACKNOWLEDGE when not enlisted in
    //a transaction
    //This is the opposite behaviour as what is required when the XA session handles MDB delivery or when using the message bridge.
@@ -491,6 +494,25 @@
       return "SessionState[" + sessionID + "]";
    }
 
+   /**
+    * Generate the sequential number for the group
+    */
+   public long getGroupSequenceNum(String grpName)
+   {
+      OrderingGroupSeq seq = orderingGrpSeqMap.get(grpName);
+      if (seq == null) {
+         seq = new OrderingGroupSeq();
+         orderingGrpSeqMap.put(grpName, seq);
+      }
+      return seq.getNext();
+   }
+   
+   //remove the ordering group from map.
+   public void removeOrderingGroup(String orderingGroupName)
+   {
+      orderingGrpSeqMap.remove(orderingGroupName);
+   }
+
    // Package protected ----------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------
@@ -499,5 +521,22 @@
 
    // Inner classes --------------------------------------------------------------------------------
    
+   //A sequence generator class. sequence starts from 1
+   public static class OrderingGroupSeq
+   {
+      private long counter;
+      
+      public OrderingGroupSeq()
+      {
+         counter = 0;
+      }
+
+      public long getNext()
+      {
+         counter++;
+         return counter;
+      }
+   }
+
 }
 

Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/delegate/ProducerDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/delegate/ProducerDelegate.java	2008-10-19 12:07:59 UTC (rev 5149)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/delegate/ProducerDelegate.java	2008-10-19 12:34:40 UTC (rev 5150)
@@ -75,4 +75,8 @@
              int deliveryMode,
              int priority,
              long timeToLive, boolean keepOriginalID) throws JMSException;
+   
+   void enableOrderingGroup(String ogrpName) throws JMSException;
+   
+   void disableOrderingGroup() throws JMSException;
 }

Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/message/JBossMessage.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/message/JBossMessage.java	2008-10-19 12:07:59 UTC (rev 5149)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/message/JBossMessage.java	2008-10-19 12:34:40 UTC (rev 5150)
@@ -112,6 +112,12 @@
    //Used when bridging a message
    public static final String JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST = "JBM_BRIDGE_MSG_ID_LIST";
    
+   //Used in ordering group
+   public static final String JBOSS_MESSAGING_ORDERING_GROUP_ID = "JBM_ORDERING_GROUP_ID";
+   
+   //Used in ordering group
+   public static final String JBOSS_MESSAGING_ORDERING_GROUP_SEQ = "JBM_ORDERING_GROUP_SEQ";
+
    private static final Logger log = Logger.getLogger(JBossMessage.class);   
       
    // Static --------------------------------------------------------
@@ -222,7 +228,7 @@
       {
          deliveryCount = m.getIntProperty("JMSXDeliveryCount");
       }
-      catch (JMSException e)
+      catch (Exception e)
       {
          log.error("Failed to get delivery count", e);
       }
@@ -457,7 +463,18 @@
       //when routing the message
       this.destination = destination; 
    }
-   
+
+   //used in message ordering group
+   public void setJBMOrderingGroupName(String grpName) throws JMSException
+   {
+      setStringProperty(JBOSS_MESSAGING_ORDERING_GROUP_ID, grpName);
+   }
+
+   public void setJBMOrderingGroupSeq(long groupSequenceNum) throws JMSException
+   {
+      setLongProperty(JBOSS_MESSAGING_ORDERING_GROUP_SEQ, groupSequenceNum);
+   }
+
    //We need to override getHeaders - so the JMSDestination header gets persisted to the db
    //This is called by the persistence manager
    public Map getHeaders()
@@ -1089,6 +1106,7 @@
                                             "' is reserved due to selector syntax.");
       }          
    }
+
    
    // Protected -----------------------------------------------------
    

Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-10-19 12:07:59 UTC (rev 5149)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-10-19 12:34:40 UTC (rev 5150)
@@ -223,7 +223,6 @@
       if (!clientAccepting)
       {
          if (trace) { log.trace(this + "'s client is NOT accepting messages!"); }
-
          return null;
       }
 

Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2008-10-19 12:07:59 UTC (rev 5149)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2008-10-19 12:34:40 UTC (rev 5150)
@@ -28,6 +28,12 @@
 import java.util.ListIterator;
 import java.util.Set;
 
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.message.JBossMessage;
+import org.jboss.jms.message.JBossTextMessage;
 import org.jboss.jms.server.MessagingTimeoutFactory;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Channel;
@@ -100,6 +106,8 @@
    protected int maxSize;
 
    protected SynchronizedInt messagesAdded;
+   
+   protected OrderingGroupMonitor monitor = new OrderingGroupMonitor();
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -213,6 +221,9 @@
 
       // Each channel has its own copy of the reference
       ref = ref.copy();
+      
+      dlog("registering -- ", ref);
+      monitor.registerMessage(ref, tx);
 
       try
       {
@@ -578,6 +589,14 @@
     *
     * @see org.jboss.messaging.core.contract.Channel#deliver()
     */
+   private void dlog(String lgmsg)
+   {
+      log.error("(*)-" + lgmsg);
+   }
+   private void dlog(String lgmsg, MessageReference r)
+   {
+      dlog(lgmsg + this.getRefText(r));
+   }
    protected void deliverInternal()
    {
       if (trace) { log.trace(this + " was prompted delivery"); }
@@ -593,7 +612,6 @@
          if (!getReceiversReady())
          {
          	if (trace) { log.trace(this + " receivers not ready so not delivering"); }
-
             return;
          }
 
@@ -601,70 +619,121 @@
          {
             ref = nextReference(iter);
 
+            dlog("got message - ", ref);
             if (ref != null)
             {
-               // Attempt to push the ref to a receiver
-
-               if (trace) { log.trace(this + " pushing " + ref); }
-
-               Delivery del = distributor.handle(this, ref, null);
-
-               setReceiversReady(del != null);
-
-               if (del == null)
+               
+               //challengeSend can return: 1. ok because ref is not ordered. 2. ok because the ref is the first and available for sending 3. not ok because the ref is not the first and available for sending.
+               // 4. not ok because the ref is the birst but is not available for sending (being sent).
+               int status = monitor.isAvailable(ref);
+               if (status != OrderingGroupMonitor.OK)
                {
-                  // No receiver, broken receiver or full receiver so we stop delivering
-                  if (trace) { log.trace(this + " got no delivery for " + ref + " so no receiver got the message. Stopping delivery."); }
-
-                  break;
-               }
-               else if (!del.isSelectorAccepted())
-               {
-                  // No receiver accepted the message because no selectors matched, so we create
-                  // an iterator (if we haven't already created it) to iterate through the refs
-                  // in the channel. No delivery was really performed
-
+                  dlog("leave this message alone and iterating to next message");
+                  //iterating time
                   if (iter == null)
                   {
+                     dlog("iter still null when inspecting this message ", ref);
                      iter = messageRefs.iterator();
-
                      //We just tried the first one, so we don't want to try it again
                      iter.next();
                   }
                }
                else
                {
-                  if (trace) { log.trace(this + ": " + del + " returned for message " + ref); }
+                  // Attempt to push the ref to a receiver
 
-                  // Receiver accepted the reference
+                  if (trace)
+                  {
+                     log.trace(this + " pushing " + ref);
+                  }
 
-                  synchronized (lock)
+                  dlog("everything looks fine, start to deliver the message ", ref);
+                  Delivery del = distributor.handle(this, ref, null);
+
+                  dlog("got the result del " + del);
+                  setReceiversReady(del != null);
+
+                  if (del == null)
                   {
+                     dlog("del is null, so we release the sending count for ", ref);
+
+                     // No receiver, broken receiver or full receiver so we stop delivering
+                     if (trace)
+                     {
+                        log.trace(this + " got no delivery for " +
+                                  ref +
+                                  " so no receiver got the message. Stopping delivery.");
+                     }
+
+                     dlog("as this message is not delivered, we do nothing for it, this round of delivery stopped. ", ref);
+                     break;
+                  }
+                  else if (!del.isSelectorAccepted())
+                  {
+                     // No receiver accepted the message because no selectors matched, so we create
+                     // an iterator (if we haven't already created it) to iterate through the refs
+                     // in the channel. No delivery was really performed
+
+                     dlog("delivery dropped by the selector, so we also drop our message too.");
+                     monitor.dropSend(ref);//leaving a 'hole'
+                     
                      if (iter == null)
                      {
-                        if (trace) { log.trace(this + " removing first ref in memory"); }
+                        dlog("so this is the first msg dropped, iterating ...");
+                        iter = messageRefs.iterator();
 
-                        removeFirstInMemory();
+                        // We just tried the first one, so we don't want to try it again
+                        iter.next();
                      }
-                     else
+                  }
+                  else
+                  {
+                     if (trace)
                      {
-                        if (trace) { log.trace(this + " removed current message from iterator"); }
+                        log.trace(this + ": " + del + " returned for message " + ref);
+                     }
 
-                        iter.remove();
+                     dlog("message accepted so remove from memory: ", ref);
+                     monitor.markSending(ref);
+                     
+                     // Receiver accepted the reference
+                     synchronized (lock)
+                     {
+                        if (iter == null)
+                        {
+                           if (trace)
+                           {
+                              log.trace(this + " removing first ref in memory");
+                           }
+
+                           dlog("still first in mem, removing: ", ref);
+                           removeFirstInMemory();
+                        }
+                        else
+                        {
+                           if (trace)
+                           {
+                              log.trace(this + " removed current message from iterator");
+                           }
+
+                           dlog("already iterated, removing: ", ref);
+                           iter.remove();
+                        }
                      }
+
+                     deliveringCount.increment();
                   }
-
-                  deliveringCount.increment();
                }
             }
             else
             {
+               dlog("We have no more message to deliver for this time, stop this round and return.");
                // No more refs in channel or only ones that don't match any selectors
                if (trace) { log.trace(this + " no more refs to deliver "); }
-
-               break;
+                  break;
             }
          }
+         dlog("deliverInteral() actually exit.");
       }
       catch (Throwable t)
       {
@@ -769,6 +838,24 @@
             pm.addTransaction(tx);
          }
       }
+      
+      dlog("message acknowledged for ", d.getReference());
+      monitor.messageCompleted(d.getReference());
+      
+      if (monitor.hasMessageInQueue())
+      {
+         dlog("we do still have messages in queue, trigger the delivery again");
+         synchronized (lock)
+         {
+            dlog("begin trigger delivering...");
+            deliverInternal();
+            dlog("triggered delivering done...");
+         }
+      }
+      else
+      {
+         dlog("we don't have any messages in order queue, so don't trigger more delivery.");
+      }
    }
 
    protected InMemoryCallback getCallback(Transaction tx)
@@ -811,9 +898,28 @@
 
    // Private --------------------------------------------------------------------------------------
 
+   //debug use, remove when done.
+   private String getRefText(MessageReference ref)
+   {
+      String result = "<null>";
+      if (ref == null) return result;
+      JBossMessage msg = (JBossMessage)ref.getMessage();
+      if (msg instanceof TextMessage)
+      {
+         try
+         {
+            result = "(" + ((TextMessage)msg).getText() + ")";
+         }
+         catch (JMSException e)
+         {
+         }
+      }
+      return result;
+   }
+   
    private MessageReference nextReference(ListIterator iter) throws Throwable
    {
-      MessageReference ref;
+      MessageReference ref = null;
 
       if (iter == null)
       {
@@ -826,14 +932,11 @@
          // We need to extend it to work with refs from the db
 
          //We have an iterator - this means we are iterating through the queue to find a ref that matches
-         if (iter.hasNext())
+         while (iter.hasNext())
          {
             ref = (MessageReference)iter.next();
+            //if (monitor.challengeSend(ref) == OrderingGroupMonitor.OK) break;
          }
-         else
-         {
-            ref = null;
-         }
       }
 
       return ref;

Added: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java	2008-10-19 12:34:40 UTC (rev 5150)
@@ -0,0 +1,430 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.impl;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import javax.jms.JMSException;
+
+import org.jboss.jms.message.JBossMessage;
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.MessageReference;
+
+/**
+ * A OrderingGroup
+ *
+ * @author Howard Gao
+ * 
+ * Created Oct 13, 2008 10:43:17 AM
+ *
+ *
+ */
+public class OrderingGroup
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+   private OrderingList<ReferenceHolder> sortedList = new OrderingList<ReferenceHolder>();
+   private HashMap<Long, ReferenceHolder> refMap = new HashMap<Long, ReferenceHolder>();
+   
+   private String groupName;
+
+   // Static --------------------------------------------------------
+   private static final Logger log = Logger.getLogger(OrderingGroup.class);
+
+   // Constructors --------------------------------------------------
+   public OrderingGroup(String name)
+   {
+      groupName = name;
+   }
+   
+   private OrderingGroup()
+   {
+   }
+
+   // Public --------------------------------------------------------
+   /**
+    * Adding a message to a sorted list
+    * @param ref
+    * @throws JMSException 
+    */
+   public boolean register(MessageReference ref)
+   {
+      Long mid = ref.getMessage().getMessageID();
+      ReferenceHolder holder = refMap.get(mid);
+      if (holder != null)
+      {
+         holder.addRef();
+         return true;
+      }
+      try
+      {
+         holder = new ReferenceHolder(ref);
+      }
+      catch (JMSException e)
+      {
+         //something wrong with the message, abandon.
+         log.error("error creating ReferenceHolder. ", e);
+      }
+      if (holder == null)
+      {
+         //do nothing, return
+         return false;
+      }
+      sortedList.add(holder);
+      refMap.put(mid, holder);
+      return true;
+   }
+
+   /**
+    * See if the ref be there and be the first
+    * Simply comparing the addresses simply doesn't work!
+    */
+   public int isAvailable(MessageReference ref)
+   {
+      ReferenceHolder holder = sortedList.getFirst();
+      if (holder == null) return OrderingGroupMonitor.OK;
+      
+      return holder.isAvailable(ref);
+   }
+
+   /**
+    * remove the message reference from the group
+    * Note: the ref will be removed if and only if the ref
+    * resides the first in the list, otherwise just ignore it.
+    */
+   public void unregister(MessageReference ref)
+   {
+      ReferenceHolder holder = sortedList.getFirst();
+      if (holder == null)
+      {
+         return;
+      }
+      if (holder.matchMessage(ref))
+      {
+         long count = holder.releaseSendnRef();
+         if (count == 0)
+         {
+            sortedList.removeFirst();
+            refMap.remove(ref.getMessage().getMessageID());
+         }
+      }
+   }
+
+   /**
+    * Check 
+    */
+   public boolean hasPendingMessage()
+   {
+      boolean result = false;
+      ReferenceHolder holder = sortedList.getFirst();
+      if (holder != null)
+      {
+         if (holder.isPending())
+         {
+            //true if the sortedList has more to offer.
+            result = sortedList.size() > 1;
+         }
+         else
+         {
+            //means we still have the first un-sent.
+            result = true;
+         }
+      }
+      return result;
+   }
+
+   /**
+    * @param ref
+    */
+   public void releaseSend(MessageReference ref)
+   {
+      ReferenceHolder holder = sortedList.getFirst();
+      if (holder == null) return;
+      
+      if (holder.matchMessage(ref))
+      {
+         holder.releaseSend();
+      }
+   }
+
+   /**
+    * @param ref
+    */
+   public void dropSend(MessageReference ref)
+   {
+      ReferenceHolder holder = sortedList.getFirst();
+      if (holder == null)
+      {
+         return;
+      }
+      if (holder.matchMessage(ref))
+      {
+         long count = holder.releaseRef();
+         if (count == 0)
+         {
+            sortedList.removeFirst();
+            refMap.remove(ref.getMessage().getMessageID());
+         }
+      }
+   }
+
+   /**
+    * @param ref
+    */
+   public void markSending(MessageReference ref)
+   {
+      ReferenceHolder holder = sortedList.getFirst();
+      if (holder == null)
+      {
+         return;
+      }
+      if (holder.matchMessage(ref))
+      {
+         holder.markSending();
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}
+
+/*
+ * An implementation of a sorted list.
+ * For convenience, it stores the elements in descending order.
+ */
+class OrderingList<T extends Comparable<T>>
+{
+   private LinkedList<T> list = new LinkedList<T>();
+
+   /**
+    * this is a simple stupid implementation. It assumes that messages with
+    * higher sequence numbers come later than those with lower sequence.
+    * It starts with the top of the list and move downwards the end of the list 
+    * when finding a right place for an entry.
+    * 
+    */
+   public void add(T ref)
+   {
+      Iterator<T> iter = list.iterator();
+      int index = 0;
+      T elem = null;
+      while (iter.hasNext())
+      {
+         elem = iter.next();
+         if (elem.compareTo(ref) <= 0)
+         {
+            //should be before this elem
+            break;
+         }
+         index++;
+      }
+      if (index >= list.size())
+      {
+         list.add(ref);
+      }
+      else
+      {
+         list.add(index, ref);
+      }
+   }
+
+   /**
+    * @return
+    */
+   public int size()
+   {
+      return list.size();
+   }
+
+   /**
+    * Note: we are in descending order!
+    */
+   public void removeFirst()
+   {
+      if (!list.isEmpty())
+      {
+         list.removeLast();
+      }
+   }
+
+   /**
+    * Note we are in descending order!
+    */
+   public T getFirst()
+   {
+      if (list.isEmpty())
+      {
+         return null;
+      }
+      return list.getLast();
+   }
+
+}
+
+class ReferenceHolder implements Comparable<ReferenceHolder>
+{
+   private Long seq;
+
+   private MessageReference ref;
+   
+   private long refCount;
+   
+   private long pendingSentCount;
+
+   /**
+    * @throws JMSException 
+    */
+   public ReferenceHolder(MessageReference r) throws JMSException
+   {
+      ref = r;
+      seq = getSeq(this);
+      refCount = 1;
+      pendingSentCount = 0;
+   }
+
+   /**
+    * 
+    */
+   public void markSending()
+   {
+      pendingSentCount++;
+   }
+
+   /**
+    * @param ref2
+    */
+   public long releaseSend()
+   {
+      if (pendingSentCount > 0)
+      {
+         pendingSentCount--;
+      }
+      return pendingSentCount;
+   }
+
+   /**
+    * @return
+    */
+   public boolean isPending()
+   {
+      return pendingSentCount > 0;
+   }
+
+   /**
+    * 
+    */
+   public int isAvailable(MessageReference exRef)
+   {
+      if (matchMessage(exRef))
+      {
+         if (pendingSentCount < refCount)
+         {
+            return OrderingGroupMonitor.OK;
+         }
+         return OrderingGroupMonitor.NOT_OK_BEING_SENT;
+      }
+      return OrderingGroupMonitor.NOT_OK_NOT_FIRST;
+   }
+
+   /**
+    * Increase the ref count
+    */
+   public void addRef()
+   {
+      refCount++;
+   }
+
+   public long releaseRef()
+   {
+      if (refCount > 0)
+      {
+         refCount--;
+      }
+      return refCount;
+   }
+   
+   /**
+    * decrease the ref count
+    */
+   public long releaseSendnRef()
+   {
+      if (pendingSentCount > 0)
+      {
+         refCount--;
+         pendingSentCount--;
+      }
+      return refCount;
+   }
+
+   /**
+    * If the holder holds the same message as in the ref.
+    */
+   public boolean matchMessage(MessageReference newRef)
+   {
+      Long mid1 = newRef.getMessage().getMessageID();
+      Long mid2 = ref.getMessage().getMessageID();
+      return mid1.equals(mid2);
+   }
+
+   public MessageReference getMessageRef()
+   {
+      return ref;
+   }
+
+   public int compareTo(ReferenceHolder o)
+   {
+      Long inSeq = 0L;
+      try
+      {
+         inSeq = getSeq(o);
+      }
+      catch (JMSException e)
+      {
+         // the exception is not likely to happen here.
+      }
+      return seq.compareTo(inSeq);
+   }
+
+   /**
+    * @param o
+    * @return
+    * @throws JMSException 
+    */
+   private static Long getSeq(ReferenceHolder o) throws JMSException
+   {
+      JBossMessage msg = (JBossMessage)o.ref.getMessage();
+      Long seq = msg.getLongProperty(JBossMessage.JBOSS_MESSAGING_ORDERING_GROUP_SEQ);
+      return seq;
+   }
+
+}


Property changes on: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
___________________________________________________________________
Name: svn:executable
   + *

Added: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java	2008-10-19 12:34:40 UTC (rev 5150)
@@ -0,0 +1,246 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.impl;
+
+import java.util.HashMap;
+import java.util.Iterator;
+
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.message.JBossMessage;
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.MessageReference;
+import org.jboss.messaging.core.impl.tx.Transaction;
+
+/**
+ * A OrderingGroupMonitor
+ *
+ * @author Howard Gao
+ * 
+ * Created Oct 12, 2008 5:38:58 PM
+ *
+ *
+ */
+public class OrderingGroupMonitor
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+   private HashMap<String, OrderingGroup> orderingGroups = new HashMap<String, OrderingGroup>();
+
+   // Static --------------------------------------------------------
+   private static final Logger log = Logger.getLogger(OrderingGroupMonitor.class);
+
+   public static final int OK = 0;
+   public static final int NOT_OK_NOT_FIRST = 1;
+
+   public static final int NOT_OK_BEING_SENT = 2;
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   /**
+    * Check the message is it is a member of an ordering group, if so,
+    * put it in; if not, do nothing.
+    * if message is dropped due to maxSize being reached, it won't 
+    * get registered.
+    * @throws JMSException 
+    */
+   public void registerMessage(MessageReference ref, Transaction tx)
+   {
+      String grpName = extractGroupName(ref);
+      if (grpName == null)
+      {
+         return;
+      }
+      synchronized (orderingGroups)
+      {
+         OrderingGroup group = orderingGroups.get(grpName);
+         if (group == null)
+         {
+            group = new OrderingGroup(grpName);
+            orderingGroups.put(grpName, group);
+         }
+         group.register(ref);
+      }
+   }
+
+   /**
+    * If ref is not in our registry, just return true.
+    * If in our registry, check if the ref is the first of the group.
+    * return true if it at the first place. return false other wise.
+    */
+   public int isAvailable(MessageReference ref)
+   {
+      int result = OK;
+      String grpName = extractGroupName(ref);
+      if (grpName != null)
+      {
+         synchronized (orderingGroups)
+         {
+            OrderingGroup group = orderingGroups.get(grpName);
+            if (group != null)
+            {
+               result = group.isAvailable(ref);
+            }
+         }
+      }
+      else
+      {
+         log.debug("message doesn't have group prop, fine by me");
+      }
+      return result;
+   }
+
+   /**
+    * This method indicates a messgae is completed.
+    * it is called when a message is acked, commited or rollback
+    * once the message is completed, the next one in a ordering 
+    * group becomes deliverable.
+    */
+   public void messageCompleted(MessageReference ref)
+   {
+      String grpName = extractGroupName(ref);
+      if (grpName == null)
+      {
+         return;
+      }
+      synchronized (orderingGroups)
+      {
+         OrderingGroup group = orderingGroups.get(grpName);
+         if (group != null)
+         {
+            group.unregister(ref);
+         }
+      }
+   }
+
+   /**
+    * Check if there is any pending messages in any group.
+    */
+   public boolean hasMessageInQueue()
+   {
+      boolean result = false;
+      synchronized (orderingGroups)
+      {
+         Iterator<OrderingGroup> iter = orderingGroups.values().iterator();
+         while (iter.hasNext())
+         {
+            OrderingGroup group = iter.next();
+            if (group.hasPendingMessage())
+            {
+               result = true;
+               break;
+            }
+         }
+         
+      }
+      return result;
+   }
+
+   /**
+    * reducing the refcount, if zero, remove it.
+    */
+   public void dropSend(MessageReference ref)
+   {
+      String grpName = extractGroupName(ref);
+      if (grpName == null)
+      {
+         return;
+      }
+      synchronized (orderingGroups)
+      {
+         OrderingGroup group = orderingGroups.get(grpName);
+         if (group != null)
+         {
+            group.dropSend(ref);
+         }
+      }
+   }
+
+   /**
+    * @param ref
+    */
+   public void markSending(MessageReference ref)
+   {
+      String grpName = extractGroupName(ref);
+      if (grpName != null)
+      {
+         synchronized (orderingGroups)
+         {
+            OrderingGroup group = orderingGroups.get(grpName);
+            if (group != null)
+            {
+               group.markSending(ref);
+            }
+         }
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+   private static String extractGroupName(MessageReference ref)
+   {
+      String name = null;
+      try
+      {
+         JBossMessage msg = (JBossMessage)ref.getMessage();
+         if (msg != null)
+         {
+            name = msg.getStringProperty(JBossMessage.JBOSS_MESSAGING_ORDERING_GROUP_ID);
+         }
+      }
+      catch (JMSException e)
+      {
+      }
+      return name;
+   }
+
+   // debug use, remove when done.
+   private String getRefText(MessageReference ref)
+   {
+      String result = "(unknown)";
+      JBossMessage msg = (JBossMessage)ref.getMessage();
+      if (msg instanceof TextMessage)
+      {
+         try
+         {
+            result = "(" + ((TextMessage)msg).getText() + ")";
+         }
+         catch (JMSException e)
+         {
+         }
+      }
+      return result;
+   }
+
+
+   // Inner classes -------------------------------------------------
+
+}


Property changes on: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java
___________________________________________________________________
Name: svn:executable
   + *

Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2008-10-19 12:07:59 UTC (rev 5149)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2008-10-19 12:34:40 UTC (rev 5150)
@@ -72,6 +72,7 @@
 import org.jboss.messaging.core.impl.IDManager;
 import org.jboss.messaging.core.impl.JDBCSupport;
 import org.jboss.messaging.core.impl.MessagingQueue;
+import org.jboss.messaging.core.impl.OrderingGroupMonitor;
 import org.jboss.messaging.core.impl.tx.Transaction;
 import org.jboss.messaging.core.impl.tx.TransactionRepository;
 import org.jboss.messaging.core.impl.tx.TxCallback;
@@ -237,6 +238,8 @@
    private boolean useJGroupsWorkaround;
    
    private boolean failoverOnNodeLeave;
+   
+   private OrderingGroupMonitor orderingMonitor;
       
    // Constructors ---------------------------------------------------------------------------------
 
@@ -283,6 +286,8 @@
       lock = new ReentrantWriterPreferenceReadWriteLock();
        
       waitForBindUnbindLock = new Object();
+      
+      this.orderingMonitor = new OrderingGroupMonitor();
    }
    
    /*
@@ -2102,7 +2107,7 @@
          		Queue queue = (Queue)iter.next();
          		
          		if (trace) { log.trace(this + " considering queue " + queue); }
-         		
+               
          		if (queue.getNodeID() == thisNodeID)
          		{
          			if (trace) { log.trace(this + " is a local queue"); }
@@ -2133,6 +2138,7 @@
          			
          			if (routeLocal)
          			{
+
          				//If we're not routing from the cluster OR the queue is unreliable then we consider it
          				
          				//When we route from the cluster we never route to reliable queues
@@ -2155,7 +2161,7 @@
          		else if (!fromCluster)
          		{
          			//Remote queue
-         			
+
          			if (trace) { log.trace(this + " is a remote queue"); }
          			
          			if (!queue.isRecoverable() && queue.isClustered())
@@ -2268,6 +2274,7 @@
             
             if (startedTx)
             {
+
                if (trace) { log.trace(this + " committing " + tx); }
                
                tx.commit();

Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/util/prioritylinkedlist/BasicPriorityLinkedList.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/util/prioritylinkedlist/BasicPriorityLinkedList.java	2008-10-19 12:07:59 UTC (rev 5149)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/util/prioritylinkedlist/BasicPriorityLinkedList.java	2008-10-19 12:34:40 UTC (rev 5150)
@@ -28,7 +28,12 @@
 import java.util.ListIterator;
 import java.util.NoSuchElementException;
 
+import javax.jms.JMSException;
+
+import org.jboss.jms.message.JBossMessage;
+import org.jboss.jms.message.JBossTextMessage;
 import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.MessageReference;
 
 /**
  * A basic priority linked list
@@ -53,22 +58,22 @@
    
    public void dump()
    {
-   	log.debug("Dumping " + this);
-   	log.debug("Size:" + size);
-   	log.debug("===============");
-   	
-   	for (int i = 0; i < linkedLists.length; i++)
-   	{
-   		log.debug("Priority:" + i);
-   		log.debug("----------------");
-   		
-   		Iterator iter = linkedLists[i].iterator();
-   		
-   		while (iter.hasNext())
-   		{
-   			log.debug("Ref: "+ iter.next());
-   		}
-   	}
+      log.debug("Dumping " + this);
+      log.debug("Size:" + size);
+      log.debug("===============");
+      
+      for (int i = 0; i < linkedLists.length; i++)
+      {
+         log.debug("Priority:" + i);
+         log.debug("----------------");
+         
+         Iterator iter = linkedLists[i].iterator();
+         
+         while (iter.hasNext())
+         {
+            log.debug("Ref: "+ iter.next());
+         }
+      }
    }
    
    public BasicPriorityLinkedList(int priorities)
@@ -178,6 +183,22 @@
       return obj;      
    }
    
+   public Object peekLast()
+   {
+      Object obj = null;
+      
+      for (int i = 0; i < priorities; i++)
+      {
+         LinkedList ll = linkedLists[i];
+         if (!ll.isEmpty())
+         {
+            obj = ll.getLast();
+            break;
+         }
+      }
+      return obj;
+   }
+   
    public List getAll()
    {
       List all = new ArrayList();
@@ -301,6 +322,71 @@
       {
          throw new UnsupportedOperationException();
       }
+
+      /**
+       * utility to avoid concurrent modification exception.
+       */
+      public byte downgrade(Object obj, byte oldPriority)
+      {
+
+         currentIter.remove();
+         
+         byte result = (byte)((oldPriority == 0) ? 0 : (oldPriority - 1));
+
+         LinkedList list = lists[result];
+
+         if (result == index)
+         {
+            //only if there are more elements.
+            if (currentIter.hasNext())
+            {
+               int pidx = currentIter.previousIndex();
+               list.add(obj);
+            
+               if (pidx != -1)
+               {
+                  currentIter = list.listIterator(pidx);
+               }
+               else
+               {
+                  //seems not possible.
+                  currentIter = list.listIterator();
+               }
+            }
+         }
+         else
+         {
+            //safe
+            list.add(obj);
+         }
+         return result;
+      }
    }
+
+   public byte downgrade(Object obj, byte oldPriority)
+   {
+      byte result = (byte)((oldPriority == 0) ? 0 : (oldPriority - 1));
+
+      LinkedList list = linkedLists[oldPriority];
+      Object first = list.removeFirst();
+      
+      for ( byte i = result; i >= 0; i--)
+      {
+         result = i;
+         list = linkedLists[result];
+         if (!list.isEmpty())
+         {
+            break;
+         }
+      }
+      list.addLast(obj);
+      return result;
+   }
+
+   public byte downgradeThruIter(Object obj, byte oldPriority, ListIterator iter)
+   {
+      PriorityLinkedListIterator iterator = (PriorityLinkedListIterator)iter;
+      return iterator.downgrade(obj, oldPriority);
+   }
    
 }

Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/util/prioritylinkedlist/PriorityLinkedList.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/util/prioritylinkedlist/PriorityLinkedList.java	2008-10-19 12:07:59 UTC (rev 5149)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/util/prioritylinkedlist/PriorityLinkedList.java	2008-10-19 12:34:40 UTC (rev 5150)
@@ -25,6 +25,8 @@
 import java.util.List;
 import java.util.ListIterator;
 
+import org.jboss.messaging.core.contract.MessageReference;
+
 /**
  * A type of linked list which maintains items according to a priority
  * 
@@ -44,6 +46,8 @@
    Object removeLast();
    
    Object peekFirst();
+
+   Object peekLast();
    
    List getAll();
    
@@ -56,4 +60,16 @@
    boolean isEmpty();
    
    void dump();
+
+   /**
+    * move the references to different priority list.
+    * return the new priority.
+    */
+   byte downgrade(Object obj, byte oldPriority);
+
+   /**
+    * use the iterate to do the job to avoid concurrent exception.
+    */
+   byte downgradeThruIter(Object obj, byte oldPriority, ListIterator iter);
+
 }




More information about the jboss-cvs-commits mailing list