[jboss-cvs] JBoss Messaging SVN: r5466 - in trunk: src/main/org/jboss/messaging/core/management/impl and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Dec 5 11:40:47 EST 2008


Author: jmesnil
Date: 2008-12-05 11:40:47 -0500 (Fri, 05 Dec 2008)
New Revision: 5466

Added:
   trunk/src/main/org/jboss/messaging/jms/server/management/jmx/
   trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/
   trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareConnectionFactoryControlWrapper.java
   trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSQueueControlWrapper.java
   trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSServerControlWrapper.java
   trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareTopicControlWrapper.java
Modified:
   trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
   trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
   trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java
   trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSManagementServiceImpl.java
   trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java
   trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/JMSManagementServiceImplTest.java
Log:
refacted management code to support replication

refactored JMS MBeans as wrappers which use JBM core messages to invoke the management operations.

Modified: trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/ManagementService.java	2008-12-05 15:12:12 UTC (rev 5465)
+++ trunk/src/main/org/jboss/messaging/core/management/ManagementService.java	2008-12-05 16:40:47 UTC (rev 5466)
@@ -63,6 +63,10 @@
 
    void unregisterServer() throws Exception;
 
+   void registerInJMX(ObjectName objectName, Object managedResource) throws Exception;
+
+   void registerInRegistry(ObjectName objectName, Object managedResource);
+
    void registerAddress(SimpleString address) throws Exception;
 
    void unregisterAddress(SimpleString address) throws Exception;
@@ -94,4 +98,5 @@
     * @see ManagementHelper
     */
    void sendNotification(NotificationType type, String message, TypedProperties props) throws Exception;
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2008-12-05 15:12:12 UTC (rev 5465)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2008-12-05 16:40:47 UTC (rev 5466)
@@ -39,16 +39,13 @@
 import javax.management.MBeanServer;
 import javax.management.NotificationBroadcasterSupport;
 import javax.management.ObjectName;
-import javax.management.StandardMBean;
 
 import org.jboss.messaging.core.client.management.impl.ManagementHelper;
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.management.AddressControlMBean;
 import org.jboss.messaging.core.management.ManagementService;
 import org.jboss.messaging.core.management.MessagingServerControlMBean;
 import org.jboss.messaging.core.management.NotificationType;
-import org.jboss.messaging.core.management.QueueControlMBean;
 import org.jboss.messaging.core.management.jmx.impl.ReplicationAwareAddressControlWrapper;
 import org.jboss.messaging.core.management.jmx.impl.ReplicationAwareMessagingServerControlWrapper;
 import org.jboss.messaging.core.management.jmx.impl.ReplicationAwareQueueControlWrapper;
@@ -179,7 +176,7 @@
                                                  messageCounterManager,
                                                  broadcaster);
       ObjectName objectName = getMessagingServerObjectName();
-      registerInJMX(objectName, new StandardMBean(new ReplicationAwareMessagingServerControlWrapper(objectName, managedServer), MessagingServerControlMBean.class));
+      registerInJMX(objectName, new ReplicationAwareMessagingServerControlWrapper(objectName, managedServer));
       registerInRegistry(objectName, managedServer);
 
       return managedServer;
@@ -196,7 +193,7 @@
       ObjectName objectName = getAddressObjectName(address);
       AddressControl addressControl = new AddressControl(address, postOffice, securityRepository);
       
-      registerInJMX(objectName, new StandardMBean(new ReplicationAwareAddressControlWrapper(objectName, addressControl), AddressControlMBean.class));
+      registerInJMX(objectName, new ReplicationAwareAddressControlWrapper(objectName, addressControl));
       registerInRegistry(objectName, addressControl);
       if (log.isDebugEnabled())
       {
@@ -227,7 +224,7 @@
                                                         postOffice,
                                                         queueSettingsRepository,
                                                         counter);      
-      registerInJMX(objectName, new StandardMBean(new ReplicationAwareQueueControlWrapper(objectName, queueControl), QueueControlMBean.class));
+      registerInJMX(objectName, new ReplicationAwareQueueControlWrapper(objectName, queueControl));
       registerInRegistry(objectName, queueControl);
 
       if (log.isDebugEnabled())
@@ -318,25 +315,9 @@
       return registry.get(objectName);
    }
 
-   // Package protected ---------------------------------------------
 
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   public void registerInRegistry(final ObjectName objectName, final Object managedResource)
+   public void registerInJMX(final ObjectName objectName, final Object managedResource) throws Exception
    {
-      unregisterFromRegistry(objectName);
-      registry.put(objectName, managedResource);
-   }
-
-   private void unregisterFromRegistry(final ObjectName objectName)
-   {
-      registry.remove(objectName);
-   }
-
-   private void registerInJMX(final ObjectName objectName, final Object managedResource) throws Exception
-   {
       if (!jmxManagementEnabled)
       {
          return;
@@ -347,7 +328,24 @@
          mbeanServer.registerMBean(managedResource, objectName);         
       }
    }
+   
+   public void registerInRegistry(final ObjectName objectName, final Object managedResource)
+   {
+      unregisterFromRegistry(objectName);
+      registry.put(objectName, managedResource);
+   }
 
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   private void unregisterFromRegistry(final ObjectName objectName)
+   {
+      registry.remove(objectName);
+   }
+
    // the JMX unregistration is synchronized to avoid race conditions if 2 clients tries to 
    // unregister the same resource (e.g. a queue) at the same time since unregisterMBean()
    // will throw an exception if the MBean has already been unregistered 

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java	2008-12-05 15:12:12 UTC (rev 5465)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java	2008-12-05 16:40:47 UTC (rev 5466)
@@ -45,7 +45,7 @@
    
    String getExpiryQueue();
    
-   void setExpiryAddress(@Parameter(name = "expiryQueue", desc = "Name of the expiry queueur") String expiryQueue);
+   void setExpiryAddress(@Parameter(name = "expiryQueue", desc = "Name of the expiry queueur") String expiryQueue) throws Exception;
 
    String getDeadLetterAddress();
    

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSManagementServiceImpl.java	2008-12-05 15:12:12 UTC (rev 5465)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSManagementServiceImpl.java	2008-12-05 16:40:47 UTC (rev 5466)
@@ -42,6 +42,10 @@
 import org.jboss.messaging.jms.client.JBossConnectionFactory;
 import org.jboss.messaging.jms.server.JMSServerManager;
 import org.jboss.messaging.jms.server.management.JMSManagementService;
+import org.jboss.messaging.jms.server.management.jmx.impl.ReplicationAwareConnectionFactoryControlWrapper;
+import org.jboss.messaging.jms.server.management.jmx.impl.ReplicationAwareJMSQueueControlWrapper;
+import org.jboss.messaging.jms.server.management.jmx.impl.ReplicationAwareJMSServerControlWrapper;
+import org.jboss.messaging.jms.server.management.jmx.impl.ReplicationAwareTopicControlWrapper;
 
 /*
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -97,7 +101,9 @@
    {
       ObjectName objectName = getJMSServerObjectName();
       JMSServerControl control = new JMSServerControl(server);
-      managementService.registerResource(objectName, control);
+      managementService.registerInJMX(objectName,
+                                      new ReplicationAwareJMSServerControlWrapper(objectName, control));
+      managementService.registerInRegistry(objectName, control);
    }
 
    public void unregisterJMSServer() throws Exception
@@ -129,7 +135,9 @@
                                                     storageManager,
                                                     queueSettingsRepository,
                                                     counter);
-      managementService.registerResource(objectName, control);
+      managementService.registerInJMX(objectName,
+                                      new ReplicationAwareJMSQueueControlWrapper(objectName, control));
+      managementService.registerInRegistry(objectName, control);
    }
 
    public void unregisterQueue(final String name) throws Exception
@@ -145,7 +153,8 @@
    {
       ObjectName objectName = getJMSTopicObjectName(topic.getTopicName());
       TopicControl control = new TopicControl(topic, jndiBinding, postOffice, storageManager);
-      managementService.registerResource(objectName, control);
+      managementService.registerInJMX(objectName, new ReplicationAwareTopicControlWrapper(objectName, control));
+      managementService.registerInRegistry(objectName, control);
    }
 
    public void unregisterTopic(final String name) throws Exception
@@ -160,7 +169,9 @@
    {
       ObjectName objectName = getConnectionFactoryObjectName(name);
       ConnectionFactoryControl control = new ConnectionFactoryControl(connectionFactory, name, bindings);
-      managementService.registerResource(objectName, control);
+      managementService.registerInJMX(objectName,
+                                      new ReplicationAwareConnectionFactoryControlWrapper(objectName, control));
+      managementService.registerInRegistry(objectName, control);
    }
 
    public void unregisterConnectionFactory(final String name) throws Exception

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java	2008-12-05 15:12:12 UTC (rev 5465)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java	2008-12-05 16:40:47 UTC (rev 5466)
@@ -27,9 +27,6 @@
 import java.util.GregorianCalendar;
 import java.util.List;
 
-import javax.management.MBeanInfo;
-import javax.management.NotCompliantMBeanException;
-import javax.management.StandardMBean;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.TabularData;
 
@@ -39,7 +36,6 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.management.DayCounterInfo;
 import org.jboss.messaging.core.management.MessageCounterInfo;
-import org.jboss.messaging.core.management.impl.MBeanInfoHelper;
 import org.jboss.messaging.core.messagecounter.MessageCounter;
 import org.jboss.messaging.core.messagecounter.MessageCounter.DayCounter;
 import org.jboss.messaging.core.messagecounter.impl.MessageCounterHelper;
@@ -65,8 +61,7 @@
  * @version <tt>$Revision$</tt>
  * 
  */
-public class JMSQueueControl extends StandardMBean implements
-      JMSQueueControlMBean
+public class JMSQueueControl implements JMSQueueControlMBean
 {
    // Constants -----------------------------------------------------
 
@@ -75,11 +70,17 @@
    // Attributes ----------------------------------------------------
 
    private final JBossQueue managedQueue;
+
    private final Queue coreQueue;
+
    private final String binding;
+
    private final PostOffice postOffice;
+
    private final StorageManager storageManager;
+
    private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+
    private final MessageCounter counter;
 
    // Static --------------------------------------------------------
@@ -89,24 +90,22 @@
       String filterStr = (selectorStr == null) ? null : SelectorTranslator.convertToJBMFilterString(selectorStr);
       return FilterImpl.createFilter(filterStr);
    }
-   
-   private static Filter createFilterForJMSMessageID(String jmsMessageID)
-         throws Exception
+
+   private static Filter createFilterForJMSMessageID(String jmsMessageID) throws Exception
    {
-      return new FilterImpl(new SimpleString(JBossMessage.JBM_MESSAGE_ID
-            + " = '" + jmsMessageID + "'"));
+      return new FilterImpl(new SimpleString(JBossMessage.JBM_MESSAGE_ID + " = '" + jmsMessageID + "'"));
    }
 
    // Constructors --------------------------------------------------
 
-   public JMSQueueControl(final JBossQueue queue, final Queue coreQueue,
-         final String jndiBinding, final PostOffice postOffice,
-         final StorageManager storageManager,
-         final HierarchicalRepository<QueueSettings> queueSettingsRepository,
-         final MessageCounter counter)
-         throws NotCompliantMBeanException
+   public JMSQueueControl(final JBossQueue queue,
+                          final Queue coreQueue,
+                          final String jndiBinding,
+                          final PostOffice postOffice,
+                          final StorageManager storageManager,
+                          final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+                          final MessageCounter counter)
    {
-      super(JMSQueueControlMBean.class);
       this.managedQueue = queue;
       this.coreQueue = coreQueue;
       this.binding = jndiBinding;
@@ -185,9 +184,9 @@
       QueueSettings queueSettings = queueSettingsRepository.getMatch(getName());
       if (queueSettings != null && queueSettings.getDeadLetterAddress() != null)
       {
-         return JBossDestination.fromAddress(queueSettings.getDeadLetterAddress().toString())
-               .getName();
-      } else
+         return JBossDestination.fromAddress(queueSettings.getDeadLetterAddress().toString()).getName();
+      }
+      else
       {
          return null;
       }
@@ -198,9 +197,9 @@
       QueueSettings queueSettings = queueSettingsRepository.getMatch(getName());
       if (queueSettings != null && queueSettings.getExpiryAddress() != null)
       {
-         return JBossDestination.fromAddress(
-               queueSettings.getExpiryAddress().toString()).getName();
-      } else
+         return JBossDestination.fromAddress(queueSettings.getExpiryAddress().toString()).getName();
+      }
+      else
       {
          return null;
       }
@@ -209,7 +208,7 @@
    public void setExpiryAddress(String expiryQueueName)
    {
       QueueSettings queueSettings = queueSettingsRepository.getMatch(getName());
-      
+
       if (expiryQueueName != null)
       {
          queueSettings.setExpiryAddress(new SimpleString(expiryQueueName));
@@ -222,11 +221,9 @@
       List<MessageReference> refs = coreQueue.list(filter);
       if (refs.size() != 1)
       {
-         throw new IllegalArgumentException(
-               "No message found for JMSMessageID: " + messageID);
+         throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID);
       }
-      return coreQueue.deleteReference(refs.get(0).getMessage().getMessageID(),
-            storageManager);
+      return coreQueue.deleteReference(refs.get(0).getMessage().getMessageID(), storageManager);
    }
 
    public int removeMatchingMessages(String filterStr) throws Exception
@@ -235,12 +232,13 @@
       {
          Filter filter = createFilterFromJMSSelector(filterStr);
          return coreQueue.deleteMatchingReferences(filter, storageManager);
-      } catch (MessagingException e)
+      }
+      catch (MessagingException e)
       {
          throw new IllegalStateException(e.getMessage());
       }
    }
-   
+
    public int removeAllMessages() throws Exception
    {
       return coreQueue.deleteAllReferences(storageManager);
@@ -258,8 +256,7 @@
          Filter filter = createFilterFromJMSSelector(filterStr);
 
          List<MessageReference> messageRefs = coreQueue.list(filter);
-         List<JMSMessageInfo> infos = new ArrayList<JMSMessageInfo>(messageRefs
-               .size());
+         List<JMSMessageInfo> infos = new ArrayList<JMSMessageInfo>(messageRefs.size());
          for (MessageReference messageRef : messageRefs)
          {
             ServerMessage message = messageRef.getMessage();
@@ -267,7 +264,8 @@
             infos.add(info);
          }
          return JMSMessageInfo.toTabularData(infos);
-      } catch (MessagingException e)
+      }
+      catch (MessagingException e)
       {
          throw new IllegalStateException(e.getMessage());
       }
@@ -279,11 +277,12 @@
       List<MessageReference> refs = coreQueue.list(filter);
       if (refs.size() != 1)
       {
-         throw new IllegalArgumentException(
-               "No message found for JMSMessageID: " + messageID);
+         throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID);
       }
       return coreQueue.expireMessage(refs.get(0).getMessage().getMessageID(),
-            storageManager, postOffice, queueSettingsRepository);
+                                     storageManager,
+                                     postOffice,
+                                     queueSettingsRepository);
    }
 
    public int expireMessages(final String filterStr) throws Exception
@@ -296,10 +295,13 @@
          for (MessageReference ref : refs)
          {
             coreQueue.expireMessage(ref.getMessage().getMessageID(),
-                  storageManager, postOffice, queueSettingsRepository);
+                                    storageManager,
+                                    postOffice,
+                                    queueSettingsRepository);
          }
          return refs.size();
-      } catch (MessagingException e)
+      }
+      catch (MessagingException e)
       {
          throw new IllegalStateException(e.getMessage());
       }
@@ -311,64 +313,62 @@
       List<MessageReference> refs = coreQueue.list(filter);
       if (refs.size() != 1)
       {
-         throw new IllegalArgumentException(
-               "No message found for JMSMessageID: " + messageID);
+         throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID);
       }
-      return coreQueue.sendMessageToDeadLetterAddress(
-            refs.get(0).getMessage().getMessageID(), storageManager,
-            postOffice, queueSettingsRepository);
+      return coreQueue.sendMessageToDeadLetterAddress(refs.get(0).getMessage().getMessageID(),
+                                                      storageManager,
+                                                      postOffice,
+                                                      queueSettingsRepository);
    }
 
-   public boolean changeMessagePriority(final String messageID,
-         final int newPriority) throws Exception
+   public boolean changeMessagePriority(final String messageID, final int newPriority) throws Exception
    {
       if (newPriority < 0 || newPriority > 9)
       {
-         throw new IllegalArgumentException("invalid newPriority value: "
-               + newPriority + ". It must be between 0 and 9 (both included)");
+         throw new IllegalArgumentException("invalid newPriority value: " + newPriority +
+                                            ". It must be between 0 and 9 (both included)");
       }
       Filter filter = createFilterForJMSMessageID(messageID);
       List<MessageReference> refs = coreQueue.list(filter);
       if (refs.size() != 1)
       {
-         throw new IllegalArgumentException(
-               "No message found for JMSMessageID: " + messageID);
+         throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID);
       }
-      return coreQueue.changeMessagePriority(refs.get(0).getMessage()
-            .getMessageID(), (byte) newPriority, storageManager, postOffice,
-            queueSettingsRepository);
+      return coreQueue.changeMessagePriority(refs.get(0).getMessage().getMessageID(),
+                                             (byte)newPriority,
+                                             storageManager,
+                                             postOffice,
+                                             queueSettingsRepository);
    }
-   
+
    public boolean moveMessage(long messageID, String otherQueueName) throws Exception
    {
       Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
       if (binding == null)
       {
-         throw new IllegalArgumentException("No queue found for "
-               + otherQueueName);
+         throw new IllegalArgumentException("No queue found for " + otherQueueName);
       }
 
       return coreQueue.moveMessage(messageID, binding.getAddress(), storageManager, postOffice);
    }
-   
+
    public int moveMatchingMessages(String filterStr, String otherQueueName) throws Exception
    {
       Binding otherBinding = postOffice.getBinding(new SimpleString(otherQueueName));
       if (otherBinding == null)
       {
-         throw new IllegalArgumentException("No queue found for "
-               + otherQueueName);
+         throw new IllegalArgumentException("No queue found for " + otherQueueName);
       }
 
       Filter filter = createFilterFromJMSSelector(filterStr);
       return coreQueue.moveMessages(filter, otherBinding.getAddress(), storageManager, postOffice);
    }
-   
+
    public int moveAllMessages(String otherQueueName) throws Exception
    {
       return moveMatchingMessages(null, otherQueueName);
    }
-   
+
    public CompositeData listMessageCounter()
    {
       return MessageCounterInfo.toCompositeData(counter);
@@ -395,24 +395,12 @@
       }
       return DayCounterInfo.toTabularData(infos);
    }
-   
+
    public String listMessageCounterHistoryAsHTML()
    {
       return MessageCounterHelper.listMessageCounterHistoryAsHTML(new MessageCounter[] { counter });
    }
 
-   // StandardMBean overrides ---------------------------------------
-
-   @Override
-   public MBeanInfo getMBeanInfo()
-   {
-      MBeanInfo info = super.getMBeanInfo();
-      return new MBeanInfo(info.getClassName(), info.getDescription(), info
-            .getAttributes(), info.getConstructors(), MBeanInfoHelper
-            .getMBeanOperationsInfo(JMSQueueControlMBean.class), info
-            .getNotifications());
-   }
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java	2008-12-05 15:12:12 UTC (rev 5465)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java	2008-12-05 16:40:47 UTC (rev 5466)
@@ -26,15 +26,9 @@
 import java.util.Collections;
 import java.util.List;
 
-import javax.management.MBeanInfo;
-import javax.management.NotCompliantMBeanException;
-import javax.management.StandardMBean;
 import javax.management.openmbean.TabularData;
 
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.management.Operation;
-import org.jboss.messaging.core.management.Parameter;
-import org.jboss.messaging.core.management.impl.MBeanInfoHelper;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
@@ -54,54 +48,37 @@
  * @version <tt>$Revision$</tt>
  * 
  */
-public class TopicControl extends StandardMBean implements TopicControlMBean
+public class TopicControl implements TopicControlMBean
 {
    // Constants -----------------------------------------------------
 
    private static final Logger log = Logger.getLogger(TopicControl.class);
-   
+
    // Attributes ----------------------------------------------------
 
    private final JBossTopic managedTopic;
+
    private final String binding;
+
    private final PostOffice postOffice;
+
    private final StorageManager storageManager;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public TopicControl(final JBossTopic topic, final String jndiBinding,
-         final PostOffice postOffice, final StorageManager storageManager)
-         throws NotCompliantMBeanException
+   public TopicControl(final JBossTopic topic,
+                       final String jndiBinding,
+                       final PostOffice postOffice,
+                       final StorageManager storageManager)
    {
-      super(TopicControlMBean.class);
       this.managedTopic = topic;
       this.binding = jndiBinding;
       this.postOffice = postOffice;
       this.storageManager = storageManager;
    }
 
-   // Public --------------------------------------------------------
-
-   // StandardMBean overrides ---------------------------------------
-
-   /**
-    * overrides getMBeanInfo to add operations info using annotations
-    * 
-    * @see Operation
-    * @see Parameter
-    */
-   @Override
-   public MBeanInfo getMBeanInfo()
-   {
-      MBeanInfo info = super.getMBeanInfo();
-      return new MBeanInfo(info.getClassName(), info.getDescription(), info
-            .getAttributes(), info.getConstructors(), MBeanInfoHelper
-            .getMBeanOperationsInfo(TopicControlMBean.class), info
-            .getNotifications());
-   }
-
    // TopicControlMBean implementation ------------------------------
 
    public String getName()
@@ -184,8 +161,7 @@
       return listSubscribersInfos(DurabilityType.NON_DURABLE);
    }
 
-   public TabularData listMessagesForSubscription(final String queueName)
-         throws Exception
+   public TabularData listMessagesForSubscription(final String queueName) throws Exception
    {
       SimpleString sAddress = new SimpleString(queueName);
       Binding binding = postOffice.getBinding(sAddress);
@@ -195,8 +171,7 @@
       }
       Queue queue = binding.getQueue();
       List<MessageReference> messageRefs = queue.list(null);
-      List<JMSMessageInfo> infos = new ArrayList<JMSMessageInfo>(messageRefs
-            .size());
+      List<JMSMessageInfo> infos = new ArrayList<JMSMessageInfo>(messageRefs.size());
 
       for (MessageReference messageRef : messageRefs)
       {
@@ -210,18 +185,17 @@
    public int removeAllMessages() throws Exception
    {
       int count = 0;
-      List<Binding> bindings = postOffice.getBindingsForAddress(managedTopic
-            .getSimpleAddress());
+      List<Binding> bindings = postOffice.getBindingsForAddress(managedTopic.getSimpleAddress());
 
       for (Binding binding : bindings)
       {
          Queue queue = binding.getQueue();
          count += queue.deleteAllReferences(storageManager);
       }
-      
+
       return count;
    }
-   
+
    public void dropDurableSubscription(String clientID, String subscriptionName) throws Exception
    {
       String queueName = JBossTopic.createQueueNameForDurableSubscription(clientID, subscriptionName);
@@ -229,7 +203,9 @@
 
       if (binding == null)
       {
-         throw new IllegalArgumentException("No durable subscription for clientID=" + clientID + ", subcription=" + subscriptionName);
+         throw new IllegalArgumentException("No durable subscription for clientID=" + clientID +
+                                            ", subcription=" +
+                                            subscriptionName);
       }
 
       Queue queue = binding.getQueue();
@@ -241,8 +217,7 @@
 
    public void dropAllSubscriptions() throws Exception
    {
-      List<Binding> bindings = postOffice.getBindingsForAddress(managedTopic
-                                                                .getSimpleAddress());
+      List<Binding> bindings = postOffice.getBindingsForAddress(managedTopic.getSimpleAddress());
 
       for (Binding binding : bindings)
       {
@@ -261,32 +236,30 @@
    private SubscriptionInfo[] listSubscribersInfos(final DurabilityType durability)
    {
       List<Queue> queues = getQueues(durability);
-      List<SubscriptionInfo> subInfos = new ArrayList<SubscriptionInfo>(queues
-            .size());
-      
+      List<SubscriptionInfo> subInfos = new ArrayList<SubscriptionInfo>(queues.size());
+
       for (Queue queue : queues)
       {
          String clientID = null;
-         String subName = null;                 
+         String subName = null;
 
          if (queue.isDurable())
          {
-            Pair<String, String> pair = JBossTopic
-                  .decomposeQueueNameForDurableSubscription(queue.getName()
-                        .toString());
+            Pair<String, String> pair = JBossTopic.decomposeQueueNameForDurableSubscription(queue.getName().toString());
             clientID = pair.a;
             subName = pair.b;
          }
-         
-         String filter = queue.getFilter() != null ? queue.getFilter()
-               .getFilterString().toString() : null;
+
+         String filter = queue.getFilter() != null ? queue.getFilter().getFilterString().toString() : null;
          SubscriptionInfo info = new SubscriptionInfo(queue.getName().toString(),
-               clientID, subName, queue.isDurable(), filter, queue
-                     .getMessageCount());
+                                                      clientID,
+                                                      subName,
+                                                      queue.isDurable(),
+                                                      filter,
+                                                      queue.getMessageCount());
          subInfos.add(info);
       }
-      return (SubscriptionInfo[]) subInfos.toArray(new SubscriptionInfo[subInfos
-            .size()]);
+      return (SubscriptionInfo[])subInfos.toArray(new SubscriptionInfo[subInfos.size()]);
    }
 
    private int getMessageCount(final DurabilityType durability)
@@ -304,23 +277,21 @@
    {
       try
       {
-         List<Binding> bindings = postOffice.getBindingsForAddress(managedTopic
-               .getSimpleAddress());
+         List<Binding> bindings = postOffice.getBindingsForAddress(managedTopic.getSimpleAddress());
          List<Queue> matchingQueues = new ArrayList<Queue>();
 
          for (Binding binding : bindings)
          {
             Queue queue = binding.getQueue();
-            if (durability == DurabilityType.ALL
-                  || (durability == DurabilityType.DURABLE && queue.isDurable())
-                  || (durability == DurabilityType.NON_DURABLE && !queue
-                        .isDurable()))
+            if (durability == DurabilityType.ALL || (durability == DurabilityType.DURABLE && queue.isDurable()) ||
+                (durability == DurabilityType.NON_DURABLE && !queue.isDurable()))
             {
                matchingQueues.add(queue);
             }
          }
          return matchingQueues;
-      } catch (Exception e)
+      }
+      catch (Exception e)
       {
          e.printStackTrace();
          return Collections.emptyList();

Added: trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareConnectionFactoryControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareConnectionFactoryControlWrapper.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareConnectionFactoryControlWrapper.java	2008-12-05 16:40:47 UTC (rev 5466)
@@ -0,0 +1,146 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.jms.server.management.jmx.impl;
+
+import java.util.List;
+
+import javax.management.MBeanInfo;
+import javax.management.ObjectName;
+
+import org.jboss.messaging.core.management.impl.MBeanInfoHelper;
+import org.jboss.messaging.core.management.jmx.impl.ReplicationAwareStandardMBeanWrapper;
+import org.jboss.messaging.jms.server.management.ConnectionFactoryControlMBean;
+import org.jboss.messaging.jms.server.management.impl.ConnectionFactoryControl;
+
+/**
+ * A ReplicationAwareConnectionFactoryControlWrapper
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class ReplicationAwareConnectionFactoryControlWrapper extends ReplicationAwareStandardMBeanWrapper implements
+         ConnectionFactoryControlMBean
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final ConnectionFactoryControl localControl;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ReplicationAwareConnectionFactoryControlWrapper(final ObjectName objectName, final ConnectionFactoryControl localControl) throws Exception
+   {
+      super(objectName, ConnectionFactoryControlMBean.class);
+      this.localControl = localControl;
+   }
+
+   // ConnectionFactoryControlMBean implementation ---------------------------
+
+   public List<String> getBindings()
+   {
+      return localControl.getBindings();
+   }
+
+   public long getCallTimeout()
+   {
+      return localControl.getCallTimeout();
+   }
+
+   public String getClientID()
+   {
+      return localControl.getClientID();
+   }
+
+   public int getDefaultConsumerMaxRate()
+   {
+      return localControl.getDefaultConsumerMaxRate();
+   }
+
+   public int getDefaultConsumerWindowSize()
+   {
+      return localControl.getDefaultConsumerWindowSize();
+   }
+
+   public int getDefaultProducerMaxRate()
+   {
+      return localControl.getDefaultProducerMaxRate();
+   }
+
+   public int getDefaultProducerWindowSize()
+   {
+      return localControl.getDefaultProducerWindowSize();
+   }
+
+   public int getDupsOKBatchSize()
+   {
+      return localControl.getDupsOKBatchSize();
+   }
+
+   public long getPingPeriod()
+   {
+      return localControl.getPingPeriod();
+   }
+
+   public boolean isDefaultBlockOnAcknowledge()
+   {
+      return localControl.isDefaultBlockOnAcknowledge();
+   }
+
+   public boolean isDefaultBlockOnNonPersistentSend()
+   {
+      return localControl.isDefaultBlockOnNonPersistentSend();
+   }
+
+   public boolean isDefaultBlockOnPersistentSend()
+   {
+      return localControl.isDefaultBlockOnPersistentSend();
+   }
+
+   // StandardMBean overrides ---------------------------------------
+
+   @Override
+   public MBeanInfo getMBeanInfo()
+   {
+      MBeanInfo info = super.getMBeanInfo();
+      return new MBeanInfo(info.getClassName(),
+                           info.getDescription(),
+                           info.getAttributes(),
+                           info.getConstructors(),
+                           MBeanInfoHelper.getMBeanOperationsInfo(ConnectionFactoryControlMBean.class),
+                           info.getNotifications());
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSQueueControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSQueueControlWrapper.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSQueueControlWrapper.java	2008-12-05 16:40:47 UTC (rev 5466)
@@ -0,0 +1,241 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.jms.server.management.jmx.impl;
+
+import javax.management.MBeanInfo;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+
+import org.jboss.messaging.core.management.impl.MBeanInfoHelper;
+import org.jboss.messaging.core.management.jmx.impl.ReplicationAwareStandardMBeanWrapper;
+import org.jboss.messaging.jms.server.management.JMSQueueControlMBean;
+import org.jboss.messaging.jms.server.management.impl.JMSQueueControl;
+
+/**
+ * A ReplicationAwareJMSQueueControlWrapper
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class ReplicationAwareJMSQueueControlWrapper extends ReplicationAwareStandardMBeanWrapper implements
+         JMSQueueControlMBean
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final JMSQueueControl localControl;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ReplicationAwareJMSQueueControlWrapper(final ObjectName objectName, final JMSQueueControl localControl) throws Exception
+   {
+      super(objectName, JMSQueueControlMBean.class);
+      this.localControl = localControl;
+   }
+
+   // JMSQueueControlMBean implementation ---------------------------
+
+   public int getConsumerCount()
+   {
+      return localControl.getConsumerCount();
+   }
+
+   public String getDeadLetterAddress()
+   {
+      return localControl.getDeadLetterAddress();
+   }
+
+   public int getDeliveringCount()
+   {
+      return localControl.getDeliveringCount();
+   }
+
+   public String getExpiryQueue()
+   {
+      return localControl.getExpiryQueue();
+   }
+
+   public int getMessageCount()
+   {
+      return localControl.getMessageCount();
+   }
+
+   public int getMessagesAdded()
+   {
+      return localControl.getMessagesAdded();
+   }
+
+   public String getName()
+   {
+      return localControl.getName();
+   }
+
+   public long getScheduledCount()
+   {
+      return localControl.getScheduledCount();
+   }
+
+   public long getSizeBytes()
+   {
+      return localControl.getSizeBytes();
+   }
+
+   public boolean isClustered()
+   {
+      return localControl.isClustered();
+   }
+
+   public boolean isDurable()
+   {
+      return localControl.isDurable();
+   }
+
+   public boolean isTemporary()
+   {
+      return localControl.isTemporary();
+   }
+
+   public TabularData listAllMessages() throws Exception
+   {
+      return localControl.listAllMessages();
+   }
+
+   public CompositeData listMessageCounter()
+   {
+      return localControl.listMessageCounter();
+   }
+
+   public String listMessageCounterAsHTML()
+   {
+      return localControl.listMessageCounterAsHTML();
+   }
+
+   public TabularData listMessageCounterHistory() throws Exception
+   {
+      return localControl.listMessageCounterHistory();
+   }
+
+   public String listMessageCounterHistoryAsHTML()
+   {
+      return localControl.listMessageCounterHistoryAsHTML();
+   }
+
+   public TabularData listMessages(String filter) throws Exception
+   {
+      return localControl.listMessages(filter);
+   }
+
+   public String getAddress()
+   {
+      return localControl.getAddress();
+   }
+
+   public String getJNDIBinding()
+   {
+      return localControl.getJNDIBinding();
+   }
+
+   public boolean changeMessagePriority(final String messageID, int newPriority) throws Exception
+   {
+      return (Boolean)replicationAwareInvoke("changeMessagePriority", messageID, newPriority);
+   }
+
+   public boolean expireMessage(final String messageID) throws Exception
+   {
+      return (Boolean)replicationAwareInvoke("expireMessage", messageID);
+   }
+
+   public int expireMessages(final String filter) throws Exception
+   {
+      return (Integer)replicationAwareInvoke("expireMessages", filter);
+   }
+
+   public int moveAllMessages(final String otherQueueName) throws Exception
+   {
+      return (Integer)replicationAwareInvoke("moveAllMessages", otherQueueName);
+   }
+
+   public int moveMatchingMessages(final String filter, final String otherQueueName) throws Exception
+   {
+      return (Integer)replicationAwareInvoke("moveMatchingMessages", filter, otherQueueName);
+   }
+
+   public boolean moveMessage(final long messageID, final String otherQueueName) throws Exception
+   {
+      return (Boolean)replicationAwareInvoke("moveMessage", messageID, otherQueueName);
+   }
+
+   public int removeMatchingMessages(final String filter) throws Exception
+   {
+      return (Integer)replicationAwareInvoke("removeMatchingMessages", filter);
+   }
+
+   public boolean removeMessage(final String messageID) throws Exception
+   {
+      return (Boolean)replicationAwareInvoke("removeMessage", messageID);
+   }
+
+   public boolean sendMessageToDLQ(final String messageID) throws Exception
+   {
+      return (Boolean)replicationAwareInvoke("sendMessageToDLQ", messageID);
+   }
+
+   public void setExpiryAddress(final String expiryQueue) throws Exception
+   {
+      replicationAwareInvoke("setExpiryAddress", expiryQueue);
+   }
+
+   public int removeAllMessages() throws Exception
+   {
+      return (Integer)replicationAwareInvoke("removeAllMessages");
+   }
+
+   // StandardMBean overrides ---------------------------------------
+
+   @Override
+   public MBeanInfo getMBeanInfo()
+   {
+      MBeanInfo info = super.getMBeanInfo();
+      return new MBeanInfo(info.getClassName(),
+                           info.getDescription(),
+                           info.getAttributes(),
+                           info.getConstructors(),
+                           MBeanInfoHelper.getMBeanOperationsInfo(JMSQueueControlMBean.class),
+                           info.getNotifications());
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSServerControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSServerControlWrapper.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSServerControlWrapper.java	2008-12-05 16:40:47 UTC (rev 5466)
@@ -0,0 +1,261 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.jms.server.management.jmx.impl;
+
+import java.util.List;
+
+import javax.management.MBeanInfo;
+import javax.management.ObjectName;
+
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
+import org.jboss.messaging.core.management.impl.MBeanInfoHelper;
+import org.jboss.messaging.core.management.jmx.impl.ReplicationAwareStandardMBeanWrapper;
+import org.jboss.messaging.jms.server.management.JMSServerControlMBean;
+import org.jboss.messaging.jms.server.management.impl.JMSServerControl;
+import org.jboss.messaging.util.Pair;
+
+/**
+ * A ReplicationAwareJMSServerControlWrapper
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class ReplicationAwareJMSServerControlWrapper extends ReplicationAwareStandardMBeanWrapper implements
+         JMSServerControlMBean
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final JMSServerControl localControl;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ReplicationAwareJMSServerControlWrapper(final ObjectName objectName, final JMSServerControl localControl) throws Exception
+   {
+      super(objectName, JMSServerControlMBean.class);
+      this.localControl = localControl;
+   }
+
+   // JMSServerControlMBean implementation --------------------------
+
+   public boolean closeConnectionsForAddress(final String ipAddress) throws Exception
+   {
+      return localControl.closeConnectionsForAddress(ipAddress);
+   }
+
+   public void createConnectionFactory(final String name,
+                                       final List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
+                                       final String connectionLoadBalancingPolicyClassName,
+                                       final long pingPeriod,
+                                       final long connectionTTL,
+                                       final long callTimeout,
+                                       final String clientID,
+                                       final int dupsOKBatchSize,
+                                       final int transactionBatchSize,
+                                       final int consumerWindowSize,
+                                       final int consumerMaxRate,
+                                       final int producerWindowSize,
+                                       final int producerMaxRate,
+                                       final int minLargeMessageSize,
+                                       final boolean blockOnAcknowledge,
+                                       final boolean blockOnNonPersistentSend,
+                                       final boolean blockOnPersistentSend,
+                                       final boolean autoGroup,
+                                       final int maxConnections,
+                                       final boolean preAcknowledge,
+                                       final long retryInterval,
+                                       final double retryIntervalMultiplier,
+                                       final int maxRetriesBeforeFailover,
+                                       final int maxRetriesAfterFailover,
+                                       final String jndiBinding) throws Exception
+   {
+      // FIXME need to store correctly the connector configs
+      replicationAwareInvoke("createConnectionFactory",
+                             name,
+                             connectorConfigs,
+                             connectionLoadBalancingPolicyClassName,
+                             pingPeriod,
+                             connectionTTL,
+                             callTimeout,
+                             clientID,
+                             dupsOKBatchSize,
+                             transactionBatchSize,
+                             consumerWindowSize,
+                             consumerMaxRate,
+                             producerWindowSize,
+                             producerMaxRate,
+                             minLargeMessageSize,
+                             blockOnAcknowledge,
+                             blockOnNonPersistentSend,
+                             blockOnPersistentSend,
+                             autoGroup,
+                             maxConnections,
+                             preAcknowledge,
+                             retryInterval,
+                             retryIntervalMultiplier,
+                             maxRetriesBeforeFailover,
+                             maxRetriesAfterFailover,
+                             jndiBinding);
+   }
+
+   public void createConnectionFactory(final String name,
+                                       final DiscoveryGroupConfiguration discoveryGroupConfig,
+                                       final long discoveryInitialWait,
+                                       final String connectionLoadBalancingPolicyClassName,
+                                       final long pingPeriod,
+                                       final long connectionTTL,
+                                       final long callTimeout,
+                                       final String clientID,
+                                       final int dupsOKBatchSize,
+                                       final int transactionBatchSize,
+                                       final int consumerWindowSize,
+                                       final int consumerMaxRate,
+                                       final int producerWindowSize,
+                                       final int producerMaxRate,
+                                       final int minLargeMessageSize,
+                                       final boolean blockOnAcknowledge,
+                                       final boolean blockOnNonPersistentSend,
+                                       final boolean blockOnPersistentSend,
+                                       final boolean autoGroup,
+                                       final int maxConnections,
+                                       final boolean preAcknowledge,
+                                       final long retryInterval,
+                                       final double retryIntervalMultiplier,
+                                       final int maxRetriesBeforeFailover,
+                                       final int maxRetriesAfterFailover,
+                                       final String jndiBinding) throws Exception
+   {
+      // FIXME need to store correctly the connector configs
+      replicationAwareInvoke("createConnectionFactory",
+                             name,
+                             discoveryGroupConfig,
+                             discoveryInitialWait,
+                             connectionLoadBalancingPolicyClassName,
+                             pingPeriod,
+                             connectionTTL,
+                             callTimeout,
+                             clientID,
+                             dupsOKBatchSize,
+                             transactionBatchSize,
+                             consumerWindowSize,
+                             consumerMaxRate,
+                             producerWindowSize,
+                             producerMaxRate,
+                             minLargeMessageSize,
+                             blockOnAcknowledge,
+                             blockOnNonPersistentSend,
+                             blockOnPersistentSend,
+                             autoGroup,
+                             maxConnections,
+                             preAcknowledge,
+                             retryInterval,
+                             retryIntervalMultiplier,
+                             maxRetriesBeforeFailover,
+                             maxRetriesAfterFailover,
+                             jndiBinding);
+   }
+
+   public boolean createQueue(final String name, final String jndiBinding) throws Exception
+   {
+      return (Boolean)replicationAwareInvoke("createQueue", name, jndiBinding);
+   }
+
+   public boolean createTopic(final String name, final String jndiBinding) throws Exception
+   {
+      return (Boolean)replicationAwareInvoke("createTopic", name, jndiBinding);
+   }
+
+   public void destroyConnectionFactory(final String name) throws Exception
+   {
+      replicationAwareInvoke("destroyConnectionFactory", name);
+   }
+
+   public boolean destroyQueue(final String name) throws Exception
+   {
+      return (Boolean)replicationAwareInvoke("destroyQueue", name);
+   }
+
+   public boolean destroyTopic(final String name) throws Exception
+   {
+      return (Boolean)replicationAwareInvoke("destroyTopic", name);
+   }
+
+   public String getVersion()
+   {
+      return localControl.getVersion();
+   }
+
+   public boolean isStarted()
+   {
+      return localControl.isStarted();
+   }
+
+   public String[] listConnectionIDs()
+   {
+      return localControl.listConnectionIDs();
+   }
+
+   public String[] listRemoteAddresses()
+   {
+      return localControl.listRemoteAddresses();
+   }
+
+   public String[] listRemoteAddresses(final String ipAddress)
+   {
+      return localControl.listRemoteAddresses(ipAddress);
+   }
+
+   public String[] listSessions(final String connectionID)
+   {
+      return localControl.listSessions(connectionID);
+   }
+
+   // StandardMBean overrides ---------------------------------------
+
+   @Override
+   public MBeanInfo getMBeanInfo()
+   {
+      MBeanInfo info = super.getMBeanInfo();
+      return new MBeanInfo(info.getClassName(),
+                           info.getDescription(),
+                           info.getAttributes(),
+                           info.getConstructors(),
+                           MBeanInfoHelper.getMBeanOperationsInfo(JMSServerControlMBean.class),
+                           info.getNotifications());
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareTopicControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareTopicControlWrapper.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareTopicControlWrapper.java	2008-12-05 16:40:47 UTC (rev 5466)
@@ -0,0 +1,187 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.jms.server.management.jmx.impl;
+
+import javax.management.MBeanInfo;
+import javax.management.ObjectName;
+import javax.management.openmbean.TabularData;
+
+import org.jboss.messaging.core.management.impl.MBeanInfoHelper;
+import org.jboss.messaging.core.management.jmx.impl.ReplicationAwareStandardMBeanWrapper;
+import org.jboss.messaging.jms.server.management.SubscriptionInfo;
+import org.jboss.messaging.jms.server.management.TopicControlMBean;
+import org.jboss.messaging.jms.server.management.impl.TopicControl;
+
+/**
+ * A ReplicationAwareTopicControlWrapper
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class ReplicationAwareTopicControlWrapper extends ReplicationAwareStandardMBeanWrapper implements
+         TopicControlMBean
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final TopicControl localControl;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ReplicationAwareTopicControlWrapper(final ObjectName objectName, final TopicControl localControl) throws Exception
+   {
+      super(objectName, TopicControlMBean.class);
+      this.localControl = localControl;
+   }
+
+   // TopicControlMBean implementation ---------------------------
+
+   public void dropAllSubscriptions() throws Exception
+   {
+      replicationAwareInvoke("dropAllSubscriptions");
+   }
+
+   public void dropDurableSubscription(final String clientID, final String subscriptionName) throws Exception
+   {
+      replicationAwareInvoke("dropDurableSubscription", clientID, subscriptionName);
+   }
+
+   public int getDurableMessagesCount()
+   {
+      return localControl.getDurableMessagesCount();
+   }
+
+   public int getDurableSubcriptionsCount()
+   {
+      return localControl.getDurableSubcriptionsCount();
+   }
+
+   public int getNonDurableMessagesCount()
+   {
+      return localControl.getNonDurableMessagesCount();
+   }
+
+   public int getNonDurableSubcriptionsCount()
+   {
+      return localControl.getNonDurableSubcriptionsCount();
+   }
+
+   public int getSubcriptionsCount()
+   {
+      return localControl.getSubcriptionsCount();
+   }
+
+   public SubscriptionInfo[] listAllSubscriptionInfos()
+   {
+      return localControl.listAllSubscriptionInfos();
+   }
+
+   public TabularData listAllSubscriptions()
+   {
+      return localControl.listAllSubscriptions();
+   }
+
+   public SubscriptionInfo[] listDurableSubscriptionInfos()
+   {
+      return localControl.listDurableSubscriptionInfos();
+   }
+
+   public TabularData listDurableSubscriptions()
+   {
+      return localControl.listDurableSubscriptions();
+   }
+
+   public TabularData listMessagesForSubscription(final String queueName) throws Exception
+   {
+      return localControl.listMessagesForSubscription(queueName);
+   }
+
+   public SubscriptionInfo[] listNonDurableSubscriptionInfos()
+   {
+      return localControl.listNonDurableSubscriptionInfos();
+   }
+
+   public TabularData listNonDurableSubscriptions()
+   {
+      return localControl.listNonDurableSubscriptions();
+   }
+
+   public String getAddress()
+   {
+      return localControl.getAddress();
+   }
+
+   public String getJNDIBinding()
+   {
+      return localControl.getJNDIBinding();
+   }
+
+   public int getMessageCount() throws Exception
+   {
+      return localControl.getMessageCount();
+   }
+
+   public String getName()
+   {
+      return localControl.getName();
+   }
+
+   public boolean isTemporary()
+   {
+      return localControl.isTemporary();
+   }
+
+   public int removeAllMessages() throws Exception
+   {
+      return (Integer)replicationAwareInvoke("removeAllMessages");
+   }
+  
+
+   // StandardMBean overrides ---------------------------------------
+
+   @Override
+   public MBeanInfo getMBeanInfo()
+   {
+      MBeanInfo info = super.getMBeanInfo();
+      return new MBeanInfo(info.getClassName(),
+                           info.getDescription(),
+                           info.getAttributes(),
+                           info.getConstructors(),
+                           MBeanInfoHelper.getMBeanOperationsInfo(TopicControlMBean.class),
+                           info.getNotifications());
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java	2008-12-05 15:12:12 UTC (rev 5465)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java	2008-12-05 16:40:47 UTC (rev 5466)
@@ -117,12 +117,16 @@
 
       JMSUtil.sendMessages(queue, 2);
 
+      Thread.sleep(100);
+
       assertEquals(2, queueControl.getMessageCount());
       assertEquals(2, queueControl.getMessagesAdded());
 
       assertNotNull(consumer.receive(500));
       assertNotNull(consumer.receive(500));
 
+      Thread.sleep(100);
+
       assertEquals(0, queueControl.getMessageCount());
       assertEquals(2, queueControl.getMessagesAdded());
 

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/JMSManagementServiceImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/JMSManagementServiceImplTest.java	2008-12-05 15:12:12 UTC (rev 5465)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/JMSManagementServiceImplTest.java	2008-12-05 16:40:47 UTC (rev 5466)
@@ -34,6 +34,7 @@
 import java.util.List;
 
 import javax.management.ObjectName;
+import javax.management.StandardMBean;
 
 import junit.framework.TestCase;
 
@@ -49,12 +50,12 @@
 import org.jboss.messaging.jms.JBossTopic;
 import org.jboss.messaging.jms.client.JBossConnectionFactory;
 import org.jboss.messaging.jms.server.JMSServerManager;
-import org.jboss.messaging.jms.server.management.ConnectionFactoryControlMBean;
 import org.jboss.messaging.jms.server.management.JMSManagementService;
-import org.jboss.messaging.jms.server.management.JMSQueueControlMBean;
 import org.jboss.messaging.jms.server.management.JMSServerControlMBean;
-import org.jboss.messaging.jms.server.management.TopicControlMBean;
+import org.jboss.messaging.jms.server.management.impl.ConnectionFactoryControl;
 import org.jboss.messaging.jms.server.management.impl.JMSManagementServiceImpl;
+import org.jboss.messaging.jms.server.management.impl.JMSQueueControl;
+import org.jboss.messaging.jms.server.management.impl.TopicControl;
 
 /*
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -80,7 +81,8 @@
       JMSServerManager server = createMock(JMSServerManager.class);
 
       ManagementService managementService = createMock(ManagementService.class);
-      managementService.registerResource(eq(objectName), isA(JMSServerControlMBean.class));
+      managementService.registerInJMX(eq(objectName), isA(StandardMBean.class));
+      managementService.registerInRegistry(eq(objectName), isA(JMSServerControlMBean.class));
       replay(managementService, server);
 
       JMSManagementService service = new JMSManagementServiceImpl(managementService);
@@ -107,7 +109,8 @@
       expect(managementService.getMessageCounterManager()).andReturn(messageCounterManager );
       expect(messageCounterManager.getMaxDayCount()).andReturn(randomPositiveInt());
       messageCounterManager.registerMessageCounter(eq(name), isA(MessageCounter.class));
-      managementService.registerResource(eq(objectName), isA(JMSQueueControlMBean.class));
+      managementService.registerInJMX(eq(objectName), isA(StandardMBean.class));
+      managementService.registerInRegistry(eq(objectName), isA(JMSQueueControl.class));
       
       replay(managementService, messageCounterManager, coreQueue, postOffice, storageManager, queueSettingsRepository);
 
@@ -128,7 +131,8 @@
       StorageManager storageManager = createMock(StorageManager.class);
 
       ManagementService managementService = createMock(ManagementService.class);
-      managementService.registerResource(eq(objectName), isA(TopicControlMBean.class));
+      managementService.registerInJMX(eq(objectName), isA(StandardMBean.class));
+      managementService.registerInRegistry(eq(objectName), isA(TopicControl.class));
 
       replay(managementService, postOffice, storageManager);
 
@@ -149,7 +153,8 @@
 
       JBossConnectionFactory connectionFactory = createMock(JBossConnectionFactory.class);
       ManagementService managementService = createMock(ManagementService.class);
-      managementService.registerResource(eq(objectName), isA(ConnectionFactoryControlMBean.class));
+      managementService.registerInJMX(eq(objectName), isA(StandardMBean.class));
+      managementService.registerInRegistry(eq(objectName), isA(ConnectionFactoryControl.class));
 
       replay(managementService, connectionFactory);
 




More information about the jboss-cvs-commits mailing list