[jboss-cvs] JBoss Messaging SVN: r3777 - in trunk: src/main/org/jboss/jms/server and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Feb 23 08:51:35 EST 2008


Author: timfox
Date: 2008-02-23 08:51:34 -0500 (Sat, 23 Feb 2008)
New Revision: 3777

Removed:
   trunk/src/main/org/jboss/jms/server/MessagingTimeoutFactory.java
Modified:
   trunk/src/etc/server/default/deploy/jbm-configuration.xml
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/Configuration.java
   trunk/src/main/org/jboss/messaging/core/FileConfiguration.java
   trunk/src/main/org/jboss/messaging/core/impl/QueueFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java
   trunk/tests/src/org/jboss/messaging/core/impl/test/timing/QueueTest.java
Log:
Some improvements to scheduled deliveries


Modified: trunk/src/etc/server/default/deploy/jbm-configuration.xml
===================================================================
--- trunk/src/etc/server/default/deploy/jbm-configuration.xml	2008-02-23 01:17:46 UTC (rev 3776)
+++ trunk/src/etc/server/default/deploy/jbm-configuration.xml	2008-02-23 13:51:34 UTC (rev 3777)
@@ -36,6 +36,8 @@
       <post-office-name>JMS post office</post-office-name>
 
       <clustered>false</clustered>
+      
+      <scheduled-executor-max-pool-size>30</scheduled-executor-max-pool-size>
 
       <!-- All the remaining properties only have to be specified if the post office is clustered.
 You can safely comment them out if your post office is non clustered -->

Deleted: trunk/src/main/org/jboss/jms/server/MessagingTimeoutFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/MessagingTimeoutFactory.java	2008-02-23 01:17:46 UTC (rev 3776)
+++ trunk/src/main/org/jboss/jms/server/MessagingTimeoutFactory.java	2008-02-23 13:51:34 UTC (rev 3777)
@@ -1,81 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.jms.server;
-
-import org.jboss.util.threadpool.BasicThreadPool;
-import org.jboss.util.threadpool.ThreadPool;
-import org.jboss.util.timeout.TimeoutFactory;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- *
- */
-public class MessagingTimeoutFactory
-{
-   // Constants ------------------------------------------------------------------------------------
-
-   // Static ---------------------------------------------------------------------------------------
-
-   public static MessagingTimeoutFactory instance = new MessagingTimeoutFactory();
-
-   // Attributes -----------------------------------------------------------------------------------
-
-   private TimeoutFactory factory;
-
-   // Constructors ---------------------------------------------------------------------------------
-
-   private MessagingTimeoutFactory()
-   {
-      createFactory();
-   }
-
-   // Public ---------------------------------------------------------------------------------------
-
-   public TimeoutFactory getFactory()
-   {
-      return factory;
-   }
-
-   public synchronized void reset()
-   {
-      factory.cancel();
-      createFactory();
-   }
-
-   // Package protected ----------------------------------------------------------------------------
-
-   // Protected ------------------------------------------------------------------------------------
-
-   // Private --------------------------------------------------------------------------------------
-
-   private void createFactory()
-   {
-      ThreadPool threadPool = new BasicThreadPool("Messaging Timeout");
-      factory = new TimeoutFactory(threadPool);
-   }
-
-   // Inner classes --------------------------------------------------------------------------------
-
-}

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-02-23 01:17:46 UTC (rev 3776)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-02-23 13:51:34 UTC (rev 3777)
@@ -270,30 +270,6 @@
       }
    }
     
-//   void localClose() throws Exception
-//   {
-//      if (trace) { log.trace(this + " grabbed the main lock in close() " + this); }
-//
-//      messageQueue.removeConsumer(this);
-//      
-//      sessionEndpoint.getConnectionEndpoint().getMessagingServer().getRemotingService().getDispatcher().unregister(id);     
-//      
-//      if (autoDeleteQueue)
-//      {
-//         if (messageQueue.getConsumerCount() == 0)
-//         {
-//            MessagingServer server = sessionEndpoint.getConnectionEndpoint().getMessagingServer();
-//            
-//            server.getPostOffice().removeBinding(messageQueue.getName());
-//            
-//            if (messageQueue.isDurable())
-//            {
-//               server.getPersistenceManager().deleteAllReferences(messageQueue);
-//            }
-//         }
-//      }
-//   }
-
    // Protected ------------------------------------------------------------------------------------
 
    // Private --------------------------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-02-23 01:17:46 UTC (rev 3776)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-02-23 13:51:34 UTC (rev 3777)
@@ -239,8 +239,7 @@
 
    synchronized void handleDelivery(MessageReference ref, ServerConsumerEndpoint consumer) throws Exception
    {
-      Delivery delivery = new DeliveryImpl(ref, consumer.getID(),
-            deliveryIDSequence++, sender);
+      Delivery delivery = new DeliveryImpl(ref, consumer.getID(), deliveryIDSequence++, sender);
 
       deliveries.add(delivery);
 

Modified: trunk/src/main/org/jboss/messaging/core/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/Configuration.java	2008-02-23 01:17:46 UTC (rev 3776)
+++ trunk/src/main/org/jboss/messaging/core/Configuration.java	2008-02-23 13:51:34 UTC (rev 3777)
@@ -68,6 +68,8 @@
    protected String _postOfficeName;
 
    protected Boolean _clustered = false;
+   
+   protected Integer _scheduledThreadPoolMaxSize = 30;
 
    protected Long _stateTimeout = (long) 5000;
 
@@ -213,6 +215,16 @@
    {
       return _clustered;
    }
+   
+   public Integer getScheduledThreadPoolMaxSize()
+   {
+   	return _scheduledThreadPoolMaxSize;
+   }
+   
+   public void setScheduledThreadPoolMaxSize(int size)
+   {
+   	this._scheduledThreadPoolMaxSize = size;
+   }
 
    public  void setClustered(Boolean clustered)
    {

Modified: trunk/src/main/org/jboss/messaging/core/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/FileConfiguration.java	2008-02-23 01:17:46 UTC (rev 3776)
+++ trunk/src/main/org/jboss/messaging/core/FileConfiguration.java	2008-02-23 13:51:34 UTC (rev 3777)
@@ -61,6 +61,7 @@
       _strictTck = getBoolean(e, "strict-tck", _strictTck);
       _postOfficeName = getString(e, "post-office-name", _postOfficeName);
       _clustered = getBoolean(e, "clustered", _clustered);
+      _scheduledThreadPoolMaxSize = getInteger(e, "scheduled-executor-max-pool-size", _scheduledThreadPoolMaxSize);
       _stateTimeout = getLong(e, "state-timeout", _stateTimeout);
       _castTimeout = getLong(e, "cast-timeout", _castTimeout);
       _groupName = getString(e, "group-name", _groupName);

Modified: trunk/src/main/org/jboss/messaging/core/impl/QueueFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/QueueFactoryImpl.java	2008-02-23 01:17:46 UTC (rev 3776)
+++ trunk/src/main/org/jboss/messaging/core/impl/QueueFactoryImpl.java	2008-02-23 13:51:34 UTC (rev 3777)
@@ -21,6 +21,8 @@
   */
 package org.jboss.messaging.core.impl;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 import org.jboss.messaging.core.Filter;
 import org.jboss.messaging.core.Queue;
 import org.jboss.messaging.core.QueueFactory;
@@ -37,13 +39,22 @@
  */
 public class QueueFactoryImpl implements QueueFactory
 {
-   HierarchicalRepository<QueueSettings> queueSettingsRepository;
+   private HierarchicalRepository<QueueSettings> queueSettingsRepository;
+   
+   private ScheduledExecutorService scheduledExecutor;
 
-
+   public QueueFactoryImpl(ScheduledExecutorService scheduledExecutor)
+   {
+      this();
+      
+      this.scheduledExecutor = scheduledExecutor;
+   }
+   
    public QueueFactoryImpl()
    {
       queueSettingsRepository = new HierarchicalObjectRepository<QueueSettings>();
-      queueSettingsRepository.setDefault(new QueueSettings());
+      
+      queueSettingsRepository.setDefault(new QueueSettings());      
    }
 
    public Queue createQueue(long id, String name, Filter filter,
@@ -51,11 +62,16 @@
    {
       QueueSettings queueSettings = queueSettingsRepository.getMatch(name);
 
-      Queue queue =  new QueueImpl(id, name, filter, queueSettings.isClustered(), durable, temporary, queueSettings.getMaxSize());
+      Queue queue =  new QueueImpl(id, name, filter, queueSettings.isClustered(),
+      		                       durable, temporary, queueSettings.getMaxSize(),
+      		                       scheduledExecutor);
       
       queue.setMaxDeliveryAttempts(queueSettings.getMaxDeliveryAttempts());
+      
       queue.setMessageCounterHistoryDayLimit(queueSettings.getMessageCounterHistoryDayLimit());
+      
       queue.setRedeliveryDelay(queueSettings.getRedeliveryDelay());                           
+      
       queue.setDistributionPolicy(queueSettings.getDistributionPolicy());
 
       return queue;

Modified: trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java	2008-02-23 01:17:46 UTC (rev 3776)
+++ trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java	2008-02-23 13:51:34 UTC (rev 3777)
@@ -27,9 +27,11 @@
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.jboss.jms.server.MessagingTimeoutFactory;
 import org.jboss.messaging.core.Consumer;
 import org.jboss.messaging.core.DistributionPolicy;
 import org.jboss.messaging.core.Filter;
@@ -38,8 +40,6 @@
 import org.jboss.messaging.core.PriorityLinkedList;
 import org.jboss.messaging.core.Queue;
 import org.jboss.messaging.util.Logger;
-import org.jboss.util.timeout.Timeout;
-import org.jboss.util.timeout.TimeoutTarget;
 
 /**
  * 
@@ -74,7 +74,7 @@
    
    protected List<Consumer> consumers;
    
-   protected Set<Timeout> scheduledTimeouts;
+   protected Set<ScheduledDeliveryRunnable> scheduledRunnables;
    
    protected DistributionPolicy distributionPolicy;
    
@@ -87,6 +87,8 @@
    private AtomicInteger messagesAdded = new AtomicInteger(0);
    
    private AtomicInteger deliveringCount = new AtomicInteger(0);
+   
+   private ScheduledExecutorService scheduledExecutor;
          
    // ---------
    
@@ -100,6 +102,13 @@
    
    private int messageCounterHistoryDayLimit;
       
+   public QueueImpl(long id, String name, Filter filter, boolean clustered,
+                    boolean durable, boolean temporary, int maxSize, ScheduledExecutorService scheduledExecutor)
+   {
+   	this(id, name, filter, clustered, durable, temporary, maxSize);
+   	
+   	this.scheduledExecutor = scheduledExecutor;
+   }
    
    public QueueImpl(long id, String name, Filter filter, boolean clustered,
                     boolean durable, boolean temporary, int maxSize)
@@ -123,7 +132,7 @@
       
       consumers = new ArrayList<Consumer>();
       
-      scheduledTimeouts = new HashSet<Timeout>();
+      scheduledRunnables = new HashSet<ScheduledDeliveryRunnable>();
       
       distributionPolicy = new RoundRobinDistributionPolicy();
       
@@ -295,42 +304,46 @@
    {
       messageReferences.clear();
       
-      if (!this.scheduledTimeouts.isEmpty())
+      if (!scheduledRunnables.isEmpty())
       {
-         Set<Timeout> clone = new HashSet<Timeout>(scheduledTimeouts);
+         Set<ScheduledDeliveryRunnable> clone = new HashSet<ScheduledDeliveryRunnable>(scheduledRunnables);
          
-         for (Timeout timeout: clone)
+         for (ScheduledDeliveryRunnable runnable: clone)
          {
-            timeout.cancel();
+            runnable.cancel();
          }
          
-         scheduledTimeouts.clear();
+         scheduledRunnables.clear();
       }
    }
 
    public synchronized void removeReference(MessageReference messageReference)
    {
       messageReferences.remove(messageReference , messageReference.getMessage().getPriority());
-      if (!this.scheduledTimeouts.isEmpty())
+      
+      if (!scheduledRunnables.isEmpty())
       {
-         Set<Timeout> clone = new HashSet<Timeout>(scheduledTimeouts);
+         Set<ScheduledDeliveryRunnable> clone = new HashSet<ScheduledDeliveryRunnable>(scheduledRunnables);
 
-         for (Timeout timeout: clone)
+         for (ScheduledDeliveryRunnable runnable: clone)
          {
-            timeout.cancel();
+         	runnable.cancel();
          }
 
-         scheduledTimeouts.clear();
+         scheduledRunnables.clear();
       }
    }
 
+   //FIXME - probably better with an iterator
    public synchronized List<MessageReference> removeReferences(Filter filter)
    {
       List<MessageReference> allRefs = list(filter);
+      
       for (MessageReference messageReference : allRefs)
       {
          removeReference(messageReference);
       }
+      
       return allRefs;
    }
 
@@ -356,13 +369,12 @@
 
    public synchronized int getMessageCount()
    {
-     // log.info("mr: " + messageReferences.size() + " sc: " + getScheduledCount() + " dc: " + getDeliveringCount());
       return messageReferences.size() + getScheduledCount() + getDeliveringCount();
    }
    
    public synchronized int getScheduledCount()
    {
-      return scheduledTimeouts.size();
+      return scheduledRunnables.size();
    }
    
    public int getDeliveringCount()
@@ -382,7 +394,7 @@
 
    public synchronized void setMaxSize(int maxSize)
    {
-      int num = messageReferences.size() + scheduledTimeouts.size();
+      int num = messageReferences.size() + scheduledRunnables.size();
       
       if (maxSize < num)
       {
@@ -496,18 +508,22 @@
              
    private boolean checkAndSchedule(MessageReference ref)
    {
-      if (ref.getScheduledDeliveryTime() > System.currentTimeMillis())
+   	long now = System.currentTimeMillis();
+   	
+      if (scheduledExecutor != null && ref.getScheduledDeliveryTime() > now)
       {      
          if (trace) { log.trace("Scheduling delivery for " + ref + " to occur at " + ref.getScheduledDeliveryTime()); }
-         
-         // Schedule the cancel to actually occur at the specified time. 
+           
+         long delay = ref.getScheduledDeliveryTime() - now;
             
-         Timeout timeout =
-            MessagingTimeoutFactory.instance.getFactory().
-               schedule(ref.getScheduledDeliveryTime(), new DeliverRefTimeoutTarget(ref));
+         ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
          
-         scheduledTimeouts.add(timeout);
-                       
+         scheduledRunnables.add(runnable);
+                  
+         Future<?> future = scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
+
+         runnable.setFuture(future);
+                  
          return true;
       }
       else
@@ -518,7 +534,7 @@
    
    private boolean checkFull()
    {
-      if (maxSize != -1 && (messageReferences.size() + scheduledTimeouts.size()) >= maxSize)
+      if (maxSize != -1 && (messageReferences.size() + scheduledRunnables.size()) >= maxSize)
       {
          if (trace) { log.trace(this + " queue is full, rejecting message"); }
          
@@ -565,7 +581,7 @@
          {
             throw new IllegalStateException("ClientConsumer.handle() should never return null");
          }
-         
+           
          if (status == HandleStatus.HANDLED)
          {
             deliveringCount.incrementAndGet();
@@ -597,29 +613,55 @@
    
    // Inner classes --------------------------------------------------------------------------
    
-   private class DeliverRefTimeoutTarget implements TimeoutTarget
+   private class ScheduledDeliveryRunnable implements Runnable
    {
       private MessageReference ref;
+      
+      private volatile Future<?> future;
+      
+      private boolean cancelled;
 
-      public DeliverRefTimeoutTarget(MessageReference ref)
+      public ScheduledDeliveryRunnable(MessageReference ref)
       {
          this.ref = ref;
       }
+      
+      public synchronized void setFuture(Future<?> future)
+      {
+      	if (cancelled)
+      	{
+      		future.cancel(false);
+      	}
+      	else
+      	{
+      		this.future = future;
+      	}
+      }
+      
+      public synchronized void cancel()
+      {
+      	if (future != null)
+      	{
+      		future.cancel(false);      		      		
+      	}
+      	
+      	cancelled = true;
+      }
 
-      public void timedOut(Timeout timeout)
+      public void run()
       {
          if (trace) { log.trace("Scheduled delivery timeout " + ref); }
          
-         synchronized (scheduledTimeouts)
+         synchronized (scheduledRunnables)
          {
-            boolean removed = scheduledTimeouts.remove(timeout);
+            boolean removed = scheduledRunnables.remove(this);
             
             if (!removed)
             {
-               throw new IllegalStateException("Failed to remove timeout " + timeout);
+            	log.warn("Failed to remove timeout " + this);
             }
          }
-              
+         
          ref.setScheduledDeliveryTime(0);
                   
          HandleStatus status = deliver(ref);

Modified: trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java	2008-02-23 01:17:46 UTC (rev 3776)
+++ trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java	2008-02-23 13:51:34 UTC (rev 3777)
@@ -25,10 +25,11 @@
 import java.beans.PropertyChangeListener;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import org.jboss.aop.microcontainer.aspects.jmx.JMX;
 import org.jboss.jms.server.ConnectionManager;
-import org.jboss.jms.server.MessagingTimeoutFactory;
 import org.jboss.jms.server.SecurityStore;
 import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
 import org.jboss.jms.server.endpoint.MessagingServerPacketHandler;
@@ -36,7 +37,19 @@
 import org.jboss.jms.server.security.Role;
 import org.jboss.jms.server.security.SecurityMetadataStore;
 import org.jboss.logging.Logger;
-import org.jboss.messaging.core.*;
+import org.jboss.messaging.core.Binding;
+import org.jboss.messaging.core.Configuration;
+import org.jboss.messaging.core.Filter;
+import org.jboss.messaging.core.MemoryManager;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.MessagingServer;
+import org.jboss.messaging.core.NullPersistenceManager;
+import org.jboss.messaging.core.PersistenceManager;
+import org.jboss.messaging.core.PostOffice;
+import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.QueueFactory;
+import org.jboss.messaging.core.QueueSettings;
+import org.jboss.messaging.core.ResourceManager;
 import org.jboss.messaging.core.impl.QueueFactoryImpl;
 import org.jboss.messaging.core.impl.ResourceManagerImpl;
 import org.jboss.messaging.core.impl.memory.SimpleMemoryManager;
@@ -54,8 +67,6 @@
 import org.jboss.messaging.util.Version;
 import org.jboss.security.AuthenticationManager;
 
-import javax.jms.Destination;
-
 /**
  * A Messaging Server
  *
@@ -105,8 +116,9 @@
    private Configuration configuration = new Configuration();
    private HierarchicalRepository<HashSet<Role>> securityRepository = new HierarchicalObjectRepository<HashSet<Role>>();
    private HierarchicalRepository<QueueSettings> queueSettingsRepository = new HierarchicalObjectRepository<QueueSettings>();
-   private QueueFactory queueFactory = new QueueFactoryImpl();
+   private QueueFactory queueFactory;
    private ResourceManager resourceManager = new ResourceManagerImpl(0);
+   private ScheduledExecutorService scheduledExecutor;
 
    // Constructors ---------------------------------------------------------------------------------
    /**
@@ -163,6 +175,8 @@
          queueSettingsDeployer = new QueueSettingsDeployer();
          queueSettingsRepository.setDefault(new QueueSettings());
          queueSettingsDeployer.setQueueSettingsRepository(queueSettingsRepository);
+         scheduledExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize());
+         queueFactory = new QueueFactoryImpl(scheduledExecutor);
          queueFactory.setQueueSettingsRepository(queueSettingsRepository);
          connectionManager = new SimpleConnectionManager();
          memoryManager = new SimpleMemoryManager();
@@ -241,12 +255,12 @@
          messageCounterManager = null;
          postOffice.stop();
          postOffice = null;
+         scheduledExecutor.shutdown();
+         scheduledExecutor = null;
          if (createTransport)
          {
             remotingService.stop();
          }
-         MessagingTimeoutFactory.instance.reset();
-
          log.info("JMS " + this + " stopped");
       }
       catch (Throwable t)

Modified: trunk/tests/src/org/jboss/messaging/core/impl/test/timing/QueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/impl/test/timing/QueueTest.java	2008-02-23 01:17:46 UTC (rev 3776)
+++ trunk/tests/src/org/jboss/messaging/core/impl/test/timing/QueueTest.java	2008-02-23 13:51:34 UTC (rev 3777)
@@ -23,12 +23,15 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import org.jboss.messaging.core.MessageReference;
 import org.jboss.messaging.core.Queue;
 import org.jboss.messaging.core.impl.QueueImpl;
 import org.jboss.messaging.core.impl.test.unit.fakes.FakeConsumer;
 import org.jboss.messaging.test.unit.UnitTestCase;
+import org.jboss.messaging.util.Logger;
 
 /**
  * 
@@ -43,6 +46,24 @@
 {
    private static final long TIMEOUT = 10000;
    
+   private static final Logger log = Logger.getLogger(QueueTest.class);
+   
+   private ScheduledExecutorService scheduledExecutor;
+   
+   public void setUp() throws Exception
+   {
+   	super.setUp();
+   	
+   	scheduledExecutor = new ScheduledThreadPoolExecutor(1);
+   }
+   
+   public void tearDown() throws Exception
+   {
+   	super.tearDown();
+   	
+   	scheduledExecutor.shutdownNow();
+   }
+   
    // The tests ----------------------------------------------------------------
 
    public void testScheduledDirect()
@@ -57,7 +78,7 @@
    
    public void testScheduledNoConsumer() throws Exception
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
            
       //Send one scheduled
       
@@ -123,7 +144,7 @@
    
    private void testScheduled(boolean direct)
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       FakeConsumer consumer = null;
       
@@ -190,7 +211,6 @@
       refs.clear();
       consumer.getReferences().clear();
                   
-      
       MessageReference ref = consumer.waitForNextReference(TIMEOUT);
       assertEquals(ref7, ref);
       long now2 = System.currentTimeMillis();




More information about the jboss-cvs-commits mailing list