[jboss-cvs] JBoss Messaging SVN: r5263 - in trunk: src/main/org/jboss/messaging/core/config and 10 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Nov 4 13:52:42 EST 2008


Author: timfox
Date: 2008-11-04 13:52:42 -0500 (Tue, 04 Nov 2008)
New Revision: 5263

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ActivationTimeoutTest.java
Modified:
   trunk/src/config/jbm-configuration.xml
   trunk/src/main/org/jboss/messaging/core/config/Configuration.java
   trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
   trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/Queue.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/tests/config/ConfigurationTest-config.xml
   trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest2.java
Log:
Queue activation timeout


Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml	2008-11-04 18:13:40 UTC (rev 5262)
+++ trunk/src/config/jbm-configuration.xml	2008-11-04 18:52:42 UTC (rev 5263)
@@ -28,6 +28,8 @@
       
       <backup>false</backup>
       
+      <queue-activation-timeout>30000</queue-activation-timeout>
+      
       <!--
       <backup-connector>
          <factory-class>org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory</factory-class>

Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-11-04 18:13:40 UTC (rev 5262)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-11-04 18:52:42 UTC (rev 5263)
@@ -47,6 +47,10 @@
    boolean isBackup();
 
    void setBackup(boolean backup);
+   
+   long getQueueActivationTimeout();
+   
+   void setQueueActivationTimeout(long timeout);
 
    int getScheduledThreadPoolMaxSize();
 

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-11-04 18:13:40 UTC (rev 5262)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-11-04 18:52:42 UTC (rev 5263)
@@ -34,6 +34,8 @@
    public static final boolean DEFAULT_CLUSTERED = false;
 
    public static final boolean DEFAULT_BACKUP = false;
+   
+   public static final long DEFAULT_QUEUE_ACTIVATION_TIMEOUT = 30000;
 
    public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 30;
 
@@ -78,6 +80,8 @@
    protected boolean clustered = DEFAULT_CLUSTERED;
 
    protected boolean backup = DEFAULT_BACKUP;
+      
+   protected long queueActivationTimeout = DEFAULT_QUEUE_ACTIVATION_TIMEOUT;
 
    protected int scheduledThreadPoolMaxSize = DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
 
@@ -148,6 +152,16 @@
    {
       this.backup = backup;
    }
+   
+   public long getQueueActivationTimeout()
+   {
+      return queueActivationTimeout;
+   }
+   
+   public void setQueueActivationTimeout(long timeout)
+   {
+      this.queueActivationTimeout = timeout;
+   }
 
    public int getScheduledThreadPoolMaxSize()
    {

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-11-04 18:13:40 UTC (rev 5262)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-11-04 18:52:42 UTC (rev 5263)
@@ -71,6 +71,8 @@
       clustered = getBoolean(e, "clustered", clustered);
       
       backup = getBoolean(e, "backup", backup);
+      
+      queueActivationTimeout = getLong(e, "queue-activation-timeout", queueActivationTimeout);
 
       //NOTE! All the defaults come from the super class
       

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2008-11-04 18:13:40 UTC (rev 5262)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2008-11-04 18:52:42 UTC (rev 5263)
@@ -54,9 +54,6 @@
 {
    private static final Logger log = Logger.getLogger(NullStorageManager.class);
 
-   
-   //FIXME - these need to use id generators from 1.4 null storage manager since is not unique across
-   //cluster
 	private final IDGenerator idGenerator = new TimeAndCounterIDGenerator();
 
 	private volatile boolean started;

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2008-11-04 18:13:40 UTC (rev 5262)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2008-11-04 18:52:42 UTC (rev 5263)
@@ -30,6 +30,7 @@
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.MessagingComponent;
+import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.SendLock;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.util.SimpleString;
@@ -49,9 +50,6 @@
  * The PostOffice also maintains a set of "allowable addresses". These are the addresses that it is legal to
  * route to.
  * 
- * Finally, a PostOffice maintains a set of FlowControllers - one for each unique address. These are used, where
- * appropriate to control the flow of messages sent to a particular address
- *  
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
@@ -79,7 +77,7 @@
 
    Set<SimpleString> listAllDestinations();
    
-   void activate();
+   List<Queue> activate();
    
    PagingManager getPagingManager();
    

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-11-04 18:13:40 UTC (rev 5262)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-11-04 18:52:42 UTC (rev 5263)
@@ -110,7 +110,7 @@
          addressManager = new SimpleAddressManager();
       }
 
-      this.backup = backup;
+      this.backup = backup;    
    }
 
    // MessagingComponent implementation ---------------------------------------
@@ -123,7 +123,7 @@
 
          pagingManager.start();
       }
-
+      
       // Injecting the postoffice (itself) on queueFactory for paging-control
       queueFactory.setPostOffice(this);
 
@@ -316,18 +316,29 @@
       return addressManager.getMappings();
    }
 
-   public synchronized void activate()
+   public List<Queue> activate()
    {
       this.backup = false;
 
       Map<SimpleString, Binding> nameMap = addressManager.getBindings();
 
+      List<Queue> queues = new ArrayList<Queue>();
+      
       for (Binding binding : nameMap.values())
       {
-         binding.getQueue().activate();
+         Queue queue = binding.getQueue();
+         
+         boolean activated = queue.activate();
+   
+         if (!activated)
+         { 
+            queues.add(queue);
+         }
       }
+      
+      return queues;
    }
-   
+         
    public synchronized SendLock getAddressLock(final SimpleString address)
    {
       SendLock lock = addressLocks.get(address);

Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-11-04 18:13:40 UTC (rev 5262)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-11-04 18:52:42 UTC (rev 5263)
@@ -130,8 +130,10 @@
 
    void setBackup();
    
-   void activate();
+   boolean activate();
    
+   void activateNow(Executor executor);
+   
    boolean isBackup();
    
    MessageReference removeFirst();
@@ -140,6 +142,4 @@
    
    //Only used in testing
    void deliverNow();
-   
-  // void dumpRefs();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-11-04 18:13:40 UTC (rev 5262)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-11-04 18:52:42 UTC (rev 5263)
@@ -13,6 +13,7 @@
 package org.jboss.messaging.core.server.impl;
 
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -50,6 +51,7 @@
 import org.jboss.messaging.core.security.SecurityStore;
 import org.jboss.messaging.core.security.impl.SecurityStoreImpl;
 import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.server.ServerSession;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
@@ -273,7 +275,7 @@
       {
          return;
       }
-      
+
       asyncDeliveryPool.shutdown();
 
       try
@@ -407,8 +409,15 @@
       {
          freezeAllBackupConnections();
 
-         postOffice.activate();
+         List<Queue> toActivate = postOffice.activate();
 
+         for (Queue queue : toActivate)
+         {
+            scheduledExecutor.schedule(new ActivateRunner(queue),
+                                       configuration.getQueueActivationTimeout(),
+                                       TimeUnit.MILLISECONDS);
+         }
+
          configuration.setBackup(false);
 
          remotingService.setBackup(false);
@@ -497,7 +506,7 @@
                                                      final boolean autoCommitSends,
                                                      final boolean autoCommitAcks,
                                                      final boolean xa,
-                                                     final int sendWindowSize)throws Exception
+                                                     final int sendWindowSize) throws Exception
    {
       checkActivate(connection);
 
@@ -640,4 +649,19 @@
 
    // Inner classes
    // --------------------------------------------------------------------------------
+
+   private class ActivateRunner implements Runnable
+   {
+      private Queue queue;
+
+      ActivateRunner(final Queue queue)
+      {
+         this.queue = queue;
+      }
+
+      public void run()
+      {
+         queue.activateNow(asyncDeliveryPool);
+      }
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-11-04 18:13:40 UTC (rev 5262)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-11-04 18:52:42 UTC (rev 5263)
@@ -148,7 +148,7 @@
    }
 
    public HandleStatus addLast(final MessageReference ref)
-   {      
+   {
       HandleStatus status = add(ref, false);
 
       return status;
@@ -166,7 +166,7 @@
       while (iter.hasPrevious())
       {
          MessageReference ref = iter.previous();
-         
+
          ServerMessage msg = ref.getMessage();
 
          if (!scheduledDeliveryHandler.checkAndSchedule(ref, backup))
@@ -187,13 +187,13 @@
          executor.execute(deliverRunner);
       }
    }
-   
+
    // Only used in testing - do not call directly!
    public synchronized void deliverNow()
    {
       deliver();
    }
-   
+
    public void addConsumer(final Consumer consumer)
    {
       distributionPolicy.addConsumer(consumer);
@@ -207,7 +207,7 @@
       {
          promptDelivery = false;
       }
-      
+
       return removed;
    }
 
@@ -294,7 +294,7 @@
    }
 
    public synchronized int getMessageCount()
-   {    
+   {
       return messageReferences.size() + getScheduledCount() + getDeliveringCount();
    }
 
@@ -345,7 +345,6 @@
       return messagesAdded.get();
    }
 
-
    public synchronized void deleteAllReferences(final StorageManager storageManager) throws Exception
    {
       Transaction tx = new TransactionImpl(storageManager, postOffice);
@@ -366,9 +365,9 @@
       List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
       for (MessageReference messageReference : cancelled)
       {
-          deliveringCount.incrementAndGet();
+         deliveringCount.incrementAndGet();
 
-          tx.addAcknowledgement(messageReference);
+         tx.addAcknowledgement(messageReference);
       }
 
       tx.commit();
@@ -492,7 +491,7 @@
    }
 
    public synchronized void setBackup()
-   {    
+   {
       this.backup = true;
 
       this.direct = false;
@@ -503,26 +502,45 @@
       return messageReferences.removeFirst();
    }
 
-   public synchronized void activate()
-   {      
+   public synchronized boolean activate()
+   {
       consumersToFailover = distributionPolicy.getConsumerCount();
-      
+
       if (consumersToFailover == 0)
       {
          backup = false;
+
+         return true;
       }
+      else
+      {
+         return false;
+      }
    }
 
+   public synchronized void activateNow(final Executor executor)
+   {
+      if (backup)
+      {
+         log.info("Timed out waiting for all consumers to reconnect to queue " + name +
+                  " so queue will be activated now");
+
+         backup = false;
+
+         deliverAsync(executor);
+      }
+   }
+
    public synchronized boolean consumerFailedOver()
    {
       consumersToFailover--;
-      
+
       if (consumersToFailover == 0)
       {
          // All consumers for the queue have failed over, can re-activate it now
 
          backup = false;
-         
+
          scheduledDeliveryHandler.reSchedule();
 
          return true;
@@ -570,7 +588,7 @@
       {
          return;
       }
-      
+
       MessageReference reference;
 
       Iterator<MessageReference> iterator = null;
@@ -616,7 +634,7 @@
             else
             {
                iterator.remove();
-            }            
+            }
          }
          else if (status == HandleStatus.BUSY)
          {
@@ -630,9 +648,9 @@
             // through the queue
             iterator = messageReferences.iterator();
          }
-      }     
+      }
    }
-   
+
    private synchronized HandleStatus add(final MessageReference ref, final boolean first)
    {
       if (!first)
@@ -707,7 +725,7 @@
    private HandleStatus deliver(final MessageReference reference)
    {
       HandleStatus status = distributionPolicy.distribute(reference);
-      
+
       if (status == HandleStatus.HANDLED)
       {
          deliveringCount.incrementAndGet();
@@ -717,7 +735,7 @@
       {
          promptDelivery = true;
       }
-      
+
       return status;
    }
 
@@ -732,8 +750,8 @@
          waitingToDeliver.set(false);
 
          synchronized (QueueImpl.this)
-         {          
-            deliver();            
+         {
+            deliver();
          }
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-11-04 18:13:40 UTC (rev 5262)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-11-04 18:52:42 UTC (rev 5263)
@@ -401,7 +401,7 @@
    }
    
    private HandleStatus doHandle(final MessageReference ref) throws Exception
-   {
+   {      
       if (availableCredits != null && availableCredits.get() <= 0)
       {
          return HandleStatus.BUSY;

Modified: trunk/tests/config/ConfigurationTest-config.xml
===================================================================
--- trunk/tests/config/ConfigurationTest-config.xml	2008-11-04 18:13:40 UTC (rev 5262)
+++ trunk/tests/config/ConfigurationTest-config.xml	2008-11-04 18:52:42 UTC (rev 5263)
@@ -2,11 +2,12 @@
    <configuration>
       <clustered>true</clustered>
       <backup>true</backup>
+      <queue-activation-timeout>12456</queue-activation-timeout>
       <scheduled-max-pool-size>12345</scheduled-max-pool-size>        
       <require-destinations>false</require-destinations>
       <security-enabled>false</security-enabled>
       <security-invalidation-interval>5423</security-invalidation-interval>
-       <wild-card-routing-enabled>true</wild-card-routing-enabled>
+      <wild-card-routing-enabled>true</wild-card-routing-enabled>
       <call-timeout>7654</call-timeout>    
       <packet-confirmation-batch-size>543</packet-confirmation-batch-size>
       <connection-scan-period>6543</connection-scan-period>

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ActivationTimeoutTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ActivationTimeoutTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ActivationTimeoutTest.java	2008-11-04 18:52:42 UTC (rev 5263)
@@ -0,0 +1,213 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * A ActivationTimeoutTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 4 Nov 2008 16:54:50
+ *
+ *
+ */
+public class ActivationTimeoutTest extends TestCase
+{
+   private static final Logger log = Logger.getLogger(SimpleAutomaticFailoverTest.class);
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private static final long ACTIVATION_TIMEOUT = 5000;
+   
+   private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+   private MessagingService liveService;
+
+   private MessagingService backupService;
+
+   private final Map<String, Object> backupParams = new HashMap<String, Object>();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testActivationTimeout() throws Exception
+   {            
+      ClientSessionFactoryInternal sf1 = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                                      new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                                 backupParams));
+      
+      ClientSessionFactoryInternal sf2 = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                                      new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                                 backupParams));
+
+      sf1.setSendWindowSize(32 * 1024);
+      sf2.setSendWindowSize(32 * 1024);
+
+      ClientSession session1 = sf1.createSession(false, true, true, false);
+
+      session1.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+      ClientProducer producer = session1.createProducer(ADDRESS);
+
+      final int numMessages = 1000;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session1.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().putString("aardvarks");
+         message.getBody().flip();
+         producer.send(message);
+      }
+      log.info("Sent messages");
+      
+      ClientConsumer consumer1 = session1.createConsumer(ADDRESS);
+      
+      ClientSession session2 = sf2.createSession(false, true, true, false);
+      
+      //Create another consumer so we have two consumers on the queue
+      ClientConsumer consumer2 = session2.createConsumer(ADDRESS);
+      
+      long start = System.currentTimeMillis();
+
+      RemotingConnection conn1 = ((ClientSessionImpl)session1).getConnection();
+
+      // Now we fail ONLY the connections on sf1, not on sf2      
+      conn1.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+      session1.start();
+
+      //The messages should not be delivered until after activationTimeout ms, since
+      //session 2 didn't reattach
+        
+      long now = System.currentTimeMillis();
+      
+      ClientMessage message = consumer1.receive(ACTIVATION_TIMEOUT - (now - start));
+      
+      assertNull(message);        
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         message = consumer1.receive();
+         
+         assertEquals("aardvarks", message.getBody().getString());
+
+         assertEquals(i, message.getProperty(new SimpleString("count")));
+
+         message.acknowledge();
+      }
+      
+      message = consumer1.receive(1000);
+      
+      assertNull(message);
+      
+      session1.close();
+      
+      RemotingConnection conn2 = ((ClientSessionImpl)session2).getConnection();
+     
+      conn2.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+      
+      session2.close();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      Configuration backupConf = new ConfigurationImpl();
+      backupConf.setSecurityEnabled(false);
+      backupConf.setQueueActivationTimeout(ACTIVATION_TIMEOUT);
+      backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+      backupConf.getAcceptorConfigurations()
+                .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                backupParams));
+      backupConf.setBackup(true);
+      backupService = MessagingServiceImpl.newNullStorageMessagingServer(backupConf);
+      backupService.start();
+
+      Configuration liveConf = new ConfigurationImpl();
+      liveConf.setSecurityEnabled(false);
+      liveConf.getAcceptorConfigurations()
+              .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+      liveConf.setBackupConnectorConfiguration(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                          backupParams));
+      liveService = MessagingServiceImpl.newNullStorageMessagingServer(liveConf);
+      liveService.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+      backupService.stop();
+
+      assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+      liveService.stop();
+
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2008-11-04 18:13:40 UTC (rev 5262)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2008-11-04 18:52:42 UTC (rev 5263)
@@ -128,8 +128,9 @@
       return null;
    }
 
-   public void activate()
+   public List<Queue> activate()
    {
+      return null;
    }
 
    public PagingManager getPagingManager()

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java	2008-11-04 18:13:40 UTC (rev 5262)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java	2008-11-04 18:52:42 UTC (rev 5263)
@@ -54,6 +54,7 @@
    {      
       assertEquals(ConfigurationImpl.DEFAULT_CLUSTERED, conf.isClustered());
       assertEquals(ConfigurationImpl.DEFAULT_BACKUP, conf.isBackup());
+      assertEquals(ConfigurationImpl.DEFAULT_QUEUE_ACTIVATION_TIMEOUT, conf.getQueueActivationTimeout());
       assertEquals(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, conf.getScheduledThreadPoolMaxSize());
       assertEquals(ConfigurationImpl.DEFAULT_SECURITY_INVALIDATION_INTERVAL, conf.getSecurityInvalidationInterval());
       assertEquals(ConfigurationImpl.DEFAULT_REQUIRE_DESTINATIONS, conf.isRequireDestinations());
@@ -84,11 +85,15 @@
          conf.setBackup(b);
          assertEquals(b, conf.isBackup());
          
+         long l = randomLong();
+         conf.setQueueActivationTimeout(l);
+         assertEquals(l, conf.getQueueActivationTimeout());
+         
          int i = randomInt();
          conf.setScheduledThreadPoolMaxSize(i);
          assertEquals(i, conf.getScheduledThreadPoolMaxSize());
                   
-         long l = randomLong();
+         l = randomLong();
          conf.setSecurityInvalidationInterval(l);
          assertEquals(l, conf.getSecurityInvalidationInterval());
          
@@ -149,8 +154,6 @@
    
    public void testGetSetInterceptors()
    {
-      List<String> interceptors = conf.getInterceptorClassNames();
-      
       final String name1 = "uqwyuqywuy";
       final String name2 = "yugyugyguyg";
       
@@ -167,10 +170,17 @@
       boolean b = randomBoolean();
       conf.setClustered(b);
       
+      b = randomBoolean();
+      conf.setBackup(b);
+      
+      long l = randomLong();
+      conf.setQueueActivationTimeout(l);
+      
+      
       int i = randomInt();
       conf.setScheduledThreadPoolMaxSize(i);
          
-      long l = randomLong();
+      l = randomLong();
       conf.setSecurityInvalidationInterval(l);
 
       b = randomBoolean();

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java	2008-11-04 18:13:40 UTC (rev 5262)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java	2008-11-04 18:52:42 UTC (rev 5263)
@@ -39,6 +39,7 @@
       //Check they match the values from the test file
       assertEquals(true, conf.isClustered());
       assertEquals(true, conf.isBackup());
+      assertEquals(12456, conf.getQueueActivationTimeout());
       assertEquals(12345, conf.getScheduledThreadPoolMaxSize());    
       assertEquals(5423, conf.getSecurityInvalidationInterval());
       assertEquals(false, conf.isRequireDestinations());

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest2.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest2.java	2008-11-04 18:13:40 UTC (rev 5262)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest2.java	2008-11-04 18:52:42 UTC (rev 5263)
@@ -39,6 +39,7 @@
    {
       assertEquals(ConfigurationImpl.DEFAULT_CLUSTERED, conf.isClustered());
       assertEquals(ConfigurationImpl.DEFAULT_BACKUP, conf.isBackup());
+      assertEquals(ConfigurationImpl.DEFAULT_QUEUE_ACTIVATION_TIMEOUT, conf.getQueueActivationTimeout());
       assertEquals(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, conf.getScheduledThreadPoolMaxSize());
       assertEquals(ConfigurationImpl.DEFAULT_SECURITY_INVALIDATION_INTERVAL, conf.getSecurityInvalidationInterval());
       assertEquals(ConfigurationImpl.DEFAULT_REQUIRE_DESTINATIONS, conf.isRequireDestinations());




More information about the jboss-cvs-commits mailing list