[jboss-cvs] JBoss Messaging SVN: r5461 - in trunk: src/main/org/jboss/messaging/core/client/management/impl and 14 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Dec 4 10:28:56 EST 2008


Author: jmesnil
Date: 2008-12-04 10:28:55 -0500 (Thu, 04 Dec 2008)
New Revision: 5461

Added:
   trunk/src/main/org/jboss/messaging/core/management/jmx/
   trunk/src/main/org/jboss/messaging/core/management/jmx/impl/
   trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareAddressControlWrapper.java
   trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java
   trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java
   trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareStandardMBeanWrapper.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/management/
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/management/ReplicationAwareAddressControlWrapperTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/management/ReplicationAwareQueueControlWrapperTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/management/ReplicationAwareTestBase.java
Modified:
   trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java
   trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java
   trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
   trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
   trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java
   trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
   trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
   trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
   trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
   trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverManagementTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/management/ManagementServiceImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java
Log:
refacted management code to support replication

JMX MBeans are now wrappers which use JBM core messages to invoke the management operations.
This way, management operations are properly invoked on both the live and backup nodes so that their state is correctly replicated.
Management attributes and operations w/o side-effect are called directly on the wrapped resource control.

Modified: trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java
===================================================================
--- trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java	2008-12-04 11:24:01 UTC (rev 5460)
+++ trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -29,6 +29,7 @@
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientRequestor;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.client.MessageHandler;
@@ -60,43 +61,6 @@
       clientSession.addDestination(replytoQueue, false, true);
       clientSession.createQueue(replytoQueue, replytoQueue, null, false, true, true);
 
-      ClientConsumer mngmntConsumer = clientSession.createConsumer(replytoQueue);
-      mngmntConsumer.setMessageHandler(new MessageHandler()
-      {
-
-         public void onMessage(final ClientMessage message)
-         {
-            System.out.println("received management message");
-            if (ManagementHelper.isOperationResult(message))
-            {
-               System.out.println("\toperation succeeded:" + ManagementHelper.hasOperationSucceeded(message));
-               if (ManagementHelper.hasOperationSucceeded(message))
-               {
-                  System.out.println("\t- result=" + message.getProperty(new SimpleString("sendMessageToDeadLetterAddress")));
-               }
-               else
-               {
-                  System.out.println("\t- exception=" + ManagementHelper.getOperationExceptionMessage(message));
-               }
-            }
-            else if (ManagementHelper.isAttributesResult(message))
-            {
-               System.out.println("\tattributes:");
-               System.out.println("\t- MessageCount=" + message.getProperty(new SimpleString("MessageCount")));
-               System.out.println("\t- Durable=" + message.getProperty(new SimpleString("Durable")));
-            }
-            try
-            {
-               message.acknowledge();
-            }
-            catch (MessagingException e)
-            {
-               e.printStackTrace();
-            }
-         }
-
-      });
-
       SimpleString notifQueue = new SimpleString("notifQueue");
       clientSession.createQueue(DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS, notifQueue, null, false, true, true);
       ClientConsumer notifConsumer = clientSession.createConsumer(notifQueue);
@@ -130,45 +94,71 @@
       clientSession.addDestination(new SimpleString("anotherQueue"), false, true);
       clientSession.removeDestination(new SimpleString("anotherQueue"), false);
 
-      ClientProducer producer = clientSession.createProducer(replytoQueue);
+      ClientRequestor requestor = new ClientRequestor(clientSession, DEFAULT_MANAGEMENT_ADDRESS);
 
       // to set a new value for an attribute, invoke the corresponding setter
       // method
       ClientMessage mngmntMessage = clientSession.createClientMessage(false);
       ManagementHelper.putOperationInvocation(mngmntMessage,
-                                              replytoQueue,
                                               ManagementServiceImpl.getMessagingServerObjectName(),
                                               "setMessageCounterSamplePeriod",
                                               (long)30000);
-      producer.send(DEFAULT_MANAGEMENT_ADDRESS, mngmntMessage);
+      ClientMessage reply = requestor.request(mngmntMessage);
       System.out.println("sent management message to set an attribute");
+      if (reply != null)
+      {
+         if (ManagementHelper.isOperationResult(reply))
+         {
+            System.out.println("\toperation succeeded:" + ManagementHelper.hasOperationSucceeded(reply));
+            if (!ManagementHelper.hasOperationSucceeded(reply))
+            {
+               System.out.println("\t- exception=" + ManagementHelper.getOperationExceptionMessage(reply));
+            }
+         }
+      }
 
       // create a message to retrieve one or many attributes
       mngmntMessage = clientSession.createClientMessage(false);
       ManagementHelper.putAttributes(mngmntMessage,
-                                     replytoQueue,
                                      ManagementServiceImpl.getQueueObjectName(queue, queue),
                                      "MessageCount",
                                      "Durable");
-
-      producer.send(DEFAULT_MANAGEMENT_ADDRESS, mngmntMessage);
+      reply = requestor.request(mngmntMessage);
       System.out.println("sent management message to retrieve attributes");
+      if (reply != null)
+      {
+         System.out.println("\tattributes:");
+         System.out.println("\t- MessageCount=" + reply.getProperty(new SimpleString("MessageCount")));
+         System.out.println("\t- Durable=" + reply.getProperty(new SimpleString("Durable")));
+      }
 
       // create a message to invoke the operation sendMessageToDeadLetterAddress(long) on the
       // queue
       mngmntMessage = clientSession.createClientMessage(false);
       ManagementHelper.putOperationInvocation(mngmntMessage,
-                                              replytoQueue,
                                               ManagementServiceImpl.getQueueObjectName(queue, queue),
-                                              "sendMessageToDeadLetterAddress",
+                                              "sendMessageToDLQ",
                                               (long)6161);
-      producer.send(DEFAULT_MANAGEMENT_ADDRESS, mngmntMessage);
-      System.out.println("sent management message to invoke operation");
+      reply = requestor.request(mngmntMessage);
+      System.out.println("sent management message to retrieve attributes");
+      if (reply != null)
+      {
+         if (ManagementHelper.isOperationResult(reply))
+         {
+            System.out.println("\toperation succeeded:" + ManagementHelper.hasOperationSucceeded(reply));
+            if (ManagementHelper.hasOperationSucceeded(reply))
+            {
+               System.out.println("\t- result=" + reply.getProperty(new SimpleString("sendMessageToDLQ")));
+            }
+            else
+            {
+               System.out.println("\t- exception=" + ManagementHelper.getOperationExceptionMessage(reply));
+            }
+         }
+      }
 
       Thread.sleep(5000);
 
-      mngmntConsumer.close();
-
       consumeMessages(clientSession, queue);
 
       notifConsumer.close();

Modified: trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java	2008-12-04 11:24:01 UTC (rev 5460)
+++ trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -46,8 +46,6 @@
 
    public static final SimpleString HDR_JMX_OBJECTNAME = new SimpleString("JBMJMXObjectName");
 
-   public static final SimpleString HDR_JMX_REPLYTO = new SimpleString("JBMJMXReplyTo");
-
    public static final SimpleString HDR_JMX_ATTRIBUTE_PREFIX = new SimpleString("JBMJMXAttribute.");
 
    public static final SimpleString HDR_JMX_OPERATION_PREFIX = new SimpleString("JBMJMXOperation.");
@@ -69,12 +67,10 @@
    // Static --------------------------------------------------------
 
    public static void putAttributes(final Message message,
-                                    final SimpleString replyTo,
                                     final ObjectName objectName,
                                     final String... attributes)
    {
       message.putStringProperty(HDR_JMX_OBJECTNAME, new SimpleString(objectName.toString()));
-      message.putStringProperty(HDR_JMX_REPLYTO, replyTo);
       for (int i = 0; i < attributes.length; i++)
       {
          message.putStringProperty(new SimpleString(HDR_JMX_ATTRIBUTE_PREFIX + Integer.toString(i)),
@@ -83,14 +79,12 @@
    }
 
    public static void putOperationInvocation(final Message message,
-                                             final SimpleString replyTo,
                                              final ObjectName objectName,
                                              final String operationName,
                                              final Object... parameters)
    {
       // store the name of the operation...
       message.putStringProperty(HDR_JMX_OBJECTNAME, new SimpleString(objectName.toString()));
-      message.putStringProperty(HDR_JMX_REPLYTO, replyTo);
       message.putStringProperty(HDR_JMX_OPERATION_NAME, new SimpleString(operationName));
       // ... and all the parameters (preserving their types)
       for (int i = 0; i < parameters.length; i++)
@@ -120,6 +114,16 @@
       }
       throw new IllegalArgumentException(key + " property is not a valid TabularData");
    }
+   
+   public static CompositeData getCompositeDataProperty(final Message message, final String key)
+   {
+      Object object = message.getProperty(new SimpleString(key));
+      if (object instanceof byte[])
+      {
+         return (CompositeData)from((byte[])object);
+      }
+      throw new IllegalArgumentException(key + " property is not a valid CompositeData");
+   }
 
    public static boolean hasOperationSucceeded(final Message message)
    {
@@ -192,17 +196,7 @@
       }
    }
 
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   private static Object from(final byte[] bytes)
+   public static Object from(final byte[] bytes)
    {
       ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
       ObjectInputStream ois;
@@ -217,6 +211,16 @@
       }
    }
 
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
    private static void storePropertyAsBytes(final Message message, final SimpleString key, final Object property)
    {
       ByteArrayOutputStream baos = new ByteArrayOutputStream();

Modified: trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/ManagementService.java	2008-12-04 11:24:01 UTC (rev 5460)
+++ trunk/src/main/org/jboss/messaging/core/management/ManagementService.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -22,6 +22,8 @@
 
 package org.jboss.messaging.core.management;
 
+import java.util.Set;
+
 import javax.management.ObjectName;
 
 import org.jboss.messaging.core.client.management.impl.ManagementHelper;
@@ -30,6 +32,7 @@
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.remoting.RemotingService;
+import org.jboss.messaging.core.security.Role;
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerMessage;
@@ -53,6 +56,7 @@
                                               StorageManager storageManager,
                                               Configuration configuration,                                            
                                               HierarchicalRepository<QueueSettings> queueSettingsRepository,
+                                              HierarchicalRepository<Set<Role>> securityRepository,
                                               ResourceManager resourceManager,
                                               RemotingService remotingService,
                                               MessagingServer messagingServer) throws Exception;

Modified: trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java	2008-12-04 11:24:01 UTC (rev 5460)
+++ trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -85,11 +85,11 @@
 
    int getMessageCounterMaxDayCount();
 
-   void setMessageCounterMaxDayCount(int count);
+   void setMessageCounterMaxDayCount(int count) throws Exception;
 
    long getMessageCounterSamplePeriod();
 
-   void setMessageCounterSamplePeriod(long newPeriod);
+   void setMessageCounterSamplePeriod(long newPeriod) throws Exception;
    
    public boolean isBackup();
 
@@ -134,13 +134,13 @@
          @Parameter(name = "address", desc = "The address to remove") String address)
          throws Exception;
 
-   void enableMessageCounters();
+   void enableMessageCounters() throws Exception;
 
-   void disableMessageCounters();
+   void disableMessageCounters() throws Exception;
 
-   void resetAllMessageCounters();
+   void resetAllMessageCounters() throws Exception;
 
-   void resetAllMessageCounterHistories();
+   void resetAllMessageCounterHistories() throws Exception;
    
    @Operation(desc = "List all the prepared transaction, sorted by date, oldest first")
    public String[] listPreparedTransactions();
@@ -158,7 +158,7 @@
    String[] listRemoteAddresses(@Parameter(desc = "an IP address", name = "ipAddress") String ipAddress);
 
    @Operation(desc = "Closes all the connections for the given IP Address", impact = INFO)
-   boolean closeConnectionsForAddress(@Parameter(desc = "an IP address", name = "ipAddress") String ipAddress);
+   boolean closeConnectionsForAddress(@Parameter(desc = "an IP address", name = "ipAddress") String ipAddress) throws Exception;
 
    @Operation(desc = "List all the connection IDs", impact = INFO)
    String[] listConnectionIDs();

Modified: trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java	2008-12-04 11:24:01 UTC (rev 5460)
+++ trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -51,7 +51,7 @@
    String getFilter();
 
    long getSizeBytes();
-
+   
    int getMessageCount();
 
    long getScheduledCount();
@@ -121,8 +121,8 @@
          @Parameter(name = "otherQueueName", desc = "The name of the queue to move the messages to") String otherQueueName)
          throws Exception;
 
-   @Operation(desc = "Send the message corresponding to the given messageID to this queue's Dead Letter Queue", impact = ACTION)
-   boolean sendMessageToDLQ(
+   @Operation(desc = "Send the message corresponding to the given messageID to this queue's Dead Letter Address", impact = ACTION)
+   boolean sendMessageToDeadLetterAddress(
          @Parameter(name = "messageID", desc = "A message ID") long messageID)
          throws Exception;
    
@@ -132,11 +132,11 @@
          @Parameter(name = "newPriority", desc = "the new priority (between 0 and 9)") int newPriority)
          throws Exception;
 
-   CompositeData listMessageCounter();
+   CompositeData listMessageCounter() throws Exception;
 
-   String listMessageCounterAsHTML();
+   String listMessageCounterAsHTML() throws Exception;
 
    TabularData listMessageCounterHistory() throws Exception;
 
-   String listMessageCounterHistoryAsHTML();
+   String listMessageCounterHistoryAsHTML() throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2008-12-04 11:24:01 UTC (rev 5460)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -39,6 +39,7 @@
 import javax.management.MBeanServer;
 import javax.management.NotificationBroadcasterSupport;
 import javax.management.ObjectName;
+import javax.management.StandardMBean;
 
 import org.jboss.messaging.core.client.management.impl.ManagementHelper;
 import org.jboss.messaging.core.config.Configuration;
@@ -48,6 +49,9 @@
 import org.jboss.messaging.core.management.MessagingServerControlMBean;
 import org.jboss.messaging.core.management.NotificationType;
 import org.jboss.messaging.core.management.QueueControlMBean;
+import org.jboss.messaging.core.management.jmx.impl.ReplicationAwareAddressControlWrapper;
+import org.jboss.messaging.core.management.jmx.impl.ReplicationAwareMessagingServerControlWrapper;
+import org.jboss.messaging.core.management.jmx.impl.ReplicationAwareQueueControlWrapper;
 import org.jboss.messaging.core.messagecounter.MessageCounter;
 import org.jboss.messaging.core.messagecounter.MessageCounterManager;
 import org.jboss.messaging.core.messagecounter.impl.MessageCounterManagerImpl;
@@ -98,7 +102,7 @@
 
    private HierarchicalRepository<QueueSettings> queueSettingsRepository;
 
-   private MessagingServerControlMBean managedServer;
+   private MessagingServerControl managedServer;
 
    private final MessageCounterManager messageCounterManager = new MessageCounterManagerImpl(10000);
 
@@ -155,12 +159,14 @@
                                                      final StorageManager storageManager,
                                                      final Configuration configuration,                                     
                                                      final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+                                                     final HierarchicalRepository<Set<Role>> securityRepository,
                                                      final ResourceManager resourceManager,
                                                      final RemotingService remotingService,
                                                      final MessagingServer messagingServer) throws Exception
    {
       this.postOffice = postOffice;
       this.queueSettingsRepository = queueSettingsRepository;
+      this.securityRepository = securityRepository;
       this.storageManager = storageManager;
       this.managementNotificationAddress = configuration.getManagementNotificationAddress();
       managedServer = new MessagingServerControl(postOffice,
@@ -173,7 +179,8 @@
                                                  messageCounterManager,
                                                  broadcaster);
       ObjectName objectName = getMessagingServerObjectName();
-      registerResource(objectName, managedServer);
+      registerInJMX(objectName, new StandardMBean(new ReplicationAwareMessagingServerControlWrapper(objectName, managedServer), MessagingServerControlMBean.class));
+      registerInRegistry(objectName, managedServer);
 
       return managedServer;
    }
@@ -187,8 +194,10 @@
    public void registerAddress(final SimpleString address) throws Exception
    {
       ObjectName objectName = getAddressObjectName(address);
-      AddressControlMBean addressControl = new AddressControl(address, postOffice, securityRepository);
-      registerResource(objectName, addressControl);
+      AddressControl addressControl = new AddressControl(address, postOffice, securityRepository);
+      
+      registerInJMX(objectName, new StandardMBean(new ReplicationAwareAddressControlWrapper(objectName, addressControl), AddressControlMBean.class));
+      registerInRegistry(objectName, addressControl);
       if (log.isDebugEnabled())
       {
          log.debug("registered address " + objectName);
@@ -213,12 +222,13 @@
                                                   messageCounterManager.getMaxDayCount());
       messageCounterManager.registerMessageCounter(queue.getName().toString(), counter);
       ObjectName objectName = getQueueObjectName(address, queue.getName());
-      QueueControlMBean queueControl = new QueueControl(queue,
+      QueueControl queueControl = new QueueControl(queue,
                                                         storageManager,
                                                         postOffice,
                                                         queueSettingsRepository,
-                                                        counter);
-      registerResource(objectName, queueControl);
+                                                        counter);      
+      registerInJMX(objectName, new StandardMBean(new ReplicationAwareQueueControlWrapper(objectName, queueControl), QueueControlMBean.class));
+      registerInRegistry(objectName, queueControl);
 
       if (log.isDebugEnabled())
       {

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2008-12-04 11:24:01 UTC (rev 5460)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -34,13 +34,11 @@
 import java.util.Map.Entry;
 
 import javax.management.ListenerNotFoundException;
-import javax.management.MBeanInfo;
 import javax.management.MBeanNotificationInfo;
 import javax.management.NotificationBroadcasterSupport;
 import javax.management.NotificationEmitter;
 import javax.management.NotificationFilter;
 import javax.management.NotificationListener;
-import javax.management.StandardMBean;
 import javax.transaction.xa.Xid;
 
 import org.jboss.messaging.core.config.Configuration;
@@ -74,7 +72,7 @@
  * @version <tt>$Revision$</tt>
  * 
  */
-public class MessagingServerControl extends StandardMBean implements MessagingServerControlMBean, NotificationEmitter
+public class MessagingServerControl implements MessagingServerControlMBean, NotificationEmitter
 {
    // Constants -----------------------------------------------------
 
@@ -134,7 +132,6 @@
                                  final MessageCounterManager messageCounterManager,
                                  final NotificationBroadcasterSupport broadcaster) throws Exception
    {
-      super(MessagingServerControlMBean.class);
       this.postOffice = postOffice;
       this.storageManager = storageManager;
       this.configuration = configuration;
@@ -236,20 +233,6 @@
       return 0;
    }
 
-   // StandardMBean overrides ---------------------------------------
-
-   @Override
-   public MBeanInfo getMBeanInfo()
-   {
-      MBeanInfo info = super.getMBeanInfo();
-      return new MBeanInfo(info.getClassName(),
-                           info.getDescription(),
-                           info.getAttributes(),
-                           info.getConstructors(),
-                           MBeanInfoHelper.getMBeanOperationsInfo(MessagingServerControlMBean.class),
-                           getNotificationInfo());
-   }
-
    // MessagingServerControlMBean implementation --------------------
 
    //FIXME

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java	2008-12-04 11:24:01 UTC (rev 5460)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -58,7 +58,7 @@
  * @version <tt>$Revision$</tt>
  * 
  */
-public class QueueControl extends StandardMBean implements QueueControlMBean
+public class QueueControl implements QueueControlMBean
 {
 
    // Constants -----------------------------------------------------
@@ -80,7 +80,6 @@
          final HierarchicalRepository<QueueSettings> queueSettingsRepository,
          final MessageCounter counter) throws NotCompliantMBeanException
    {
-      super(QueueControlMBean.class);
       this.queue = queue;
       this.storageManager = storageManager;
       this.postOffice = postOffice;
@@ -306,7 +305,7 @@
       return moveMatchingMessages(null, otherQueueName);
    }
 
-   public boolean sendMessageToDLQ(final long messageID) throws Exception
+   public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception
    {
       return queue.sendMessageToDeadLetterAddress(messageID, storageManager, postOffice,
             queueSettingsRepository);
@@ -361,19 +360,7 @@
    {
       return MessageCounterHelper.listMessageCounterHistoryAsHTML(new MessageCounter[] { counter });
    }
-   
-   // StandardMBean overrides ---------------------------------------
 
-   @Override
-   public MBeanInfo getMBeanInfo()
-   {
-      MBeanInfo info = super.getMBeanInfo();
-      return new MBeanInfo(info.getClassName(), info.getDescription(), info
-            .getAttributes(), info.getConstructors(), MBeanInfoHelper
-            .getMBeanOperationsInfo(QueueControlMBean.class), info
-            .getNotifications());
-   }
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Added: trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareAddressControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareAddressControlWrapper.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareAddressControlWrapper.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -0,0 +1,116 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.management.jmx.impl;
+
+import javax.management.MBeanInfo;
+import javax.management.ObjectName;
+import javax.management.openmbean.TabularData;
+
+import org.jboss.messaging.core.management.AddressControlMBean;
+import org.jboss.messaging.core.management.RoleInfo;
+import org.jboss.messaging.core.management.impl.AddressControl;
+import org.jboss.messaging.core.management.impl.MBeanInfoHelper;
+
+/**
+ * A ReplicationAwareAddressControlWrapper
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ */
+public class ReplicationAwareAddressControlWrapper extends ReplicationAwareStandardMBeanWrapper implements AddressControlMBean
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final AddressControl localAddressControl;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ReplicationAwareAddressControlWrapper(final ObjectName objectName, final AddressControl localAddressControl) throws Exception
+   {
+      super(objectName, AddressControlMBean.class);
+
+      this.localAddressControl = localAddressControl;
+   }
+
+   // AddressControlMBean implementation ------------------------------
+
+   public String getAddress()
+   {
+      return localAddressControl.getAddress();
+   }
+
+   public String[] getQueueNames() throws Exception
+   {
+      return localAddressControl.getQueueNames();
+   }
+
+   public RoleInfo[] getRoleInfos() throws Exception
+   {
+      return localAddressControl.getRoleInfos();
+   }
+
+   public TabularData getRoles() throws Exception
+   {
+      return localAddressControl.getRoles();
+   }
+
+   public void removeRole(String name) throws Exception
+   {
+      replicationAwareInvoke("removeRole", name);
+   }
+
+   public void addRole(String name, boolean create, boolean read, boolean write) throws Exception
+   {
+      replicationAwareInvoke("addRole", name, create, read, write);
+   }
+
+   // StandardMBean overrides ---------------------------------------
+
+   @Override
+   public MBeanInfo getMBeanInfo()
+   {
+      MBeanInfo info = super.getMBeanInfo();
+      return new MBeanInfo(info.getClassName(),
+                           info.getDescription(),
+                           info.getAttributes(),
+                           info.getConstructors(),
+                           MBeanInfoHelper.getMBeanOperationsInfo(AddressControlMBean.class),
+                           info.getNotifications());
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -0,0 +1,335 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.management.jmx.impl;
+
+import java.util.List;
+import java.util.Map;
+
+import javax.management.MBeanInfo;
+import javax.management.ObjectName;
+
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.management.MessagingServerControlMBean;
+import org.jboss.messaging.core.management.impl.MBeanInfoHelper;
+import org.jboss.messaging.core.management.impl.MessagingServerControl;
+
+/**
+ * A ReplicationAwareMessagingServerControlWrapper
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ */
+public class ReplicationAwareMessagingServerControlWrapper extends ReplicationAwareStandardMBeanWrapper implements
+         MessagingServerControlMBean
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final MessagingServerControl localControl;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ReplicationAwareMessagingServerControlWrapper(final ObjectName objectName,
+                                                        final MessagingServerControl localControl) throws Exception
+   {
+      super(objectName, MessagingServerControlMBean.class);
+
+      this.localControl = localControl;
+   }
+
+   // MessagingServerControlMBean implementation ------------------------------
+
+   public Map<String, Map<String, Object>> getAcceptorConfigurations()
+   {
+      return localControl.getAcceptorConfigurations();
+   }
+
+   public Map<String, Object> getBackupConnectorConfiguration()
+   {
+      return localControl.getBackupConnectorConfiguration();
+   }
+
+   public String getBindingsDirectory()
+   {
+      return localControl.getBindingsDirectory();
+   }
+
+   public Configuration getConfiguration()
+   {
+      return localControl.getConfiguration();
+   }
+
+   public int getConnectionCount()
+   {
+      return localControl.getConnectionCount();
+   }
+
+   public long getConnectionScanPeriod()
+   {
+      return localControl.getConnectionScanPeriod();
+   }
+
+   public List<String> getInterceptorClassNames()
+   {
+      return localControl.getInterceptorClassNames();
+   }
+
+   public int getJournalBufferReuseSize()
+   {
+      return localControl.getJournalBufferReuseSize();
+   }
+
+   public String getJournalDirectory()
+   {
+      return localControl.getJournalDirectory();
+   }
+
+   public int getJournalFileSize()
+   {
+      return localControl.getJournalFileSize();
+   }
+
+   public int getJournalMaxAIO()
+   {
+      return localControl.getJournalMaxAIO();
+   }
+
+   public int getJournalMinFiles()
+   {
+      return localControl.getJournalMinFiles();
+   }
+
+   public String getJournalType()
+   {
+      return localControl.getJournalType();
+   }
+
+   public int getMessageCounterMaxDayCount()
+   {
+      return localControl.getMessageCounterMaxDayCount();
+   }
+
+   public long getMessageCounterSamplePeriod()
+   {
+      return localControl.getMessageCounterSamplePeriod();
+   }
+
+   public String getPagingDirectory()
+   {
+      return localControl.getPagingDirectory();
+   }
+
+   public long getPagingMaxGlobalSizeBytes()
+   {
+      return localControl.getPagingMaxGlobalSizeBytes();
+   }
+
+   public int getScheduledThreadPoolMaxSize()
+   {
+      return localControl.getScheduledThreadPoolMaxSize();
+   }
+
+   public long getSecurityInvalidationInterval()
+   {
+      return localControl.getSecurityInvalidationInterval();
+   }
+
+   public String getVersion()
+   {
+      return localControl.getVersion();
+   }
+
+   public boolean isBackup()
+   {
+      return localControl.isBackup();
+   }
+
+   public boolean isClustered()
+   {
+      return localControl.isClustered();
+   }
+
+   public boolean isCreateBindingsDir()
+   {
+      return localControl.isCreateBindingsDir();
+   }
+
+   public boolean isCreateJournalDir()
+   {
+      return localControl.isCreateJournalDir();
+   }
+
+   public boolean isJournalSyncNonTransactional()
+   {
+      return localControl.isJournalSyncNonTransactional();
+   }
+
+   public boolean isJournalSyncTransactional()
+   {
+      return localControl.isJournalSyncTransactional();
+   }
+
+   public boolean isMessageCounterEnabled()
+   {
+      return localControl.isMessageCounterEnabled();
+   }
+
+   public boolean isRequireDestinations()
+   {
+      return localControl.isRequireDestinations();
+   }
+
+   public boolean isSecurityEnabled()
+   {
+      return localControl.isSecurityEnabled();
+   }
+
+   public boolean isStarted()
+   {
+      return localControl.isStarted();
+   }
+
+   public String[] listConnectionIDs()
+   {
+      return localControl.listConnectionIDs();
+   }
+
+   public String[] listPreparedTransactions()
+   {
+      return localControl.listPreparedTransactions();
+   }
+
+   public String[] listRemoteAddresses()
+   {
+      return localControl.listRemoteAddresses();
+   }
+
+   public String[] listRemoteAddresses(String ipAddress)
+   {
+      return localControl.listRemoteAddresses(ipAddress);
+   }
+
+   public String[] listSessions(String connectionID)
+   {
+      return localControl.listSessions(connectionID);
+   }
+   
+   public boolean addAddress(String address) throws Exception
+   {
+      return (Boolean)replicationAwareInvoke("addAddress", address);
+   }
+
+   public boolean closeConnectionsForAddress(String ipAddress) throws Exception
+   {
+      return (Boolean)replicationAwareInvoke("closeConnectionsForAddress", ipAddress);
+   }
+
+   public boolean commitPreparedTransaction(String transactionAsBase64) throws Exception
+   {
+      return (Boolean)replicationAwareInvoke("commitPreparedTransaction", transactionAsBase64);
+   }
+
+   public void createQueue(String address, String name) throws Exception
+   {
+      replicationAwareInvoke("createQueue", address, name);
+   }
+
+   public void createQueue(String address, String name, String filter, boolean durable, boolean fanout) throws Exception
+   {
+      replicationAwareInvoke("createQueue", address, name, filter, durable, fanout);
+   }
+
+   public void destroyQueue(String name) throws Exception
+   {
+      replicationAwareInvoke("destroyQueue", name);
+   }
+
+   public void disableMessageCounters() throws Exception
+   {
+      replicationAwareInvoke("disableMessageCounters");
+   }
+
+   public void enableMessageCounters() throws Exception
+   {
+      replicationAwareInvoke("enableMessageCounters");
+   }
+
+   public boolean removeAddress(String address) throws Exception
+   {
+      return (Boolean)replicationAwareInvoke("removeAddress", address);
+   }
+
+   public void resetAllMessageCounterHistories() throws Exception
+   {
+      replicationAwareInvoke("resetAllMessageCounterHistories");
+   }
+
+   public void resetAllMessageCounters() throws Exception
+   {
+      replicationAwareInvoke("resetAllMessageCounters");
+   }
+
+   public boolean rollbackPreparedTransaction(String transactionAsBase64) throws Exception
+   {
+      return (Boolean)replicationAwareInvoke("rollbackPreparedTransaction", transactionAsBase64);
+   }
+
+   public void setMessageCounterMaxDayCount(int count) throws Exception
+   {
+      replicationAwareInvoke("setMessageCounterMaxDayCount", count);
+   }
+
+   public void setMessageCounterSamplePeriod(long newPeriod) throws Exception
+   {
+      replicationAwareInvoke("setMessageCounterSamplePeriod", newPeriod);
+   }
+
+   // StandardMBean overrides ---------------------------------------
+
+   @Override
+   public MBeanInfo getMBeanInfo()
+   {
+      MBeanInfo info = super.getMBeanInfo();
+      return new MBeanInfo(info.getClassName(),
+                           info.getDescription(),
+                           info.getAttributes(),
+                           info.getConstructors(),
+                           MBeanInfoHelper.getMBeanOperationsInfo(MessagingServerControlMBean.class),
+                           localControl.getNotificationInfo());
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -0,0 +1,246 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.management.jmx.impl;
+
+import javax.management.MBeanInfo;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+
+import org.jboss.messaging.core.management.QueueControlMBean;
+import org.jboss.messaging.core.management.impl.MBeanInfoHelper;
+import org.jboss.messaging.core.management.impl.QueueControl;
+
+/**
+ * A ReplicationAwareQueueControlWrapper
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ */
+public class ReplicationAwareQueueControlWrapper extends ReplicationAwareStandardMBeanWrapper implements QueueControlMBean
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final QueueControl localQueueControl;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ReplicationAwareQueueControlWrapper(final ObjectName objectName, final QueueControl localControl) throws Exception
+   {
+      super(objectName, QueueControlMBean.class);
+
+      this.localQueueControl = localControl;
+   }
+
+   // QueueControlMBean implementation ------------------------------
+
+   public int getConsumerCount()
+   {
+      return localQueueControl.getConsumerCount();
+   }
+
+   public String getDeadLetterAddress()
+   {
+      return localQueueControl.getDeadLetterAddress();
+   }
+
+   public int getDeliveringCount()
+   {
+      return localQueueControl.getDeliveringCount();
+   }
+
+   public String getExpiryQueue()
+   {
+      return localQueueControl.getExpiryQueue();
+   }
+
+   public String getFilter()
+   {
+      return localQueueControl.getFilter();
+   }
+
+   public int getMessageCount()
+   {
+      return localQueueControl.getMessageCount();
+   }
+
+   public int getMessagesAdded()
+   {
+      return localQueueControl.getMessagesAdded();
+   }
+
+   public String getName()
+   {
+      return localQueueControl.getName();
+   }
+
+   public long getPersistenceID()
+   {
+      return localQueueControl.getPersistenceID();
+   }
+
+   public long getScheduledCount()
+   {
+      return localQueueControl.getScheduledCount();
+   }
+
+   public long getSizeBytes()
+   {
+      return localQueueControl.getSizeBytes();
+   }
+
+   public boolean isBackup()
+   {
+      return localQueueControl.isBackup();
+   }
+
+   public boolean isClustered()
+   {
+      return localQueueControl.isClustered();
+   }
+
+   public boolean isDurable()
+   {
+      return localQueueControl.isDurable();
+   }
+
+   public boolean isTemporary()
+   {
+      return localQueueControl.isTemporary();
+   }
+
+   public TabularData listAllMessages() throws Exception
+   {
+      return localQueueControl.listAllMessages();
+   }
+
+   public CompositeData listMessageCounter() throws Exception
+   {
+      return localQueueControl.listMessageCounter();
+   }
+
+   public String listMessageCounterAsHTML() throws Exception
+   {
+      return localQueueControl.listMessageCounterAsHTML();
+   }
+
+   public TabularData listMessageCounterHistory() throws Exception
+   {
+      return localQueueControl.listMessageCounterHistory();
+   }
+
+   public String listMessageCounterHistoryAsHTML() throws Exception
+   {
+      return localQueueControl.listMessageCounterHistoryAsHTML();
+   }
+
+   public TabularData listMessages(String filter) throws Exception
+   {
+      return localQueueControl.listMessages(filter);
+   }
+
+   public TabularData listScheduledMessages() throws Exception
+   {
+      return localQueueControl.listScheduledMessages();
+   }
+
+   public boolean changeMessagePriority(long messageID, int newPriority) throws Exception
+   {
+      return (Boolean)replicationAwareInvoke("changeMessagePriority", messageID, newPriority);
+   }
+
+   public boolean expireMessage(long messageID) throws Exception
+   {
+      return (Boolean)replicationAwareInvoke("expireMessage", messageID);
+   }
+
+   public int expireMessages(String filter) throws Exception
+   {
+      return (Integer)replicationAwareInvoke("expireMessages", filter);
+   }
+
+   public int moveAllMessages(String otherQueueName) throws Exception
+   {
+      return (Integer)replicationAwareInvoke("moveAllMessages", otherQueueName);
+   }
+
+   public int moveMatchingMessages(String filter, String otherQueueName) throws Exception
+   {
+      return (Integer)replicationAwareInvoke("moveMatchingMessages", filter, otherQueueName);
+   }
+
+   public boolean moveMessage(long messageID, String otherQueueName) throws Exception
+   {
+      return (Boolean)replicationAwareInvoke("moveMessage", messageID, otherQueueName);
+   }
+
+   public int removeAllMessages() throws Exception
+   {
+      return (Integer)replicationAwareInvoke("removeAllMessages");
+   }
+
+   public int removeMatchingMessages(String filter) throws Exception
+   {
+      return (Integer)replicationAwareInvoke("removeMatchingMessages", filter);
+   }
+
+   public boolean removeMessage(long messageID) throws Exception
+   {
+      return (Boolean)replicationAwareInvoke("removeMessage", messageID);
+   }
+
+   public boolean sendMessageToDeadLetterAddress(long messageID) throws Exception
+   {
+      return (Boolean)replicationAwareInvoke("sendMessageToDeadLetterAddress", messageID);
+   }
+
+   // StandardMBean overrides ---------------------------------------
+
+   @Override
+   public MBeanInfo getMBeanInfo()
+   {
+      MBeanInfo info = super.getMBeanInfo();
+      return new MBeanInfo(info.getClassName(),
+                           info.getDescription(),
+                           info.getAttributes(),
+                           info.getConstructors(),
+                           MBeanInfoHelper.getMBeanOperationsInfo(QueueControlMBean.class),
+                           info.getNotifications());
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareStandardMBeanWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareStandardMBeanWrapper.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareStandardMBeanWrapper.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -0,0 +1,115 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.management.jmx.impl;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientRequestor;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.management.impl.ManagementHelper;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A ReplicationAwareStandardMBeanWrapper
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * Created 3 dŽc. 2008 11:23:11
+ *
+ *
+ */
+public class ReplicationAwareStandardMBeanWrapper extends StandardMBean
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final ObjectName objectName;
+
+   private final ClientSessionFactoryImpl sessionFactory;
+
+   // FIXME moved to configuration
+   private final long timeout = 500;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   protected ReplicationAwareStandardMBeanWrapper(final ObjectName objectName, final Class mbeanInterface) throws NotCompliantMBeanException
+   {
+      super(mbeanInterface);
+
+      this.objectName = objectName;
+      this.sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected Object replicationAwareInvoke(final String operationName, final Object... parameters) throws Exception
+   {
+      ClientSession clientSession = sessionFactory.createSession(false, true, true);
+      ClientRequestor requestor = new ClientRequestor(clientSession, ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS);
+      clientSession.start();
+
+      ClientMessage mngmntMessage = clientSession.createClientMessage(false);
+      ManagementHelper.putOperationInvocation(mngmntMessage, objectName, operationName, parameters);
+      ClientMessage reply = requestor.request(mngmntMessage, timeout);
+
+      try
+      {
+         if (reply == null)
+         {
+            throw new Exception("did not receive reply for message " + mngmntMessage);
+         }
+         if (ManagementHelper.hasOperationSucceeded(reply))
+         {
+            return reply.getProperty(new SimpleString(operationName));
+         }
+         else
+         {
+            throw new Exception(ManagementHelper.getOperationExceptionMessage(reply));
+         }
+      }
+      finally
+      {
+         requestor.close();
+      }
+   }
+   
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-12-04 11:24:01 UTC (rev 5460)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -243,6 +243,7 @@
                                                           storageManager,
                                                           configuration,
                                                           queueSettingsRepository,
+                                                          securityRepository,
                                                           resourceManager,
                                                           remotingService,
                                                           this);

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-12-04 11:24:01 UTC (rev 5460)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -27,7 +27,7 @@
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
-import org.jboss.messaging.core.client.management.impl.ManagementHelper;
+import org.jboss.messaging.core.client.impl.ClientMessageImpl;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.filter.impl.FilterImpl;
@@ -2330,7 +2330,7 @@
 
       managementService.handleMessage(message);
 
-      message.setDestination((SimpleString)message.getProperty(ManagementHelper.HDR_JMX_REPLYTO));
+      message.setDestination((SimpleString)message.getProperty(ClientMessageImpl.REPLYTO_HEADER_NAME));
 
       send(message);
    }

Modified: trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java	2008-12-04 11:24:01 UTC (rev 5460)
+++ trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -175,7 +175,7 @@
 
    String[] listRemoteAddresses(String ipAddress);
 
-   boolean closeConnectionsForAddress(String ipAddress);
+   boolean closeConnectionsForAddress(String ipAddress) throws Exception;
 
    String[] listConnectionIDs();
 

Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java	2008-12-04 11:24:01 UTC (rev 5460)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -363,7 +363,7 @@
       return messagingServer.listRemoteAddresses(ipAddress);
    }
 
-   public boolean closeConnectionsForAddress(final String ipAddress)
+   public boolean closeConnectionsForAddress(final String ipAddress) throws Exception
    {
       return messagingServer.closeConnectionsForAddress(ipAddress);
    }

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java	2008-12-04 11:24:01 UTC (rev 5460)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -185,7 +185,7 @@
    String[] listRemoteAddresses(@Parameter(desc = "an IP address", name = "ipAddress") String ipAddress);
 
    @Operation(desc = "Closes all the connections for the given IP Address", impact = INFO)
-   boolean closeConnectionsForAddress(@Parameter(desc = "an IP address", name = "ipAddress") String ipAddress);
+   boolean closeConnectionsForAddress(@Parameter(desc = "an IP address", name = "ipAddress") String ipAddress) throws Exception;
    
    @Operation(desc = "List all the connection IDs", impact = INFO)
    String[] listConnectionIDs();

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java	2008-12-04 11:24:01 UTC (rev 5460)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -303,7 +303,7 @@
       return server.listRemoteAddresses(ipAddress);
    }
 
-   public boolean closeConnectionsForAddress(final String ipAddress)
+   public boolean closeConnectionsForAddress(final String ipAddress) throws Exception
    {
       return server.closeConnectionsForAddress(ipAddress);
    }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverManagementTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverManagementTest.java	2008-12-04 11:24:01 UTC (rev 5460)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverManagementTest.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -33,6 +33,7 @@
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientProducer;
 import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.impl.ClientMessageImpl;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
 import org.jboss.messaging.core.client.impl.ClientSessionImpl;
@@ -116,10 +117,9 @@
          ClientMessage managementMessage  = session1.createClientMessage(false);
          
          ManagementHelper.putAttributes(managementMessage,
-                                        replyTo,
                                         ManagementServiceImpl.getQueueObjectName(ADDRESS, ADDRESS),
                                         "MessageCount");
-         
+         managementMessage.putStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME, replyTo);
          managementMessage.getBody().flip();
          
          producer.send(DEFAULT_MANAGEMENT_ADDRESS, managementMessage);
@@ -137,10 +137,10 @@
          ClientMessage managementMessage  = session1.createClientMessage(false);
          
          ManagementHelper.putAttributes(managementMessage,
-                                        replyTo,
                                         ManagementServiceImpl.getQueueObjectName(ADDRESS, ADDRESS),
                                         "MessageCount");
-         
+         managementMessage.putStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME, replyTo);
+
          managementMessage.getBody().flip();
          
          producer.send(DEFAULT_MANAGEMENT_ADDRESS, managementMessage);
@@ -211,10 +211,9 @@
          ClientMessage managementMessage  = session1.createClientMessage(false);
          
          ManagementHelper.putAttributes(managementMessage,
-                                        replyTo,
                                         ManagementServiceImpl.getQueueObjectName(ADDRESS, ADDRESS),
                                         "MessageCount");
-         
+         managementMessage.putStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME, replyTo);
          managementMessage.getBody().flip();
          
          producer.send(DEFAULT_MANAGEMENT_ADDRESS, managementMessage);

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/management/ReplicationAwareAddressControlWrapperTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/management/ReplicationAwareAddressControlWrapperTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/management/ReplicationAwareAddressControlWrapperTest.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -0,0 +1,141 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.management;
+
+import static org.jboss.messaging.tests.util.RandomUtil.randomBoolean;
+import static org.jboss.messaging.tests.util.RandomUtil.randomSimpleString;
+import static org.jboss.messaging.tests.util.RandomUtil.randomString;
+
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.management.AddressControlMBean;
+import org.jboss.messaging.core.management.RoleInfo;
+import org.jboss.messaging.core.management.impl.ManagementServiceImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A ReplicationAwareQueueControlWrapperTest
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class ReplicationAwareAddressControlWrapperTest extends ReplicationAwareTestBase
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private SimpleString address;
+
+   private ClientSession session;
+
+   // Static --------------------------------------------------------
+
+   private static AddressControlMBean createAddressControl(SimpleString address, MBeanServer mbeanServer) throws Exception
+   {
+      AddressControlMBean control = (AddressControlMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
+                                                                                                       ManagementServiceImpl.getAddressObjectName(address),
+                                                                                                       AddressControlMBean.class,
+                                                                                                       false);
+      return control;
+   }
+   
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testAddRole() throws Exception
+   {
+      AddressControlMBean liveAddressControl = createAddressControl(address, liveMBeanServer);
+      AddressControlMBean backupAddressControl = createAddressControl(address, backupMBeanServer);
+
+      RoleInfo[] roles = liveAddressControl.getRoleInfos();
+      assertEquals(roles.length, backupAddressControl.getRoleInfos().length);
+
+      // add a role
+      liveAddressControl.addRole(randomString(), randomBoolean(), randomBoolean(), randomBoolean());
+
+      assertEquals(roles.length + 1, liveAddressControl.getRoleInfos().length);
+      assertEquals(roles.length + 1, backupAddressControl.getRoleInfos().length);
+   }
+
+   public void testRemoveRole() throws Exception
+   {
+      String roleName = randomString();
+
+      AddressControlMBean liveAddressControl = createAddressControl(address, liveMBeanServer);
+      AddressControlMBean backupAddressControl = createAddressControl(address, backupMBeanServer);
+
+      RoleInfo[] roles = liveAddressControl.getRoleInfos();
+      assertEquals(roles.length, backupAddressControl.getRoleInfos().length);
+
+      // add a role
+      liveAddressControl.addRole(roleName, randomBoolean(), randomBoolean(), randomBoolean());
+
+      assertEquals(roles.length + 1, liveAddressControl.getRoleInfos().length);
+      assertEquals(roles.length + 1, backupAddressControl.getRoleInfos().length);
+
+      // and remove it
+      liveAddressControl.removeRole(roleName);
+
+      assertEquals(roles.length, liveAddressControl.getRoleInfos().length);
+      assertEquals(roles.length, backupAddressControl.getRoleInfos().length);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      address = randomSimpleString();
+      ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()),
+                                                                     new TransportConfiguration(InVMConnectorFactory.class.getName(),
+                                                                                                backupParams));
+
+      session = sf.createSession(false, true, true);
+      session.createQueue(address, address, null, false, false, true);
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      session.deleteQueue(address);
+      session.close();
+
+      super.tearDown();
+   }
+   
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/management/ReplicationAwareQueueControlWrapperTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/management/ReplicationAwareQueueControlWrapperTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/management/ReplicationAwareQueueControlWrapperTest.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -0,0 +1,479 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.management;
+
+import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
+import static org.jboss.messaging.tests.util.RandomUtil.randomSimpleString;
+
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.openmbean.TabularData;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.management.MessageInfo;
+import org.jboss.messaging.core.management.QueueControlMBean;
+import org.jboss.messaging.core.management.impl.ManagementServiceImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
+import org.jboss.messaging.tests.util.RandomUtil;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A ReplicationAwareQueueControlWrapperTest
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class ReplicationAwareQueueControlWrapperTest extends ReplicationAwareTestBase
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private ClientSession session;
+
+   private SimpleString address;
+
+   private final long timeToSleep = 100;
+
+   // Static --------------------------------------------------------
+
+   private static QueueControlMBean createQueueControl(SimpleString address, SimpleString name, MBeanServer mbeanServer) throws Exception
+   {
+      QueueControlMBean queueControl = (QueueControlMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
+                                                                                                        ManagementServiceImpl.getQueueObjectName(address,
+                                                                                                                                                 name),
+                                                                                                        QueueControlMBean.class,
+                                                                                                        false);
+      return queueControl;
+   }
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testChangeMessagePriority() throws Exception
+   {
+      byte oldPriority = (byte)1;
+      byte newPriority = (byte)8;
+      
+      QueueControlMBean liveQueueControl = createQueueControl(address, address, liveMBeanServer);
+      QueueControlMBean backupQueueControl = createQueueControl(address, address, backupMBeanServer);
+
+      assertFalse(liveQueueControl.isBackup());
+      assertTrue(backupQueueControl.isBackup());
+
+      // send 1 message
+      ClientProducer producer = session.createProducer(address);
+      ClientMessage message = session.createClientMessage(false);
+      message.setPriority(oldPriority);
+      producer.send(message);
+      
+      // wiat a little bit to give time for the message to be handled by the server
+      Thread.sleep(timeToSleep);
+
+      // check it is on both live & backup nodes
+      assertEquals(1, liveQueueControl.getMessageCount());
+      assertEquals(1, backupQueueControl.getMessageCount());
+
+      TabularData messages = liveQueueControl.listAllMessages();
+      MessageInfo[] messageInfos = MessageInfo.from(messages);
+      assertEquals(1, messageInfos.length);
+      long messageID = messageInfos[0].getID();
+      assertEquals(oldPriority, messageInfos[0].getPriority());
+      
+       messages = backupQueueControl.listAllMessages();
+       messageInfos = MessageInfo.from(messages);
+      assertEquals(1, messageInfos.length);
+      assertEquals(oldPriority, messageInfos[0].getPriority());
+      
+      assertTrue(liveQueueControl.changeMessagePriority(messageID, newPriority));
+
+      // check the priority is changed on both live & backup nodes
+      messages = liveQueueControl.listAllMessages();
+      messageInfos = MessageInfo.from(messages);
+      assertEquals(1, messageInfos.length);
+      assertEquals(newPriority, messageInfos[0].getPriority());
+
+      messages = backupQueueControl.listAllMessages();
+      messageInfos = MessageInfo.from(messages);
+      assertEquals(1, messageInfos.length);
+      assertEquals(newPriority, messageInfos[0].getPriority());
+   }
+   
+   public void testExpireMessage() throws Exception
+   {
+      QueueControlMBean liveQueueControl = createQueueControl(address, address, liveMBeanServer);
+      QueueControlMBean backupQueueControl = createQueueControl(address, address, backupMBeanServer);
+
+      assertFalse(liveQueueControl.isBackup());
+      assertTrue(backupQueueControl.isBackup());
+
+      // send 1 message
+      ClientProducer producer = session.createProducer(address);
+      producer.send(session.createClientMessage(false));
+      
+      // wiat a little bit to give time for the message to be handled by the server
+      Thread.sleep(timeToSleep);
+
+      // check it is on both live & backup nodes
+      assertEquals(1, liveQueueControl.getMessageCount());
+      assertEquals(1, backupQueueControl.getMessageCount());
+
+      TabularData messages = liveQueueControl.listAllMessages();
+      MessageInfo[] messageInfos = MessageInfo.from(messages);
+      assertEquals(1, messageInfos.length);
+      long messageID = messageInfos[0].getID();
+      
+      assertTrue(liveQueueControl.expireMessage(messageID));
+      
+      // check the message is no longer in the queue on both live & backup nodes
+      assertEquals(0, liveQueueControl.getMessageCount());
+      assertEquals(0, backupQueueControl.getMessageCount());
+   }
+   
+   public void testExpireMessagesWithFilter() throws Exception
+   {
+      SimpleString key = new SimpleString("key");
+      long matchingValue = randomLong();
+      long unmatchingValue = matchingValue + 1;
+
+      QueueControlMBean liveQueueControl = createQueueControl(address, address, liveMBeanServer);
+      QueueControlMBean backupQueueControl = createQueueControl(address, address, backupMBeanServer);
+
+      assertFalse(liveQueueControl.isBackup());
+      assertTrue(backupQueueControl.isBackup());
+
+      // send 1 message
+      ClientProducer producer = session.createProducer(address);
+      ClientMessage unmatchingMessage = session.createClientMessage(false);
+      unmatchingMessage.putLongProperty(key, unmatchingValue);
+      producer.send(unmatchingMessage);
+      ClientMessage matchingMessage = session.createClientMessage(false);
+      matchingMessage.putLongProperty(key, matchingValue);
+      producer.send(matchingMessage);
+      
+      // wiat a little bit to give time for the message to be handled by the server
+      Thread.sleep(timeToSleep);
+
+      // check messages are on both live & backup nodes
+      assertEquals(2, liveQueueControl.getMessageCount());
+      assertEquals(2, backupQueueControl.getMessageCount());
+
+      assertEquals(1, liveQueueControl.expireMessages(key + " =" + matchingValue));
+      
+      // check there is only 1 message in the queue on both live & backup nodes
+      assertEquals(1, liveQueueControl.getMessageCount());
+      assertEquals(1, backupQueueControl.getMessageCount());
+   }
+   
+   public void testMoveAllMessages() throws Exception
+   {
+      SimpleString otherQueue = randomSimpleString();
+
+      session.createQueue(otherQueue, otherQueue, null, false, true, true);
+      ClientProducer producer = session.createProducer(address);
+
+      QueueControlMBean liveQueueControl = createQueueControl(address, address, liveMBeanServer);
+      QueueControlMBean backupQueueControl = createQueueControl(address, address, backupMBeanServer);
+      QueueControlMBean liveOtherQueueControl = createQueueControl(otherQueue, otherQueue, liveMBeanServer);
+      QueueControlMBean backupOtherQueueControl = createQueueControl(otherQueue, otherQueue, backupMBeanServer);
+
+      assertFalse(liveQueueControl.isBackup());
+      assertTrue(backupQueueControl.isBackup());
+      assertFalse(liveOtherQueueControl.isBackup());
+      assertTrue(backupOtherQueueControl.isBackup());
+
+      // send on queue
+      ClientMessage message = session.createClientMessage(false);
+      SimpleString key = randomSimpleString();
+      long value = randomLong();
+      message.putLongProperty(key, value);
+      producer.send(message);
+
+      // wait a little bit to ensure the message is handled by the server
+      Thread.sleep(timeToSleep);
+
+      assertEquals(1, liveQueueControl.getMessageCount());
+      assertEquals(1, backupQueueControl.getMessageCount());
+      assertEquals(0, liveOtherQueueControl.getMessageCount());
+      assertEquals(0, backupOtherQueueControl.getMessageCount());
+
+      // moved all messages to otherQueue
+      int movedMessagesCount = liveQueueControl.moveAllMessages(otherQueue.toString());
+      assertEquals(1, movedMessagesCount);
+      
+      assertEquals(0, liveQueueControl.getMessageCount());
+      assertEquals(0, backupQueueControl.getMessageCount());
+      assertEquals(1, liveOtherQueueControl.getMessageCount());
+      assertEquals(1, backupOtherQueueControl.getMessageCount());
+
+
+      session.deleteQueue(otherQueue);
+   }
+   
+   public void testMoveMatchingMessages() throws Exception
+   {
+      SimpleString key = new SimpleString("key");
+      long matchingValue = randomLong();
+      long unmatchingValue = matchingValue + 1;
+
+      SimpleString otherQueue = randomSimpleString();
+
+      session.createQueue(otherQueue, otherQueue, null, false, true, true);
+
+      QueueControlMBean liveQueueControl = createQueueControl(address, address, liveMBeanServer);
+      QueueControlMBean backupQueueControl = createQueueControl(address, address, backupMBeanServer);
+      QueueControlMBean liveOtherQueueControl = createQueueControl(otherQueue, otherQueue, liveMBeanServer);
+      QueueControlMBean backupOtherQueueControl = createQueueControl(otherQueue, otherQueue, backupMBeanServer);
+
+      // send on queue
+      ClientProducer producer = session.createProducer(address);
+      ClientMessage unmatchingMessage = session.createClientMessage(false);
+      unmatchingMessage.putLongProperty(key, unmatchingValue);
+      producer.send(unmatchingMessage);
+      ClientMessage matchingMessage = session.createClientMessage(false);
+      matchingMessage.putLongProperty(key, matchingValue);
+      producer.send(matchingMessage);
+
+      // wait a little bit to ensure the message is handled by the server
+      Thread.sleep(timeToSleep);
+      
+      assertEquals(2, liveQueueControl.getMessageCount());
+      assertEquals(2, backupQueueControl.getMessageCount());
+      assertEquals(0, liveOtherQueueControl.getMessageCount());
+      assertEquals(0, backupOtherQueueControl.getMessageCount());
+
+      // moved matching messages to otherQueue
+      int movedMatchedMessagesCount = liveQueueControl.moveMatchingMessages(key + " =" + matchingValue, otherQueue.toString());
+      assertEquals(1, movedMatchedMessagesCount);
+
+      assertEquals(1, liveQueueControl.getMessageCount());
+      assertEquals(1, backupQueueControl.getMessageCount());
+      assertEquals(1, liveOtherQueueControl.getMessageCount());
+      assertEquals(1, backupOtherQueueControl.getMessageCount());
+
+      session.deleteQueue(otherQueue);
+   }
+   
+   public void testMoveMessage() throws Exception
+   {
+      SimpleString otherQueue = randomSimpleString();
+
+      session.createQueue(otherQueue, otherQueue, null, false, true, true);
+
+      QueueControlMBean liveQueueControl = createQueueControl(address, address, liveMBeanServer);
+      QueueControlMBean backupQueueControl = createQueueControl(address, address, backupMBeanServer);
+      QueueControlMBean liveOtherQueueControl = createQueueControl(otherQueue, otherQueue, liveMBeanServer);
+      QueueControlMBean backupOtherQueueControl = createQueueControl(otherQueue, otherQueue, backupMBeanServer);
+
+      assertFalse(liveQueueControl.isBackup());
+      assertTrue(backupQueueControl.isBackup());
+
+      // send 1 message
+      ClientProducer producer = session.createProducer(address);
+      producer.send(session.createClientMessage(false));
+      
+      // wait a little bit to give time for the message to be handled by the server
+      Thread.sleep(timeToSleep);
+
+      // check it is on both live & backup nodes
+      assertEquals(1, liveQueueControl.getMessageCount());
+      assertEquals(1, backupQueueControl.getMessageCount());
+      assertEquals(0, liveOtherQueueControl.getMessageCount());
+      assertEquals(0, backupOtherQueueControl.getMessageCount());
+
+      TabularData messages = liveQueueControl.listAllMessages();
+      MessageInfo[] messageInfos = MessageInfo.from(messages);
+      assertEquals(1, messageInfos.length);
+      long messageID = messageInfos[0].getID();
+      
+      assertTrue(liveQueueControl.moveMessage(messageID, otherQueue.toString()));
+      
+      // check the message is no longer in the queue on both live & backup nodes
+      assertEquals(0, liveQueueControl.getMessageCount());
+      assertEquals(0, backupQueueControl.getMessageCount());
+      assertEquals(1, liveOtherQueueControl.getMessageCount());
+      assertEquals(1, backupOtherQueueControl.getMessageCount());
+
+      session.deleteQueue(otherQueue);
+   }
+   
+   public void testRemoveAllMessages() throws Exception
+   {
+      QueueControlMBean liveQueueControl = createQueueControl(address, address, liveMBeanServer);
+      QueueControlMBean backupQueueControl = createQueueControl(address, address, backupMBeanServer);
+
+      assertFalse(liveQueueControl.isBackup());
+      assertTrue(backupQueueControl.isBackup());
+
+      // send 1 message
+      ClientProducer producer = session.createProducer(address);
+      producer.send(session.createClientMessage(false));
+
+      // wiat a little bit to give time for the message to be handled by the server
+      Thread.sleep(timeToSleep);
+
+      // check it is on both live & backup nodes
+      assertEquals(1, liveQueueControl.getMessageCount());
+      assertEquals(1, backupQueueControl.getMessageCount());
+
+      // remove all messages
+      int count = liveQueueControl.removeAllMessages();
+      assertEquals(1, count);
+
+      // check there are no messages on both live & backup nodes
+      assertEquals(0, liveQueueControl.getMessageCount());
+      assertEquals(0, backupQueueControl.getMessageCount());
+   }
+
+   public void testRemoveMatchingMessages() throws Exception
+   {
+      SimpleString key = new SimpleString("key");
+      long matchingValue = randomLong();
+      long unmatchingValue = matchingValue + 1;
+
+      QueueControlMBean liveQueueControl = createQueueControl(address, address, liveMBeanServer);
+      QueueControlMBean backupQueueControl = createQueueControl(address, address, backupMBeanServer);
+
+      // send on queue
+      ClientProducer producer = session.createProducer(address);
+      ClientMessage unmatchingMessage = session.createClientMessage(false);
+      unmatchingMessage.putLongProperty(key, unmatchingValue);
+      producer.send(unmatchingMessage);
+      ClientMessage matchingMessage = session.createClientMessage(false);
+      matchingMessage.putLongProperty(key, matchingValue);
+      producer.send(matchingMessage);
+
+      // wait a little bit to ensure the message is handled by the server
+      Thread.sleep(timeToSleep );
+      
+      assertEquals(2, liveQueueControl.getMessageCount());
+      assertEquals(2, backupQueueControl.getMessageCount());
+
+      // removed matching messages
+      int removedMatchedMessagesCount = liveQueueControl.removeMatchingMessages(key + " =" + matchingValue);
+      assertEquals(1, removedMatchedMessagesCount);
+
+      assertEquals(1, liveQueueControl.getMessageCount());
+      assertEquals(1, backupQueueControl.getMessageCount());
+   }
+   
+   public void testRemoveMessage() throws Exception
+   {
+      QueueControlMBean liveQueueControl = createQueueControl(address, address, liveMBeanServer);
+      QueueControlMBean backupQueueControl = createQueueControl(address, address, backupMBeanServer);
+
+      assertFalse(liveQueueControl.isBackup());
+      assertTrue(backupQueueControl.isBackup());
+
+      // send 1 message
+      ClientProducer producer = session.createProducer(address);
+      producer.send(session.createClientMessage(false));
+      
+      // wait a little bit to give time for the message to be handled by the server
+      Thread.sleep(timeToSleep);
+
+      // check it is on both live & backup nodes
+      assertEquals(1, liveQueueControl.getMessageCount());
+      assertEquals(1, backupQueueControl.getMessageCount());
+
+      TabularData messages = liveQueueControl.listAllMessages();
+      MessageInfo[] messageInfos = MessageInfo.from(messages);
+      assertEquals(1, messageInfos.length);
+      long messageID = messageInfos[0].getID();
+      
+      assertTrue(liveQueueControl.removeMessage(messageID));
+      
+      // check the message is no longer in the queue on both live & backup nodes
+      assertEquals(0, liveQueueControl.getMessageCount());
+      assertEquals(0, backupQueueControl.getMessageCount());
+   }
+   
+   public void testSendMessageToDLQ() throws Exception
+   {
+      QueueControlMBean liveQueueControl = createQueueControl(address, address, liveMBeanServer);
+      QueueControlMBean backupQueueControl = createQueueControl(address, address, backupMBeanServer);
+
+      assertFalse(liveQueueControl.isBackup());
+      assertTrue(backupQueueControl.isBackup());
+
+      // send 1 message
+      ClientProducer producer = session.createProducer(address);
+      producer.send(session.createClientMessage(false));
+      
+      // wait a little bit to give time for the message to be handled by the server
+      Thread.sleep(timeToSleep);
+
+      // check it is on both live & backup nodes
+      assertEquals(1, liveQueueControl.getMessageCount());
+      assertEquals(1, backupQueueControl.getMessageCount());
+
+      TabularData messages = liveQueueControl.listAllMessages();
+      MessageInfo[] messageInfos = MessageInfo.from(messages);
+      assertEquals(1, messageInfos.length);
+      long messageID = messageInfos[0].getID();
+      
+      assertTrue(liveQueueControl.sendMessageToDeadLetterAddress(messageID));
+      
+      // check the message is no longer in the queue on both live & backup nodes
+      assertEquals(0, liveQueueControl.getMessageCount());
+      assertEquals(0, backupQueueControl.getMessageCount());
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      address = RandomUtil.randomSimpleString();
+
+      ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()),
+                                                                     new TransportConfiguration(InVMConnectorFactory.class.getName(),
+                                                                                                backupParams));
+
+      session = sf.createSession(false, true, true);
+
+      session.createQueue(address, address, null, false, false, true);
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      session.close();
+
+      super.tearDown();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/management/ReplicationAwareTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/management/ReplicationAwareTestBase.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/management/ReplicationAwareTestBase.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -0,0 +1,157 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.management;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.management.ManagementService;
+import org.jboss.messaging.core.management.impl.ManagementServiceImpl;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.persistence.impl.nullpm.NullStorageManager;
+import org.jboss.messaging.core.remoting.RemotingService;
+import org.jboss.messaging.core.remoting.impl.RemotingServiceImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.security.JBMSecurityManager;
+import org.jboss.messaging.core.security.impl.JBMSecurityManagerImpl;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServerImpl;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+
+/**
+ * A ReplicationAwareAddressControlWrapperTest
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class ReplicationAwareTestBase extends TestCase
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   protected MessagingService liveService;
+
+   protected MessagingService backupService;
+
+   protected final Map<String, Object> backupParams = new HashMap<String, Object>();
+
+   protected MBeanServer liveMBeanServer;
+
+   protected MBeanServer backupMBeanServer;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      backupMBeanServer = MBeanServerFactory.createMBeanServer();
+      Configuration backupConf = new ConfigurationImpl();
+      backupConf.setSecurityEnabled(false);
+      backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+      backupConf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName(),
+                                                                            backupParams));
+      backupConf.setBackup(true);
+      backupConf.setJMXManagementEnabled(true);
+      backupService = createNullStorageMessagingServer(backupConf, backupMBeanServer);
+      backupService.start();
+
+      liveMBeanServer = MBeanServerFactory.createMBeanServer();
+      Configuration liveConf = new ConfigurationImpl();
+      liveConf.setSecurityEnabled(false);
+      liveConf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+      TransportConfiguration backupTC = new TransportConfiguration(InVMConnectorFactory.class.getName(),
+                                                                   backupParams,
+                                                                   "backup-connector");
+      connectors.put(backupTC.getName(), backupTC);      
+      liveConf.setConnectorConfigurations(connectors);
+      liveConf.setBackupConnectorName(backupTC.getName());
+      liveConf.setJMXManagementEnabled(true);
+      liveService = createNullStorageMessagingServer(liveConf, liveMBeanServer);
+      liveService.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+      backupService.stop();
+
+      assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+      liveService.stop();
+
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+
+   private static MessagingService createNullStorageMessagingServer(final Configuration config, MBeanServer mbeanServer)
+   {
+      StorageManager storageManager = new NullStorageManager();
+      
+      RemotingService remotingService = new RemotingServiceImpl(config);
+
+      JBMSecurityManager securityManager = new JBMSecurityManagerImpl(true);
+      
+      ManagementService managementService = new ManagementServiceImpl(mbeanServer, config.isJMXManagementEnabled());
+      
+      MessagingServer server = new MessagingServerImpl();
+      
+      server.setConfiguration(config);
+      
+      server.setStorageManager(storageManager);
+      
+      server.setRemotingService(remotingService);
+      
+      server.setSecurityManager(securityManager);
+      
+      server.setManagementService(managementService);
+      
+      return new MessagingServiceImpl(server, storageManager, remotingService);
+   }
+   
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/management/ManagementServiceImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/management/ManagementServiceImplTest.java	2008-12-04 11:24:01 UTC (rev 5460)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/management/ManagementServiceImplTest.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -75,7 +75,6 @@
       ManagementService managementService = new ManagementServiceImpl(ManagementFactory.getPlatformMBeanServer(), true);
       assertNotNull(managementService);
 
-      SimpleString replyTo = RandomUtil.randomSimpleString();
       SimpleString address = RandomUtil.randomSimpleString();
       managementService.registerAddress(address);
 
@@ -83,7 +82,7 @@
       ServerMessage message = new ServerMessageImpl();
       MessagingBuffer body = new ByteBufferWrapper(ByteBuffer.allocate(2048));
       message.setBody(body);
-      ManagementHelper.putAttributes(message, replyTo, ManagementServiceImpl.getAddressObjectName(address), "Address");
+      ManagementHelper.putAttributes(message, ManagementServiceImpl.getAddressObjectName(address), "Address");
 
       managementService.handleMessage(message);
 
@@ -104,7 +103,6 @@
       resource.addRole(role.getName(), role.isCheckType(CREATE), role.isCheckType(READ), role.isCheckType(WRITE));
       replay(resource);
 
-      SimpleString replyTo = RandomUtil.randomSimpleString();
       SimpleString address = RandomUtil.randomSimpleString();
       ObjectName on = ManagementServiceImpl.getAddressObjectName(address);
       managementService.registerResource(on, resource);
@@ -114,7 +112,6 @@
       MessagingBuffer body = new ByteBufferWrapper(ByteBuffer.allocate(2048));
       message.setBody(body);
       ManagementHelper.putOperationInvocation(message,
-                                              replyTo,
                                               on,
                                               "addRole",
                                               role.getName(),
@@ -143,7 +140,6 @@
       expectLastCall().andThrow(new Exception(exceptionMessage));
       replay(resource);
 
-      SimpleString replyTo = RandomUtil.randomSimpleString();
       SimpleString address = RandomUtil.randomSimpleString();
       ObjectName on = ManagementServiceImpl.getAddressObjectName(address);
       managementService.registerResource(on, resource);
@@ -153,7 +149,6 @@
       MessagingBuffer body = new ByteBufferWrapper(ByteBuffer.allocate(2048));
       message.setBody(body);
       ManagementHelper.putOperationInvocation(message,
-                                              replyTo,
                                               on,
                                               "addRole",
                                               role.getName(),

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java	2008-12-04 11:24:01 UTC (rev 5460)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -35,6 +35,7 @@
 import javax.management.MBeanServer;
 import javax.management.ObjectInstance;
 import javax.management.ObjectName;
+import javax.management.StandardMBean;
 
 import junit.framework.TestCase;
 
@@ -79,10 +80,8 @@
 
    public void testRegisterMessagingServer() throws Exception
    {
-      ObjectName objectName = ManagementServiceImpl
-            .getMessagingServerObjectName();
-      ObjectInstance objectInstance = new ObjectInstance(objectName,
-            MessagingServerControl.class.getName());
+      ObjectName objectName = ManagementServiceImpl.getMessagingServerObjectName();
+      ObjectInstance objectInstance = new ObjectInstance(objectName, MessagingServerControl.class.getName());
 
       PostOffice postOffice = createMock(PostOffice.class);
       StorageManager storageManager = createMock(StorageManager.class);
@@ -96,24 +95,43 @@
       MessagingServer messagingServer = createMock(MessagingServer.class);
       MBeanServer mbeanServer = createMock(MBeanServer.class);
       expect(mbeanServer.isRegistered(objectName)).andReturn(false);
-      expect(
-            mbeanServer.registerMBean(isA(MessagingServerControl.class),
-                  eq(objectName))).andReturn(objectInstance);
+      expect(mbeanServer.registerMBean(isA(StandardMBean.class), eq(objectName))).andReturn(objectInstance);
 
-      replay(mbeanServer, postOffice, storageManager, configuration, securityRepository, queueSettingsRepository, resourceManager, remotingService, messagingServer);
+      replay(mbeanServer,
+             postOffice,
+             storageManager,
+             configuration,
+             securityRepository,
+             queueSettingsRepository,
+             resourceManager,
+             remotingService,
+             messagingServer);
 
       ManagementService service = new ManagementServiceImpl(mbeanServer, true);
-      service.registerServer(postOffice, storageManager, configuration, queueSettingsRepository, resourceManager, remotingService, messagingServer);
+      service.registerServer(postOffice,
+                             storageManager,
+                             configuration,
+                             queueSettingsRepository,
+                             securityRepository,
+                             resourceManager,
+                             remotingService,
+                             messagingServer);
 
-      verify(mbeanServer, postOffice, storageManager, configuration, securityRepository, queueSettingsRepository, resourceManager, remotingService, messagingServer);
+      verify(mbeanServer,
+             postOffice,
+             storageManager,
+             configuration,
+             securityRepository,
+             queueSettingsRepository,
+             resourceManager,
+             remotingService,
+             messagingServer);
    }
 
    public void testRegisterAlreadyRegisteredMessagingServer() throws Exception
    {
-      ObjectName objectName = ManagementServiceImpl
-            .getMessagingServerObjectName();
-      ObjectInstance objectInstance = new ObjectInstance(objectName,
-            MessagingServerControl.class.getName());
+      ObjectName objectName = ManagementServiceImpl.getMessagingServerObjectName();
+      ObjectInstance objectInstance = new ObjectInstance(objectName, MessagingServerControl.class.getName());
 
       PostOffice postOffice = createMock(PostOffice.class);
       StorageManager storageManager = createMock(StorageManager.class);
@@ -128,22 +146,42 @@
       MBeanServer mbeanServer = createMock(MBeanServer.class);
       expect(mbeanServer.isRegistered(objectName)).andReturn(true);
       mbeanServer.unregisterMBean(objectName);
-      expect(
-            mbeanServer.registerMBean(isA(MessagingServerControlMBean.class),
-                  eq(objectName))).andReturn(objectInstance);
+      expect(mbeanServer.registerMBean(isA(StandardMBean.class), eq(objectName))).andReturn(objectInstance);
 
-      replay(mbeanServer, postOffice, storageManager, configuration, securityRepository, queueSettingsRepository, resourceManager, remotingService, messagingServer);
+      replay(mbeanServer,
+             postOffice,
+             storageManager,
+             configuration,
+             securityRepository,
+             queueSettingsRepository,
+             resourceManager,
+             remotingService,
+             messagingServer);
 
       ManagementService service = new ManagementServiceImpl(mbeanServer, true);
-      service.registerServer(postOffice, storageManager, configuration, queueSettingsRepository, resourceManager, remotingService, messagingServer);
+      service.registerServer(postOffice,
+                             storageManager,
+                             configuration,
+                             queueSettingsRepository,
+                             securityRepository,
+                             resourceManager,
+                             remotingService,
+                             messagingServer);
 
-      verify(mbeanServer, postOffice, storageManager, configuration, securityRepository, queueSettingsRepository, resourceManager, remotingService, messagingServer);
+      verify(mbeanServer,
+             postOffice,
+             storageManager,
+             configuration,
+             securityRepository,
+             queueSettingsRepository,
+             resourceManager,
+             remotingService,
+             messagingServer);
    }
 
    public void testUnregisterMessagingServer() throws Exception
    {
-      ObjectName objectName = ManagementServiceImpl
-            .getMessagingServerObjectName();
+      ObjectName objectName = ManagementServiceImpl.getMessagingServerObjectName();
 
       MBeanServer mbeanServer = createMock(MBeanServer.class);
       expect(mbeanServer.isRegistered(objectName)).andReturn(true);
@@ -160,16 +198,12 @@
    public void testRegisterAddress() throws Exception
    {
       SimpleString address = randomSimpleString();
-      ObjectName objectName = ManagementServiceImpl
-            .getAddressObjectName(address);
-      ObjectInstance objectInstance = new ObjectInstance(objectName,
-            AddressControl.class.getName());
+      ObjectName objectName = ManagementServiceImpl.getAddressObjectName(address);
+      ObjectInstance objectInstance = new ObjectInstance(objectName, AddressControl.class.getName());
 
       MBeanServer mbeanServer = createMock(MBeanServer.class);
       expect(mbeanServer.isRegistered(objectName)).andReturn(false);
-      expect(
-            mbeanServer.registerMBean(isA(AddressControlMBean.class),
-                  eq(objectName))).andReturn(objectInstance);
+      expect(mbeanServer.registerMBean(isA(StandardMBean.class), eq(objectName))).andReturn(objectInstance);
 
       replay(mbeanServer);
 
@@ -182,17 +216,13 @@
    public void testRegisterAlreadyRegisteredAddress() throws Exception
    {
       SimpleString address = randomSimpleString();
-      ObjectName objectName = ManagementServiceImpl
-            .getAddressObjectName(address);
-      ObjectInstance objectInstance = new ObjectInstance(objectName,
-            AddressControl.class.getName());
+      ObjectName objectName = ManagementServiceImpl.getAddressObjectName(address);
+      ObjectInstance objectInstance = new ObjectInstance(objectName, AddressControl.class.getName());
 
       MBeanServer mbeanServer = createMock(MBeanServer.class);
       expect(mbeanServer.isRegistered(objectName)).andReturn(true);
       mbeanServer.unregisterMBean(objectName);
-      expect(
-            mbeanServer.registerMBean(isA(AddressControlMBean.class),
-                  eq(objectName))).andReturn(objectInstance);
+      expect(mbeanServer.registerMBean(isA(StandardMBean.class), eq(objectName))).andReturn(objectInstance);
 
       replay(mbeanServer);
 
@@ -205,8 +235,7 @@
    public void testUnregisterAddress() throws Exception
    {
       SimpleString address = randomSimpleString();
-      ObjectName objectName = ManagementServiceImpl
-            .getAddressObjectName(address);
+      ObjectName objectName = ManagementServiceImpl.getAddressObjectName(address);
 
       MBeanServer mbeanServer = createMock(MBeanServer.class);
       expect(mbeanServer.isRegistered(objectName)).andReturn(true);
@@ -224,10 +253,8 @@
    {
       SimpleString address = randomSimpleString();
       SimpleString name = randomSimpleString();
-      ObjectName objectName = ManagementServiceImpl.getQueueObjectName(address,
-            name);
-      ObjectInstance objectInstance = new ObjectInstance(objectName,
-            QueueControl.class.getName());
+      ObjectName objectName = ManagementServiceImpl.getQueueObjectName(address, name);
+      ObjectInstance objectInstance = new ObjectInstance(objectName, QueueControl.class.getName());
 
       MBeanServer mbeanServer = createMock(MBeanServer.class);
       Queue queue = createMock(Queue.class);
@@ -235,9 +262,7 @@
       expect(queue.isDurable()).andReturn(true);
       StorageManager storageManager = createMock(StorageManager.class);
       expect(mbeanServer.isRegistered(objectName)).andReturn(false);
-      expect(
-            mbeanServer.registerMBean(isA(QueueControlMBean.class),
-                  eq(objectName))).andReturn(objectInstance);
+      expect(mbeanServer.registerMBean(isA(StandardMBean.class), eq(objectName))).andReturn(objectInstance);
 
       replay(mbeanServer, queue, storageManager);
 
@@ -251,10 +276,8 @@
    {
       SimpleString address = randomSimpleString();
       SimpleString name = randomSimpleString();
-      ObjectName objectName = ManagementServiceImpl.getQueueObjectName(address,
-            name);
-      ObjectInstance objectInstance = new ObjectInstance(objectName,
-            QueueControl.class.getName());
+      ObjectName objectName = ManagementServiceImpl.getQueueObjectName(address, name);
+      ObjectInstance objectInstance = new ObjectInstance(objectName, QueueControl.class.getName());
 
       MBeanServer mbeanServer = createMock(MBeanServer.class);
       Queue queue = createMock(Queue.class);
@@ -263,9 +286,7 @@
       StorageManager storageManager = createMock(StorageManager.class);
       expect(mbeanServer.isRegistered(objectName)).andReturn(true);
       mbeanServer.unregisterMBean(objectName);
-      expect(
-            mbeanServer.registerMBean(isA(QueueControlMBean.class),
-                  eq(objectName))).andReturn(objectInstance);
+      expect(mbeanServer.registerMBean(isA(StandardMBean.class), eq(objectName))).andReturn(objectInstance);
 
       replay(mbeanServer, queue, storageManager);
 
@@ -279,8 +300,7 @@
    {
       SimpleString address = randomSimpleString();
       SimpleString name = randomSimpleString();
-      ObjectName objectName = ManagementServiceImpl.getQueueObjectName(address,
-            name);
+      ObjectName objectName = ManagementServiceImpl.getQueueObjectName(address, name);
 
       MBeanServer mbeanServer = createMock(MBeanServer.class);
       expect(mbeanServer.isRegistered(objectName)).andReturn(true);

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java	2008-12-04 11:24:01 UTC (rev 5460)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java	2008-12-04 15:28:55 UTC (rev 5461)
@@ -601,7 +601,7 @@
       verifyMockedAttributes();
    }
 
-   public void testSendMessageToDLQ() throws Exception
+   public void testSendMessageToDeadLetterAddress() throws Exception
    {
       long messageID = randomLong();
       expect(
@@ -611,12 +611,12 @@
       replayMockedAttributes();
 
       QueueControlMBean control = createControl();
-      assertTrue(control.sendMessageToDLQ(messageID));
+      assertTrue(control.sendMessageToDeadLetterAddress(messageID));
 
       verifyMockedAttributes();
    }
 
-   public void testSendMessageToDLQWithNoMessageID() throws Exception
+   public void testSendMessageToDeadLetterAddressWithNoMessageID() throws Exception
    {
       long messageID = randomLong();
       expect(
@@ -626,7 +626,7 @@
       replayMockedAttributes();
 
       QueueControlMBean control = createControl();
-      assertFalse(control.sendMessageToDLQ(messageID));
+      assertFalse(control.sendMessageToDeadLetterAddress(messageID));
 
       verifyMockedAttributes();
    }




More information about the jboss-cvs-commits mailing list