[jboss-cvs] JBoss Messaging SVN: r7795 - in branches/Branch_1_4: tests/src/org/jboss/test/messaging/jms and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Sep 1 01:19:49 EDT 2009


Author: gaohoward
Date: 2009-09-01 01:19:48 -0400 (Tue, 01 Sep 2009)
New Revision: 7795

Modified:
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java
Log:
JBMESSAGING-1728


Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2009-08-31 07:03:58 UTC (rev 7794)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2009-09-01 05:19:48 UTC (rev 7795)
@@ -162,8 +162,6 @@
 
       // Each channel has its own copy of the reference
       ref = ref.copy();
-      
-      monitor.registerMessage(ref);
 
       try
       {
@@ -223,9 +221,6 @@
 
       // Each channel has its own copy of the reference
       ref = ref.copy();
-      
-      //guarding against any ordering group messages.
-      monitor.registerMessage(ref);
 
       try
       {

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroup.java	2009-08-31 07:03:58 UTC (rev 7794)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroup.java	2009-09-01 05:19:48 UTC (rev 7795)
@@ -82,7 +82,7 @@
       }
       try
       {
-         holder = new ReferenceHolder(ref);
+         holder = new ReferenceHolder(mid);
       }
       catch (JMSException e)
       {
@@ -201,18 +201,15 @@
 
 class ReferenceHolder
 {
+   private Long mid;
 
-   private Long seq;
-
-   private MessageReference ref;
-
    private long refCount;
 
    private long pendingSentCount;
 
-   public ReferenceHolder(MessageReference r) throws JMSException
+   public ReferenceHolder(Long id) throws JMSException
    {
-      ref = r;
+      mid = id;
       refCount = 1;
       pendingSentCount = 0;
    }
@@ -283,13 +280,12 @@
    public boolean matchMessage(MessageReference newRef)
    {
       Long mid1 = newRef.getMessage().getMessageID();
-      Long mid2 = ref.getMessage().getMessageID();
-      return mid1.equals(mid2);
+      return mid1.equals(mid);
    }
 
-   public MessageReference getMessageRef()
+   public Long getMessageID()
    {
-      return ref;
+      return mid;
    }
 
 }

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java	2009-08-31 07:03:58 UTC (rev 7794)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java	2009-09-01 05:19:48 UTC (rev 7795)
@@ -85,9 +85,11 @@
    }
 
    /**
-    * If ref is not in our registry, just return true.
+    * If ref is not in our registry, just return OK.
     * If in our registry, check if the ref is the first of the group.
-    * return true if it at the first place. return false otherwise.
+    * return OK if it is at the first place. 
+    * otherwise return NOT_OK_BEING_SENT if the ref is being sent or
+    * NOT_OK_NOT_FIRST is the ref is not at the first place.
     */
    public int isAvailable(MessageReference ref)
    {

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java	2009-08-31 07:03:58 UTC (rev 7794)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java	2009-09-01 05:19:48 UTC (rev 7795)
@@ -267,9 +267,6 @@
          
          MessageReference ref = addFromRefInfo(info, refMap);
          
-         //https://jira.jboss.org/jira/browse/JBMESSAGING-1664
-         monitor.registerMessage(ref);
-         
          if (recoverable && ref.getMessage().isReliable())
          {
             loadedReliable = true;
@@ -386,6 +383,9 @@
       }
       else
       {
+         //guarding against any ordering group messages.
+         monitor.registerMessage(ref);
+
          super.addReferenceInMemory(ref);
          
          if (messageRefs.size() == fullSize)
@@ -511,9 +511,6 @@
          ReferenceInfo info = (ReferenceInfo)iter.next();
          
          MessageReference added = addFromRefInfo(info, refMap);
-         //note, we registered the ref 'after' it has been added to the list
-         //it is safe as long as the caller of this method is synchronized on lock.
-         monitor.registerMessage(added);
       }
    }
         
@@ -534,6 +531,7 @@
       //Schedule the delivery if necessary, or just add to the in memory queue
       if (!checkAndSchedule(ref))
       {
+         monitor.registerMessage(ref);
          messageRefs.addLast(ref, ref.getMessage().getPriority());
       }
       

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java	2009-08-31 07:03:58 UTC (rev 7794)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java	2009-09-01 05:19:48 UTC (rev 7795)
@@ -625,6 +625,174 @@
       }
    }
 
+   //https://jira.jboss.org/jira/browse/JBMESSAGING-11728
+   //test ordering while depaging.
+   public void testOrderingGroupOnPaging3() throws Exception
+   {
+      //can't run invm mode.
+      if (!this.isRemote()) return;
+            
+      ServerManagement.deployQueue("pagingQ", null, 5, 2, 2);
+      
+      Queue queue = (Queue)ic.lookup("/queue/pagingQ");
+      
+      Connection conn = null;
+      
+      try
+      {
+         conn = cf.createConnection();
+         
+         conn.start();
+                  
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         JBossMessageProducer sender = (JBossMessageProducer)session.createProducer(queue);
+         
+         sender.enableOrderingGroup("my-group");
+         
+         final int numMessages = 20;
+         
+         for (int i = 0; i < numMessages; i++)
+         {
+            TextMessage m = session.createTextMessage("Hello" + i);
+            sender.send(m, DeliveryMode.PERSISTENT, 5, Message.DEFAULT_TIME_TO_LIVE);
+         }
+         
+         MessageConsumer consumer =  session.createConsumer(queue);
+
+         TextMessage rm = null;
+         for (int i = 0; i < 2; i++)
+         {
+            rm = (TextMessage)consumer.receive(5000);
+            assertNotNull(rm);
+            assertEquals("Hello" + i, rm.getText());
+         }
+         
+         //receive 3
+         rm = (TextMessage)consumer.receive(5000);
+         assertEquals("Hello2", rm.getText());
+
+         //add a new one
+         sender.send(session.createTextMessage("NewMessage"));
+         
+         //receive all left.
+         for (int i = 3; i < numMessages; i++)
+         {
+            rm = (TextMessage)consumer.receive(5000);
+            assertNotNull(rm);
+            assertEquals("Hello" + i, rm.getText());
+         }
+         
+         //last one
+         rm = (TextMessage)consumer.receive(5000);
+         assertEquals("NewMessage", rm.getText());
+         
+         rm = (TextMessage)consumer.receive(5000);
+         assertNull(rm);
+         
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         
+         try
+         {
+            ServerManagement.undeployQueue("pagingQ");
+         }
+         catch(Exception ignore)
+         {
+            
+         }
+      }
+   }
+
+   //https://jira.jboss.org/jira/browse/JBMESSAGING-11728
+   //test ordering while depaging (messages sent with various priorities)
+   public void testOrderingGroupOnPaging4() throws Exception
+   {
+      //can't run invm mode.
+      if (!this.isRemote()) return;
+            
+      ServerManagement.deployQueue("pagingQ", null, 5, 2, 2);
+      
+      Queue queue = (Queue)ic.lookup("/queue/pagingQ");
+      
+      Connection conn = null;
+      
+      try
+      {
+         conn = cf.createConnection();
+         
+         conn.start();
+                  
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         JBossMessageProducer sender = (JBossMessageProducer)session.createProducer(queue);
+         
+         sender.enableOrderingGroup("my-group");
+         
+         final int numMessages = 20;
+         
+         for (int i = 0; i < numMessages; i++)
+         {
+            TextMessage m = session.createTextMessage("Hello" + i);
+            sender.send(m, DeliveryMode.PERSISTENT, i%10, Message.DEFAULT_TIME_TO_LIVE);
+         }
+         
+         MessageConsumer consumer =  session.createConsumer(queue);
+
+         TextMessage rm = null;
+         for (int i = 0; i < 2; i++)
+         {
+            rm = (TextMessage)consumer.receive(5000);
+            assertNotNull(rm);
+            assertEquals("Hello" + i, rm.getText());
+         }
+         
+         //receive 3
+         rm = (TextMessage)consumer.receive(5000);
+         assertEquals("Hello2", rm.getText());
+
+         //add a new one
+         sender.send(session.createTextMessage("NewMessage"));
+         
+         //receive all left.
+         for (int i = 3; i < numMessages; i++)
+         {
+            rm = (TextMessage)consumer.receive(5000);
+            assertNotNull(rm);
+            assertEquals("Hello" + i, rm.getText());
+         }
+         
+         //last one
+         rm = (TextMessage)consumer.receive(5000);
+         assertEquals("NewMessage", rm.getText());
+         
+         rm = (TextMessage)consumer.receive(5000);
+         assertNull(rm);
+         
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         
+         try
+         {
+            ServerManagement.undeployQueue("pagingQ");
+         }
+         catch(Exception ignore)
+         {
+            
+         }
+      }
+   }
+
    //https://jira.jboss.org/jira/browse/JBMESSAGING-1726
    //send one ordering group message and receive it, then resend it out
    //using another group name, the group name should not be changed.




More information about the jboss-cvs-commits mailing list