[jboss-cvs] JBoss Messaging SVN: r5391 - in trunk: src/config and 11 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Nov 19 11:06:48 EST 2008
Author: jmesnil
Date: 2008-11-19 11:06:47 -0500 (Wed, 19 Nov 2008)
New Revision: 5391
Added:
trunk/src/main/org/jboss/messaging/core/management/NotificationType.java
trunk/tests/src/org/jboss/messaging/tests/integration/management/NotificationTest.java
Modified:
trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java
trunk/src/config/jbm-configuration.xml
trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java
trunk/src/main/org/jboss/messaging/core/config/Configuration.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
trunk/src/main/org/jboss/messaging/core/message/Message.java
trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/util/TypedProperties.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java
Log:
JBMESSAGING-1441: Refactor management notifications
notification messages are not sent to a special address (defined in jbm-configuration.xml).
Modified: trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java
===================================================================
--- trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java 2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java 2008-11-19 16:06:47 UTC (rev 5391)
@@ -22,7 +22,10 @@
package org.jboss.messaging.example;
import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS;
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS;
+import java.util.Set;
+
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientProducer;
@@ -47,6 +50,7 @@
public static void main(final String[] args) throws Exception
{
SimpleString replytoQueue = new SimpleString("replyto.adminQueue");
+
ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.integration.transports.netty.NettyConnectorFactory"));
final ClientSession clientSession = sessionFactory.createSession(false, true, true);
SimpleString queue = new SimpleString("queuejms.testQueue");
@@ -57,14 +61,6 @@
clientSession.addDestination(replytoQueue, false, true);
clientSession.createQueue(replytoQueue, replytoQueue, null, false, true, true);
- // create a management message to subscribe to notifications from the
- // server
- ClientProducer producer = clientSession.createProducer(DEFAULT_MANAGEMENT_ADDRESS);
- ClientMessage mngmntMessage = clientSession.createClientMessage(false);
- ManagementHelper.putNotificationSubscription(mngmntMessage, replytoQueue, true);
- producer.send(DEFAULT_MANAGEMENT_ADDRESS, mngmntMessage);
- System.out.println("send message to subscribe to notifications");
-
ClientConsumer mngmntConsumer = clientSession.createConsumer(replytoQueue);
mngmntConsumer.setMessageHandler(new MessageHandler()
{
@@ -72,12 +68,8 @@
public void onMessage(final ClientMessage message)
{
System.out.println("received management message");
- if (ManagementHelper.isNotification(message))
+ if (ManagementHelper.isOperationResult(message))
{
- System.out.println("\tnotification: " + ManagementHelper.getNotification(message));
- }
- else if (ManagementHelper.isOperationResult(message))
- {
System.out.println("\toperation succeeded:" + ManagementHelper.hasOperationSucceeded(message));
if (ManagementHelper.hasOperationSucceeded(message))
{
@@ -105,6 +97,33 @@
}
});
+
+ SimpleString notifQueue = new SimpleString("notifQueue");
+ clientSession.createQueue(DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS, notifQueue, null, false, true, true);
+ ClientConsumer notifConsumer = clientSession.createConsumer(notifQueue);
+ notifConsumer.setMessageHandler(new MessageHandler()
+ {
+
+ public void onMessage(final ClientMessage message)
+ {
+ System.out.println("received notification" + message);
+ Set<SimpleString> propertyNames = message.getPropertyNames();
+
+ for (SimpleString key : propertyNames)
+ {
+ System.out.println(key + "=" + message.getProperty(key));
+ }
+ try
+ {
+ message.acknowledge();
+ }
+ catch (MessagingException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ });
clientSession.start();
// add and remove a destination to receive two notifications from the
@@ -112,9 +131,11 @@
clientSession.addDestination(new SimpleString("anotherQueue"), false, true);
clientSession.removeDestination(new SimpleString("anotherQueue"), false);
+ ClientProducer producer = clientSession.createProducer(replytoQueue);
+
// to set a new value for an attribute, invoke the corresponding setter
// method
- mngmntMessage = clientSession.createClientMessage(false);
+ ClientMessage mngmntMessage = clientSession.createClientMessage(false);
ManagementHelper.putOperationInvocation(mngmntMessage,
replytoQueue,
ManagementServiceImpl.getMessagingServerObjectName(),
@@ -145,21 +166,16 @@
producer.send(DEFAULT_MANAGEMENT_ADDRESS, mngmntMessage);
System.out.println("sent management message to invoke operation");
- // create a message to unsubscribe from the notifications sent by the
- // server
- mngmntMessage = clientSession.createClientMessage(false);
- ManagementHelper.putNotificationSubscription(mngmntMessage, replytoQueue, false);
- producer.send(DEFAULT_MANAGEMENT_ADDRESS, mngmntMessage);
- System.out.println("send message to unsubscribe to notifications");
-
Thread.sleep(5000);
mngmntConsumer.close();
consumeMessages(clientSession, queue);
+ notifConsumer.close();
clientSession.removeDestination(replytoQueue, false);
clientSession.deleteQueue(replytoQueue);
+ clientSession.deleteQueue(notifQueue);
clientSession.close();
}
@@ -171,7 +187,10 @@
do
{
m = clientConsumer.receive(5000);
- m.acknowledge();
+ if (m != null)
+ {
+ m.acknowledge();
+ }
}
while (m != null);
clientSession.commit();
Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml 2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/config/jbm-configuration.xml 2008-11-19 16:06:47 UTC (rev 5391)
@@ -41,11 +41,14 @@
<wild-card-routing-enabled>true</wild-card-routing-enabled>
- <!-- by default, message counter is disabled -->
- <message-counter-enabled>false</message-counter-enabled>
+ <management-address>admin.management</management-address>
+ <management-notification-address>admin.notification</management-notification-address>
<!-- true to expose JBoss Messaging resources through JMX -->
<jmx-management-enabled>true</jmx-management-enabled>
+
+ <!-- by default, message counter is disabled -->
+ <message-counter-enabled>false</message-counter-enabled>
<connection-scan-period>10000</connection-scan-period>
@@ -64,9 +67,7 @@
<backup>false</backup>
<queue-activation-timeout>30000</queue-activation-timeout>
-
- <management-address>admin.management</management-address>
-
+
<!--
<backup-connector>
<factory-class>org.jboss.messaging.integration.transports.netty.NettyConnectorFactory</factory-class>
Modified: trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java 2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java 2008-11-19 16:06:47 UTC (rev 5391)
@@ -27,7 +27,6 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import javax.management.Notification;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
@@ -59,22 +58,16 @@
public static final SimpleString HDR_JMX_OPERATION_EXCEPTION = new SimpleString("JBMJMXOperationException");
- public static final SimpleString HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS = new SimpleString("JBMJMXSubscribeToNotification");
+ public static final SimpleString HDR_NOTIFICATION_TYPE = new SimpleString("JBMNotifType");
- public static final SimpleString HDR_JMX_NOTIFICATION = new SimpleString("JBMJMXNotification");
+ public static final SimpleString HDR_NOTIFICATION_MESSAGE = new SimpleString("JBMNotifMessage");
+ public static final SimpleString HDR_NOTIFICATION_TIMESTAMP = new SimpleString("JBMNotifTimestamp");
+
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
- public static void putNotificationSubscription(final Message message,
- final SimpleString replyTo,
- final boolean subscribeToNotifications)
- {
- message.putStringProperty(HDR_JMX_REPLYTO, replyTo);
- message.putBooleanProperty(HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS, subscribeToNotifications);
- }
-
public static void putAttributes(final Message message,
final SimpleString replyTo,
final ObjectName objectName,
@@ -108,46 +101,6 @@
}
}
- public static void storeNotification(final Message message, final Notification notification)
- {
- message.putBooleanProperty(HDR_JMX_NOTIFICATION, true);
- message.putStringProperty(new SimpleString("message"), new SimpleString(notification.getMessage()));
- message.putStringProperty(new SimpleString("type"), new SimpleString(notification.getType()));
- message.putLongProperty(new SimpleString("sequenceNumber"), notification.getSequenceNumber());
- message.putLongProperty(new SimpleString("timestamp"), notification.getTimeStamp());
- if (notification.getSource() instanceof ObjectName)
- {
- message.putStringProperty(new SimpleString("source"), new SimpleString(notification.getSource().toString()));
- }
- }
-
- public static Notification getNotification(final Message message)
- {
- SimpleString sourceStr = (SimpleString)message.getProperty(new SimpleString("source"));
- Object source = null;
- if (sourceStr != null)
- {
- try
- {
- source = ObjectName.getInstance(sourceStr.toString());
- }
- catch (Exception e)
- {
- }
- }
- SimpleString type = (SimpleString)message.getProperty(new SimpleString("type"));
- long sequenceNumber = (Long)message.getProperty(new SimpleString("sequenceNumber"));
- long timestamp = (Long)message.getProperty(new SimpleString("timestamp"));
-
- Notification notif = new Notification(type.toString(), source, sequenceNumber, timestamp, message.toString());
- return notif;
- }
-
- public static boolean isNotification(final Message message)
- {
- return message.containsProperty(HDR_JMX_NOTIFICATION);
- }
-
public static boolean isOperationResult(final Message message)
{
return message.containsProperty(HDR_JMX_OPERATION_SUCCEEDED);
@@ -155,7 +108,7 @@
public static boolean isAttributesResult(final Message message)
{
- return !(isNotification(message) && isOperationResult(message));
+ return !(isOperationResult(message));
}
public static TabularData getTabularDataProperty(final Message message, final String key)
Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-11-19 16:06:47 UTC (rev 5391)
@@ -107,7 +107,11 @@
void setManagementAddress(SimpleString address);
+ SimpleString getManagementNotificationAddress();
+
+ void setManagementNotificationAddress(SimpleString address);
+
// Journal related attributes ------------------------------------------------------------
String getBindingsDirectory();
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-11-19 16:06:47 UTC (rev 5391)
@@ -91,6 +91,8 @@
public static final SimpleString DEFAULT_MANAGEMENT_ADDRESS = new SimpleString("admin.management");
+ public static final SimpleString DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS = new SimpleString("admin.notification");
+
public static final long DEFAULT_BROADCAST_PERIOD = 5000;
public static final long DEFAULT_BROADCAST_REFRESH_TIMEOUT = 10000;
@@ -176,6 +178,8 @@
protected SimpleString managementAddress = DEFAULT_MANAGEMENT_ADDRESS;
+ protected SimpleString managementNotificationAddress = DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS;
+
public boolean isClustered()
{
return clustered;
@@ -529,6 +533,16 @@
{
this.managementAddress = address;
}
+
+ public SimpleString getManagementNotificationAddress()
+ {
+ return managementNotificationAddress ;
+ }
+
+ public void setManagementNotificationAddress(SimpleString address)
+ {
+ this.managementNotificationAddress = address;
+ }
@Override
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-11-19 16:06:47 UTC (rev 5391)
@@ -22,9 +22,6 @@
package org.jboss.messaging.core.config.impl;
-import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MAX_FORWARD_BATCH_SIZE;
-import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MAX_FORWARD_BATCH_TIME;
-
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URL;
@@ -102,6 +99,8 @@
managementAddress = new SimpleString(getString(e, "management-address", managementAddress.toString()));
+ managementNotificationAddress = new SimpleString(getString(e, "management-notification-address", managementNotificationAddress.toString()));
+
NodeList interceptorNodes = e.getElementsByTagName("remoting-interceptors");
ArrayList<String> interceptorList = new ArrayList<String>();
Modified: trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/ManagementService.java 2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/core/management/ManagementService.java 2008-11-19 16:06:47 UTC (rev 5391)
@@ -22,9 +22,9 @@
package org.jboss.messaging.core.management;
-import javax.management.NotificationBroadcaster;
import javax.management.ObjectName;
+import org.jboss.messaging.core.client.management.impl.ManagementHelper;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.messagecounter.MessageCounterManager;
import org.jboss.messaging.core.persistence.StorageManager;
@@ -36,6 +36,7 @@
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.TypedProperties;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -43,7 +44,7 @@
* @version <tt>$Revision$</tt>
*
*/
-public interface ManagementService extends NotificationBroadcaster
+public interface ManagementService
{
MessageCounterManager getMessageCounterManager();
@@ -72,5 +73,19 @@
void handleMessage(ServerMessage message);
+ void sendNotification(NotificationType type, String message) throws Exception;
+ /**
+ * the message corresponding to a notification will always contain the properties:
+ * <ul>
+ * <li><code>ManagementHelper.HDR_NOTIFICATION_TYPE</code> - the type of notification (SimpleString)</li>
+ * <li><code>ManagementHelper.HDR_NOTIFICATION_MESSAGE</code> - a message contextual to the notification (SimpleString)</li>
+ * <li><code>ManagementHelper.HDR_NOTIFICATION_TIMESTAMP</code> - the timestamp when the notification occured (long)</li>
+ * </ul>
+ *
+ * in addition to the properties defined in <code>props</code>
+ *
+ * @see ManagementHelper
+ */
+ void sendNotification(NotificationType type, String message, TypedProperties props) throws Exception;
}
Added: trunk/src/main/org/jboss/messaging/core/management/NotificationType.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/NotificationType.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/management/NotificationType.java 2008-11-19 16:06:47 UTC (rev 5391)
@@ -0,0 +1,35 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.management;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public enum NotificationType
+{
+ QUEUE_CREATED, QUEUE_DESTROYED, ADDRESS_ADDED, ADDRESS_REMOVED;
+}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2008-11-19 16:06:47 UTC (rev 5391)
@@ -28,6 +28,7 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -36,13 +37,8 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
-import javax.management.ListenerNotFoundException;
-import javax.management.MBeanNotificationInfo;
import javax.management.MBeanServer;
-import javax.management.Notification;
import javax.management.NotificationBroadcasterSupport;
-import javax.management.NotificationFilter;
-import javax.management.NotificationListener;
import javax.management.ObjectName;
import org.jboss.messaging.core.client.management.impl.ManagementHelper;
@@ -51,21 +47,25 @@
import org.jboss.messaging.core.management.AddressControlMBean;
import org.jboss.messaging.core.management.ManagementService;
import org.jboss.messaging.core.management.MessagingServerControlMBean;
+import org.jboss.messaging.core.management.NotificationType;
import org.jboss.messaging.core.management.QueueControlMBean;
-import org.jboss.messaging.core.management.impl.MessagingServerControl.NotificationType;
import org.jboss.messaging.core.messagecounter.MessageCounter;
import org.jboss.messaging.core.messagecounter.MessageCounterManager;
import org.jboss.messaging.core.messagecounter.impl.MessageCounterManagerImpl;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.security.Role;
+import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.TypedProperties;
/*
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -90,10 +90,10 @@
private final NotificationBroadcasterSupport broadcaster;
- private AtomicLong notifSeq = new AtomicLong(0);
-
private PostOffice postOffice;
+ private StorageManager storageManager;
+
private HierarchicalRepository<Set<Role>> securityRepository;
private HierarchicalRepository<QueueSettings> queueSettingsRepository;
@@ -102,6 +102,9 @@
private final MessageCounterManager messageCounterManager = new MessageCounterManagerImpl(10000);
+ private SimpleString managementNotificationAddress;
+
+
// Static --------------------------------------------------------
public static ObjectName getMessagingServerObjectName() throws Exception
@@ -137,7 +140,6 @@
this.jmxManagementEnabled = jmxManagementEnabled;
registry = new HashMap<ObjectName, Object>();
broadcaster = new NotificationBroadcasterSupport();
- notifSeq = new AtomicLong(0);
}
// Public --------------------------------------------------------
@@ -158,7 +160,8 @@
{
this.postOffice = postOffice;
this.queueSettingsRepository = queueSettingsRepository;
-
+ this.storageManager = storageManager;
+ this.managementNotificationAddress = configuration.getManagementNotificationAddress();
managedServer = new MessagingServerControl(postOffice,
storageManager,
configuration,
@@ -303,25 +306,6 @@
return registry.get(objectName);
}
- // NotificatioBroadcaster implementation -----------------------------------
-
- public void addNotificationListener(final NotificationListener listener,
- final NotificationFilter filter,
- final Object handback) throws IllegalArgumentException
- {
- broadcaster.addNotificationListener(listener, filter, handback);
- }
-
- public MBeanNotificationInfo[] getNotificationInfo()
- {
- return broadcaster.getNotificationInfo();
- }
-
- public void removeNotificationListener(final NotificationListener listener) throws ListenerNotFoundException
- {
- broadcaster.removeNotificationListener(listener);
- }
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -370,15 +354,40 @@
}
}
- private void sendNotification(final MessagingServerControl.NotificationType type, final String message) throws Exception
+ public void sendNotification(final NotificationType type, final String message) throws Exception
{
+ sendNotification(type, message, null);
+ }
+
+ public void sendNotification(final NotificationType type, final String message, TypedProperties props) throws Exception
+ {
if (managedServer != null)
{
- Notification notif = new Notification(type.toString(),
- getMessagingServerObjectName(),
- notifSeq.incrementAndGet(),
- message);
- broadcaster.sendNotification(notif);
+ ServerMessage notificationMessage = new ServerMessageImpl(storageManager.generateUniqueID());
+ notificationMessage.setDestination(managementNotificationAddress);
+ notificationMessage.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
+
+ TypedProperties notifProps;
+ if (props != null)
+ {
+ notifProps = props;
+ } else
+ {
+ notifProps = new TypedProperties();
+ }
+
+ notifProps.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(type.toString()));
+ notifProps.putStringProperty(ManagementHelper.HDR_NOTIFICATION_MESSAGE, new SimpleString(message));
+ notifProps.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
+
+ notificationMessage.putTypedProperties(notifProps);
+
+ List<MessageReference> refs = postOffice.route(notificationMessage);
+
+ for (MessageReference ref : refs)
+ {
+ ref.getQueue().addLast(ref);
+ }
}
}
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2008-11-19 16:06:47 UTC (rev 5391)
@@ -48,6 +48,7 @@
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.filter.impl.FilterImpl;
import org.jboss.messaging.core.management.MessagingServerControlMBean;
+import org.jboss.messaging.core.management.NotificationType;
import org.jboss.messaging.core.messagecounter.MessageCounterManager;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
@@ -625,11 +626,4 @@
messageCounterManager.resetAllCounterHistories();
}
-
- // Inner classes -------------------------------------------------
-
- public static enum NotificationType
- {
- QUEUE_CREATED, QUEUE_DESTROYED, ADDRESS_ADDED, ADDRESS_REMOVED;
- }
}
Modified: trunk/src/main/org/jboss/messaging/core/message/Message.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/Message.java 2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/core/message/Message.java 2008-11-19 16:06:47 UTC (rev 5391)
@@ -26,6 +26,7 @@
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.TypedProperties;
/**
* A message is a routable instance that has a payload.
@@ -114,7 +115,8 @@
void putStringProperty(SimpleString key, SimpleString value);
-
+ void putTypedProperties(TypedProperties properties);
+
// TODO - should have typed property getters and do conversions herein
Object getProperty(SimpleString key);
Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-11-19 16:06:47 UTC (rev 5391)
@@ -347,6 +347,11 @@
properties.putStringProperty(key, value);
}
+ public void putTypedProperties(TypedProperties otherProps)
+ {
+ properties.putTypedProperties(otherProps);
+ }
+
public Object getProperty(final SimpleString key)
{
return properties.getProperty(key);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-11-19 16:06:47 UTC (rev 5391)
@@ -247,6 +247,11 @@
postOffice.start();
resourceManager.start();
+ if (!postOffice.containsDestination(configuration.getManagementNotificationAddress()))
+ {
+ postOffice.addDestination(configuration.getManagementNotificationAddress(), true);
+ }
+
TransportConfiguration backupConnector = configuration.getBackupConnectorConfiguration();
if (backupConnector != null)
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-11-19 16:06:47 UTC (rev 5391)
@@ -23,8 +23,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
-import javax.management.Notification;
-import javax.management.NotificationListener;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -106,7 +104,7 @@
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
*/
-public class ServerSessionImpl implements ServerSession, FailureListener, NotificationListener
+public class ServerSessionImpl implements ServerSession, FailureListener
{
// Constants -----------------------------------------------------------------------------
@@ -2317,39 +2315,11 @@
{
doSecurity(message);
- if (message.containsProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS))
- {
- boolean subscribe = (Boolean)message.getProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS);
+ managementService.handleMessage(message);
- final SimpleString replyTo = (SimpleString)message.getProperty(ManagementHelper.HDR_JMX_REPLYTO);
+ message.setDestination((SimpleString)message.getProperty(ManagementHelper.HDR_JMX_REPLYTO));
- if (subscribe)
- {
- if (log.isDebugEnabled())
- {
- log.debug("added notification listener " + this);
- }
-
- managementService.addNotificationListener(this, null, replyTo);
- }
- else
- {
- if (log.isDebugEnabled())
- {
- log.debug("removed notification listener " + this);
- }
-
- managementService.removeNotificationListener(this);
- }
- }
- else
- {
- managementService.handleMessage(message);
-
- message.setDestination((SimpleString)message.getProperty(ManagementHelper.HDR_JMX_REPLYTO));
-
- send(message);
- }
+ send(message);
}
public void handleReplicatedDelivery(final SessionReplicateDeliveryMessage packet)
@@ -2429,25 +2399,6 @@
}
}
- // NotificationListener implementation -------------------------------------
-
- public void handleNotification(final Notification notification, final Object replyTo)
- {
- // FIXME this won't work with replication
- ServerMessage notificationMessage = new ServerMessageImpl(storageManager.generateUniqueID());
- notificationMessage.setDestination((SimpleString)replyTo);
- notificationMessage.setBody(new ByteBufferWrapper(ByteBuffer.allocate(2048)));
- ManagementHelper.storeNotification(notificationMessage, notification);
- try
- {
- send(notificationMessage);
- }
- catch (Exception e)
- {
- log.warn("problem while sending a notification message " + notification, e);
- }
- }
-
// Public
// ----------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/util/TypedProperties.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/TypedProperties.java 2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/util/TypedProperties.java 2008-11-19 16:06:47 UTC (rev 5391)
@@ -47,6 +47,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import java.util.Map.Entry;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
@@ -139,6 +140,21 @@
doPutValue(key, new CharValue(value));
}
+ public void putTypedProperties(final TypedProperties otherProps)
+ {
+ if (otherProps == null || otherProps.properties == null)
+ {
+ return;
+ }
+
+ checkCreateProperties();
+ Set<Entry<SimpleString,PropertyValue>> otherEntries = otherProps.properties.entrySet();
+ for (Entry<SimpleString, PropertyValue> otherEntry : otherEntries)
+ {
+ doPutValue(otherEntry.getKey(), otherEntry.getValue());
+ }
+ }
+
public Object getProperty(final SimpleString key)
{
return doGetProperty(key);
Added: trunk/tests/src/org/jboss/messaging/tests/integration/management/NotificationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/management/NotificationTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/management/NotificationTest.java 2008-11-19 16:06:47 UTC (rev 5391)
@@ -0,0 +1,205 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.management;
+
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS;
+import static org.jboss.messaging.tests.util.RandomUtil.randomSimpleString;
+
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.management.impl.ManagementHelper;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.management.NotificationType;
+import org.jboss.messaging.core.management.impl.MessagingServerControl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.tests.util.RandomUtil;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A NotificationTest
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ */
+public class NotificationTest extends TestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private MessagingService service;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testNotification() throws Exception
+ {
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ // create a queue to receive the management notifications
+ SimpleString notifQueue = randomSimpleString();
+ session.createQueue(DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS, notifQueue, null, false, true, true);
+ ClientConsumer notifConsumer = session.createConsumer(notifQueue);
+ session.start();
+
+ // generate notifications
+ SimpleString destinationName = randomSimpleString();
+ session.addDestination(destinationName, false, true);
+ session.removeDestination(destinationName, false);
+
+ // we've generated at least 2 notifications
+ // but there is more in the queue (e.g. the notification when the notifQueue was created)
+ ClientMessage notifMessage = notifConsumer.receive(500);
+ assertNotNull(notifMessage);
+ Set<SimpleString> propertyNames = notifMessage.getPropertyNames();
+
+ for (SimpleString key : propertyNames)
+ {
+ System.out.println(key + "=" + notifMessage.getProperty(key));
+ }
+
+ notifMessage.acknowledge();
+
+
+ notifMessage = notifConsumer.receive(500);
+ assertNotNull(notifMessage);
+ propertyNames = notifMessage.getPropertyNames();
+ for (SimpleString key : propertyNames)
+ {
+ System.out.println(key + "=" + notifMessage.getProperty(key));
+ }
+ notifMessage.acknowledge();
+
+ notifConsumer.close();
+ session.deleteQueue(notifQueue);
+ session.close();
+ }
+
+ public void testNotificationWithFilter() throws Exception
+ {
+ SimpleString destinationName = randomSimpleString();
+ SimpleString unmatchedDestinationName = new SimpleString("this.destination.does.not.match.the.filter");
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ // create a queue to receive the management notifications only concerning the destination
+ SimpleString notifQueue = randomSimpleString();
+ SimpleString filter = new SimpleString(ManagementHelper.HDR_NOTIFICATION_MESSAGE + " LIKE '%" + destinationName + "%'" );
+ System.out.println(filter);
+ session.createQueue(DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS, notifQueue, filter, false, true, true);
+ ClientConsumer notifConsumer = session.createConsumer(notifQueue);
+ session.start();
+
+ // generate notifications that do NOT match the filter
+ session.addDestination(unmatchedDestinationName, false, true);
+ session.removeDestination(unmatchedDestinationName, false);
+
+ assertNull(notifConsumer.receive(500));
+
+ // generate notifications that match the filter
+ session.addDestination(destinationName, false, true);
+ session.removeDestination(destinationName, false);
+
+ ClientMessage notifMessage = notifConsumer.receive(500);
+ assertNotNull(notifMessage);
+ assertEquals(NotificationType.ADDRESS_ADDED.toString(), notifMessage.getProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
+ Set<SimpleString> propertyNames = notifMessage.getPropertyNames();
+
+ for (SimpleString key : propertyNames)
+ {
+ System.out.println(key + "=" + notifMessage.getProperty(key));
+ }
+
+ notifMessage.acknowledge();
+
+
+ notifMessage = notifConsumer.receive(500);
+ assertNotNull(notifMessage);
+ assertEquals(NotificationType.ADDRESS_REMOVED.toString(), notifMessage.getProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
+ propertyNames = notifMessage.getPropertyNames();
+ for (SimpleString key : propertyNames)
+ {
+ System.out.println(key + "=" + notifMessage.getProperty(key));
+ }
+ notifMessage.acknowledge();
+
+ // no other notifications matching the filter
+ assertNull(notifConsumer.receive(500));
+
+ notifConsumer.close();
+ session.deleteQueue(notifQueue);
+ session.close();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+
+ Configuration conf = new ConfigurationImpl();
+ conf.setSecurityEnabled(false);
+ // the notifications are independent of JMX
+ conf.setJMXManagementEnabled(false);
+ conf.getAcceptorConfigurations()
+ .add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+ service = MessagingServiceImpl.newNullStorageMessagingServer(conf);
+ service.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ service.stop();
+
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java 2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java 2008-11-19 16:06:47 UTC (rev 5391)
@@ -55,6 +55,7 @@
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.ResourceManager;
+import org.jboss.messaging.tests.util.RandomUtil;
import org.jboss.messaging.util.SimpleString;
/**
@@ -85,6 +86,7 @@
PostOffice postOffice = createMock(PostOffice.class);
StorageManager storageManager = createMock(StorageManager.class);
Configuration configuration = createMock(Configuration.class);
+ expect(configuration.getManagementNotificationAddress()).andReturn(randomSimpleString());
expect(configuration.isMessageCounterEnabled()).andReturn(false);
HierarchicalRepository<Set<Role>> securityRepository = createMock(HierarchicalRepository.class);
HierarchicalRepository<QueueSettings> queueSettingsRepository = createMock(HierarchicalRepository.class);
@@ -114,6 +116,7 @@
PostOffice postOffice = createMock(PostOffice.class);
StorageManager storageManager = createMock(StorageManager.class);
Configuration configuration = createMock(Configuration.class);
+ expect(configuration.getManagementNotificationAddress()).andReturn(randomSimpleString());
expect(configuration.isMessageCounterEnabled()).andReturn(false);
HierarchicalRepository<Set<Role>> securityRepository = createMock(HierarchicalRepository.class);
HierarchicalRepository<QueueSettings> queueSettingsRepository = createMock(HierarchicalRepository.class);
More information about the jboss-cvs-commits
mailing list