[jboss-cvs] JBoss Messaging SVN: r5391 - in trunk: src/config and 11 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Nov 19 11:06:48 EST 2008


Author: jmesnil
Date: 2008-11-19 11:06:47 -0500 (Wed, 19 Nov 2008)
New Revision: 5391

Added:
   trunk/src/main/org/jboss/messaging/core/management/NotificationType.java
   trunk/tests/src/org/jboss/messaging/tests/integration/management/NotificationTest.java
Modified:
   trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java
   trunk/src/config/jbm-configuration.xml
   trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java
   trunk/src/main/org/jboss/messaging/core/config/Configuration.java
   trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
   trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
   trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
   trunk/src/main/org/jboss/messaging/core/message/Message.java
   trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/util/TypedProperties.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java
Log:
JBMESSAGING-1441: Refactor management notifications

notification messages are not sent to a special address (defined in jbm-configuration.xml).

Modified: trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java
===================================================================
--- trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java	2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java	2008-11-19 16:06:47 UTC (rev 5391)
@@ -22,7 +22,10 @@
 package org.jboss.messaging.example;
 
 import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS;
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS;
 
+import java.util.Set;
+
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientProducer;
@@ -47,6 +50,7 @@
    public static void main(final String[] args) throws Exception
    {
       SimpleString replytoQueue = new SimpleString("replyto.adminQueue");
+
       ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.integration.transports.netty.NettyConnectorFactory"));
       final ClientSession clientSession = sessionFactory.createSession(false, true, true);
       SimpleString queue = new SimpleString("queuejms.testQueue");
@@ -57,14 +61,6 @@
       clientSession.addDestination(replytoQueue, false, true);
       clientSession.createQueue(replytoQueue, replytoQueue, null, false, true, true);
 
-      // create a management message to subscribe to notifications from the
-      // server
-      ClientProducer producer = clientSession.createProducer(DEFAULT_MANAGEMENT_ADDRESS);
-      ClientMessage mngmntMessage = clientSession.createClientMessage(false);
-      ManagementHelper.putNotificationSubscription(mngmntMessage, replytoQueue, true);
-      producer.send(DEFAULT_MANAGEMENT_ADDRESS, mngmntMessage);
-      System.out.println("send message to subscribe to notifications");
-
       ClientConsumer mngmntConsumer = clientSession.createConsumer(replytoQueue);
       mngmntConsumer.setMessageHandler(new MessageHandler()
       {
@@ -72,12 +68,8 @@
          public void onMessage(final ClientMessage message)
          {
             System.out.println("received management message");
-            if (ManagementHelper.isNotification(message))
+            if (ManagementHelper.isOperationResult(message))
             {
-               System.out.println("\tnotification: " + ManagementHelper.getNotification(message));
-            }
-            else if (ManagementHelper.isOperationResult(message))
-            {
                System.out.println("\toperation succeeded:" + ManagementHelper.hasOperationSucceeded(message));
                if (ManagementHelper.hasOperationSucceeded(message))
                {
@@ -105,6 +97,33 @@
          }
 
       });
+
+      SimpleString notifQueue = new SimpleString("notifQueue");
+      clientSession.createQueue(DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS, notifQueue, null, false, true, true);
+      ClientConsumer notifConsumer = clientSession.createConsumer(notifQueue);
+      notifConsumer.setMessageHandler(new MessageHandler()
+      {
+
+         public void onMessage(final ClientMessage message)
+         {
+            System.out.println("received notification" + message);
+            Set<SimpleString> propertyNames = message.getPropertyNames();
+
+            for (SimpleString key : propertyNames)
+            {
+               System.out.println(key + "=" + message.getProperty(key));
+            }
+            try
+            {
+               message.acknowledge();
+            }
+            catch (MessagingException e)
+            {
+               e.printStackTrace();
+            }
+         }
+
+      });
       clientSession.start();
 
       // add and remove a destination to receive two notifications from the
@@ -112,9 +131,11 @@
       clientSession.addDestination(new SimpleString("anotherQueue"), false, true);
       clientSession.removeDestination(new SimpleString("anotherQueue"), false);
 
+      ClientProducer producer = clientSession.createProducer(replytoQueue);
+
       // to set a new value for an attribute, invoke the corresponding setter
       // method
-      mngmntMessage = clientSession.createClientMessage(false);
+      ClientMessage mngmntMessage = clientSession.createClientMessage(false);
       ManagementHelper.putOperationInvocation(mngmntMessage,
                                               replytoQueue,
                                               ManagementServiceImpl.getMessagingServerObjectName(),
@@ -145,21 +166,16 @@
       producer.send(DEFAULT_MANAGEMENT_ADDRESS, mngmntMessage);
       System.out.println("sent management message to invoke operation");
 
-      // create a message to unsubscribe from the notifications sent by the
-      // server
-      mngmntMessage = clientSession.createClientMessage(false);
-      ManagementHelper.putNotificationSubscription(mngmntMessage, replytoQueue, false);
-      producer.send(DEFAULT_MANAGEMENT_ADDRESS, mngmntMessage);
-      System.out.println("send message to unsubscribe to notifications");
-
       Thread.sleep(5000);
 
       mngmntConsumer.close();
 
       consumeMessages(clientSession, queue);
 
+      notifConsumer.close();
       clientSession.removeDestination(replytoQueue, false);
       clientSession.deleteQueue(replytoQueue);
+      clientSession.deleteQueue(notifQueue);
 
       clientSession.close();
    }
@@ -171,7 +187,10 @@
       do
       {
          m = clientConsumer.receive(5000);
-         m.acknowledge();
+         if (m != null)
+         {
+            m.acknowledge();
+         }
       }
       while (m != null);
       clientSession.commit();

Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml	2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/config/jbm-configuration.xml	2008-11-19 16:06:47 UTC (rev 5391)
@@ -41,11 +41,14 @@
 
       <wild-card-routing-enabled>true</wild-card-routing-enabled>
 
-      <!-- by default, message counter is disabled -->
-      <message-counter-enabled>false</message-counter-enabled>
+      <management-address>admin.management</management-address>
+      <management-notification-address>admin.notification</management-notification-address>      
 
       <!-- true to expose JBoss Messaging resources through JMX -->
       <jmx-management-enabled>true</jmx-management-enabled>
+
+      <!-- by default, message counter is disabled -->
+      <message-counter-enabled>false</message-counter-enabled>
     
       <connection-scan-period>10000</connection-scan-period>
 
@@ -64,9 +67,7 @@
       <backup>false</backup>
       
       <queue-activation-timeout>30000</queue-activation-timeout>
-      
-      <management-address>admin.management</management-address>
-      
+
       <!--
       <backup-connector>
          <factory-class>org.jboss.messaging.integration.transports.netty.NettyConnectorFactory</factory-class>

Modified: trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java	2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java	2008-11-19 16:06:47 UTC (rev 5391)
@@ -27,7 +27,6 @@
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 
-import javax.management.Notification;
 import javax.management.ObjectName;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.TabularData;
@@ -59,22 +58,16 @@
 
    public static final SimpleString HDR_JMX_OPERATION_EXCEPTION = new SimpleString("JBMJMXOperationException");
 
-   public static final SimpleString HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS = new SimpleString("JBMJMXSubscribeToNotification");
+   public static final SimpleString HDR_NOTIFICATION_TYPE = new SimpleString("JBMNotifType");
 
-   public static final SimpleString HDR_JMX_NOTIFICATION = new SimpleString("JBMJMXNotification");
+   public static final SimpleString HDR_NOTIFICATION_MESSAGE = new SimpleString("JBMNotifMessage");
 
+   public static final SimpleString HDR_NOTIFICATION_TIMESTAMP = new SimpleString("JBMNotifTimestamp");
+
    // Attributes ----------------------------------------------------
 
    // Static --------------------------------------------------------
 
-   public static void putNotificationSubscription(final Message message,
-                                                  final SimpleString replyTo,
-                                                  final boolean subscribeToNotifications)
-   {
-      message.putStringProperty(HDR_JMX_REPLYTO, replyTo);
-      message.putBooleanProperty(HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS, subscribeToNotifications);
-   }
-
    public static void putAttributes(final Message message,
                                     final SimpleString replyTo,
                                     final ObjectName objectName,
@@ -108,46 +101,6 @@
       }
    }
 
-   public static void storeNotification(final Message message, final Notification notification)
-   {
-      message.putBooleanProperty(HDR_JMX_NOTIFICATION, true);
-      message.putStringProperty(new SimpleString("message"), new SimpleString(notification.getMessage()));
-      message.putStringProperty(new SimpleString("type"), new SimpleString(notification.getType()));
-      message.putLongProperty(new SimpleString("sequenceNumber"), notification.getSequenceNumber());
-      message.putLongProperty(new SimpleString("timestamp"), notification.getTimeStamp());
-      if (notification.getSource() instanceof ObjectName)
-      {
-         message.putStringProperty(new SimpleString("source"), new SimpleString(notification.getSource().toString()));
-      }
-   }
-
-   public static Notification getNotification(final Message message)
-   {
-      SimpleString sourceStr = (SimpleString)message.getProperty(new SimpleString("source"));
-      Object source = null;
-      if (sourceStr != null)
-      {
-         try
-         {
-            source = ObjectName.getInstance(sourceStr.toString());
-         }
-         catch (Exception e)
-         {
-         }
-      }
-      SimpleString type = (SimpleString)message.getProperty(new SimpleString("type"));
-      long sequenceNumber = (Long)message.getProperty(new SimpleString("sequenceNumber"));
-      long timestamp = (Long)message.getProperty(new SimpleString("timestamp"));
-
-      Notification notif = new Notification(type.toString(), source, sequenceNumber, timestamp, message.toString());
-      return notif;
-   }
-
-   public static boolean isNotification(final Message message)
-   {
-      return message.containsProperty(HDR_JMX_NOTIFICATION);
-   }
-
    public static boolean isOperationResult(final Message message)
    {
       return message.containsProperty(HDR_JMX_OPERATION_SUCCEEDED);
@@ -155,7 +108,7 @@
 
    public static boolean isAttributesResult(final Message message)
    {
-      return !(isNotification(message) && isOperationResult(message));
+      return !(isOperationResult(message));
    }
 
    public static TabularData getTabularDataProperty(final Message message, final String key)

Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-11-19 16:06:47 UTC (rev 5391)
@@ -107,7 +107,11 @@
    
    void setManagementAddress(SimpleString address);
 
+   SimpleString getManagementNotificationAddress();
+   
+   void setManagementNotificationAddress(SimpleString address);
 
+   
    // Journal related attributes ------------------------------------------------------------
 
    String getBindingsDirectory();

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-11-19 16:06:47 UTC (rev 5391)
@@ -91,6 +91,8 @@
    
    public static final SimpleString DEFAULT_MANAGEMENT_ADDRESS = new SimpleString("admin.management");
    
+   public static final SimpleString DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS = new SimpleString("admin.notification");
+
    public static final long DEFAULT_BROADCAST_PERIOD = 5000;
    
    public static final long DEFAULT_BROADCAST_REFRESH_TIMEOUT = 10000;
@@ -176,6 +178,8 @@
    
    protected SimpleString managementAddress = DEFAULT_MANAGEMENT_ADDRESS;
 
+   protected SimpleString managementNotificationAddress = DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS;
+
    public boolean isClustered()
    {
       return clustered;
@@ -529,6 +533,16 @@
    {
       this.managementAddress = address;
    }
+   
+   public SimpleString getManagementNotificationAddress()
+   {
+      return managementNotificationAddress ;
+   }
+   
+   public void setManagementNotificationAddress(SimpleString address)
+   {
+      this.managementNotificationAddress = address;
+   }
       
 
    @Override

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-11-19 16:06:47 UTC (rev 5391)
@@ -22,9 +22,6 @@
 
 package org.jboss.messaging.core.config.impl;
 
-import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MAX_FORWARD_BATCH_SIZE;
-import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MAX_FORWARD_BATCH_TIME;
-
 import java.io.InputStreamReader;
 import java.io.Reader;
 import java.net.URL;
@@ -102,6 +99,8 @@
 
       managementAddress = new SimpleString(getString(e, "management-address", managementAddress.toString()));
 
+      managementNotificationAddress = new SimpleString(getString(e, "management-notification-address", managementNotificationAddress.toString()));
+
       NodeList interceptorNodes = e.getElementsByTagName("remoting-interceptors");
 
       ArrayList<String> interceptorList = new ArrayList<String>();

Modified: trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/ManagementService.java	2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/core/management/ManagementService.java	2008-11-19 16:06:47 UTC (rev 5391)
@@ -22,9 +22,9 @@
 
 package org.jboss.messaging.core.management;
 
-import javax.management.NotificationBroadcaster;
 import javax.management.ObjectName;
 
+import org.jboss.messaging.core.client.management.impl.ManagementHelper;
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.messagecounter.MessageCounterManager;
 import org.jboss.messaging.core.persistence.StorageManager;
@@ -36,6 +36,7 @@
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.TypedProperties;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -43,7 +44,7 @@
  * @version <tt>$Revision$</tt>
  * 
  */
-public interface ManagementService extends NotificationBroadcaster
+public interface ManagementService
 {
    MessageCounterManager getMessageCounterManager();
 
@@ -72,5 +73,19 @@
 
    void handleMessage(ServerMessage message);
 
+   void sendNotification(NotificationType type, String message) throws Exception;
 
+   /** 
+    * the message corresponding to a notification will always contain the properties:
+    * <ul>
+    *   <li><code>ManagementHelper.HDR_NOTIFICATION_TYPE</code> - the type of notification (SimpleString)</li>
+    *   <li><code>ManagementHelper.HDR_NOTIFICATION_MESSAGE</code> - a message contextual to the notification (SimpleString)</li>
+    *   <li><code>ManagementHelper.HDR_NOTIFICATION_TIMESTAMP</code> - the timestamp when the notification occured (long)</li>
+    * </ul>
+    * 
+    * in addition to the properties defined in <code>props</code>
+    * 
+    * @see ManagementHelper
+    */
+   void sendNotification(NotificationType type, String message, TypedProperties props) throws Exception;
 }

Added: trunk/src/main/org/jboss/messaging/core/management/NotificationType.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/NotificationType.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/management/NotificationType.java	2008-11-19 16:06:47 UTC (rev 5391)
@@ -0,0 +1,35 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.management;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ * 
+ */
+public enum NotificationType
+{
+   QUEUE_CREATED, QUEUE_DESTROYED, ADDRESS_ADDED, ADDRESS_REMOVED;
+}
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2008-11-19 16:06:47 UTC (rev 5391)
@@ -28,6 +28,7 @@
 
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -36,13 +37,8 @@
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
-import javax.management.ListenerNotFoundException;
-import javax.management.MBeanNotificationInfo;
 import javax.management.MBeanServer;
-import javax.management.Notification;
 import javax.management.NotificationBroadcasterSupport;
-import javax.management.NotificationFilter;
-import javax.management.NotificationListener;
 import javax.management.ObjectName;
 
 import org.jboss.messaging.core.client.management.impl.ManagementHelper;
@@ -51,21 +47,25 @@
 import org.jboss.messaging.core.management.AddressControlMBean;
 import org.jboss.messaging.core.management.ManagementService;
 import org.jboss.messaging.core.management.MessagingServerControlMBean;
+import org.jboss.messaging.core.management.NotificationType;
 import org.jboss.messaging.core.management.QueueControlMBean;
-import org.jboss.messaging.core.management.impl.MessagingServerControl.NotificationType;
 import org.jboss.messaging.core.messagecounter.MessageCounter;
 import org.jboss.messaging.core.messagecounter.MessageCounterManager;
 import org.jboss.messaging.core.messagecounter.impl.MessageCounterManagerImpl;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.security.Role;
+import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.TypedProperties;
 
 /*
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -90,10 +90,10 @@
 
    private final NotificationBroadcasterSupport broadcaster;
 
-   private AtomicLong notifSeq = new AtomicLong(0);
-
    private PostOffice postOffice;
 
+   private StorageManager storageManager;
+
    private HierarchicalRepository<Set<Role>> securityRepository;
 
    private HierarchicalRepository<QueueSettings> queueSettingsRepository;
@@ -102,6 +102,9 @@
 
    private final MessageCounterManager messageCounterManager = new MessageCounterManagerImpl(10000);
 
+   private SimpleString managementNotificationAddress;
+
+
    // Static --------------------------------------------------------
 
    public static ObjectName getMessagingServerObjectName() throws Exception
@@ -137,7 +140,6 @@
       this.jmxManagementEnabled = jmxManagementEnabled;
       registry = new HashMap<ObjectName, Object>();
       broadcaster = new NotificationBroadcasterSupport();
-      notifSeq = new AtomicLong(0);
    }
 
    // Public --------------------------------------------------------
@@ -158,7 +160,8 @@
    {
       this.postOffice = postOffice;
       this.queueSettingsRepository = queueSettingsRepository;
-      
+      this.storageManager = storageManager;
+      this.managementNotificationAddress = configuration.getManagementNotificationAddress();
       managedServer = new MessagingServerControl(postOffice,
                                                  storageManager,
                                                  configuration,                                                
@@ -303,25 +306,6 @@
       return registry.get(objectName);
    }
 
-   // NotificatioBroadcaster implementation -----------------------------------
-
-   public void addNotificationListener(final NotificationListener listener,
-                                       final NotificationFilter filter,
-                                       final Object handback) throws IllegalArgumentException
-   {
-      broadcaster.addNotificationListener(listener, filter, handback);
-   }
-
-   public MBeanNotificationInfo[] getNotificationInfo()
-   {
-      return broadcaster.getNotificationInfo();
-   }
-
-   public void removeNotificationListener(final NotificationListener listener) throws ListenerNotFoundException
-   {
-      broadcaster.removeNotificationListener(listener);
-   }
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -370,15 +354,40 @@
       }
    }
 
-   private void sendNotification(final MessagingServerControl.NotificationType type, final String message) throws Exception
+   public void sendNotification(final NotificationType type, final String message) throws Exception
    {
+     sendNotification(type, message, null);
+   }
+   
+   public void sendNotification(final NotificationType type, final String message, TypedProperties props) throws Exception
+   {
       if (managedServer != null)
       {
-         Notification notif = new Notification(type.toString(),
-                                               getMessagingServerObjectName(),
-                                               notifSeq.incrementAndGet(),
-                                               message);
-         broadcaster.sendNotification(notif);
+         ServerMessage notificationMessage = new ServerMessageImpl(storageManager.generateUniqueID());
+         notificationMessage.setDestination(managementNotificationAddress);
+         notificationMessage.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
+         
+         TypedProperties notifProps;
+         if (props != null)
+         {
+            notifProps = props;
+         } else
+         {
+            notifProps = new TypedProperties();
+         }
+         
+         notifProps.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(type.toString()));
+         notifProps.putStringProperty(ManagementHelper.HDR_NOTIFICATION_MESSAGE, new SimpleString(message)); 
+         notifProps.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis()); 
+         
+         notificationMessage.putTypedProperties(notifProps);
+
+         List<MessageReference> refs = postOffice.route(notificationMessage);
+         
+         for (MessageReference ref : refs)
+         {
+            ref.getQueue().addLast(ref);
+         }
       }
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2008-11-19 16:06:47 UTC (rev 5391)
@@ -48,6 +48,7 @@
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.filter.impl.FilterImpl;
 import org.jboss.messaging.core.management.MessagingServerControlMBean;
+import org.jboss.messaging.core.management.NotificationType;
 import org.jboss.messaging.core.messagecounter.MessageCounterManager;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
@@ -625,11 +626,4 @@
 
       messageCounterManager.resetAllCounterHistories();
    }
-
-   // Inner classes -------------------------------------------------
-
-   public static enum NotificationType
-   {
-      QUEUE_CREATED, QUEUE_DESTROYED, ADDRESS_ADDED, ADDRESS_REMOVED;
-   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/message/Message.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/Message.java	2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/core/message/Message.java	2008-11-19 16:06:47 UTC (rev 5391)
@@ -26,6 +26,7 @@
 
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.TypedProperties;
 
 /**
  * A message is a routable instance that has a payload.
@@ -114,7 +115,8 @@
    
    void putStringProperty(SimpleString key, SimpleString value);
    
-   
+   void putTypedProperties(TypedProperties properties);
+
    // TODO - should have typed property getters and do conversions herein
    
    Object getProperty(SimpleString key);

Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-11-19 16:06:47 UTC (rev 5391)
@@ -347,6 +347,11 @@
       properties.putStringProperty(key, value);
    }
 
+   public void putTypedProperties(TypedProperties otherProps)
+   {
+      properties.putTypedProperties(otherProps);
+   }
+
    public Object getProperty(final SimpleString key)
    {
       return properties.getProperty(key);

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-11-19 16:06:47 UTC (rev 5391)
@@ -247,6 +247,11 @@
       postOffice.start();
       resourceManager.start();
 
+      if (!postOffice.containsDestination(configuration.getManagementNotificationAddress()))
+      {
+         postOffice.addDestination(configuration.getManagementNotificationAddress(), true);
+      }
+
       TransportConfiguration backupConnector = configuration.getBackupConnectorConfiguration();
 
       if (backupConnector != null)

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-11-19 16:06:47 UTC (rev 5391)
@@ -23,8 +23,6 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
-import javax.management.Notification;
-import javax.management.NotificationListener;
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
@@ -106,7 +104,7 @@
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  */
-public class ServerSessionImpl implements ServerSession, FailureListener, NotificationListener
+public class ServerSessionImpl implements ServerSession, FailureListener
 {
    // Constants -----------------------------------------------------------------------------
 
@@ -2317,39 +2315,11 @@
    {
       doSecurity(message);
 
-      if (message.containsProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS))
-      {
-         boolean subscribe = (Boolean)message.getProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS);
+      managementService.handleMessage(message);
 
-         final SimpleString replyTo = (SimpleString)message.getProperty(ManagementHelper.HDR_JMX_REPLYTO);
+      message.setDestination((SimpleString)message.getProperty(ManagementHelper.HDR_JMX_REPLYTO));
 
-         if (subscribe)
-         {
-            if (log.isDebugEnabled())
-            {
-               log.debug("added notification listener " + this);
-            }
-
-            managementService.addNotificationListener(this, null, replyTo);
-         }
-         else
-         {
-            if (log.isDebugEnabled())
-            {
-               log.debug("removed notification listener " + this);
-            }
-
-            managementService.removeNotificationListener(this);
-         }
-      }
-      else
-      {
-         managementService.handleMessage(message);
-
-         message.setDestination((SimpleString)message.getProperty(ManagementHelper.HDR_JMX_REPLYTO));
-
-         send(message);
-      }
+      send(message);
    }
 
    public void handleReplicatedDelivery(final SessionReplicateDeliveryMessage packet)
@@ -2429,25 +2399,6 @@
       }
    }
 
-   // NotificationListener implementation -------------------------------------
-
-   public void handleNotification(final Notification notification, final Object replyTo)
-   {
-      // FIXME this won't work with replication
-      ServerMessage notificationMessage = new ServerMessageImpl(storageManager.generateUniqueID());
-      notificationMessage.setDestination((SimpleString)replyTo);
-      notificationMessage.setBody(new ByteBufferWrapper(ByteBuffer.allocate(2048)));
-      ManagementHelper.storeNotification(notificationMessage, notification);
-      try
-      {
-         send(notificationMessage);
-      }
-      catch (Exception e)
-      {
-         log.warn("problem while sending a notification message " + notification, e);
-      }
-   }
-
    // Public
    // ----------------------------------------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/util/TypedProperties.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/TypedProperties.java	2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/src/main/org/jboss/messaging/util/TypedProperties.java	2008-11-19 16:06:47 UTC (rev 5391)
@@ -47,6 +47,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.Map.Entry;
 
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
@@ -139,6 +140,21 @@
       doPutValue(key, new CharValue(value));
    }
 
+   public void putTypedProperties(final TypedProperties otherProps)
+   {
+      if (otherProps == null || otherProps.properties == null)
+      {
+         return;
+      }
+      
+      checkCreateProperties();
+      Set<Entry<SimpleString,PropertyValue>> otherEntries = otherProps.properties.entrySet();
+      for (Entry<SimpleString, PropertyValue> otherEntry : otherEntries)
+      {
+         doPutValue(otherEntry.getKey(), otherEntry.getValue());
+      }
+   }
+   
    public Object getProperty(final SimpleString key)
    {
       return doGetProperty(key);

Added: trunk/tests/src/org/jboss/messaging/tests/integration/management/NotificationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/management/NotificationTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/management/NotificationTest.java	2008-11-19 16:06:47 UTC (rev 5391)
@@ -0,0 +1,205 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.management;
+
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS;
+import static org.jboss.messaging.tests.util.RandomUtil.randomSimpleString;
+
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.management.impl.ManagementHelper;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.management.NotificationType;
+import org.jboss.messaging.core.management.impl.MessagingServerControl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.tests.util.RandomUtil;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A NotificationTest
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ */
+public class NotificationTest extends TestCase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private MessagingService service;
+
+   // Static --------------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testNotification() throws Exception
+   {
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+      
+      ClientSession session = sf.createSession(false, true, true);
+
+      // create a queue to receive the management notifications
+      SimpleString notifQueue = randomSimpleString();
+      session.createQueue(DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS, notifQueue, null, false, true, true);
+      ClientConsumer notifConsumer = session.createConsumer(notifQueue);
+      session.start();
+      
+      // generate notifications
+      SimpleString destinationName = randomSimpleString();
+      session.addDestination(destinationName, false, true);
+      session.removeDestination(destinationName, false);
+
+      // we've generated at least 2 notifications
+      // but there is more in the queue (e.g. the notification when the notifQueue was created)      
+      ClientMessage notifMessage = notifConsumer.receive(500);
+      assertNotNull(notifMessage);
+      Set<SimpleString> propertyNames = notifMessage.getPropertyNames();
+
+      for (SimpleString key : propertyNames)
+      {
+         System.out.println(key + "=" + notifMessage.getProperty(key));
+      }
+
+      notifMessage.acknowledge();
+      
+
+      notifMessage = notifConsumer.receive(500);
+      assertNotNull(notifMessage);
+      propertyNames = notifMessage.getPropertyNames();
+      for (SimpleString key : propertyNames)
+      {
+         System.out.println(key + "=" + notifMessage.getProperty(key));
+      }
+      notifMessage.acknowledge();
+      
+      notifConsumer.close();
+      session.deleteQueue(notifQueue);
+      session.close();
+   }
+   
+   public void testNotificationWithFilter() throws Exception
+   {
+      SimpleString destinationName = randomSimpleString();
+      SimpleString unmatchedDestinationName = new SimpleString("this.destination.does.not.match.the.filter");
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+      
+      ClientSession session = sf.createSession(false, true, true);
+
+      // create a queue to receive the management notifications only concerning the destination
+      SimpleString notifQueue = randomSimpleString();
+      SimpleString filter = new SimpleString(ManagementHelper.HDR_NOTIFICATION_MESSAGE + " LIKE '%" + destinationName + "%'" );
+      System.out.println(filter);
+      session.createQueue(DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS, notifQueue, filter, false, true, true);
+      ClientConsumer notifConsumer = session.createConsumer(notifQueue);
+      session.start();
+
+      // generate notifications that do NOT match the filter
+      session.addDestination(unmatchedDestinationName, false, true);
+      session.removeDestination(unmatchedDestinationName, false);
+      
+      assertNull(notifConsumer.receive(500));
+      
+      // generate notifications that match the filter
+      session.addDestination(destinationName, false, true);
+      session.removeDestination(destinationName, false);
+
+      ClientMessage notifMessage = notifConsumer.receive(500);
+      assertNotNull(notifMessage);
+      assertEquals(NotificationType.ADDRESS_ADDED.toString(), notifMessage.getProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
+      Set<SimpleString> propertyNames = notifMessage.getPropertyNames();
+
+      for (SimpleString key : propertyNames)
+      {
+         System.out.println(key + "=" + notifMessage.getProperty(key));
+      }
+
+      notifMessage.acknowledge();
+      
+
+      notifMessage = notifConsumer.receive(500);
+      assertNotNull(notifMessage);
+      assertEquals(NotificationType.ADDRESS_REMOVED.toString(), notifMessage.getProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
+      propertyNames = notifMessage.getPropertyNames();
+      for (SimpleString key : propertyNames)
+      {
+         System.out.println(key + "=" + notifMessage.getProperty(key));
+      }
+      notifMessage.acknowledge();
+      
+      // no other notifications matching the filter
+      assertNull(notifConsumer.receive(500));
+      
+      notifConsumer.close();
+      session.deleteQueue(notifQueue);
+      session.close();
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+
+      Configuration conf = new ConfigurationImpl();
+      conf.setSecurityEnabled(false);
+      // the notifications are independent of JMX
+      conf.setJMXManagementEnabled(false);
+      conf.getAcceptorConfigurations()
+          .add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+      service = MessagingServiceImpl.newNullStorageMessagingServer(conf);
+      service.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      service.stop();
+
+      super.tearDown();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java	2008-11-19 13:51:57 UTC (rev 5390)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java	2008-11-19 16:06:47 UTC (rev 5391)
@@ -55,6 +55,7 @@
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.ResourceManager;
+import org.jboss.messaging.tests.util.RandomUtil;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -85,6 +86,7 @@
       PostOffice postOffice = createMock(PostOffice.class);
       StorageManager storageManager = createMock(StorageManager.class);
       Configuration configuration = createMock(Configuration.class);
+      expect(configuration.getManagementNotificationAddress()).andReturn(randomSimpleString());
       expect(configuration.isMessageCounterEnabled()).andReturn(false);
       HierarchicalRepository<Set<Role>> securityRepository = createMock(HierarchicalRepository.class);
       HierarchicalRepository<QueueSettings> queueSettingsRepository = createMock(HierarchicalRepository.class);
@@ -114,6 +116,7 @@
       PostOffice postOffice = createMock(PostOffice.class);
       StorageManager storageManager = createMock(StorageManager.class);
       Configuration configuration = createMock(Configuration.class);
+      expect(configuration.getManagementNotificationAddress()).andReturn(randomSimpleString());
       expect(configuration.isMessageCounterEnabled()).andReturn(false);
       HierarchicalRepository<Set<Role>> securityRepository = createMock(HierarchicalRepository.class);
       HierarchicalRepository<QueueSettings> queueSettingsRepository = createMock(HierarchicalRepository.class);




More information about the jboss-cvs-commits mailing list