[jboss-cvs] JBoss Messaging SVN: r8384 - in branches/Branch_1_4: src/main/org/jboss/messaging/core/contract and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jul 14 05:31:26 EDT 2011
Author: gaohoward
Date: 2011-07-14 05:31:25 -0400 (Thu, 14 Jul 2011)
New Revision: 8384
Modified:
branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java
branches/Branch_1_4/src/main/org/jboss/jms/server/destination/TopicService.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PostOffice.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
branches/Branch_1_4/tests/src/org/jboss/test/messaging/core/postoffice/PostOfficeTest.java
Log:
JBMESSAGING-1887
Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java 2011-07-12 02:13:49 UTC (rev 8383)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java 2011-07-14 09:31:25 UTC (rev 8384)
@@ -136,6 +136,8 @@
queue.activate();
}
+ po.removeInactiveCondition(new JMSCondition(true, destination.getName()));
+
((ManagedQueue)destination).setQueue(queue);
String counterName = QUEUE_MESSAGECOUNTER_PREFIX + destination.getName();
@@ -191,6 +193,9 @@
{
try
{
+ PostOffice po = serverPeer.getPostOfficeInstance();
+ po.addInactiveCondition(new JMSCondition(true, destination.getName()));
+
serverPeer.getDestinationManager().unregisterDestination(destination);
Queue queue = ((ManagedQueue)destination).getQueue();
Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/destination/TopicService.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/destination/TopicService.java 2011-07-12 02:13:49 UTC (rev 8383)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/destination/TopicService.java 2011-07-14 09:31:25 UTC (rev 8384)
@@ -68,8 +68,10 @@
PostOffice po = serverPeer.getPostOfficeInstance();
// We deploy any queues corresponding to pre-existing durable subscriptions
+
+ JMSCondition condition = new JMSCondition(false, destination.getName());
- Collection queues = po.getQueuesForCondition(new JMSCondition(false, destination.getName()), true);
+ Collection queues = po.getQueuesForCondition(condition, true);
Iterator iter = queues.iterator();
@@ -152,6 +154,8 @@
started = true;
+ po.removeInactiveCondition(condition);
+
log.info(this + " started, fullSize=" + destination.getFullSize() + ", pageSize=" + destination.getPageSize() + ", downCacheSize=" + destination.getDownCacheSize());
@@ -177,8 +181,12 @@
PostOffice po = serverPeer.getPostOfficeInstance();
- Collection queues = serverPeer.getPostOfficeInstance().getQueuesForCondition(new JMSCondition(false, destination.getName()), true);
+ JMSCondition condition = new JMSCondition(false, destination.getName());
+
+ Collection queues = po.getQueuesForCondition(condition, true);
+ po.addInactiveCondition(condition);
+
Iterator iter = queues.iterator();
while (iter.hasNext())
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PostOffice.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PostOffice.java 2011-07-12 02:13:49 UTC (rev 8383)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PostOffice.java 2011-07-14 09:31:25 UTC (rev 8384)
@@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Set;
+import org.jboss.jms.server.JMSCondition;
import org.jboss.jms.server.destination.ManagedDestination;
import org.jboss.messaging.core.impl.tx.Transaction;
@@ -161,5 +162,9 @@
* change the destination's clustered state.
*/
Queue convertDestination(ManagedDestination destination, String queueName) throws Throwable;
+
+ void addInactiveCondition(Condition condition);
+
+ void removeInactiveCondition(Condition condition);
}
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-07-12 02:13:49 UTC (rev 8383)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-07-14 09:31:25 UTC (rev 8384)
@@ -275,6 +275,8 @@
//https://issues.jboss.org/browse/JBMESSAGING-1864
private Object failoverLock = new Object();
+ private Set<Condition> inactiveConditions = new HashSet<Condition>();
+
// Constructors ---------------------------------------------------------------------------------
public boolean isFailoverOnNodeLeave()
@@ -3004,6 +3006,12 @@
}
}
+ //check inactive condition
+ if (inactiveConditions.contains(condition))
+ {
+ throw new IllegalStateException("Destination " + condition + " not active!");
+ }
+
try
{
List queues = (List)mappings.get(condition);
@@ -4522,5 +4530,59 @@
return true;
}
}
+
+ public void addInactiveCondition(Condition condition)
+ {
+ boolean intr = Thread.interrupted();
+ for (;;)
+ {
+ try
+ {
+ lock.readLock().acquire();
+ break;
+ }
+ catch (InterruptedException ex)
+ {
+ intr = true;
+ }
+ }
+
+ try
+ {
+ this.inactiveConditions.add(condition);
+ }
+ finally
+ {
+ lock.readLock().release();
+ if (intr) Thread.currentThread().interrupt();
+ }
+ }
+
+ public void removeInactiveCondition(Condition condition)
+ {
+ boolean intr = Thread.interrupted();
+ for (;;)
+ {
+ try
+ {
+ lock.readLock().acquire();
+ break;
+ }
+ catch (InterruptedException ex)
+ {
+ intr = true;
+ }
+ }
+
+ try
+ {
+ this.inactiveConditions.remove(condition);
+ }
+ finally
+ {
+ lock.readLock().release();
+ if (intr) Thread.currentThread().interrupt();
+ }
+ }
}
Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/core/postoffice/PostOfficeTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/core/postoffice/PostOfficeTest.java 2011-07-12 02:13:49 UTC (rev 8383)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/core/postoffice/PostOfficeTest.java 2011-07-14 09:31:25 UTC (rev 8384)
@@ -685,7 +685,124 @@
}
}
+
+ public final void testRouteInactive2() throws Throwable
+ {
+ PostOffice postOffice = null;
+
+ try
+ {
+ postOffice = createNonClusteredPostOffice();
+
+ Condition condition1 = new SimpleCondition("topic1");
+
+ MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false);
+ queue1.activate();
+
+ postOffice.addBinding(new Binding(condition1, queue1, false), false);
+
+ MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false);
+ queue2.activate();
+
+ postOffice.addBinding(new Binding(condition1, queue2, false), false);
+
+ MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false);
+ queue3.activate();
+
+ postOffice.addBinding(new Binding(condition1, queue3, false), false);
+
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue1.getLocalDistributor().add(receiver1);
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue2.getLocalDistributor().add(receiver2);
+ SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue3.getLocalDistributor().add(receiver3);
+
+ Message msg1 = CoreMessageFactory.createCoreMessage(1);
+ MessageReference ref1 = ms.reference(msg1);
+
+ boolean routed = postOffice.route(ref1, condition1, null);
+ assertTrue(routed);
+
+ List msgs = receiver1.getMessages();
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+
+ msgs = receiver2.getMessages();
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+
+ msgs = receiver3.getMessages();
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+
+ receiver1.clear();
+ receiver2.clear();
+ receiver3.clear();
+
+ postOffice.addInactiveCondition(condition1);
+
+ msg1 = CoreMessageFactory.createCoreMessage(1);
+ ref1 = ms.reference(msg1);
+
+ try
+ {
+ postOffice.route(ref1, condition1, null);
+ fail("Didn't get expected exception");
+ }
+ catch (IllegalStateException e)
+ {
+ //ignore
+ }
+
+ //none received
+ msgs = receiver1.getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receiver2.getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receiver3.getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ postOffice.removeInactiveCondition(condition1);
+
+ msg1 = CoreMessageFactory.createCoreMessage(1);
+ ref1 = ms.reference(msg1);
+
+ //all received
+ routed = postOffice.route(ref1, condition1, null);
+ assertTrue(routed);
+
+ msgs = receiver1.getMessages();
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+
+ msgs = receiver2.getMessages();
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+
+ msgs = receiver3.getMessages();
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+
+ receiver1.clear();
+ receiver2.clear();
+ receiver3.clear();
+ }
+ finally
+ {
+ if (postOffice != null)
+ {
+ postOffice.stop();
+ }
+
+ }
+ }
// Package protected ---------------------------------------------
More information about the jboss-cvs-commits
mailing list