[jboss-cvs] JBoss Messaging SVN: r6575 - in trunk: src/main/org/jboss/messaging/core/postoffice and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Apr 27 08:29:17 EDT 2009
Author: timfox
Date: 2009-04-27 08:29:17 -0400 (Mon, 27 Apr 2009)
New Revision: 6575
Modified:
trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
some fixes
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-04-27 08:01:50 UTC (rev 6574)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-04-27 12:29:17 UTC (rev 6575)
@@ -586,53 +586,60 @@
// This needs to be synchronized since we need to ensure notifications are processed in strict sequence
synchronized (this)
{
- // First send to any local listeners
- for (NotificationListener listener : listeners)
+ //We also need to synchronize on the post office notification lock
+ //otherwise we can get notifications arriving in wrong order / missing
+ //if a notification occurs at same time as sendQueueInfoToQueue is processed
+ synchronized (postOffice.getNotificationLock())
{
- try
+
+ // First send to any local listeners
+ for (NotificationListener listener : listeners)
{
- listener.onNotification(notification);
+ try
+ {
+ listener.onNotification(notification);
+ }
+ catch (Exception e)
+ {
+ // Exception thrown from one listener should not stop execution of others
+ log.error("Failed to call listener", e);
+ }
}
- catch (Exception e)
+
+ // Now send message
+
+ ServerMessage notificationMessage = new ServerMessageImpl(storageManager.generateUniqueID());
+
+ notificationMessage.setBody(ChannelBuffers.EMPTY_BUFFER);
+ // Notification messages are always durable so the user can choose whether to add a durable queue to consume
+ // them in
+ notificationMessage.setDurable(true);
+ notificationMessage.setDestination(managementNotificationAddress);
+
+ TypedProperties notifProps;
+ if (notification.getProperties() != null)
{
- // Exception thrown from one listener should not stop execution of others
- log.error("Failed to call listener", e);
+ notifProps = new TypedProperties(notification.getProperties());
}
+ else
+ {
+ notifProps = new TypedProperties();
+ }
+
+ notifProps.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE,
+ new SimpleString(notification.getType().toString()));
+
+ notifProps.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
+
+ if (notification.getUID() != null)
+ {
+ notifProps.putStringProperty(new SimpleString("foobar"), new SimpleString(notification.getUID()));
+ }
+
+ notificationMessage.putTypedProperties(notifProps);
+
+ postOffice.route(notificationMessage, null);
}
-
- // Now send message
-
- ServerMessage notificationMessage = new ServerMessageImpl(storageManager.generateUniqueID());
-
- notificationMessage.setBody(ChannelBuffers.EMPTY_BUFFER);
- // Notification messages are always durable so the user can choose whether to add a durable queue to consume
- // them in
- notificationMessage.setDurable(true);
- notificationMessage.setDestination(managementNotificationAddress);
-
- TypedProperties notifProps;
- if (notification.getProperties() != null)
- {
- notifProps = new TypedProperties(notification.getProperties());
- }
- else
- {
- notifProps = new TypedProperties();
- }
-
- notifProps.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE,
- new SimpleString(notification.getType().toString()));
-
- notifProps.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
-
- if (notification.getUID() != null)
- {
- notifProps.putStringProperty(new SimpleString("foobar"), new SimpleString(notification.getUID()));
- }
-
- notificationMessage.putTypedProperties(notifProps);
-
- postOffice.route(notificationMessage, null);
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2009-04-27 08:01:50 UTC (rev 6574)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2009-04-27 12:29:17 UTC (rev 6575)
@@ -74,4 +74,6 @@
DuplicateIDCache getDuplicateIDCache(SimpleString address);
void sendQueueInfoToQueue(SimpleString queueName, SimpleString address) throws Exception;
+
+ Object getNotificationLock();
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java 2009-04-27 08:01:50 UTC (rev 6574)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java 2009-04-27 12:29:17 UTC (rev 6575)
@@ -161,7 +161,7 @@
}
public void willRoute(final ServerMessage message)
- {
+ {
}
public boolean isQueueBinding()
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-04-27 08:01:50 UTC (rev 6574)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-04-27 12:29:17 UTC (rev 6575)
@@ -736,6 +736,11 @@
return cache;
}
+
+ public Object getNotificationLock()
+ {
+ return notificationLock;
+ }
public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
{
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-04-27 08:01:50 UTC (rev 6574)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-04-27 12:29:17 UTC (rev 6575)
@@ -195,7 +195,7 @@
}
public void willRoute(final ServerMessage message)
- {
+ {
//We add a header with the name of the queue, holding a list of the transient ids of the queues to route to
//TODO - this can be optimised
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-04-27 08:01:50 UTC (rev 6574)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-04-27 12:29:17 UTC (rev 6575)
@@ -149,7 +149,7 @@
}
while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
- System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
+ //System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
throw new IllegalStateException("Timed out waiting for messages (messageCount = " + messageCount +
", expecting = " +
@@ -162,7 +162,7 @@
final int consumerCount,
final boolean local) throws Exception
{
-// log.info("waiting for bindings on node " + node +
+// log.info("waiting for bindings on node " + node +
// " address " +
// address +
// " count " +
@@ -206,7 +206,7 @@
}
}
- // log.info(node + " binding count " + bindingCount + " consumer Count " + totConsumers);
+ //log.info(node + " binding count " + bindingCount + " consumer Count " + totConsumers);
if (bindingCount == count && totConsumers == consumerCount)
{
@@ -218,11 +218,15 @@
}
while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
- System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
+ // System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
- throw new IllegalStateException("Timed out waiting for bindings (bindingCount = " + bindingCount +
- ", totConsumers = " +
- totConsumers);
+ String msg = "Timed out waiting for bindings (bindingCount = " + bindingCount +
+ ", totConsumers = " +
+ totConsumers;
+
+ log.error(msg);
+
+ throw new IllegalStateException(msg);
}
protected void createQueue(int node, String address, String queueName, String filterVal, boolean durable) throws Exception
@@ -399,18 +403,22 @@
{
sendInRange(node, address, 0, numMessages, durable, filterVal);
}
-
+
protected void verifyReceiveAllInRange(boolean ack, int msgStart, int msgEnd, int... consumerIDs) throws Exception
{
verifyReceiveAllInRangeNotBefore(ack, -1, msgStart, msgEnd, consumerIDs);
}
-
+
protected void verifyReceiveAllInRange(int msgStart, int msgEnd, int... consumerIDs) throws Exception
{
verifyReceiveAllInRangeNotBefore(false, -1, msgStart, msgEnd, consumerIDs);
}
- protected void verifyReceiveAllInRangeNotBefore(boolean ack, long firstReceiveTime, int msgStart, int msgEnd, int... consumerIDs) throws Exception
+ protected void verifyReceiveAllInRangeNotBefore(boolean ack,
+ long firstReceiveTime,
+ int msgStart,
+ int msgEnd,
+ int... consumerIDs) throws Exception
{
boolean outOfOrder = false;
for (int i = 0; i < consumerIDs.length; i++)
@@ -426,13 +434,20 @@
{
ClientMessage message = holder.consumer.receive(2000);
- assertNotNull("consumer " + consumerIDs[i] + " did not receive message " + j, message);
+ if (message == null)
+ {
+ log.info("*** dumping consumers:");
+ dumpConsumers();
+
+ assertNotNull("consumer " + consumerIDs[i] + " did not receive message " + j, message);
+ }
+
if (ack)
{
message.acknowledge();
}
-
+
if (firstReceiveTime != -1)
{
assertTrue("Message received too soon", System.currentTimeMillis() >= firstReceiveTime);
@@ -448,7 +463,20 @@
assertFalse("Messages were consumed out of order, look at System.out for more information", outOfOrder);
}
-
+
+ private void dumpConsumers() throws Exception
+ {
+ for (int i = 0; i < consumers.length; i++)
+ {
+ if (consumers[i] != null)
+ {
+ log.info("Dumping consumer " + i);
+
+ checkReceive(i);
+ }
+ }
+ }
+
protected void verifyReceiveAll(boolean ack, int numMessages, int... consumerIDs) throws Exception
{
verifyReceiveAllInRange(ack, 0, numMessages, consumerIDs);
@@ -574,12 +602,12 @@
{
message.acknowledge();
}
-
- //log.info("consumer " + consumerIDs[i] +" returns " + count);
+
+ // log.info("consumer " + consumerIDs[i] +" returns " + count);
}
else
{
- // log.info("consumer " + consumerIDs[i] +" returns null");
+ // log.info("consumer " + consumerIDs[i] +" returns null");
}
}
while (message != null);
@@ -954,7 +982,7 @@
TransportConfiguration nettyBackuptc = null;
TransportConfiguration invmBackuptc = null;
-
+
if (backupNode != -1)
{
Map<String, Object> backupParams = generateParams(backupNode, netty);
@@ -1000,11 +1028,13 @@
TransportConfiguration nettytc_c = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params);
configuration.getConnectorConfigurations().put(nettytc_c.getName(), nettytc_c);
- connectorPairs.add(new Pair<String, String>(nettytc_c.getName(), nettyBackuptc == null ? null : nettyBackuptc.getName()));
+ connectorPairs.add(new Pair<String, String>(nettytc_c.getName(),
+ nettyBackuptc == null ? null : nettyBackuptc.getName()));
}
else
{
- connectorPairs.add(new Pair<String, String>(invmtc_c.getName(), invmBackuptc == null ? null : invmBackuptc.getName()));
+ connectorPairs.add(new Pair<String, String>(invmtc_c.getName(), invmBackuptc == null ? null
+ : invmBackuptc.getName()));
}
BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
@@ -1059,7 +1089,7 @@
servers[nodes[i]] = null;
}
}
-
+
protected void clearAllServers()
{
for (int i = 0; i < servers.length; i++)
@@ -1117,7 +1147,6 @@
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
-
protected void setupClusterConnection(String name,
String address,
boolean forwardWhenNoConsumers,
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java 2009-04-27 08:01:50 UTC (rev 6574)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java 2009-04-27 12:29:17 UTC (rev 6575)
@@ -1245,6 +1245,7 @@
waitForBindings(3, "queues.testaddress", 1, 1, true);
waitForBindings(4, "queues.testaddress", 1, 1, true);
+ waitForBindings(0, "queues.testaddress", 4, 4, false);
waitForBindings(1, "queues.testaddress", 3, 3, false);
waitForBindings(2, "queues.testaddress", 3, 3, false);
waitForBindings(3, "queues.testaddress", 3, 3, false);
@@ -1252,7 +1253,7 @@
send(0, "queues.testaddress", 10, false, null);
- verifyReceiveAll(10, 1, 2, 3, 4);
+ verifyReceiveAll(10, 1, 2, 3, 4);
}
public void testNoLocalQueueLoadBalancedQueues() throws Exception
@@ -1282,6 +1283,7 @@
waitForBindings(3, "queues.testaddress", 1, 1, true);
waitForBindings(4, "queues.testaddress", 1, 1, true);
+ waitForBindings(0, "queues.testaddress", 4, 4, false);
waitForBindings(1, "queues.testaddress", 3, 3, false);
waitForBindings(2, "queues.testaddress", 3, 3, false);
waitForBindings(3, "queues.testaddress", 3, 3, false);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2009-04-27 08:01:50 UTC (rev 6574)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2009-04-27 12:29:17 UTC (rev 6575)
@@ -251,6 +251,8 @@
setupCluster();
startServers();
+
+ log.info("*** started servers");
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-04-27 08:01:50 UTC (rev 6574)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-04-27 12:29:17 UTC (rev 6575)
@@ -1175,6 +1175,12 @@
class FakePostOffice implements PostOffice
{
+ public Object getNotificationLock()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
public Bindings getMatchingBindings(SimpleString address)
{
// TODO Auto-generated method stub
More information about the jboss-cvs-commits
mailing list