[jboss-cvs] JBoss Messaging SVN: r5333 - in branches/Branch_1416_merge/src/main/org/jboss: jms/client/container and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Nov 11 05:44:17 EST 2008


Author: gaohoward
Date: 2008-11-11 05:44:16 -0500 (Tue, 11 Nov 2008)
New Revision: 5333

Modified:
   branches/Branch_1416_merge/src/main/org/jboss/jms/client/JBossMessageProducer.java
   branches/Branch_1416_merge/src/main/org/jboss/jms/client/container/ProducerAspect.java
   branches/Branch_1416_merge/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
   branches/Branch_1416_merge/src/main/org/jboss/jms/client/state/ProducerState.java
   branches/Branch_1416_merge/src/main/org/jboss/jms/client/state/SessionState.java
   branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
Log:
JBMESSAGING-1416 clean up code


Modified: branches/Branch_1416_merge/src/main/org/jboss/jms/client/JBossMessageProducer.java
===================================================================
--- branches/Branch_1416_merge/src/main/org/jboss/jms/client/JBossMessageProducer.java	2008-11-11 09:24:58 UTC (rev 5332)
+++ branches/Branch_1416_merge/src/main/org/jboss/jms/client/JBossMessageProducer.java	2008-11-11 10:44:16 UTC (rev 5333)
@@ -40,6 +40,7 @@
 /**
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
  * @version <tt>$Revision$</tt>
  *
  * $Id$
@@ -212,6 +213,41 @@
       return (Queue)getDestination();
    }
    
+   // JBM Ordering Group Extension Implementation ---------------------------------
+
+
+   /**
+    * Lets the Producer send Ordering Group Messages.
+    * 
+    * This methods makes the producer to be able to send messages in an ordering
+    * group named by orgpName. If the ogrpName is null, the group name
+    * will be automatically generated. By default a newly created producer
+    * has this feature disabled.
+    * 
+    * Calling this method will override the group name set by previous call, if any.
+    * Users should make sure the uniqueness of the name if they supply one themselves
+    * across the JMS server domain. Auto-generated names always are guaranteed to be
+    * unique.
+    * 
+    * @param ogrpName the name of the group or null for an auto-generated name.
+    * 
+    */
+   public void enableOrderingGroup(String ogrpName) throws JMSException
+   {
+      delegate.enableOrderingGroup(ogrpName);
+   }
+   
+   /**
+    * Disables the ordering group capability of the producer.
+    * After this method is called, the producer no longer sends out
+    * ordering group messages.
+    */
+   public void disableOrderingGroup() throws JMSException
+   {
+      delegate.disableOrderingGroup();
+   }
+
+   
    // Public --------------------------------------------------------
 
    public ProducerDelegate getDelegate()

Modified: branches/Branch_1416_merge/src/main/org/jboss/jms/client/container/ProducerAspect.java
===================================================================
--- branches/Branch_1416_merge/src/main/org/jboss/jms/client/container/ProducerAspect.java	2008-11-11 09:24:58 UTC (rev 5332)
+++ branches/Branch_1416_merge/src/main/org/jboss/jms/client/container/ProducerAspect.java	2008-11-11 10:44:16 UTC (rev 5333)
@@ -39,7 +39,6 @@
 import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.jms.client.state.ProducerState;
 import org.jboss.jms.client.state.SessionState;
-import org.jboss.jms.delegate.ConnectionDelegate;
 import org.jboss.jms.delegate.SessionDelegate;
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.destination.JBossTemporaryQueue;
@@ -62,6 +61,7 @@
  * Remember! PER_INSTANCE aspects are very expensive so we avoid them.
  *
  * @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
+ * @author <a href="mailto:hgao at redhat.com>Howard Gao</a>
  * @version <tt>$Revision$</tt>
  *
  * $Id$
@@ -263,7 +263,15 @@
       {
          m.setJMSMessageID(messageToSend.getJMSMessageID());
       }
-            
+      
+      // Processing the ordering group message here.
+      if (producerState.isOrderingGroupEnabled())
+      {
+         String grpName = producerState.getOrderingGroupName();
+         messageToSend.setJBMOrderingGroupName(grpName);
+         messageToSend.setJBMOrderingGroupSeq(sessionState.getGroupSequenceNum(grpName));
+      }
+
       // we now invoke the send(Message) method on the session, which will eventually be fielded
       // by connection endpoint
       ((SessionDelegate)sessionState.getDelegate()).send(messageToSend, false);
@@ -385,6 +393,30 @@
       return null;
    }
    
+   public Object handleEnableOrderingGroup(Invocation invocation) throws Throwable
+   {
+      Object[] args = ((MethodInvocation)invocation).getArguments();
+      String orderingGroupName = (String)args[0];
+      
+      ProducerState state = getProducerState(invocation);
+      if (orderingGroupName == null)
+      {
+         orderingGroupName = "JBM-ORD-GRP:" + UUID.randomUUID().toString();
+      }
+      
+      state.setOrderingGroupName(orderingGroupName);
+      state.setOrderingGroupEnabled(true);
+      
+      return null;
+   }
+   
+   public Object handleDisableOrderingGroup(Invocation invocation) throws Throwable
+   {
+      ProducerState pState = getProducerState(invocation);
+      pState.disableOrderingGroup();
+      return null;
+   }
+   
    // Class YYY overrides --------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------

Modified: branches/Branch_1416_merge/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
===================================================================
--- branches/Branch_1416_merge/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java	2008-11-11 09:24:58 UTC (rev 5332)
+++ branches/Branch_1416_merge/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java	2008-11-11 10:44:16 UTC (rev 5333)
@@ -33,6 +33,7 @@
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
  *
  * @version <tt>$Revision$</tt>
  *
@@ -218,19 +219,30 @@
       throw new IllegalStateException("This invocation should not be handled here!");
    }
 
-   // Public ---------------------------------------------------------------------------------------
+   // JBM Ordering Group Extension ---------------------------------
 
+   public void enableOrderingGroup(String ogrpName) throws JMSException
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+   
+   public void disableOrderingGroup() throws JMSException
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+
    public String toString()
    {
       return "ProducerDelegate[" + System.identityHashCode(this) + ", ID=" + id + "]";
    }
+   // Public ---------------------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------
    
    // Package Private ------------------------------------------------------------------------------
 
    // Private --------------------------------------------------------------------------------------
-
+   
    // Inner Classes --------------------------------------------------------------------------------
 
 }

Modified: branches/Branch_1416_merge/src/main/org/jboss/jms/client/state/ProducerState.java
===================================================================
--- branches/Branch_1416_merge/src/main/org/jboss/jms/client/state/ProducerState.java	2008-11-11 09:24:58 UTC (rev 5332)
+++ branches/Branch_1416_merge/src/main/org/jboss/jms/client/state/ProducerState.java	2008-11-11 10:44:16 UTC (rev 5333)
@@ -36,6 +36,7 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodoorv</a>
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
  *
  * @version <tt>$Revision$</tt>
  *
@@ -61,6 +62,8 @@
    private SessionState parent;
    private ProducerDelegate delegate;
 
+   private boolean isOrderingGroupEnabled = false;
+   private String orderingGroupName = null;
    // Constructors ---------------------------------------------------------------------------------
 
    public ProducerState(SessionState parent, ProducerDelegate delegate, Destination dest)
@@ -166,6 +169,36 @@
       this.deliveryMode = deliveryMode;
    }
 
+   public void setOrderingGroupEnabled(boolean isOrderingGroupEnabled)
+   {
+      this.isOrderingGroupEnabled = isOrderingGroupEnabled;
+   }
+
+   public boolean isOrderingGroupEnabled()
+   {
+      return isOrderingGroupEnabled;
+   }
+
+   public void setOrderingGroupName(String orderingGroupName)
+   {
+      this.orderingGroupName = orderingGroupName;
+   }
+
+   public String getOrderingGroupName()
+   {
+      return orderingGroupName;
+   }
+
+   //let the session reset the ordering group
+   public void disableOrderingGroup()
+   {
+      if (isOrderingGroupEnabled)
+      {
+         setOrderingGroupEnabled(false);
+         parent.removeOrderingGroup(orderingGroupName);
+      }
+   }
+
    // Package protected ----------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------

Modified: branches/Branch_1416_merge/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_1416_merge/src/main/org/jboss/jms/client/state/SessionState.java	2008-11-11 09:24:58 UTC (rev 5332)
+++ branches/Branch_1416_merge/src/main/org/jboss/jms/client/state/SessionState.java	2008-11-11 10:44:16 UTC (rev 5333)
@@ -29,7 +29,6 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 
 import javax.jms.MessageListener;
@@ -54,14 +53,13 @@
 import org.jboss.messaging.util.OrderedExecutorFactory;
 import org.jboss.messaging.util.Version;
 
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-
 /**
  * State corresponding to a session. This state is acessible inside aspects/interceptors.
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
  * @version <tt>$Revision$</tt>
  *
  * $Id$
@@ -108,6 +106,13 @@
    //The distinguished message listener - for ASF
    private MessageListener sessionListener;
    
+   /* Holding sequential counters for ordering groups.
+    * Each ordering group has a sequence counter used to generate sequence numbers for
+    * its messages. The sequence number of a message indicates the order in which the message
+    * is to be delivered.
+    */
+   private HashMap<String, OrderingGroupSeq> orderingGrpSeqMap = new HashMap<String, OrderingGroupSeq>();
+   
    //This is somewhat strange - but some of the MQ and TCK tests expect an XA session to behaviour as AUTO_ACKNOWLEDGE when not enlisted in
    //a transaction
    //This is the opposite behaviour as what is required when the XA session handles MDB delivery or when using the message bridge.
@@ -491,6 +496,25 @@
       return "SessionState[" + sessionID + "]";
    }
 
+   /**
+    * Generate the sequential number for the ordering group
+    */
+   public long getGroupSequenceNum(String grpName)
+   {
+      OrderingGroupSeq seq = orderingGrpSeqMap.get(grpName);
+      if (seq == null) {
+         seq = new OrderingGroupSeq();
+         orderingGrpSeqMap.put(grpName, seq);
+      }
+      return seq.getNext();
+   }
+   
+   //remove the ordering group from map.
+   public void removeOrderingGroup(String orderingGroupName)
+   {
+      orderingGrpSeqMap.remove(orderingGroupName);
+   }
+
    // Package protected ----------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------
@@ -499,5 +523,22 @@
 
    // Inner classes --------------------------------------------------------------------------------
    
+   //A sequence generator class. sequence starts from 1
+   public static class OrderingGroupSeq
+   {
+      private long counter;
+      
+      public OrderingGroupSeq()
+      {
+         counter = 0;
+      }
+
+      public long getNext()
+      {
+         counter++;
+         return counter;
+      }
+   }
+
 }
 

Modified: branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2008-11-11 09:24:58 UTC (rev 5332)
+++ branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2008-11-11 10:44:16 UTC (rev 5333)
@@ -28,6 +28,10 @@
 import java.util.ListIterator;
 import java.util.Set;
 
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.message.JBossMessage;
 import org.jboss.jms.server.MessagingTimeoutFactory;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Channel;
@@ -55,6 +59,7 @@
  *
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
  * @version <tt>$Revision$</tt> $Id: ChannelSupport.java,v 1.65
  *          2006/06/27 19:44:39 timfox Exp $
  */
@@ -100,6 +105,8 @@
    protected int maxSize;
 
    protected SynchronizedInt messagesAdded;
+   
+   protected OrderingGroupMonitor monitor = new OrderingGroupMonitor();
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -154,6 +161,8 @@
 
       // Each channel has its own copy of the reference
       ref = ref.copy();
+      
+      monitor.registerMessage(ref, null);
 
       try
       {
@@ -213,6 +222,9 @@
 
       // Each channel has its own copy of the reference
       ref = ref.copy();
+      
+      //guarding against any ordering group messages.
+      monitor.registerMessage(ref, tx);
 
       try
       {
@@ -566,6 +578,7 @@
 
       synchronized (lock)
       {
+         monitor.unmarkSending(ref);
          messageRefs.addFirst(ref, ref.getMessage().getPriority());
       }
 
@@ -593,7 +606,6 @@
          if (!getReceiversReady())
          {
          	if (trace) { log.trace(this + " receivers not ready so not delivering"); }
-
             return;
          }
 
@@ -603,66 +615,107 @@
 
             if (ref != null)
             {
-               // Attempt to push the ref to a receiver
-
-               if (trace) { log.trace(this + " pushing " + ref); }
-
-               Delivery del = distributor.handle(this, ref, null);
-
-               setReceiversReady(del != null);
-
-               if (del == null)
+               int status = monitor.isAvailable(ref);
+               if (status != OrderingGroupMonitor.OK)
                {
-                  // No receiver, broken receiver or full receiver so we stop delivering
-                  if (trace) { log.trace(this + " got no delivery for " + ref + " so no receiver got the message. Stopping delivery."); }
-
-                  break;
-               }
-               else if (!del.isSelectorAccepted())
-               {
-                  // No receiver accepted the message because no selectors matched, so we create
-                  // an iterator (if we haven't already created it) to iterate through the refs
-                  // in the channel. No delivery was really performed
-
+                  if (trace)
+                  {
+                     log.trace("Hold sending off ordering group message " + ref);
+                  }
+                  //iterating time
                   if (iter == null)
                   {
                      iter = messageRefs.iterator();
-
                      //We just tried the first one, so we don't want to try it again
                      iter.next();
                   }
                }
                else
                {
-                  if (trace) { log.trace(this + ": " + del + " returned for message " + ref); }
+                  // Attempt to push the ref to a receiver
 
-                  // Receiver accepted the reference
+                  if (trace)
+                  {
+                     log.trace(this + " pushing " + ref);
+                  }
 
-                  synchronized (lock)
+                  Delivery del = distributor.handle(this, ref, null);
+
+                  setReceiversReady(del != null);
+
+                  if (del == null)
                   {
+
+                     // No receiver, broken receiver or full receiver so we stop delivering
+                     if (trace)
+                     {
+                        log.trace(this + " got no delivery for " +
+                                  ref +
+                                  " so no receiver got the message. Stopping delivery.");
+                     }
+
+                     break;
+                  }
+                  else if (!del.isSelectorAccepted())
+                  {
+                     // No receiver accepted the message because no selectors matched, so we create
+                     // an iterator (if we haven't already created it) to iterate through the refs
+                     // in the channel. No delivery was really performed
+
+                     if (OrderingGroupMonitor.isOrderingGroupMessage(ref))
+                     {
+                        log.warn("Warning! Using message selectors with ordering group can cause unpredicatable results!");
+                     }
+                     
                      if (iter == null)
                      {
-                        if (trace) { log.trace(this + " removing first ref in memory"); }
+                        iter = messageRefs.iterator();
 
-                        removeFirstInMemory();
+                        // We just tried the first one, so we don't want to try it again
+                        iter.next();
                      }
-                     else
+                  }
+                  else
+                  {
+                     if (trace)
                      {
-                        if (trace) { log.trace(this + " removed current message from iterator"); }
+                        log.trace(this + ": " + del + " returned for message " + ref);
+                     }
 
-                        iter.remove();
+                     monitor.markSending(ref);
+                     
+                     // Receiver accepted the reference
+                     synchronized (lock)
+                     {
+                        if (iter == null)
+                        {
+                           if (trace)
+                           {
+                              log.trace(this + " removing first ref in memory");
+                           }
+
+                           removeFirstInMemory();
+                        }
+                        else
+                        {
+                           if (trace)
+                           {
+                              log.trace(this + " removed current message from iterator");
+                           }
+
+                           iter.remove();
+                        }
                      }
+
+                     deliveringCount.increment();
                   }
-
-                  deliveringCount.increment();
                }
             }
             else
             {
                // No more refs in channel or only ones that don't match any selectors
                if (trace) { log.trace(this + " no more refs to deliver "); }
-
-               break;
+                  break;
             }
          }
       }
@@ -769,6 +822,22 @@
             pm.addTransaction(tx);
          }
       }
+      
+      MessageReference ref = d.getReference();
+      if (OrderingGroupMonitor.isOrderingGroupMessage(ref))
+      {
+         if (trace)
+         {
+            log.trace("Ordering group message " + ref + " has been completed, trying to send next.");
+         }
+         synchronized (lock)
+         {
+            if (monitor.messageCompleted(ref))
+            {
+               deliverInternal();
+            }
+         }
+      }
    }
 
    protected InMemoryCallback getCallback(Transaction tx)
@@ -811,9 +880,32 @@
 
    // Private --------------------------------------------------------------------------------------
 
+   //debug use, remove when done.
+   public static String getRefText(MessageReference ref)
+   {
+      String result = "<null>";
+      if (ref == null) return result;
+      Object rmsg = ref.getMessage();
+      if (rmsg instanceof JBossMessage)
+      {
+         JBossMessage msg = (JBossMessage)ref.getMessage();
+         if (msg instanceof TextMessage)
+         {
+            try
+            {
+               result = "(" + ((TextMessage)msg).getText() + ")";
+            }
+            catch (JMSException e)
+            {
+            }
+         }
+      }
+      return result;
+   }
+   
    private MessageReference nextReference(ListIterator iter) throws Throwable
    {
-      MessageReference ref;
+      MessageReference ref = null;
 
       if (iter == null)
       {
@@ -829,6 +921,7 @@
          if (iter.hasNext())
          {
             ref = (MessageReference)iter.next();
+            //if (monitor.challengeSend(ref) == OrderingGroupMonitor.OK) break;
          }
          else
          {
@@ -954,7 +1047,12 @@
 
       public void afterRollback(boolean onePhase) throws Exception
       {
-      	//NOOP
+         //tx rolled back, we need to inform the monitor
+         for(Iterator i = refsToAdd.iterator(); i.hasNext(); )
+         {
+            MessageReference ref = (MessageReference)i.next();
+            monitor.unmarkSending(ref);
+         }
       }
 
       public String toString()




More information about the jboss-cvs-commits mailing list