[hornetq-commits] JBoss hornetq SVN: r10465 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/cluster/distribution and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Apr 8 00:47:49 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-04-08 00:47:49 -0400 (Fri, 08 Apr 2011)
New Revision: 10465

Added:
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.java
Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
https://issues.jboss.org/browse/JBPAPP-6273 - fix for ack of prepared scheduled messages

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-04-08 04:39:58 UTC (rev 10464)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-04-08 04:47:49 UTC (rev 10465)
@@ -1142,6 +1142,8 @@
 
             continue;
          }
+         
+         queue.pause();
 
          Collection<AddMessageRecord> valueRecords = queueRecords.values();
          
@@ -1225,6 +1227,11 @@
       {
          messageJournal.perfBlast(perfBlastPages);
       }
+      
+      for (Queue queue : queues.values())
+      {
+         queue.resume();
+      }
 
       if (System.getProperty("org.hornetq.opt.directblast") != null)
       {

Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.java	                        (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.java	2011-04-08 04:47:49 UTC (rev 10465)
@@ -0,0 +1,206 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+import java.util.ArrayList;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.settings.impl.AddressSettings;
+
+/**
+ * A MessageRedistributionWithDiscoveryTest
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class MessageRedistributionWithDiscoveryTest extends ClusterTestBase
+{
+   private static final Logger log = Logger.getLogger(SymmetricClusterWithDiscoveryTest.class);
+
+   protected static final String groupAddress = getUDPDiscoveryAddress();
+
+   protected static final int groupPort = getUDPDiscoveryPort();
+
+   protected boolean isNetty()
+   {
+      return false;
+   }
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      setupCluster();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      for (int i = 0; i < servers.length; i++)
+      {
+         if (servers[i] != null)
+         {
+            servers[i].stop();
+            servers[i] = null;
+         }
+      }
+      super.tearDown();
+   }
+
+   protected void setupCluster() throws Exception
+   {
+      setupCluster(false);
+   }
+
+   protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+   {
+      for (int i = 0; i < 5; i++)
+      {
+         setServer(forwardWhenNoConsumers, i);
+      }
+   }
+
+   /**
+    * @param forwardWhenNoConsumers
+    */
+   protected void setServer(final boolean forwardWhenNoConsumers, int server)
+   {
+      setupLiveServerWithDiscovery(server,
+                                   SymmetricClusterWithDiscoveryTest.groupAddress,
+                                   SymmetricClusterWithDiscoveryTest.groupPort,
+                                   isFileStorage(),
+                                   isNetty(),
+                                   false);
+
+      AddressSettings setting = new AddressSettings();
+      setting.setRedeliveryDelay(0);
+      setting.setRedistributionDelay(0);
+
+      servers[server].getAddressSettingsRepository().addMatch("#", setting);
+
+      setupDiscoveryClusterConnection("cluster" + server, server, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+   }
+
+   public void testRedistributeWithPreparedAndRestart() throws Exception
+   {
+      startServers(0);
+
+      setupSessionFactory(0, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, true);
+
+      ClientSession session0 = sfs[0].createSession(false, false, false);
+
+      ClientProducer prod0 = session0.createProducer("queues.testaddress");
+
+      for (int i = 0; i < 100; i++)
+      {
+         ClientMessage msg = session0.createMessage(true);
+
+         msg.putIntProperty("key", i);
+
+         prod0.send(msg);
+
+         session0.commit();
+      }
+
+      session0.close();
+
+      session0 = sfs[0].createSession(true, false, false);
+
+      ClientConsumer consumer0 = session0.createConsumer("queue0");
+
+      session0.start();
+
+      ArrayList<Xid> xids = new ArrayList<Xid>();
+
+      for (int i = 0; i < 100; i++)
+      {
+         Xid xid = newXID();
+
+         session0.start(xid, XAResource.TMNOFLAGS);
+
+         ClientMessage msg = consumer0.receive(5000);
+
+         msg.acknowledge();
+
+         session0.end(xid, XAResource.TMSUCCESS);
+
+         session0.prepare(xid);
+
+         xids.add(xid);
+      }
+
+      session0.close();
+
+      sfs[0].close();
+      sfs[0] = null;
+
+      servers[0].stop();
+      servers[0] = null;
+
+      setServer(false, 0);
+
+      startServers(1, 2);
+
+      setupSessionFactory(1, isNetty());
+      setupSessionFactory(2, isNetty());
+
+      ClientSession session1 = sfs[1].createSession(false, false);
+
+      createQueue(1, "queues.testaddress", "queue0", null, true);
+      createQueue(2, "queues.testaddress", "queue0", null, true);
+
+      session1.start();
+      ClientConsumer consumer1 = session1.createConsumer("queue0");
+
+      startServers(0);
+
+      setupSessionFactory(0, isNetty());
+
+      waitForBindings(0, "queues.testaddress", 1, 0, true);
+
+      waitForBindings(0, "queues.testaddress", 2, 1, false);
+
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+      waitForBindings(2, "queues.testaddress", 1, 0, true);
+
+      waitForBindings(1, "queues.testaddress", 2, 0, false);
+      waitForBindings(2, "queues.testaddress", 2, 1, false);
+
+      session0 = sfs[0].createSession(true, false, false);
+
+      for (Xid xid : xids)
+      {
+         session0.rollback(xid);
+      }
+
+      for (int i = 0; i < 100; i++)
+      {
+         ClientMessage msg = consumer1.receive(15000);
+         assertNotNull(msg);
+         msg.acknowledge();
+      }
+
+      session1.commit();
+
+   }
+
+}



More information about the hornetq-commits mailing list