[jboss-cvs] JBoss Messaging SVN: r3777 - in trunk: src/main/org/jboss/jms/server and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Feb 23 08:51:35 EST 2008
Author: timfox
Date: 2008-02-23 08:51:34 -0500 (Sat, 23 Feb 2008)
New Revision: 3777
Removed:
trunk/src/main/org/jboss/jms/server/MessagingTimeoutFactory.java
Modified:
trunk/src/etc/server/default/deploy/jbm-configuration.xml
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/Configuration.java
trunk/src/main/org/jboss/messaging/core/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/impl/QueueFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java
trunk/tests/src/org/jboss/messaging/core/impl/test/timing/QueueTest.java
Log:
Some improvements to scheduled deliveries
Modified: trunk/src/etc/server/default/deploy/jbm-configuration.xml
===================================================================
--- trunk/src/etc/server/default/deploy/jbm-configuration.xml 2008-02-23 01:17:46 UTC (rev 3776)
+++ trunk/src/etc/server/default/deploy/jbm-configuration.xml 2008-02-23 13:51:34 UTC (rev 3777)
@@ -36,6 +36,8 @@
<post-office-name>JMS post office</post-office-name>
<clustered>false</clustered>
+
+ <scheduled-executor-max-pool-size>30</scheduled-executor-max-pool-size>
<!-- All the remaining properties only have to be specified if the post office is clustered.
You can safely comment them out if your post office is non clustered -->
Deleted: trunk/src/main/org/jboss/jms/server/MessagingTimeoutFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/MessagingTimeoutFactory.java 2008-02-23 01:17:46 UTC (rev 3776)
+++ trunk/src/main/org/jboss/jms/server/MessagingTimeoutFactory.java 2008-02-23 13:51:34 UTC (rev 3777)
@@ -1,81 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.jms.server;
-
-import org.jboss.util.threadpool.BasicThreadPool;
-import org.jboss.util.threadpool.ThreadPool;
-import org.jboss.util.timeout.TimeoutFactory;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- *
- */
-public class MessagingTimeoutFactory
-{
- // Constants ------------------------------------------------------------------------------------
-
- // Static ---------------------------------------------------------------------------------------
-
- public static MessagingTimeoutFactory instance = new MessagingTimeoutFactory();
-
- // Attributes -----------------------------------------------------------------------------------
-
- private TimeoutFactory factory;
-
- // Constructors ---------------------------------------------------------------------------------
-
- private MessagingTimeoutFactory()
- {
- createFactory();
- }
-
- // Public ---------------------------------------------------------------------------------------
-
- public TimeoutFactory getFactory()
- {
- return factory;
- }
-
- public synchronized void reset()
- {
- factory.cancel();
- createFactory();
- }
-
- // Package protected ----------------------------------------------------------------------------
-
- // Protected ------------------------------------------------------------------------------------
-
- // Private --------------------------------------------------------------------------------------
-
- private void createFactory()
- {
- ThreadPool threadPool = new BasicThreadPool("Messaging Timeout");
- factory = new TimeoutFactory(threadPool);
- }
-
- // Inner classes --------------------------------------------------------------------------------
-
-}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-02-23 01:17:46 UTC (rev 3776)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-02-23 13:51:34 UTC (rev 3777)
@@ -270,30 +270,6 @@
}
}
-// void localClose() throws Exception
-// {
-// if (trace) { log.trace(this + " grabbed the main lock in close() " + this); }
-//
-// messageQueue.removeConsumer(this);
-//
-// sessionEndpoint.getConnectionEndpoint().getMessagingServer().getRemotingService().getDispatcher().unregister(id);
-//
-// if (autoDeleteQueue)
-// {
-// if (messageQueue.getConsumerCount() == 0)
-// {
-// MessagingServer server = sessionEndpoint.getConnectionEndpoint().getMessagingServer();
-//
-// server.getPostOffice().removeBinding(messageQueue.getName());
-//
-// if (messageQueue.isDurable())
-// {
-// server.getPersistenceManager().deleteAllReferences(messageQueue);
-// }
-// }
-// }
-// }
-
// Protected ------------------------------------------------------------------------------------
// Private --------------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-02-23 01:17:46 UTC (rev 3776)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-02-23 13:51:34 UTC (rev 3777)
@@ -239,8 +239,7 @@
synchronized void handleDelivery(MessageReference ref, ServerConsumerEndpoint consumer) throws Exception
{
- Delivery delivery = new DeliveryImpl(ref, consumer.getID(),
- deliveryIDSequence++, sender);
+ Delivery delivery = new DeliveryImpl(ref, consumer.getID(), deliveryIDSequence++, sender);
deliveries.add(delivery);
Modified: trunk/src/main/org/jboss/messaging/core/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/Configuration.java 2008-02-23 01:17:46 UTC (rev 3776)
+++ trunk/src/main/org/jboss/messaging/core/Configuration.java 2008-02-23 13:51:34 UTC (rev 3777)
@@ -68,6 +68,8 @@
protected String _postOfficeName;
protected Boolean _clustered = false;
+
+ protected Integer _scheduledThreadPoolMaxSize = 30;
protected Long _stateTimeout = (long) 5000;
@@ -213,6 +215,16 @@
{
return _clustered;
}
+
+ public Integer getScheduledThreadPoolMaxSize()
+ {
+ return _scheduledThreadPoolMaxSize;
+ }
+
+ public void setScheduledThreadPoolMaxSize(int size)
+ {
+ this._scheduledThreadPoolMaxSize = size;
+ }
public void setClustered(Boolean clustered)
{
Modified: trunk/src/main/org/jboss/messaging/core/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/FileConfiguration.java 2008-02-23 01:17:46 UTC (rev 3776)
+++ trunk/src/main/org/jboss/messaging/core/FileConfiguration.java 2008-02-23 13:51:34 UTC (rev 3777)
@@ -61,6 +61,7 @@
_strictTck = getBoolean(e, "strict-tck", _strictTck);
_postOfficeName = getString(e, "post-office-name", _postOfficeName);
_clustered = getBoolean(e, "clustered", _clustered);
+ _scheduledThreadPoolMaxSize = getInteger(e, "scheduled-executor-max-pool-size", _scheduledThreadPoolMaxSize);
_stateTimeout = getLong(e, "state-timeout", _stateTimeout);
_castTimeout = getLong(e, "cast-timeout", _castTimeout);
_groupName = getString(e, "group-name", _groupName);
Modified: trunk/src/main/org/jboss/messaging/core/impl/QueueFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/QueueFactoryImpl.java 2008-02-23 01:17:46 UTC (rev 3776)
+++ trunk/src/main/org/jboss/messaging/core/impl/QueueFactoryImpl.java 2008-02-23 13:51:34 UTC (rev 3777)
@@ -21,6 +21,8 @@
*/
package org.jboss.messaging.core.impl;
+import java.util.concurrent.ScheduledExecutorService;
+
import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.Queue;
import org.jboss.messaging.core.QueueFactory;
@@ -37,13 +39,22 @@
*/
public class QueueFactoryImpl implements QueueFactory
{
- HierarchicalRepository<QueueSettings> queueSettingsRepository;
+ private HierarchicalRepository<QueueSettings> queueSettingsRepository;
+
+ private ScheduledExecutorService scheduledExecutor;
-
+ public QueueFactoryImpl(ScheduledExecutorService scheduledExecutor)
+ {
+ this();
+
+ this.scheduledExecutor = scheduledExecutor;
+ }
+
public QueueFactoryImpl()
{
queueSettingsRepository = new HierarchicalObjectRepository<QueueSettings>();
- queueSettingsRepository.setDefault(new QueueSettings());
+
+ queueSettingsRepository.setDefault(new QueueSettings());
}
public Queue createQueue(long id, String name, Filter filter,
@@ -51,11 +62,16 @@
{
QueueSettings queueSettings = queueSettingsRepository.getMatch(name);
- Queue queue = new QueueImpl(id, name, filter, queueSettings.isClustered(), durable, temporary, queueSettings.getMaxSize());
+ Queue queue = new QueueImpl(id, name, filter, queueSettings.isClustered(),
+ durable, temporary, queueSettings.getMaxSize(),
+ scheduledExecutor);
queue.setMaxDeliveryAttempts(queueSettings.getMaxDeliveryAttempts());
+
queue.setMessageCounterHistoryDayLimit(queueSettings.getMessageCounterHistoryDayLimit());
+
queue.setRedeliveryDelay(queueSettings.getRedeliveryDelay());
+
queue.setDistributionPolicy(queueSettings.getDistributionPolicy());
return queue;
Modified: trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java 2008-02-23 01:17:46 UTC (rev 3776)
+++ trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java 2008-02-23 13:51:34 UTC (rev 3777)
@@ -27,9 +27,11 @@
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.jboss.jms.server.MessagingTimeoutFactory;
import org.jboss.messaging.core.Consumer;
import org.jboss.messaging.core.DistributionPolicy;
import org.jboss.messaging.core.Filter;
@@ -38,8 +40,6 @@
import org.jboss.messaging.core.PriorityLinkedList;
import org.jboss.messaging.core.Queue;
import org.jboss.messaging.util.Logger;
-import org.jboss.util.timeout.Timeout;
-import org.jboss.util.timeout.TimeoutTarget;
/**
*
@@ -74,7 +74,7 @@
protected List<Consumer> consumers;
- protected Set<Timeout> scheduledTimeouts;
+ protected Set<ScheduledDeliveryRunnable> scheduledRunnables;
protected DistributionPolicy distributionPolicy;
@@ -87,6 +87,8 @@
private AtomicInteger messagesAdded = new AtomicInteger(0);
private AtomicInteger deliveringCount = new AtomicInteger(0);
+
+ private ScheduledExecutorService scheduledExecutor;
// ---------
@@ -100,6 +102,13 @@
private int messageCounterHistoryDayLimit;
+ public QueueImpl(long id, String name, Filter filter, boolean clustered,
+ boolean durable, boolean temporary, int maxSize, ScheduledExecutorService scheduledExecutor)
+ {
+ this(id, name, filter, clustered, durable, temporary, maxSize);
+
+ this.scheduledExecutor = scheduledExecutor;
+ }
public QueueImpl(long id, String name, Filter filter, boolean clustered,
boolean durable, boolean temporary, int maxSize)
@@ -123,7 +132,7 @@
consumers = new ArrayList<Consumer>();
- scheduledTimeouts = new HashSet<Timeout>();
+ scheduledRunnables = new HashSet<ScheduledDeliveryRunnable>();
distributionPolicy = new RoundRobinDistributionPolicy();
@@ -295,42 +304,46 @@
{
messageReferences.clear();
- if (!this.scheduledTimeouts.isEmpty())
+ if (!scheduledRunnables.isEmpty())
{
- Set<Timeout> clone = new HashSet<Timeout>(scheduledTimeouts);
+ Set<ScheduledDeliveryRunnable> clone = new HashSet<ScheduledDeliveryRunnable>(scheduledRunnables);
- for (Timeout timeout: clone)
+ for (ScheduledDeliveryRunnable runnable: clone)
{
- timeout.cancel();
+ runnable.cancel();
}
- scheduledTimeouts.clear();
+ scheduledRunnables.clear();
}
}
public synchronized void removeReference(MessageReference messageReference)
{
messageReferences.remove(messageReference , messageReference.getMessage().getPriority());
- if (!this.scheduledTimeouts.isEmpty())
+
+ if (!scheduledRunnables.isEmpty())
{
- Set<Timeout> clone = new HashSet<Timeout>(scheduledTimeouts);
+ Set<ScheduledDeliveryRunnable> clone = new HashSet<ScheduledDeliveryRunnable>(scheduledRunnables);
- for (Timeout timeout: clone)
+ for (ScheduledDeliveryRunnable runnable: clone)
{
- timeout.cancel();
+ runnable.cancel();
}
- scheduledTimeouts.clear();
+ scheduledRunnables.clear();
}
}
+ //FIXME - probably better with an iterator
public synchronized List<MessageReference> removeReferences(Filter filter)
{
List<MessageReference> allRefs = list(filter);
+
for (MessageReference messageReference : allRefs)
{
removeReference(messageReference);
}
+
return allRefs;
}
@@ -356,13 +369,12 @@
public synchronized int getMessageCount()
{
- // log.info("mr: " + messageReferences.size() + " sc: " + getScheduledCount() + " dc: " + getDeliveringCount());
return messageReferences.size() + getScheduledCount() + getDeliveringCount();
}
public synchronized int getScheduledCount()
{
- return scheduledTimeouts.size();
+ return scheduledRunnables.size();
}
public int getDeliveringCount()
@@ -382,7 +394,7 @@
public synchronized void setMaxSize(int maxSize)
{
- int num = messageReferences.size() + scheduledTimeouts.size();
+ int num = messageReferences.size() + scheduledRunnables.size();
if (maxSize < num)
{
@@ -496,18 +508,22 @@
private boolean checkAndSchedule(MessageReference ref)
{
- if (ref.getScheduledDeliveryTime() > System.currentTimeMillis())
+ long now = System.currentTimeMillis();
+
+ if (scheduledExecutor != null && ref.getScheduledDeliveryTime() > now)
{
if (trace) { log.trace("Scheduling delivery for " + ref + " to occur at " + ref.getScheduledDeliveryTime()); }
-
- // Schedule the cancel to actually occur at the specified time.
+
+ long delay = ref.getScheduledDeliveryTime() - now;
- Timeout timeout =
- MessagingTimeoutFactory.instance.getFactory().
- schedule(ref.getScheduledDeliveryTime(), new DeliverRefTimeoutTarget(ref));
+ ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
- scheduledTimeouts.add(timeout);
-
+ scheduledRunnables.add(runnable);
+
+ Future<?> future = scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
+
+ runnable.setFuture(future);
+
return true;
}
else
@@ -518,7 +534,7 @@
private boolean checkFull()
{
- if (maxSize != -1 && (messageReferences.size() + scheduledTimeouts.size()) >= maxSize)
+ if (maxSize != -1 && (messageReferences.size() + scheduledRunnables.size()) >= maxSize)
{
if (trace) { log.trace(this + " queue is full, rejecting message"); }
@@ -565,7 +581,7 @@
{
throw new IllegalStateException("ClientConsumer.handle() should never return null");
}
-
+
if (status == HandleStatus.HANDLED)
{
deliveringCount.incrementAndGet();
@@ -597,29 +613,55 @@
// Inner classes --------------------------------------------------------------------------
- private class DeliverRefTimeoutTarget implements TimeoutTarget
+ private class ScheduledDeliveryRunnable implements Runnable
{
private MessageReference ref;
+
+ private volatile Future<?> future;
+
+ private boolean cancelled;
- public DeliverRefTimeoutTarget(MessageReference ref)
+ public ScheduledDeliveryRunnable(MessageReference ref)
{
this.ref = ref;
}
+
+ public synchronized void setFuture(Future<?> future)
+ {
+ if (cancelled)
+ {
+ future.cancel(false);
+ }
+ else
+ {
+ this.future = future;
+ }
+ }
+
+ public synchronized void cancel()
+ {
+ if (future != null)
+ {
+ future.cancel(false);
+ }
+
+ cancelled = true;
+ }
- public void timedOut(Timeout timeout)
+ public void run()
{
if (trace) { log.trace("Scheduled delivery timeout " + ref); }
- synchronized (scheduledTimeouts)
+ synchronized (scheduledRunnables)
{
- boolean removed = scheduledTimeouts.remove(timeout);
+ boolean removed = scheduledRunnables.remove(this);
if (!removed)
{
- throw new IllegalStateException("Failed to remove timeout " + timeout);
+ log.warn("Failed to remove timeout " + this);
}
}
-
+
ref.setScheduledDeliveryTime(0);
HandleStatus status = deliver(ref);
Modified: trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java 2008-02-23 01:17:46 UTC (rev 3776)
+++ trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java 2008-02-23 13:51:34 UTC (rev 3777)
@@ -25,10 +25,11 @@
import java.beans.PropertyChangeListener;
import java.util.HashSet;
import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.jboss.aop.microcontainer.aspects.jmx.JMX;
import org.jboss.jms.server.ConnectionManager;
-import org.jboss.jms.server.MessagingTimeoutFactory;
import org.jboss.jms.server.SecurityStore;
import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
import org.jboss.jms.server.endpoint.MessagingServerPacketHandler;
@@ -36,7 +37,19 @@
import org.jboss.jms.server.security.Role;
import org.jboss.jms.server.security.SecurityMetadataStore;
import org.jboss.logging.Logger;
-import org.jboss.messaging.core.*;
+import org.jboss.messaging.core.Binding;
+import org.jboss.messaging.core.Configuration;
+import org.jboss.messaging.core.Filter;
+import org.jboss.messaging.core.MemoryManager;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.MessagingServer;
+import org.jboss.messaging.core.NullPersistenceManager;
+import org.jboss.messaging.core.PersistenceManager;
+import org.jboss.messaging.core.PostOffice;
+import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.QueueFactory;
+import org.jboss.messaging.core.QueueSettings;
+import org.jboss.messaging.core.ResourceManager;
import org.jboss.messaging.core.impl.QueueFactoryImpl;
import org.jboss.messaging.core.impl.ResourceManagerImpl;
import org.jboss.messaging.core.impl.memory.SimpleMemoryManager;
@@ -54,8 +67,6 @@
import org.jboss.messaging.util.Version;
import org.jboss.security.AuthenticationManager;
-import javax.jms.Destination;
-
/**
* A Messaging Server
*
@@ -105,8 +116,9 @@
private Configuration configuration = new Configuration();
private HierarchicalRepository<HashSet<Role>> securityRepository = new HierarchicalObjectRepository<HashSet<Role>>();
private HierarchicalRepository<QueueSettings> queueSettingsRepository = new HierarchicalObjectRepository<QueueSettings>();
- private QueueFactory queueFactory = new QueueFactoryImpl();
+ private QueueFactory queueFactory;
private ResourceManager resourceManager = new ResourceManagerImpl(0);
+ private ScheduledExecutorService scheduledExecutor;
// Constructors ---------------------------------------------------------------------------------
/**
@@ -163,6 +175,8 @@
queueSettingsDeployer = new QueueSettingsDeployer();
queueSettingsRepository.setDefault(new QueueSettings());
queueSettingsDeployer.setQueueSettingsRepository(queueSettingsRepository);
+ scheduledExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize());
+ queueFactory = new QueueFactoryImpl(scheduledExecutor);
queueFactory.setQueueSettingsRepository(queueSettingsRepository);
connectionManager = new SimpleConnectionManager();
memoryManager = new SimpleMemoryManager();
@@ -241,12 +255,12 @@
messageCounterManager = null;
postOffice.stop();
postOffice = null;
+ scheduledExecutor.shutdown();
+ scheduledExecutor = null;
if (createTransport)
{
remotingService.stop();
}
- MessagingTimeoutFactory.instance.reset();
-
log.info("JMS " + this + " stopped");
}
catch (Throwable t)
Modified: trunk/tests/src/org/jboss/messaging/core/impl/test/timing/QueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/impl/test/timing/QueueTest.java 2008-02-23 01:17:46 UTC (rev 3776)
+++ trunk/tests/src/org/jboss/messaging/core/impl/test/timing/QueueTest.java 2008-02-23 13:51:34 UTC (rev 3777)
@@ -23,12 +23,15 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.jboss.messaging.core.MessageReference;
import org.jboss.messaging.core.Queue;
import org.jboss.messaging.core.impl.QueueImpl;
import org.jboss.messaging.core.impl.test.unit.fakes.FakeConsumer;
import org.jboss.messaging.test.unit.UnitTestCase;
+import org.jboss.messaging.util.Logger;
/**
*
@@ -43,6 +46,24 @@
{
private static final long TIMEOUT = 10000;
+ private static final Logger log = Logger.getLogger(QueueTest.class);
+
+ private ScheduledExecutorService scheduledExecutor;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ scheduledExecutor = new ScheduledThreadPoolExecutor(1);
+ }
+
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ scheduledExecutor.shutdownNow();
+ }
+
// The tests ----------------------------------------------------------------
public void testScheduledDirect()
@@ -57,7 +78,7 @@
public void testScheduledNoConsumer() throws Exception
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
//Send one scheduled
@@ -123,7 +144,7 @@
private void testScheduled(boolean direct)
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
FakeConsumer consumer = null;
@@ -190,7 +211,6 @@
refs.clear();
consumer.getReferences().clear();
-
MessageReference ref = consumer.waitForNextReference(TIMEOUT);
assertEquals(ref7, ref);
long now2 = System.currentTimeMillis();
More information about the jboss-cvs-commits
mailing list