Author: timfox
Date: 2009-11-02 12:46:47 -0500 (Mon, 02 Nov 2009)
New Revision: 8186
Modified:
trunk/src/main/org/hornetq/core/exception/HornetQException.java
trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
various tweaks including fixing bridge reconnect test
Modified: trunk/src/main/org/hornetq/core/exception/HornetQException.java
===================================================================
--- trunk/src/main/org/hornetq/core/exception/HornetQException.java 2009-11-02 16:10:49
UTC (rev 8185)
+++ trunk/src/main/org/hornetq/core/exception/HornetQException.java 2009-11-02 17:46:47
UTC (rev 8186)
@@ -61,6 +61,8 @@
public static final int LARGE_MESSAGE_ERROR_BODY = 110;
public static final int TRANSACTION_ROLLED_BACK = 111;
+
+ public static final int SESSION_CREATION_REJECTED = 112;
// Native Error codes ----------------------------------------------
Modified: trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-11-02
16:10:49 UTC (rev 8185)
+++ trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-11-02
17:46:47 UTC (rev 8186)
@@ -695,7 +695,7 @@
}
// start sending notification *messages* only when the *remoting service*
if started
- if (messagingServer == null || !messagingServer.isStarted() ||
+ if (messagingServer == null ||
!messagingServer.getRemotingService().isStarted())
{
return;
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2009-11-02
16:10:49 UTC (rev 8185)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2009-11-02
17:46:47 UTC (rev 8186)
@@ -49,10 +49,8 @@
private final Channel channel1;
private final RemotingConnection connection;
-
- public HornetQPacketHandler(final HornetQServer server,
- final Channel channel1,
- final RemotingConnection connection)
+
+ public HornetQPacketHandler(final HornetQServer server, final Channel channel1, final
RemotingConnection connection)
{
this.server = server;
@@ -64,11 +62,11 @@
public void handlePacket(final Packet packet)
{
byte type = packet.getType();
-
+
// All these operations need to be idempotent since they are outside of the
session
// reliability replay functionality
switch (type)
- {
+ {
case CREATESESSION:
{
CreateSessionMessage request = (CreateSessionMessage)packet;
@@ -76,7 +74,7 @@
handleCreateSession(request);
break;
- }
+ }
case REATTACH_SESSION:
{
ReattachSessionMessage request = (ReattachSessionMessage)packet;
@@ -90,21 +88,21 @@
// Create queue can also be fielded here in the case of a replicated store
and forward queue creation
CreateQueueMessage request = (CreateQueueMessage)packet;
-
+
handleCreateQueue(request);
break;
- }
+ }
case CREATE_REPLICATION:
{
// Create queue can also be fielded here in the case of a replicated store
and forward queue creation
CreateReplicationSessionMessage request =
(CreateReplicationSessionMessage)packet;
-
+
handleCreateReplication(request);
break;
- }
+ }
default:
{
log.error("Invalid packet " + packet);
@@ -118,7 +116,7 @@
try
{
response = server.createSession(request.getName(),
- request.getSessionChannelID(),
+ request.getSessionChannelID(),
request.getUsername(),
request.getPassword(),
request.getMinLargeMessageSize(),
@@ -129,11 +127,6 @@
request.isPreAcknowledge(),
request.isXA(),
request.getWindowSize());
-
- if (response == null)
- {
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS));
- }
}
catch (Exception e)
{
@@ -148,10 +141,10 @@
response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
}
}
-
- channel1.send(response);
+
+ channel1.send(response);
}
-
+
private void handleReattachSession(final ReattachSessionMessage request)
{
Packet response;
@@ -181,14 +174,18 @@
{
try
{
- server.createQueue(request.getAddress(), request.getQueueName(),
request.getFilterString(), request.isDurable(), request.isTemporary());
+ server.createQueue(request.getAddress(),
+ request.getQueueName(),
+ request.getFilterString(),
+ request.isDurable(),
+ request.isTemporary());
}
catch (Exception e)
{
log.error("Failed to handle create queue", e);
}
}
-
+
private void handleCreateReplication(final CreateReplicationSessionMessage request)
{
Packet response;
@@ -196,17 +193,17 @@
try
{
Channel channel = connection.getChannel(request.getSessionChannelID(),
request.getWindowSize());
-
+
ReplicationEndpoint endpoint = server.createReplicationEndpoint(channel);
-
+
channel.setHandler(endpoint);
-
+
response = new NullResponseMessage();
}
- catch (Exception e)
+ catch (Exception e)
{
log.warn(e.getMessage(), e);
-
+
if (e instanceof HornetQException)
{
response = new HornetQExceptionMessage((HornetQException)e);
@@ -219,9 +216,5 @@
channel1.send(response);
}
-
-
-
-
}
\ No newline at end of file
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-02 16:10:49
UTC (rev 8185)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-02 17:46:47
UTC (rev 8186)
@@ -135,9 +135,9 @@
// Attributes
//
-----------------------------------------------------------------------------------
- private SimpleString nodeID;
+ private volatile SimpleString nodeID;
- private UUID uuid;
+ private volatile UUID uuid;
private final Version version;
@@ -149,39 +149,39 @@
private volatile boolean started;
- private SecurityStore securityStore;
+ private volatile SecurityStore securityStore;
private final HierarchicalRepository<AddressSettings>
addressSettingsRepository;
- private QueueFactory queueFactory;
+ private volatile QueueFactory queueFactory;
- private PagingManager pagingManager;
+ private volatile PagingManager pagingManager;
- private PostOffice postOffice;
+ private volatile PostOffice postOffice;
- private ExecutorService threadPool;
+ private volatile ExecutorService threadPool;
- private ScheduledExecutorService scheduledPool;
+ private volatile ScheduledExecutorService scheduledPool;
- private ExecutorFactory executorFactory;
+ private volatile ExecutorFactory executorFactory;
- private HierarchicalRepository<Set<Role>> securityRepository;
+ private volatile HierarchicalRepository<Set<Role>> securityRepository;
- private ResourceManager resourceManager;
+ private volatile ResourceManager resourceManager;
- private HornetQServerControlImpl messagingServerControl;
+ private volatile HornetQServerControlImpl messagingServerControl;
- private ClusterManager clusterManager;
+ private volatile ClusterManager clusterManager;
- private StorageManager storageManager;
+ private volatile StorageManager storageManager;
- private RemotingService remotingService;
+ private volatile RemotingService remotingService;
- private ManagementService managementService;
+ private volatile ManagementService managementService;
private MemoryManager memoryManager;
- private DeploymentManager deploymentManager;
+ private volatile DeploymentManager deploymentManager;
private Deployer basicUserCredentialsDeployer;
@@ -205,7 +205,7 @@
private final Set<ActivateCallback> activateCallbacks = new
HashSet<ActivateCallback>();
- private GroupingHandler groupingHandler;
+ private volatile GroupingHandler groupingHandler;
// Constructors
// ---------------------------------------------------------------------------------
@@ -258,8 +258,6 @@
this.addressSettingsRepository = new
HierarchicalObjectRepository<AddressSettings>();
addressSettingsRepository.setDefault(new AddressSettings());
-
- // this.managementConnectorID = managementConnectorSequence.decrementAndGet();
}
// lifecycle methods
@@ -317,92 +315,130 @@
super.finalize();
}
- public synchronized void stop() throws Exception
+ public void stop() throws Exception
{
- if (!started)
+ synchronized (this)
{
- return;
- }
+ if (!started)
+ {
+ return;
+ }
- if (clusterManager != null)
- {
- clusterManager.stop();
- }
+ if (clusterManager != null)
+ {
+ clusterManager.stop();
+ }
- if (groupingHandler != null)
- {
- managementService.removeNotificationListener(groupingHandler);
- groupingHandler = null;
- }
- // Need to flush all sessions to make sure all confirmations get sent back to
client
+ if (groupingHandler != null)
+ {
+ managementService.removeNotificationListener(groupingHandler);
+ groupingHandler = null;
+ }
+ // Need to flush all sessions to make sure all confirmations get sent back to
client
- for (ServerSession session : sessions.values())
- {
- session.getChannel().flushConfirmations();
- }
+ for (ServerSession session : sessions.values())
+ {
+ session.getChannel().flushConfirmations();
+ }
- remotingService.stop();
+ remotingService.stop();
- // Stop the deployers
- if (configuration.isFileDeploymentEnabled())
- {
- basicUserCredentialsDeployer.stop();
+ // Stop the deployers
+ if (configuration.isFileDeploymentEnabled())
+ {
+ basicUserCredentialsDeployer.stop();
- addressSettingsDeployer.stop();
+ addressSettingsDeployer.stop();
- if (queueDeployer != null)
+ if (queueDeployer != null)
+ {
+ queueDeployer.stop();
+ }
+
+ if (securityDeployer != null)
+ {
+ securityDeployer.stop();
+ }
+
+ deploymentManager.stop();
+ }
+
+ managementService.unregisterServer();
+
+ managementService.stop();
+
+ if (storageManager != null)
{
- queueDeployer.stop();
+ storageManager.stop();
}
- if (securityDeployer != null)
+ if (replicationEndpoint != null)
{
- securityDeployer.stop();
+ replicationEndpoint.stop();
+ replicationEndpoint = null;
}
- deploymentManager.stop();
- }
+ if (securityManager != null)
+ {
+ securityManager.stop();
+ }
- managementService.unregisterServer();
+ if (resourceManager != null)
+ {
+ resourceManager.stop();
+ }
- managementService.stop();
+ if (postOffice != null)
+ {
+ postOffice.stop();
+ }
- if (storageManager != null)
- {
- storageManager.stop();
- }
+ // Need to shutdown pools before shutting down paging manager to make sure
everything is written ok
- if (replicationEndpoint != null)
- {
- replicationEndpoint.stop();
- replicationEndpoint = null;
- }
+ List<Runnable> tasks = scheduledPool.shutdownNow();
- if (securityManager != null)
- {
- securityManager.stop();
- }
+ for (Runnable task : tasks)
+ {
+ log.debug("Waiting for " + task);
+ }
- if (resourceManager != null)
- {
- resourceManager.stop();
- }
+ threadPool.shutdown();
- if (postOffice != null)
- {
- postOffice.stop();
- }
+ scheduledPool = null;
- // Need to shutdown pools before shutting down paging manager to make sure
everything is written ok
+ if (pagingManager != null)
+ {
+ pagingManager.stop();
+ }
- List<Runnable> tasks = scheduledPool.shutdownNow();
+ if (memoryManager != null)
+ {
+ memoryManager.stop();
+ }
- for (Runnable task : tasks)
- {
- log.debug("Waiting for " + task);
+ pagingManager = null;
+ securityStore = null;
+ resourceManager = null;
+ postOffice = null;
+ securityRepository = null;
+ securityStore = null;
+ queueFactory = null;
+ resourceManager = null;
+ messagingServerControl = null;
+ memoryManager = null;
+
+ sessions.clear();
+
+ started = false;
+ initialised = false;
+ uuid = null;
+ nodeID = null;
+
+ log.info("HornetQ Server version " + getVersion().getFullVersion() +
" stopped");
+
+ Logger.reset();
}
- threadPool.shutdown();
try
{
if (!threadPool.awaitTermination(30000, TimeUnit.MILLISECONDS))
@@ -414,41 +450,7 @@
{
// Ignore
}
-
- scheduledPool = null;
threadPool = null;
-
- if (pagingManager != null)
- {
- pagingManager.stop();
- }
-
- if (memoryManager != null)
- {
- memoryManager.stop();
- }
-
- pagingManager = null;
- securityStore = null;
- resourceManager = null;
- postOffice = null;
- securityRepository = null;
- securityStore = null;
- queueFactory = null;
- resourceManager = null;
- messagingServerControl = null;
- memoryManager = null;
-
- sessions.clear();
-
- started = false;
- initialised = false;
- uuid = null;
- nodeID = null;
-
- log.info("HornetQ Server version " + getVersion().getFullVersion() +
" stopped");
-
- Logger.reset();
}
// HornetQServer implementation
@@ -509,7 +511,7 @@
return version;
}
- public boolean isStarted()
+ public synchronized boolean isStarted()
{
return started;
}
@@ -520,9 +522,14 @@
}
public ReattachSessionResponseMessage reattachSession(final RemotingConnection
connection,
- final String name,
- final int lastReceivedCommandID)
throws Exception
+ final String name,
+ final int
lastReceivedCommandID) throws Exception
{
+ if (!started)
+ {
+ return null;
+ }
+
ServerSession session = sessions.get(name);
if (!checkActivate())
@@ -552,7 +559,7 @@
}
sessions.remove(name);
-
+
return new ReattachSessionResponseMessage(-1, false);
}
else
@@ -566,18 +573,23 @@
}
public CreateSessionResponseMessage createSession(final String name,
- final long channelID,
- final String username,
- final String password,
- final int minLargeMessageSize,
- final int incrementingVersion,
- final RemotingConnection
connection,
- final boolean autoCommitSends,
- final boolean autoCommitAcks,
- final boolean preAcknowledge,
- final boolean xa,
- final int sendWindowSize) throws
Exception
+ final long channelID,
+ final String username,
+ final String password,
+ final int
minLargeMessageSize,
+ final int
incrementingVersion,
+ final
RemotingConnection connection,
+ final boolean
autoCommitSends,
+ final boolean
autoCommitAcks,
+ final boolean
preAcknowledge,
+ final boolean xa,
+ final int
sendWindowSize) throws Exception
{
+ if (!started)
+ {
+ throw new HornetQException(HornetQException.SESSION_CREATION_REJECTED,
"Server not started");
+ }
+
if (version.getIncrementingVersion() != incrementingVersion)
{
log.warn("Client with version " + incrementingVersion +
@@ -588,14 +600,14 @@
". " +
"Please ensure all clients and servers are upgraded to the same
version for them to " +
"interoperate properly");
- return null;
+ throw new HornetQException(HornetQException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS,
"Server and client versions incompatible");
}
if (!checkActivate())
{
// Backup server is not ready to accept connections
- return new CreateSessionResponseMessage(version.getIncrementingVersion());
+ throw new HornetQException(HornetQException.SESSION_CREATION_REJECTED,
"Server will not accept create session requests");
}
if (securityStore != null)
@@ -673,7 +685,7 @@
return sessions.get(name);
}
- public List<ServerSession> getSessions(final String connectionID)
+ public synchronized List<ServerSession> getSessions(final String connectionID)
{
Set<Entry<String, ServerSession>> sessionEntries =
sessions.entrySet();
List<ServerSession> matchingSessions = new ArrayList<ServerSession>();
@@ -688,11 +700,12 @@
return matchingSessions;
}
- public Set<ServerSession> getSessions()
+ public synchronized Set<ServerSession> getSessions()
{
return new HashSet<ServerSession>(sessions.values());
}
+ //TODO - should this really be here?? It's only used in tests
public boolean isInitialised()
{
synchronized (initialiseLock)
@@ -727,19 +740,19 @@
}
public Queue createQueue(final SimpleString address,
- final SimpleString queueName,
- final SimpleString filterString,
- final boolean durable,
- final boolean temporary) throws Exception
+ final SimpleString queueName,
+ final SimpleString filterString,
+ final boolean durable,
+ final boolean temporary) throws Exception
{
return createQueue(address, queueName, filterString, durable, temporary, false);
}
public Queue deployQueue(final SimpleString address,
- final SimpleString queueName,
- final SimpleString filterString,
- final boolean durable,
- final boolean temporary) throws Exception
+ final SimpleString queueName,
+ final SimpleString filterString,
+ final boolean durable,
+ final boolean temporary) throws Exception
{
return createQueue(address, queueName, filterString, durable, temporary, true);
}
@@ -793,12 +806,12 @@
activateCallbacks.remove(callback);
}
- public ExecutorFactory getExecutorFactory()
+ public synchronized ExecutorFactory getExecutorFactory()
{
return executorFactory;
}
- public void setGroupingHandler(GroupingHandler groupingHandler)
+ public void setGroupingHandler(final GroupingHandler groupingHandler)
{
this.groupingHandler = groupingHandler;
}
@@ -900,7 +913,7 @@
return true;
}
- private synchronized void callActivateCallbacks()
+ private void callActivateCallbacks()
{
for (ActivateCallback callback : activateCallbacks)
{
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2009-11-02
16:10:49 UTC (rev 8185)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2009-11-02
17:46:47 UTC (rev 8186)
@@ -35,16 +35,16 @@
{
super.setUp();
- start();
+ start();
}
-
+
private void start() throws Exception
{
setupServers();
-
- setRedistributionDelay(0);
+
+ setRedistributionDelay(0);
}
-
+
private void stop() throws Exception
{
stopServers();
@@ -71,7 +71,7 @@
public void testRedistributionWhenConsumerIsClosed() throws Exception
{
setupCluster(false);
-
+
log.info("Doing test");
startServers(0, 1, 2);
@@ -105,7 +105,7 @@
removeConsumer(1);
verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
-
+
log.info("Test done");
}
@@ -358,100 +358,98 @@
verifyReceiveAll(20, 1);
verifyNotReceive(1);
}
-
+
public void testBackAndForth() throws Exception
{
for (int i = 0; i < 10; i++)
{
setupCluster(false);
-
+
startServers(0, 1, 2);
-
+
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
-
+
final String ADDRESS = "queues.testaddress";
final String QUEUE = "queue0";
-
-
+
createQueue(0, ADDRESS, QUEUE, null, false);
createQueue(1, ADDRESS, QUEUE, null, false);
createQueue(2, ADDRESS, QUEUE, null, false);
-
+
addConsumer(0, 0, QUEUE, null);
-
+
waitForBindings(0, ADDRESS, 1, 1, true);
waitForBindings(1, ADDRESS, 1, 0, true);
waitForBindings(2, ADDRESS, 1, 0, true);
-
+
waitForBindings(0, ADDRESS, 2, 0, false);
waitForBindings(1, ADDRESS, 2, 1, false);
waitForBindings(2, ADDRESS, 2, 1, false);
-
+
send(0, ADDRESS, 20, false, null);
-
+
waitForMessages(0, ADDRESS, 20);
-
+
removeConsumer(0);
-
+
waitForBindings(0, ADDRESS, 1, 0, true);
waitForBindings(1, ADDRESS, 1, 0, true);
waitForBindings(2, ADDRESS, 1, 0, true);
-
+
waitForBindings(0, ADDRESS, 2, 0, false);
waitForBindings(1, ADDRESS, 2, 0, false);
waitForBindings(2, ADDRESS, 2, 0, false);
-
+
addConsumer(1, 1, QUEUE, null);
-
+
waitForBindings(0, ADDRESS, 1, 0, true);
waitForBindings(1, ADDRESS, 1, 1, true);
waitForBindings(2, ADDRESS, 1, 0, true);
-
+
waitForMessages(1, ADDRESS, 20);
waitForMessages(0, ADDRESS, 0);
-
-
+
waitForBindings(0, ADDRESS, 2, 1, false);
waitForBindings(1, ADDRESS, 2, 0, false);
waitForBindings(2, ADDRESS, 2, 1, false);
-
+
removeConsumer(1);
-
+
waitForBindings(0, ADDRESS, 1, 0, true);
waitForBindings(1, ADDRESS, 1, 0, true);
waitForBindings(2, ADDRESS, 1, 0, true);
-
+
waitForBindings(0, ADDRESS, 2, 0, false);
waitForBindings(1, ADDRESS, 2, 0, false);
waitForBindings(2, ADDRESS, 2, 0, false);
-
+
addConsumer(0, 0, QUEUE, null);
-
+
waitForBindings(0, ADDRESS, 1, 1, true);
waitForBindings(1, ADDRESS, 1, 0, true);
waitForBindings(2, ADDRESS, 1, 0, true);
-
+
waitForBindings(0, ADDRESS, 2, 0, false);
waitForBindings(1, ADDRESS, 2, 1, false);
waitForBindings(2, ADDRESS, 2, 1, false);
-
+
waitForMessages(0, ADDRESS, 20);
-
+
verifyReceiveAll(20, 0);
verifyNotReceive(0);
-
+
addConsumer(1, 1, QUEUE, null);
verifyNotReceive(1);
removeConsumer(1);
-
+
stop();
start();
}
-
+
}
-
+
public void testRedistributionToQueuesWhereNotAllMessagesMatch() throws Exception
{
setupCluster(false);
@@ -469,7 +467,7 @@
createQueue(1, "queues.testaddress", "queue0", null, false);
createQueue(2, "queues.testaddress", "queue0", null, false);
- addConsumer(0, 0, "queue0", null);
+ addConsumer(0, 0, "queue0", null);
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 0, true);
@@ -483,18 +481,18 @@
sendInRange(0, "queues.testaddress", 10, 20, false, filter2);
removeConsumer(0);
- addConsumer(1, 1, "queue0", filter1);
- addConsumer(2, 2, "queue0", filter2);
+ addConsumer(1, 1, "queue0", filter1);
+ addConsumer(2, 2, "queue0", filter2);
verifyReceiveAllInRange(0, 10, 1);
verifyReceiveAllInRange(10, 20, 2);
}
-
+
public void testDelayedRedistribution() throws Exception
{
final long delay = 1000;
setRedistributionDelay(delay);
-
+
setupCluster(false);
startServers(0, 1, 2);
@@ -507,7 +505,7 @@
createQueue(1, "queues.testaddress", "queue0", null, false);
createQueue(2, "queues.testaddress", "queue0", null, false);
- addConsumer(0, 0, "queue0", null);
+ addConsumer(0, 0, "queue0", null);
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 0, true);
@@ -520,20 +518,20 @@
send(0, "queues.testaddress", 20, false, null);
long start = System.currentTimeMillis();
-
+
removeConsumer(0);
- addConsumer(1, 1, "queue0", null);
-
+ addConsumer(1, 1, "queue0", null);
+
long minReceiveTime = start + delay;
-
+
verifyReceiveAllNotBefore(minReceiveTime, 20, 1);
}
-
+
public void testDelayedRedistributionCancelled() throws Exception
{
final long delay = 1000;
setRedistributionDelay(delay);
-
+
setupCluster(false);
startServers(0, 1, 2);
@@ -546,7 +544,7 @@
createQueue(1, "queues.testaddress", "queue0", null, false);
createQueue(2, "queues.testaddress", "queue0", null, false);
- addConsumer(0, 0, "queue0", null);
+ addConsumer(0, 0, "queue0", null);
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 0, true);
@@ -559,18 +557,18 @@
send(0, "queues.testaddress", 20, false, null);
removeConsumer(0);
- addConsumer(1, 1, "queue0", null);
-
+ addConsumer(1, 1, "queue0", null);
+
Thread.sleep(delay / 2);
-
- //Add it back on the local queue - this should stop any redistributionm
- addConsumer(0, 0, "queue0", null);
-
+
+ // Add it back on the local queue - this should stop any redistributionm
+ addConsumer(0, 0, "queue0", null);
+
Thread.sleep(delay);
-
+
verifyReceiveAll(20, 0);
}
-
+
public void testRedistributionNumberOfMessagesGreaterThanBatchSize() throws Exception
{
setupCluster(false);
@@ -585,7 +583,7 @@
createQueue(1, "queues.testaddress", "queue0", null, false);
createQueue(2, "queues.testaddress", "queue0", null, false);
- addConsumer(0, 0, "queue0", null);
+ addConsumer(0, 0, "queue0", null);
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 0, true);
@@ -598,8 +596,8 @@
send(0, "queues.testaddress", QueueImpl.REDISTRIBUTOR_BATCH_SIZE * 2,
false, null);
removeConsumer(0);
- addConsumer(1, 1, "queue0", null);
-
+ addConsumer(1, 1, "queue0", null);
+
verifyReceiveAll(QueueImpl.REDISTRIBUTOR_BATCH_SIZE * 2, 1);
}
@@ -636,7 +634,7 @@
closeAllSessionFactories();
stopServers(0, 1, 2);
-
+
clearServer(0, 1, 2);
}