[jboss-cvs] JBoss Messaging SVN: r6569 - in trunk: src/main/org/jboss/messaging/core/cluster/impl and 8 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Apr 26 08:42:53 EDT 2009
Author: timfox
Date: 2009-04-26 08:42:52 -0400 (Sun, 26 Apr 2009)
New Revision: 6569
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlowRecord.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
more fixes
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-04-26 12:42:52 UTC (rev 6569)
@@ -506,7 +506,7 @@
boolean attemptFailover = (backupConnectorFactory) != null && (failoverOnServerShutdown || me.getCode() != MessagingException.SERVER_DISCONNECTED);
boolean done = false;
-
+
if (attemptFailover || reconnectAttempts != 0)
{
lockAllChannel1s();
@@ -709,7 +709,7 @@
while (true)
{
if (closed)
- {
+ {
return null;
}
Modified: trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java 2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java 2009-04-26 12:42:52 UTC (rev 6569)
@@ -368,7 +368,7 @@
Map.Entry<String, DiscoveryEntry> entry = iter.next();
if (entry.getValue().getLastUpdate() + timeout <= now)
- {
+ {
iter.remove();
changed = true;
Modified: trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2009-04-26 12:42:52 UTC (rev 6569)
@@ -60,6 +60,8 @@
long generateUniqueID();
long getCurrentUniqueID();
+
+ void setUniqueIDSequence(long id);
void storeMessage(ServerMessage message) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-04-26 12:42:52 UTC (rev 6569)
@@ -139,7 +139,7 @@
private volatile boolean started;
private final ExecutorService executor;
-
+
public JournalStorageManager(final Configuration config)
{
this.executor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-journal-storage-manager"));
@@ -244,7 +244,9 @@
public long generateUniqueID()
{
- return idGenerator.generateID();
+ long id = idGenerator.generateID();
+
+ return id;
}
public long getCurrentUniqueID()
@@ -252,6 +254,11 @@
return idGenerator.getCurrentID();
}
+ public void setUniqueIDSequence(final long id)
+ {
+ idGenerator.setID(id);
+ }
+
public LargeServerMessage createLargeMessage()
{
return new JournalLargeServerMessage(this);
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2009-04-26 12:42:52 UTC (rev 6569)
@@ -55,7 +55,7 @@
{
private static final Logger log = Logger.getLogger(NullStorageManager.class);
- private final AtomicLong idGenerator = new AtomicLong(0);
+ private final AtomicLong idSequence = new AtomicLong(0);
private UUID id;
@@ -184,15 +184,20 @@
public long generateUniqueID()
{
- long id = idGenerator.getAndIncrement();
+ long id = idSequence.getAndIncrement();
return id;
}
public long getCurrentUniqueID()
{
- return idGenerator.get();
+ return idSequence.get();
}
+
+ public void setUniqueIDSequence(final long id)
+ {
+ idSequence.set(id);
+ }
public synchronized void start() throws Exception
{
@@ -213,7 +218,7 @@
id = null;
- idGenerator.set(0);
+ idSequence.set(0);
started = false;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlowRecord.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlowRecord.java 2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlowRecord.java 2009-04-26 12:42:52 UTC (rev 6569)
@@ -43,7 +43,7 @@
void activate(Queue queue) throws Exception;
- void reset() throws Exception;
+ //void reset() throws Exception;
void close() throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-04-26 12:42:52 UTC (rev 6569)
@@ -709,7 +709,7 @@
}
catch (Exception e)
{
- log.warn("Unable to connect. JMSBridge is now disabled.", e);
+ log.warn("Unable to connect. Bridge is now disabled.", e);
return false;
}
@@ -767,7 +767,7 @@
{
try
{
- flowRecord.reset();
+ // flowRecord.reset();
}
catch (Exception e)
{
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-04-26 12:42:52 UTC (rev 6569)
@@ -256,7 +256,13 @@
for (MessageFlowRecord record : records.values())
{
- record.close();
+ try
+ {
+ record.close();
+ }
+ catch (Exception ignore)
+ {
+ }
}
started = false;
@@ -347,7 +353,7 @@
// have messages - this is up to the administrator to do this
entry.getValue().close();
-
+
iter.remove();
}
}
@@ -511,13 +517,13 @@
this.bridge = bridge;
}
- public synchronized void reset() throws Exception
- {
- clearBindings();
+// public synchronized void reset() throws Exception
+// {
+// clearBindings();
+//
+// firstReset = false;
+// }
- firstReset = false;
- }
-
public synchronized void onMessage(final ClientMessage message)
{
try
@@ -666,8 +672,7 @@
routingName,
queueID,
filterString,
- queue,
- // useDuplicateDetection,
+ queue,
bridge.getName(),
distance + 1);
@@ -679,14 +684,20 @@
// hops is too high
// or there are multiple cluster connections for the same address
- log.warn("Remoting queue binding " + clusterName +
+ log.warn("Remote queue binding " + clusterName +
" has already been bound in the post office. Most likely cause for this is you have a loop " +
"in your cluster due to cluster max-hops being too large or you have multiple cluster connections to the same nodes using overlapping addresses");
return;
}
- postOffice.addBinding(binding);
+ try
+ {
+ postOffice.addBinding(binding);
+ }
+ catch (Exception ignore)
+ {
+ }
Bindings theBindings = postOffice.getBindingsForAddress(queueAddress);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-04-26 12:42:52 UTC (rev 6569)
@@ -134,7 +134,7 @@
private final Configuration configuration;
private final MBeanServer mbeanServer;
-
+
private final Set<ActivateCallback> activateCallbacks = new HashSet<ActivateCallback>();
private volatile boolean started;
@@ -216,7 +216,7 @@
{
throw new NullPointerException("Must inject SecurityManager into MessagingServer constructor");
}
-
+
// We need to hard code the version information into a source file
version = VersionLoader.getVersion();
@@ -235,7 +235,6 @@
// lifecycle methods
// ----------------------------------------------------------------
-
public synchronized void start() throws Exception
{
if (started)
@@ -410,7 +409,7 @@
public ClusterManager getClusterManager()
{
return clusterManager;
- }
+ }
public ReattachSessionResponseMessage reattachSession(final RemotingConnection connection,
final String name,
@@ -536,7 +535,7 @@
}
}
- public void initialiseBackup(final UUID theUUID, final long currentMessageID) throws Exception
+ public void initialiseBackup(final UUID theUUID, final long liveUniqueID) throws Exception
{
if (theUUID == null)
{
@@ -556,19 +555,34 @@
initialisePart2();
- if (currentMessageID != this.storageManager.getCurrentUniqueID())
+ // It is possible, in a replicated environment that ids are slightly different
+ // (live is higher)- this
+ // is due to on stopping of the live server, the cluster connections are stopped and cause
+ // a remove binding for all the flow records, which causes notifications which causes id to be
+ // generated for the notifications.
+ // When shutting down the backup the cluster connections are not active so no bindings are removed
+ // on close
+
+ long backupID = storageManager.getCurrentUniqueID();
+
+ if (liveUniqueID != backupID)
{
- initialised = false;
-
- throw new IllegalStateException("Backup node current id sequence != live node current id sequence " + this.storageManager.getCurrentUniqueID() +
- ", " +
- currentMessageID);
+ if (liveUniqueID > backupID)
+ {
+ storageManager.setUniqueIDSequence(liveUniqueID);
+ }
+ else
+ {
+ initialised = false;
+
+ throw new IllegalStateException("Live and backup unique ids different. Probably trying to restart a live backup pair after a crash");
+ }
}
log.info("Backup server is now operational");
}
}
-
+
public Channel getReplicatingChannel()
{
synchronized (replicatingChannelLock)
@@ -708,7 +722,7 @@
public void destroyQueue(final SimpleString queueName, final ServerSession session) throws Exception
{
Binding binding = postOffice.getBinding(queueName);
-
+
if (binding == null)
{
throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST, "No such queue " + queueName);
@@ -743,7 +757,7 @@
postOffice.removeBinding(queueName);
}
-
+
public synchronized void registerActivateCallback(final ActivateCallback callback)
{
activateCallbacks.add(callback);
@@ -754,7 +768,6 @@
activateCallbacks.remove(callback);
}
-
// Public
// ---------------------------------------------------------------------------------------
@@ -781,12 +794,12 @@
private synchronized void callActivateCallbacks()
{
- for (ActivateCallback callback: activateCallbacks)
+ for (ActivateCallback callback : activateCallbacks)
{
callback.activated();
}
}
-
+
private void checkActivate(final RemotingConnection connection)
{
if (configuration.isBackup())
@@ -850,7 +863,7 @@
replConnection.freeze();
}
}
-
+
private void initialisePart1() throws Exception
{
managementService = new ManagementServiceImpl(mbeanServer, configuration);
@@ -965,7 +978,7 @@
// Deploy any queues in the Configuration class - if there's no file deployment we still need
// to load those
deployQueuesFromConfiguration();
-
+
// Deploy any predefined queues - on backup we don't start queue deployer - instead deployments
// are replicated from live
@@ -975,20 +988,20 @@
queueDeployer.start();
}
-
+
// We need to call this here, this gives any dependent server a chance to deploy its own destinations
// this needs to be done before clustering is initialised, and in the same order on live and backup
callActivateCallbacks();
// Deply any pre-defined diverts
deployDiverts();
-
- // Set-up the replicating connection
+
+ // Set-up the replicating connection
if (!setupReplicatingConnection())
{
return;
}
-
+
if (configuration.isClustered())
{
// This can't be created until node id is set
@@ -1004,12 +1017,12 @@
clusterManager.start();
}
-
+
if (deploymentManager != null)
{
deploymentManager.start();
}
-
+
pagingManager.startGlobalDepage();
initialised = true;
@@ -1161,14 +1174,13 @@
}
}
-
private Queue createQueue(final SimpleString address,
final SimpleString queueName,
final SimpleString filterString,
final boolean durable,
final boolean temporary,
final boolean ignoreIfExists) throws Exception
- {
+ {
Binding binding = postOffice.getBinding(queueName);
if (binding != null)
@@ -1189,7 +1201,7 @@
{
filter = new FilterImpl(filterString);
}
-
+
final Queue queue = queueFactory.createQueue(-1, address, queueName, filter, durable, temporary);
binding = new LocalQueueBinding(address, queue, nodeID);
@@ -1200,7 +1212,7 @@
}
postOffice.addBinding(binding);
-
+
return queue;
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-04-26 12:42:52 UTC (rev 6569)
@@ -115,19 +115,6 @@
protected void failNode(TransportConfiguration conf)
{
- // MessagingServer server = this.services[node];
- //
- // if (server == null)
- // {
- // throw new IllegalArgumentException("No server at " + node);
- // }
- //
- // RemotingConnection conn = ((ClientSessionInternal)this.consumers[node].session).getConnection();
- //
- // conn.fail(new MessagingException(MessagingException.INTERNAL_ERROR, "blah"));
- //
- // //Also fail any cluster connections
-
ConnectionManagerImpl.failAllConnectionsForConnector(conf);
}
@@ -926,7 +913,7 @@
boolean netty,
boolean backup)
{
- this.setupServerWithDiscovery(node, groupAddress, port, fileStorage, netty, backup, -1);
+ setupServerWithDiscovery(node, groupAddress, port, fileStorage, netty, backup, -1);
}
protected void setupServerWithDiscovery(int node,
@@ -936,7 +923,7 @@
boolean netty,
int backupNode)
{
- this.setupServerWithDiscovery(node, groupAddress, port, fileStorage, netty, false, backupNode);
+ setupServerWithDiscovery(node, groupAddress, port, fileStorage, netty, false, backupNode);
}
protected void setupServerWithDiscovery(int node,
@@ -1029,7 +1016,7 @@
configuration.getBroadcastGroupConfigurations().add(bcConfig);
- DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1", groupAddress, port, 500);
+ DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1", groupAddress, port, 5000);
configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
@@ -1128,72 +1115,9 @@
maxHops,
pairs);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
-
- // clusterConfs.add(clusterConf);
-
- // serviceFrom.getConfiguration().setClusterConfigurations(clusterConfs);
}
- // protected void setupClusterConnection(String name,
- // int nodeFrom,
- // int nodeTo,
- // String address,
- // boolean forwardWhenNoConsumers,
- // int maxHops,
- // boolean netty)
- // {
- // MessagingServer serviceFrom = servers[nodeFrom];
- //
- // if (serviceFrom == null)
- // {
- // throw new IllegalStateException("No server at node " + nodeFrom);
- // }
- //
- // Map<String, TransportConfiguration> connectors = serviceFrom
- // .getConfiguration()
- // .getConnectorConfigurations();
- //
- // Map<String, Object> params = generateParams(nodeTo, netty);
- //
- // TransportConfiguration serverTotc;
- //
- // if (netty)
- // {
- // serverTotc = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params);
- // }
- // else
- // {
- // serverTotc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
- // }
- //
- // connectors.put(serverTotc.getName(), serverTotc);
- //
- // serviceFrom.getConfiguration().setConnectorConfigurations(connectors);
- //
- // Pair<String, String> connectorPair = new Pair<String, String>(serverTotc.getName(), null);
- //
- // List<Pair<String, String>> pairs = new ArrayList<Pair<String, String>>();
- // pairs.add(connectorPair);
- //
- // ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
- // address,
- // 100,
- // 1d,
- // -1,
- // -1,
- // true,
- // forwardWhenNoConsumers,
- // maxHops,
- // pairs);
- // List<ClusterConnectionConfiguration> clusterConfs = serviceFrom
- // .getConfiguration()
- // .getClusterConfigurations();
- //
- // clusterConfs.add(clusterConf);
- //
- // serviceFrom.getConfiguration().setClusterConfigurations(clusterConfs);
- // }
-
+
protected void setupClusterConnection(String name,
String address,
boolean forwardWhenNoConsumers,
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java 2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java 2009-04-26 12:42:52 UTC (rev 6569)
@@ -41,20 +41,5 @@
{
return true;
}
-
-// public void testStart() throws Exception
-// {
-// try
-// {
-// setupCluster();
-//
-// startServers();
-//
-// // Thread.sleep(100000);
-// }
-// catch (Exception e)
-// {
-// e.printStackTrace();
-// }
-// }
+
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java 2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java 2009-04-26 12:42:52 UTC (rev 6569)
@@ -1297,7 +1297,7 @@
setupCluster();
startServers();
-
+
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -1409,14 +1409,14 @@
removeConsumer(18);
removeConsumer(21);
removeConsumer(26);
-
+
closeSessionFactory(0);
closeSessionFactory(3);
-
+
stopServers(0, 3);
-
+
startServers(3, 0);
-
+
setupSessionFactory(0, isNetty());
setupSessionFactory(3, isNetty());
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2009-04-26 12:42:52 UTC (rev 6569)
@@ -367,8 +367,12 @@
closeSessionFactory(0);
closeSessionFactory(3);
+ log.info("*** stopping servers");
+
stopServers(0, 3, 5, 8);
+ log.info("**** rstarting servers");
+
startServers(5, 8, 0, 3);
setupSessionFactory(0, isNetty());
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java 2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java 2009-04-26 12:42:52 UTC (rev 6569)
@@ -81,5 +81,199 @@
setupServerWithDiscovery(4, groupAddress, groupPort, isFileStorage(), isNetty(), false);
}
+ /*
+ * This is like testStopStartServers but we make sure we pause longer than discovery group timeout
+ * before restarting (5 seconds)
+ */
+ public void testStartStopServersWithPauseBeforeRestarting() throws Exception
+ {
+ setupCluster();
+ startServers();
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(2, "queues.testaddress", "queue2", null, false);
+ createQueue(3, "queues.testaddress", "queue3", null, false);
+ createQueue(4, "queues.testaddress", "queue4", null, false);
+
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+ createQueue(2, "queues.testaddress", "queue7", null, false);
+ createQueue(3, "queues.testaddress", "queue8", null, false);
+ createQueue(4, "queues.testaddress", "queue9", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(1, "queues.testaddress", "queue11", null, false);
+ createQueue(2, "queues.testaddress", "queue12", null, false);
+ createQueue(3, "queues.testaddress", "queue13", null, false);
+ createQueue(4, "queues.testaddress", "queue14", null, false);
+
+ createQueue(0, "queues.testaddress", "queue15", null, false);
+ createQueue(1, "queues.testaddress", "queue15", null, false);
+ createQueue(2, "queues.testaddress", "queue15", null, false);
+ createQueue(3, "queues.testaddress", "queue15", null, false);
+ createQueue(4, "queues.testaddress", "queue15", null, false);
+
+ createQueue(2, "queues.testaddress", "queue16", null, false);
+ createQueue(3, "queues.testaddress", "queue16", null, false);
+ createQueue(4, "queues.testaddress", "queue16", null, false);
+
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+ createQueue(1, "queues.testaddress", "queue17", null, false);
+ createQueue(4, "queues.testaddress", "queue17", null, false);
+
+ createQueue(3, "queues.testaddress", "queue18", null, false);
+ createQueue(4, "queues.testaddress", "queue18", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue1", null);
+ addConsumer(2, 2, "queue2", null);
+ addConsumer(3, 3, "queue3", null);
+ addConsumer(4, 4, "queue4", null);
+
+ addConsumer(5, 0, "queue5", null);
+ addConsumer(6, 1, "queue6", null);
+ addConsumer(7, 2, "queue7", null);
+ addConsumer(8, 3, "queue8", null);
+ addConsumer(9, 4, "queue9", null);
+
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(11, 1, "queue11", null);
+ addConsumer(12, 2, "queue12", null);
+ addConsumer(13, 3, "queue13", null);
+ addConsumer(14, 4, "queue14", null);
+
+ addConsumer(15, 0, "queue15", null);
+ addConsumer(16, 1, "queue15", null);
+ addConsumer(17, 2, "queue15", null);
+ addConsumer(18, 3, "queue15", null);
+ addConsumer(19, 4, "queue15", null);
+
+ addConsumer(20, 2, "queue16", null);
+ addConsumer(21, 3, "queue16", null);
+ addConsumer(22, 4, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+ addConsumer(24, 1, "queue17", null);
+ addConsumer(25, 4, "queue17", null);
+
+ addConsumer(26, 3, "queue18", null);
+ addConsumer(27, 4, "queue18", null);
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+ waitForBindings(2, "queues.testaddress", 5, 5, true);
+ waitForBindings(3, "queues.testaddress", 6, 6, true);
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 23, 23, false);
+ waitForBindings(1, "queues.testaddress", 23, 23, false);
+ waitForBindings(2, "queues.testaddress", 23, 23, false);
+ waitForBindings(3, "queues.testaddress", 22, 22, false);
+ waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+ removeConsumer(0);
+ removeConsumer(5);
+ removeConsumer(10);
+ removeConsumer(15);
+ removeConsumer(23);
+ removeConsumer(3);
+ removeConsumer(8);
+ removeConsumer(13);
+ removeConsumer(18);
+ removeConsumer(21);
+ removeConsumer(26);
+
+ closeSessionFactory(0);
+ closeSessionFactory(3);
+
+ stopServers(0, 3);
+
+ Thread.sleep(10000);
+
+ startServers(3, 0);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(3, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(3, "queues.testaddress", "queue3", null, false);
+
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ createQueue(3, "queues.testaddress", "queue8", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(3, "queues.testaddress", "queue13", null, false);
+
+ createQueue(0, "queues.testaddress", "queue15", null, false);
+ createQueue(3, "queues.testaddress", "queue15", null, false);
+
+ createQueue(3, "queues.testaddress", "queue16", null, false);
+
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+
+ createQueue(3, "queues.testaddress", "queue18", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(3, 3, "queue3", null);
+
+ addConsumer(5, 0, "queue5", null);
+ addConsumer(8, 3, "queue8", null);
+
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(13, 3, "queue13", null);
+
+ addConsumer(15, 0, "queue15", null);
+ addConsumer(18, 3, "queue15", null);
+
+ addConsumer(21, 3, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+
+ addConsumer(26, 3, "queue18", null);
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+ waitForBindings(2, "queues.testaddress", 5, 5, true);
+ waitForBindings(3, "queues.testaddress", 6, 6, true);
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 23, 23, false);
+ waitForBindings(1, "queues.testaddress", 23, 23, false);
+ waitForBindings(2, "queues.testaddress", 23, 23, false);
+ waitForBindings(3, "queues.testaddress", 22, 22, false);
+ waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+ }
+
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-04-26 02:50:44 UTC (rev 6568)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-04-26 12:42:52 UTC (rev 6569)
@@ -911,6 +911,12 @@
class FakeStorageManager implements StorageManager
{
+ public void setUniqueIDSequence(long id)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
/* (non-Javadoc)
* @see org.jboss.messaging.core.persistence.StorageManager#addQueueBinding(org.jboss.messaging.core.postoffice.Binding)
*/
More information about the jboss-cvs-commits
mailing list