[jboss-cvs] JBoss Messaging SVN: r8384 - in branches/Branch_1_4: src/main/org/jboss/messaging/core/contract and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jul 14 05:31:26 EDT 2011


Author: gaohoward
Date: 2011-07-14 05:31:25 -0400 (Thu, 14 Jul 2011)
New Revision: 8384

Modified:
   branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java
   branches/Branch_1_4/src/main/org/jboss/jms/server/destination/TopicService.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PostOffice.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/core/postoffice/PostOfficeTest.java
Log:
JBMESSAGING-1887


Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java	2011-07-12 02:13:49 UTC (rev 8383)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java	2011-07-14 09:31:25 UTC (rev 8384)
@@ -136,6 +136,8 @@
             queue.activate();
          }
          
+         po.removeInactiveCondition(new JMSCondition(true, destination.getName()));
+         
          ((ManagedQueue)destination).setQueue(queue);
          
          String counterName = QUEUE_MESSAGECOUNTER_PREFIX + destination.getName();
@@ -191,6 +193,9 @@
    {
       try
       {
+         PostOffice po = serverPeer.getPostOfficeInstance();
+         po.addInactiveCondition(new JMSCondition(true, destination.getName()));
+         
       	serverPeer.getDestinationManager().unregisterDestination(destination);
       	
          Queue queue = ((ManagedQueue)destination).getQueue();

Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/destination/TopicService.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/destination/TopicService.java	2011-07-12 02:13:49 UTC (rev 8383)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/destination/TopicService.java	2011-07-14 09:31:25 UTC (rev 8384)
@@ -68,8 +68,10 @@
          PostOffice po = serverPeer.getPostOfficeInstance();
                 
          // We deploy any queues corresponding to pre-existing durable subscriptions
+         
+         JMSCondition condition = new JMSCondition(false, destination.getName());
 
-         Collection queues = po.getQueuesForCondition(new JMSCondition(false, destination.getName()), true);
+         Collection queues = po.getQueuesForCondition(condition, true);
       	
          Iterator iter = queues.iterator();
 
@@ -152,6 +154,8 @@
          
          started = true;
          
+         po.removeInactiveCondition(condition);
+         
          log.info(this + " started, fullSize=" + destination.getFullSize() + ", pageSize=" + destination.getPageSize() + ", downCacheSize=" + destination.getDownCacheSize());
          
          
@@ -177,8 +181,12 @@
          
          PostOffice po = serverPeer.getPostOfficeInstance();
                   
-         Collection queues = serverPeer.getPostOfficeInstance().getQueuesForCondition(new JMSCondition(false, destination.getName()), true);
+         JMSCondition condition = new JMSCondition(false, destination.getName());
+         
+         Collection queues = po.getQueuesForCondition(condition, true);
       	
+         po.addInactiveCondition(condition);
+         
          Iterator iter = queues.iterator();
          
          while (iter.hasNext())            

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PostOffice.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PostOffice.java	2011-07-12 02:13:49 UTC (rev 8383)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PostOffice.java	2011-07-14 09:31:25 UTC (rev 8384)
@@ -25,6 +25,7 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.jboss.jms.server.JMSCondition;
 import org.jboss.jms.server.destination.ManagedDestination;
 import org.jboss.messaging.core.impl.tx.Transaction;
 
@@ -161,5 +162,9 @@
     * change the destination's clustered state.
     */
    Queue convertDestination(ManagedDestination destination, String queueName) throws Throwable;
+
+   void addInactiveCondition(Condition condition);
+
+   void removeInactiveCondition(Condition condition);
 }
 

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-07-12 02:13:49 UTC (rev 8383)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-07-14 09:31:25 UTC (rev 8384)
@@ -275,6 +275,8 @@
    //https://issues.jboss.org/browse/JBMESSAGING-1864
    private Object failoverLock = new Object();
    
+   private Set<Condition> inactiveConditions = new HashSet<Condition>();
+   
    // Constructors ---------------------------------------------------------------------------------
 
    public boolean isFailoverOnNodeLeave()
@@ -3004,6 +3006,12 @@
          }
       }
       
+      //check inactive condition
+      if (inactiveConditions.contains(condition))
+      {
+         throw new IllegalStateException("Destination " + condition + " not active!");
+      }
+      
       try
       {
          List queues = (List)mappings.get(condition);
@@ -4522,5 +4530,59 @@
          return true;
       }
    }
+
+   public void addInactiveCondition(Condition condition)
+   {
+      boolean intr = Thread.interrupted();
+      for (;;)
+      {
+         try
+         {
+            lock.readLock().acquire();
+            break;
+         }
+         catch (InterruptedException ex)
+         {
+            intr = true;
+         }
+      }
+      
+      try
+      {
+         this.inactiveConditions.add(condition);
+      }
+      finally
+      {
+         lock.readLock().release();
+         if (intr) Thread.currentThread().interrupt();
+      }
+   }
+
+   public void removeInactiveCondition(Condition condition)
+   {
+      boolean intr = Thread.interrupted();
+      for (;;)
+      {
+         try
+         {
+            lock.readLock().acquire();
+            break;
+         }
+         catch (InterruptedException ex)
+         {
+            intr = true;
+         }
+      }
+      
+      try
+      {
+         this.inactiveConditions.remove(condition);
+      }
+      finally
+      {
+         lock.readLock().release();
+         if (intr) Thread.currentThread().interrupt();
+      }
+   }
    
 }

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/core/postoffice/PostOfficeTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/core/postoffice/PostOfficeTest.java	2011-07-12 02:13:49 UTC (rev 8383)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/core/postoffice/PostOfficeTest.java	2011-07-14 09:31:25 UTC (rev 8384)
@@ -685,7 +685,124 @@
          
       }
    }
+
+   public final void testRouteInactive2() throws Throwable
+   {
+      PostOffice postOffice = null;
+      
+      try
+      {      
+         postOffice = createNonClusteredPostOffice();
+         
+         Condition condition1 = new SimpleCondition("topic1");
+         
+         MessagingQueue queue1 =  new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false);
+         queue1.activate();
+
+         postOffice.addBinding(new Binding(condition1, queue1, false), false);
+         
+         MessagingQueue queue2 =  new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false);
+         queue2.activate();
+         
+         postOffice.addBinding(new Binding(condition1, queue2, false), false);
+         
+         MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false);
+         queue3.activate();
+         
+         postOffice.addBinding(new Binding(condition1, queue3, false), false);
+
+         SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue1.getLocalDistributor().add(receiver1);
+         SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue2.getLocalDistributor().add(receiver2);
+         SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue3.getLocalDistributor().add(receiver3);
+         
+         Message msg1 = CoreMessageFactory.createCoreMessage(1);      
+         MessageReference ref1 = ms.reference(msg1);
+         
+         boolean routed = postOffice.route(ref1, condition1, null);      
+         assertTrue(routed);
+         
+         List msgs = receiver1.getMessages();
+         assertNotNull(msgs);
+         assertEquals(1, msgs.size());
+         
+         msgs = receiver2.getMessages();
+         assertNotNull(msgs);
+         assertEquals(1, msgs.size());
+         
+         msgs = receiver3.getMessages();
+         assertNotNull(msgs);
+         assertEquals(1, msgs.size());
+         
+         receiver1.clear();
+         receiver2.clear();
+         receiver3.clear();
+         
+         postOffice.addInactiveCondition(condition1);
+         
+         msg1 = CoreMessageFactory.createCoreMessage(1);      
+         ref1 = ms.reference(msg1);
+         
+         try
+         {
+            postOffice.route(ref1, condition1, null);      
+            fail("Didn't get expected exception");
+         }
+         catch (IllegalStateException e)
+         {
+            //ignore
+         }
+         
+         //none received
+         msgs = receiver1.getMessages();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty());
+         
+         msgs = receiver2.getMessages();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty());
+         
+         msgs = receiver3.getMessages();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty());
+
+         postOffice.removeInactiveCondition(condition1);
+         
+         msg1 = CoreMessageFactory.createCoreMessage(1);      
+         ref1 = ms.reference(msg1);
+
+         //all received
+         routed = postOffice.route(ref1, condition1, null);      
+         assertTrue(routed);
+         
+         msgs = receiver1.getMessages();
+         assertNotNull(msgs);
+         assertEquals(1, msgs.size());
+         
+         msgs = receiver2.getMessages();
+         assertNotNull(msgs);
+         assertEquals(1, msgs.size());
+         
+         msgs = receiver3.getMessages();
+         assertNotNull(msgs);
+         assertEquals(1, msgs.size());
+         
+         receiver1.clear();
+         receiver2.clear();
+         receiver3.clear();
+      }
+      finally
+      {
+         if (postOffice != null)
+         {
+            postOffice.stop();
+         }
+         
+      }
    
+   }   
    
    
    // Package protected ---------------------------------------------



More information about the jboss-cvs-commits mailing list