[jboss-cvs] JBoss Messaging SVN: r7879 - in branches/JBMESSAGING-1742: src/main/org/jboss/messaging/core/impl/postoffice and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Oct 28 12:06:32 EDT 2009


Author: gaohoward
Date: 2009-10-28 12:06:30 -0400 (Wed, 28 Oct 2009)
New Revision: 7879

Modified:
   branches/JBMESSAGING-1742/integration/EAP4/tests-src/org/jboss/test/messaging/tools/container/LocalTestServer.java
   branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java
   branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
   branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java
   branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/container/Server.java
Log:
test good


Modified: branches/JBMESSAGING-1742/integration/EAP4/tests-src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- branches/JBMESSAGING-1742/integration/EAP4/tests-src/org/jboss/test/messaging/tools/container/LocalTestServer.java	2009-10-28 14:54:35 UTC (rev 7878)
+++ branches/JBMESSAGING-1742/integration/EAP4/tests-src/org/jboss/test/messaging/tools/container/LocalTestServer.java	2009-10-28 16:06:30 UTC (rev 7879)
@@ -737,6 +737,34 @@
       sc.invoke(deston, "start", new Object[0], new String[0]);
    }
 
+   public void deployDestination(boolean isQueue, String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception
+   {
+      String config = "<mbean code=\"org.jboss.jms.server.destination." + (isQueue ? "QueueService" : "TopicService") +
+                      "\"" +
+                      "       name=\"jboss.messaging.destination:service=" +
+                      (isQueue ? "Queue" : "Topic") +
+                      ",name=" +
+                      name +
+                      "\"" +
+                      "       xmbean-dd=\"xmdesc/" +
+                      (isQueue ? "Queue" : "Topic") +
+                      "-xmbean.xml\">" +
+                      (jndiName != null ? "    <attribute name=\"JNDIName\">" + jndiName + "</attribute>" : "") +
+                      "       <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
+                      "       <attribute name=\"Clustered\">" +
+                      String.valueOf(clustered) +
+                      "</attribute>" +
+                      "       <attribute name=\"DropOldMessageOnRedeploy\">" +
+                      String.valueOf(!keepMessage) +
+                      "</attribute>" +                      
+                      "</mbean>";
+
+      MBeanConfigurationElement mbean = new MBeanConfigurationElement(XMLUtil.stringToElement(config));
+      ObjectName deston = sc.registerAndConfigureService(mbean);
+      sc.invoke(deston, "create", new Object[0], new String[0]);
+      sc.invoke(deston, "start", new Object[0], new String[0]);
+   }
+
    public void deployDestination(boolean isQueue,
                                  String name,
                                  String jndiName,
@@ -1100,6 +1128,17 @@
       }
    }
 
+   public void deployQueue(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception
+   {
+      deployDestination(true, name, jndiName, clustered, keepMessage);
+   }
+
+   public void deployTopic(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception
+   {
+      deployDestination(false, name, jndiName, clustered, keepMessage);
+      
+   }
+
    // Inner classes --------------------------------------------------------------------------------
 
 }

Modified: branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2009-10-28 14:54:35 UTC (rev 7878)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2009-10-28 16:06:30 UTC (rev 7879)
@@ -775,23 +775,21 @@
    {
       Binding b = getBindingForQueueName(queueName);
 
-      logger.log(thisNodeID + "> wwwwwww new Dest: " + newDest.getName() + " queue found: " + queueName);
-      
       if (b.queue.isActive())
       {
          throw new IllegalStateException(this + " cannot convert the destination " + b.queue + " because it is in active state");
       }
       
       //if a destination changes from clustered to standalone, collect the messages from all channels
-      if ( !newDest.isClustered() )
+      Collection allBindings = getAllBindingsForQueueName(queueName);
+      if (newDest.isDropOldMessageOnRedeploy())
       {
-         Collection allBindings = getAllBindingsForQueueName(queueName);
-         if (newDest.isDropOldMessageOnRedeploy())
+         removeDBChannelMessages(allBindings);
+      }
+      else
+      {
+         if ( !newDest.isClustered() )
          {
-            removeDBChannelMessages(allBindings);
-         }
-         else
-         {
             mergeDBChannelMessages(allBindings, b.queue);
          }
       }

Modified: branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java
===================================================================
--- branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java	2009-10-28 14:54:35 UTC (rev 7878)
+++ branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java	2009-10-28 16:06:30 UTC (rev 7879)
@@ -36,17 +36,12 @@
 import javax.jms.XASession;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
 import org.jboss.jms.client.JBossConnectionFactory;
 import org.jboss.jms.tx.MessagingXid;
-import org.jboss.test.messaging.jms.clustering.XAFailoverTest.DummyXAResource;
 import org.jboss.test.messaging.tools.ServerManagement;
-import org.jboss.test.messaging.tools.container.InVMInitialContextFactory;
-import org.jboss.test.messaging.tools.container.ServiceContainer;
 
 /**
  * Test for https://jira.jboss.org/jira/browse/JBMESSAGING-1742
@@ -67,13 +62,7 @@
    
    //nonclustered2ClusteredTopic
    private Topic nTopic;
-
-   private ServiceContainer sc;
    
-   private TransactionManager tm;
-   
-   private Transaction suspended;
-   
    public DestinationRedeployTest(String name)
    {
       super(name);
@@ -82,28 +71,11 @@
    protected void setUp() throws Exception
    {
       super.setUp();
-      sc = new ServiceContainer("transaction");
-      
-      //Don't drop the tables again!
-      sc.start(false);
-   
-      InitialContext localIc = new InitialContext(InVMInitialContextFactory.getJNDIEnvironment());
-           
-      tm = (TransactionManager)localIc.lookup(ServiceContainer.TRANSACTION_MANAGER_JNDI_NAME);
-      
-      suspended = tm.suspend();
    }
 
    protected void tearDown() throws Exception
    {
       super.tearDown();
-      
-      sc.stop();
-      
-      if (suspended != null)
-      {
-         tm.resume(suspended);
-      }
    }
 
    //do a redeploy and test the queues work normally by sending some messages and receiving them.
@@ -118,8 +90,8 @@
       sendMessages(0, cQueue, msgBase, numMsg);
       sendMessages(1, nQueue, msgBase, numMsg);
       
-      receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, false);
-      receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, false);
+      receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, true);
+      receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
    }
 
    //do a redeploy and test the topics work normally by sending some messages and receiving them.
@@ -225,6 +197,69 @@
       }
    }
    
+   public void testRedeployTopicWithMessageLoss() throws Exception
+   {
+      String msgBase = "testRedeployTopicWithMessageLoss";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      Connection conn = null;
+      Session sess = null;
+      try
+      {
+         conn = createConnectionOnServer(cf, 0);
+         conn.setClientID("client-id-0");
+         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         sess.createDurableSubscriber(cTopic, "sub1");
+         sess.createDurableSubscriber(nTopic, "sub2");
+         
+         conn.close();
+
+         sendMessages(2, cTopic, msgBase, numMsg);
+         sendMessages(0, nTopic, msgBase, numMsg);
+         
+         redeployDestinations(false);
+         
+         checkEmpty(cTopic);
+         checkEmpty(nTopic);
+
+         sendMessages(0, cTopic, msgBase, numMsg);
+         sendMessages(2, nTopic, msgBase, numMsg);
+
+         conn = createConnectionOnServer(cf, 0);
+         conn.setClientID("client-id-0");
+         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         conn.start();
+         
+         MessageConsumer sub1 = sess.createDurableSubscriber(cTopic, "sub1");
+         MessageConsumer sub2 = sess.createDurableSubscriber(nTopic, "sub2");
+         
+         for (int i = 0; i < numMsg; i++)
+         {
+            TextMessage rm = (TextMessage)sub1.receive(5000);
+            log.info("--Message received: " + rm);
+            assertEquals(msgBase + i, rm.getText());
+            rm = (TextMessage)sub2.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+         }
+         
+         sub1.close();
+         sub2.close();
+
+         sess.unsubscribe("sub1");
+         sess.unsubscribe("sub2");
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+   
    //send some messages to topics and receive a few of them. Then do redeploy and try to receive the rest.
    public void testRedeployTopicNoMessageLoss2() throws Exception
    {
@@ -512,6 +547,29 @@
       receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
    }
    
+   //send some messages to queues and do redeploy, dropping all messages
+   public void testRedeployQueueWithMessageLoss() throws Exception
+   {
+      String msgBase = "testRedeployQueueWithMessageLoss";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      sendMessages(1, cQueue, msgBase, numMsg);
+      sendMessages(0, nQueue, msgBase, numMsg);
+      
+      redeployDestinations(false);
+      
+      checkEmpty(cQueue);
+      checkEmpty(nQueue);
+      
+      sendMessages(0, cQueue, msgBase, numMsg);
+      sendMessages(1, nQueue, msgBase, numMsg);
+      
+      receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, true);
+      receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+   }
+
    //send some messages to queues and receive a few of them. Then do redeploy and try to receive the rest.
    public void testRedeployQueueNoMessageLoss2() throws Exception
    {
@@ -680,6 +738,18 @@
 
    private void redeployDestinations(boolean keepMessage) throws Exception
    {
+      if (keepMessage)
+      {
+         redeployDestinationsWithMessage();
+      }
+      else
+      {
+         redeployDestinationsNoMessage();
+      }
+   }
+   
+   private void redeployDestinationsNoMessage() throws Exception
+   {
       for (int i = 0; i < nodeCount; i++)
       {
          ServerManagement.stop(i);
@@ -695,6 +765,60 @@
       //redeploy
       for (int i = 0; i < nodeCount; i++)
       {
+         ServerManagement.deployQueue("nonclustered2ClusteredQueue", i, false);
+         ServerManagement.deployTopic("nonclustered2ClusteredTopic", i, false);
+      }
+      ServerManagement.deployQueue("clustered2NonclusteredQueue", false);
+      ServerManagement.deployTopic("clustered2NonclusteredTopic", false);      
+
+
+      cQueue = (Queue)ic[0].lookup("queue/clustered2NonclusteredQueue");
+      nQueue = (Queue)ic[0].lookup("queue/nonclustered2ClusteredQueue");
+      cTopic = (Topic)ic[0].lookup("topic/clustered2NonclusteredTopic");
+      nTopic = (Topic)ic[0].lookup("topic/nonclustered2ClusteredTopic");  
+      
+      cf = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
+
+      try
+      {
+         Queue nonExistQueue = (Queue)ic[1].lookup("queue/clustered2NonclusteredQueue");
+         fail("The queue " + nonExistQueue + " should not exist after redeploy");
+      }
+      catch (NamingException e)
+      {
+         //ok
+      }
+      
+      try
+      {
+         Topic nonExistTopic = (Topic)ic[1].lookup("topic/clustered2NonclusteredTopic");
+         fail("The topic " + nonExistTopic + " should not exist after redeploy");
+      }
+      catch (NamingException e)
+      {
+         //ok
+      }
+
+
+   }
+   
+   private void redeployDestinationsWithMessage() throws Exception
+   {
+      for (int i = 0; i < nodeCount; i++)
+      {
+         ServerManagement.stop(i);
+      }
+      
+      //Restart nodes
+      for (int i = 0; i < nodeCount; i++)
+      {
+         startDefaultServer(i, overrides, false);
+         ic[i] = new InitialContext(ServerManagement.getJNDIEnvironment(i));
+      }
+      
+      //redeploy
+      for (int i = 0; i < nodeCount; i++)
+      {
          ServerManagement.deployQueue("nonclustered2ClusteredQueue", i);
          ServerManagement.deployTopic("nonclustered2ClusteredTopic", i);
       }
@@ -774,12 +898,9 @@
          MessageConsumer receiver = sess.createConsumer(dest);
          TextMessage msg = null;
          
-         log.info("<<<<<<<<<<<< " + numMsg + " >>>>>>>>>>>>>>");
-         
          for (int i = 0; i < numMsg; i++)
          {
             msg = (TextMessage)receiver.receive(5000);
-            log.info("---------------------------------------i: " + i);
             assertEquals(msgBase + (startIndex + i), msg.getText());
          }
 

Modified: branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2009-10-28 14:54:35 UTC (rev 7878)
+++ branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2009-10-28 16:06:30 UTC (rev 7879)
@@ -1357,4 +1357,28 @@
       }
    }
 
+   public static void deployQueue(String name, int serverIndex, boolean keepMessage) throws Exception
+   {
+      insureStarted(serverIndex);
+      servers[serverIndex].getServer().deployQueue(name, null, true, keepMessage);
+   }
+
+   public static void deployTopic(String name, int serverIndex, boolean keepMessage) throws Exception
+   {
+      insureStarted(serverIndex);
+      servers[serverIndex].getServer().deployTopic(name, null, true, keepMessage);
+   }
+
+   public static void deployQueue(String name, boolean keepMessage) throws Exception
+   {
+      insureStarted();
+      servers[0].getServer().deployQueue(name, null, false, keepMessage);
+   }
+
+   public static void deployTopic(String name, boolean keepMessage) throws Exception
+   {
+      insureStarted();
+      servers[0].getServer().deployTopic(name, null, false, keepMessage);
+   }
+
 }

Modified: branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java
===================================================================
--- branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java	2009-10-28 14:54:35 UTC (rev 7878)
+++ branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java	2009-10-28 16:06:30 UTC (rev 7879)
@@ -561,5 +561,21 @@
       return namingDelegate;
    }
 
+   /* (non-Javadoc)
+    * @see org.jboss.test.messaging.tools.container.Server#deployQueue(java.lang.String, java.lang.String, boolean, boolean)
+    */
+   public void deployQueue(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception
+   {
+      server.deployQueue(name, jndiName, clustered, keepMessage);
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.test.messaging.tools.container.Server#deployTopic(java.lang.String, java.lang.String, boolean, boolean)
+    */
+   public void deployTopic(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception
+   {
+      server.deployTopic(name, jndiName, clustered, keepMessage);
+   }
+
    // Inner classes -------------------------------------------------
 }

Modified: branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/container/Server.java
===================================================================
--- branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/container/Server.java	2009-10-28 14:54:35 UTC (rev 7878)
+++ branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/container/Server.java	2009-10-28 16:06:30 UTC (rev 7879)
@@ -298,4 +298,8 @@
 
    ObjectName getPostOfficeObjectName() throws Exception;
 
+   void deployQueue(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception;
+
+   void deployTopic(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception;
+
 }




More information about the jboss-cvs-commits mailing list