[jboss-cvs] JBoss Messaging SVN: r7790 - in branches/Branch_1_4: src/main/org/jboss/messaging/core/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Aug 26 09:26:30 EDT 2009


Author: gaohoward
Date: 2009-08-26 09:26:30 -0400 (Wed, 26 Aug 2009)
New Revision: 7790

Modified:
   branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1722


Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java	2009-08-19 21:34:40 UTC (rev 7789)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java	2009-08-26 13:26:30 UTC (rev 7790)
@@ -153,14 +153,22 @@
       private Long maxPageOrdering;
 
       private List refInfos;
+      
+      private boolean isFromMerge;
 
-      public InitialLoadInfo(Long minPageOrdering, Long maxPageOrdering, List refInfos)
+      public InitialLoadInfo(Long minPageOrdering, Long maxPageOrdering, List refInfos, boolean fromMerge)
       {
          this.minPageOrdering = minPageOrdering;
          this.maxPageOrdering = maxPageOrdering;
          this.refInfos = refInfos;
+         this.isFromMerge = fromMerge;
       }
 
+      public InitialLoadInfo(Long minPageOrdering, Long maxPageOrdering, List refInfos)
+      {
+         this(minPageOrdering, maxPageOrdering, refInfos, false);
+      }
+
       public Long getMaxPageOrdering()
       {
          return maxPageOrdering;
@@ -175,6 +183,11 @@
       {
          return refInfos;
       }
+      
+      public boolean isFromMerge()
+      {
+         return isFromMerge;
+      }
    }
 
 }

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2009-08-19 21:34:40 UTC (rev 7789)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2009-08-26 13:26:30 UTC (rev 7790)
@@ -1309,11 +1309,11 @@
                if (arePaged)
                {
                   return new InitialLoadInfo(new Long(firstPagingOrder),
-                        new Long(pageOrd - 1), refs);
+                        new Long(pageOrd - 1), refs, true);
                }
                else
                {
-                  return new InitialLoadInfo(null, null, refs);
+                  return new InitialLoadInfo(null, null, refs, true);
                }
             }
             finally

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java	2009-08-19 21:34:40 UTC (rev 7789)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java	2009-08-26 13:26:30 UTC (rev 7790)
@@ -495,9 +495,12 @@
       }
       else
       {
-         firstPagingOrder = nextPagingOrder = 0;
-         
-         paging = false;
+         //https://jira.jboss.org/jira/browse/JBMESSAGING-1722
+         if (!ili.isFromMerge())
+         {
+            firstPagingOrder = nextPagingOrder = 0;
+            paging = false;
+         }
       }
             
       Map refMap = processReferences(ili.getRefInfos());

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java	2009-08-19 21:34:40 UTC (rev 7789)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java	2009-08-26 13:26:30 UTC (rev 7790)
@@ -11,12 +11,17 @@
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.naming.InitialContext;
 
+import org.jboss.jms.client.FailoverEvent;
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.test.messaging.jms.clustering.ClusteringTestBase.SimpleFailoverListener;
 import org.jboss.test.messaging.tools.ServerManagement;
 
 /**
@@ -474,7 +479,117 @@
    {
       mergeQueuePaging(5, 10, 10, 10, false);
    }
+   
+   // deploy a distributed queue and send some messages to node0, then kill node1
+   // receive one message and send some more.
+   // restart node 0 and there should be no errors.
+   // https://jira.jboss.org/jira/browse/JBMESSAGING-1722
+   public void testQueueMergeMakePagingBroken() throws Exception
+   {
+      Connection conn0 = null;
+      Connection conn1 = null;
 
+      try
+      {
+         //Deploy queue with fullSize of 10
+
+         ServerManagement.deployQueue("mergeTestQueue", "queue/mergeTestQueue", 20, 5, 5, 0, true);
+
+         ServerManagement.deployQueue("mergeTestQueue", "queue/mergeTestQueue", 20, 5, 5, 1, true);
+
+         Queue queue0 = (Queue)ic[0].lookup("queue/mergeTestQueue");
+
+         Queue queue1 = (Queue)ic[1].lookup("queue/mergeTestQueue");
+
+         // Objects Server0
+         conn0 = createConnectionOnServer(cf, 0);
+         conn0.start();
+
+         assertEquals(0, getServerId(conn0));
+
+         conn1 = createConnectionOnServer(cf, 1);
+
+         assertEquals(1, getServerId(conn1));
+
+         //Send some messages on node 0
+
+         Session session0 = conn0.createSession(true, Session.SESSION_TRANSACTED);
+
+         MessageProducer producer0 = session0.createProducer(queue0);
+
+         log.info("sending messages on node 0");
+
+         for (int i = 0; i < 30; i++)
+         {
+            producer0.send(session0.createTextMessage("message " + i));
+
+            log.info("Sent message: message " + i);
+         }
+
+         session0.commit();
+
+         //Now kill the server 1
+         waitForFailoverComplete(1, conn1);
+
+         //receive 1 message
+         MessageConsumer cons0 = session0.createConsumer(queue0);
+         
+         TextMessage msg = (TextMessage)cons0.receive(5000);
+
+         assertEquals("message 0", msg.getText());
+
+         session0.commit();
+         
+         //send 30 more
+         log.info("sending more messages on node 0");
+
+         for (int i = 0; i < 30; i++)
+         {
+            producer0.send(session0.createTextMessage("message " + i));
+
+            log.info("Sent message: message " + i);
+         }
+         
+         session0.commit();
+         
+         conn0.close();
+         conn0 = null;
+         
+         //restart node0
+         ServerManagement.stop(0);
+         ServerManagement.start(0, config, false);
+         ServerManagement.deployQueue("mergeTestQueue", "queue/mergeTestQueue", 50, 5, 5, 0, true);
+      }
+      finally
+      {
+         try
+         {
+            ServerManagement.undeployQueue("mergeTestQueue", 0);
+         }
+         catch (Exception ignore)
+         {
+         }
+
+         try
+         {
+            ServerManagement.undeployQueue("mergeTestQueue", 1);
+         }
+         catch (Exception ignore)
+         {
+         }
+
+         if (conn0!=null)
+         {
+            conn0.close();
+         }
+
+         if (conn1!=null)
+         {
+            conn1.close();
+         }
+      }
+   }
+
    /*
     * Both queues paging on merge
     */




More information about the jboss-cvs-commits mailing list