[jboss-cvs] JBoss Messaging SVN: r5215 - in branches/Branch_JBMESSAGING_1416: tests/src/org/jboss/test/messaging/jms and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Oct 30 02:55:52 EDT 2008


Author: gaohoward
Date: 2008-10-30 02:55:52 -0400 (Thu, 30 Oct 2008)
New Revision: 5215

Modified:
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java
   branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java
Log:
JBMESSAGING-1416


Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2008-10-29 22:51:42 UTC (rev 5214)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2008-10-30 06:55:52 UTC (rev 5215)
@@ -224,8 +224,9 @@
       // Each channel has its own copy of the reference
       ref = ref.copy();
       
-      dlog("registering -- ", ref);
       monitor.registerMessage(ref, tx);
+      
+      dlog("---registering msg: ", ref);
 
       try
       {
@@ -592,6 +593,7 @@
     *
     * @see org.jboss.messaging.core.contract.Channel#deliver()
     */
+   /*debug use, delete them!!!*/
    private void dlog(String lgmsg)
    {
       log.error("(*)-" + lgmsg);
@@ -600,6 +602,7 @@
    {
       dlog(lgmsg + this.getRefText(r));
    }
+   /*debug*/
    protected void deliverInternal()
    {
       if (trace) { log.trace(this + " was prompted delivery"); }
@@ -622,7 +625,6 @@
          {
             ref = nextReference(iter);
 
-            dlog("got message - ", ref);
             if (ref != null)
             {
                
@@ -631,14 +633,11 @@
                int status = monitor.isAvailable(ref);
                if (status != OrderingGroupMonitor.OK)
                {
-                  dlog("leave this message alone and iterating to next message");
                   //iterating time
                   if (iter == null)
                   {
-                     dlog("iter still null when inspecting this message ", ref);
                      iter = messageRefs.iterator();
                      //We just tried the first one, so we don't want to try it again
-                     dlog("call next on iter to skip over the current one", ref);
                      iter.next();
                   }
                }
@@ -651,15 +650,12 @@
                      log.trace(this + " pushing " + ref);
                   }
 
-                  dlog("everything looks fine, start to deliver the message ", ref);
                   Delivery del = distributor.handle(this, ref, null);
 
-                  dlog("got the result del " + del);
                   setReceiversReady(del != null);
 
                   if (del == null)
                   {
-                     dlog("del is null, so we release the sending count for ", ref);
 
                      // No receiver, broken receiver or full receiver so we stop delivering
                      if (trace)
@@ -669,7 +665,6 @@
                                   " so no receiver got the message. Stopping delivery.");
                      }
 
-                     dlog("as this message is not delivered, we do nothing for it, this round of delivery stopped. ", ref);
                      break;
                   }
                   else if (!del.isSelectorAccepted())
@@ -678,12 +673,10 @@
                      // an iterator (if we haven't already created it) to iterate through the refs
                      // in the channel. No delivery was really performed
 
-                     dlog("delivery dropped by the selector, so we also drop our message too.");
                      monitor.dropSend(ref);//leaving a 'hole'
                      
                      if (iter == null)
                      {
-                        dlog("so this is the first msg dropped, iterating ...");
                         iter = messageRefs.iterator();
 
                         // We just tried the first one, so we don't want to try it again
@@ -697,7 +690,6 @@
                         log.trace(this + ": " + del + " returned for message " + ref);
                      }
 
-                     dlog("message accepted so remove from memory: ", ref);
                      monitor.markSending(ref);
                      
                      // Receiver accepted the reference
@@ -710,7 +702,6 @@
                               log.trace(this + " removing first ref in memory");
                            }
 
-                           dlog("still first in mem, removing: ", ref);
                            removeFirstInMemory();
                         }
                         else
@@ -720,7 +711,6 @@
                               log.trace(this + " removed current message from iterator");
                            }
 
-                           dlog("already iterated, removing: ", ref);
                            iter.remove();
                         }
                      }
@@ -731,13 +721,12 @@
             }
             else
             {
-               dlog("We have no more message to deliver for this time, stop this round and return.");
                // No more refs in channel or only ones that don't match any selectors
                if (trace) { log.trace(this + " no more refs to deliver "); }
                   break;
             }
          }
-         dlog("deliverInteral() actually exit.");
+         log.error("======delever end==========");
       }
       catch (Throwable t)
       {
@@ -843,23 +832,19 @@
          }
       }
       
-      dlog("message acknowledged for ", d.getReference());
-      monitor.messageCompleted(d.getReference());
-      
-      if (monitor.hasMessageInQueue())
+      synchronized (lock)
       {
-         dlog("we do still have messages in queue, trigger the delivery again");
-         synchronized (lock)
+         dlog("---acknowledging ", d.getReference());
+         if (monitor.messageCompleted(d.getReference()))
          {
-            dlog("begin trigger delivering...");
+            log.error("-----more og msg, trigger...");
             deliverInternal();
-            dlog("triggered delivering done...");
          }
+         else
+         {
+            log.error("---no more og msg, no trigger!");
+         }
       }
-      else
-      {
-         dlog("we don't have any messages in order queue, so don't trigger more delivery.");
-      }
    }
 
    protected InMemoryCallback getCallback(Transaction tx)
@@ -940,10 +925,8 @@
          // We need to extend it to work with refs from the db
 
          //We have an iterator - this means we are iterating through the queue to find a ref that matches
-         log.error("---nextRef call, loop for next");
          if (iter.hasNext())
          {
-            log.error("---nextRef call, found one");
             ref = (MessageReference)iter.next();
             //if (monitor.challengeSend(ref) == OrderingGroupMonitor.OK) break;
          }

Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java	2008-10-29 22:51:42 UTC (rev 5214)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java	2008-10-30 06:55:52 UTC (rev 5215)
@@ -141,16 +141,19 @@
    {
       boolean result = false;
       ReferenceHolder holder = sortedList.getFirst();
+      log.error("---haspending, holder: " + holder);
       if (holder != null)
       {
          if (holder.isPending())
          {
             //true if the sortedList has more to offer.
+            log.error("---see our size: " + sortedList.size());
             result = sortedList.size() > 1;
          }
          else
          {
             //means we still have the first un-sent.
+            log.error("--- first is not sent yet.");
             result = true;
          }
       }
@@ -207,6 +210,11 @@
       }
    }
 
+   public String getGroupName()
+   {
+      return groupName;
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java	2008-10-29 22:51:42 UTC (rev 5214)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java	2008-10-30 06:55:52 UTC (rev 5215)
@@ -120,13 +120,15 @@
     * it is called when a message is acked, commited or rollback
     * once the message is completed, the next one in a ordering 
     * group becomes deliverable.
+    * return if there is more messages available after this one.
     */
-   public void messageCompleted(MessageReference ref)
+   public boolean messageCompleted(MessageReference ref)
    {
       String grpName = extractGroupName(ref);
       if (grpName == null)
       {
-         return;
+         //not a ordering group message
+         return false;
       }
       synchronized (orderingGroups)
       {
@@ -135,13 +137,14 @@
          {
             group.unregister(ref);
          }
+         return this.hasMessageInQueue();
       }
    }
 
    /**
     * Check if there is any pending messages in any group.
     */
-   public boolean hasMessageInQueue()
+   private boolean hasMessageInQueue()
    {
       boolean result = false;
       synchronized (orderingGroups)
@@ -150,6 +153,7 @@
          while (iter.hasNext())
          {
             OrderingGroup group = iter.next();
+            log.error("--checking group: " + group.getGroupName());
             if (group.hasPendingMessage())
             {
                result = true;

Modified: branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java	2008-10-29 22:51:42 UTC (rev 5214)
+++ branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java	2008-10-30 06:55:52 UTC (rev 5215)
@@ -96,14 +96,12 @@
          consumerConn.start();
 
          Session sessCons1 = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Session sessCons2 = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
          OrderingGroupMessageListener listener = new OrderingGroupMessageListener(this);
 
          sessCons1.setMessageListener(listener);
-         sessCons2.setMessageListener(listener);
 
-         ServerSessionPool pool = new OrderingServerSessionPool(sessCons1, sessCons2);
+         ServerSessionPool pool = new OrderingServerSessionPool(sessCons1);
 
          JBossConnectionConsumer cc = (JBossConnectionConsumer)consumerConn.createConnectionConsumer(queue1, null, pool, 1);
 
@@ -133,8 +131,15 @@
             TextMessage txm = mList.get(i);
             assertEquals(txm.getText(), "testing" + i);
          }
-
          
+         //allow consumer thread gracefully shutdown
+         try
+         {
+            Thread.sleep(3000);
+         }
+         catch (InterruptedException e)
+         {
+         }
          cc.close();
 
          consumerConn.close();
@@ -147,11 +152,83 @@
          if (consumerConn != null) consumerConn.close();
          if (producerConn != null) producerConn.close();
          
-         removeAllMessages(queue1.getQueueName(), true, 0);
+      }
+   }
+   
+   
+   /*
+    * Make sure the ordering group messages are received in order
+    * thru a ConnectionConsumer in transaction mode.
+    */
+   /*
+   public void testTransactedReceive() throws Exception
+   {
+      if (ServerManagement.isRemote()) return;
+
+      Connection consumerConn = null;
+
+      Connection producerConn = null;
+
+      try
+      {
+         consumerConn = cf.createConnection();
+
+         consumerConn.start();
+
+         Session sessCons1 = consumerConn.createSession(true, Session.SESSION_TRANSACTED);
+
+         TxOrderingGroupMessageListener listener1 = new TxOrderingGroupMessageListener(this, sessCons1);
+
+         sessCons1.setMessageListener(listener1);
+
+         ServerSessionPool pool = new MockServerSessionPool(sessCons1);
+
+         JBossConnectionConsumer cc = (JBossConnectionConsumer)consumerConn.createConnectionConsumer(queue1, null, pool, 5);
+
+         producerConn = cf.createConnection();
+
+         Session sessProd = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
+         JBossMessageProducer prod = (JBossMessageProducer)sessProd.createProducer(queue1);
+         prod.enableOrderingGroup(null);
+
+         forceGC();
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage m = sessProd.createTextMessage("testing" + i);
+            prod.send(m, Message.DEFAULT_DELIVERY_MODE, i%10, Message.DEFAULT_TIME_TO_LIVE);
+         }
+
+         //waiting enough time to allow delivery complete.
+         msgLatch.attempt(10000);
+         
+         //check the order
+         assertEquals(NUM_MESSAGES, mList.size());
+         
+         for (int i = 0; i < NUM_MESSAGES; ++i)
+         {
+            TextMessage txm = mList.get(i);
+            assertEquals(txm.getText(), "testing" + i);
+         }
+
+         checkEmpty(queue1);
+         
+         cc.close();
+
+         consumerConn.close();
+         consumerConn = null;
+         producerConn.close();
+         producerConn = null;
       }
+      finally
+      {
+         if (consumerConn != null) consumerConn.close();
+         if (producerConn != null) producerConn.close();
+         
+      }
    }
-
+*/
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -176,6 +253,7 @@
       {
          try
          {
+            System.err.println("===== ======= ======== ========== message received: " + ((TextMessage)message).getText());
             owner.addReceived((TextMessage)message);
          }
          catch (Exception e)
@@ -185,27 +263,58 @@
       }
    }
 
+   class TxOrderingGroupMessageListener implements MessageListener
+   {
+      
+      OrderingGroupConnectionConsumerTest owner;
+      long counter = 0;
+      Session sessRef;
+       
+      TxOrderingGroupMessageListener(OrderingGroupConnectionConsumerTest theTest, Session sess)
+      {
+         owner = theTest;
+         sessRef = sess;
+      }
+      
+      public synchronized void onMessage(Message message)
+      {
+         try
+         {
+            System.err.println("===== ======= ======== ==========  coutner: " + counter);
+            //roll back once for every 5 messages
+            if (counter%5 == 0) {
+               System.err.println("===== ======= ======== ==========  rolling back : " + ((TextMessage)message).getText());
+               sessRef.rollback();
+            }
+            else
+            {
+              System.err.println("===== ======= ======== ========== received : " + ((TextMessage)message).getText());
+              owner.addReceived((TextMessage)message);
+              System.err.println("===== ======= ======== ========== commiting : " + ((TextMessage)message).getText());
+              sessRef.commit();
+            }
+            counter++;
+            System.err.println("===coutner: " + counter);
+         }
+         catch (Exception e)
+         {
+            log.error(e);
+         }
+      }
+   }
+
    class OrderingServerSessionPool implements ServerSessionPool
    {
-      private ServerSession serverSession1;
-      private ServerSession serverSession2;
-      private long flag;
+      private ServerSession serverSession;
       
-      OrderingServerSessionPool(Session sess1, Session sess2)
+      OrderingServerSessionPool(Session sess)
       {
-         serverSession1 = new MockServerSession(sess1);
-         serverSession2 = new MockServerSession(sess2);
-         flag = 0L;
+         serverSession = new MockServerSession(sess);
       }
 
       public synchronized ServerSession getServerSession() throws JMSException
       {
-         flag++;
-         if (flag%2 == 0)
-         {
-            return serverSession1;
-         }
-         return serverSession2;
+         return serverSession;
       }      
    }
 




More information about the jboss-cvs-commits mailing list