[jboss-cvs] JBoss Messaging SVN: r3726 - in trunk: src/main/org/jboss/jms/client and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Feb 15 10:59:01 EST 2008


Author: ataylor
Date: 2008-02-15 10:59:01 -0500 (Fri, 15 Feb 2008)
New Revision: 3726

Added:
   trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedManagementExample.java
Modified:
   trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java
   trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/jms/server/JMSServerManager.java
   trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java
   trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounter.java
   trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounterManager.java
   trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java
   trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
Log:
refactoring of management interfaces second draft

Added: trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedManagementExample.java
===================================================================
--- trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedManagementExample.java	                        (rev 0)
+++ trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedManagementExample.java	2008-02-15 15:59:01 UTC (rev 3726)
@@ -0,0 +1,131 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+package org.jboss.example.embedded;
+
+import org.jboss.messaging.core.remoting.RemotingConfiguration;
+import static org.jboss.messaging.core.remoting.TransportType.TCP;
+import org.jboss.messaging.core.MessagingServer;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessagingServerManagement;
+import org.jboss.messaging.core.impl.server.MessagingServerImpl;
+import org.jboss.messaging.core.impl.server.MessagingServerManagementImpl;
+import org.jboss.messaging.core.impl.MessageImpl;
+import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
+import org.jboss.jms.client.api.*;
+import org.jboss.jms.client.impl.ClientConnectionFactoryImpl;
+import org.jboss.jms.message.JBossTextMessage;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class EmbeddedManagementExample
+{
+   public static void main(String args[]) throws Exception
+   {
+      RemotingConfiguration remotingConf = new RemotingConfiguration(TCP, "localhost", 5400);
+      remotingConf.setInvmDisabled(true);
+      MessagingServer messagingServer = new MessagingServerImpl(remotingConf);
+      messagingServer.start();
+      MessagingServerManagementImpl messagingServerManagement = new MessagingServerManagementImpl();
+      messagingServerManagement.setMessagingServer(messagingServer);
+      messagingServerManagement.start();
+      ClientConnectionFactory cf = new ClientConnectionFactoryImpl(remotingConf);
+      ClientConnection clientConnection = cf.createConnection(null, null);
+      ClientSession clientSession = clientConnection.createClientSession(false, true, true, 0);
+      String queue = "Queue1";
+      clientSession.createQueue(queue, queue, null, false, false);
+      ClientProducer clientProducer = clientSession.createProducer();
+
+      clientConnection.start();
+
+      messagingServerManagement.registerMessageCounter(queue);
+      Message message = new MessageImpl(JBossTextMessage.TYPE, true, 0, System.currentTimeMillis(), (byte) 1);
+      messagingServerManagement.startMessageCounter(queue, 0);
+      for (int i = 0; i < 1000; i++)
+      {
+         clientProducer.send(queue, message);
+      }
+
+      MessageCounter messageCounter = messagingServerManagement.getMessageCounter(queue);
+      System.out.println("messageCounter = " + messageCounter);
+      for (int i = 0; i < 2000; i++)
+      {
+         clientProducer.send(queue, message);
+      }
+
+      messageCounter = messagingServerManagement.getMessageCounter(queue);
+      System.out.println("messageCounter = " + messageCounter);
+      for (int i = 0; i < 3000; i++)
+      {
+         clientProducer.send(queue, message);
+      }
+
+      messageCounter = messagingServerManagement.getMessageCounter(queue);
+      System.out.println("messageCounter = " + messageCounter);
+      for (int i = 0; i < 4000; i++)
+      {
+         clientProducer.send(queue, message);
+      }
+
+      messageCounter = messagingServerManagement.getMessageCounter(queue);
+      System.out.println("messageCounter = " + messageCounter);
+      messagingServerManagement.stopMessageCounter(queue);
+      messagingServerManagement.startMessageCounter(queue, 5);
+      ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+      Timer timer = new Timer();
+      scheduler.schedule(timer, 10, TimeUnit.SECONDS);
+      int counter = 0;
+      while (timer.isRunning())
+      {
+         clientProducer.send(queue, message);
+         counter++;
+      }
+      scheduler.shutdown();
+      System.out.println("counter = " + counter);
+      messageCounter = messagingServerManagement.getMessageCounter(queue);
+      System.out.println("messageCounter = " + messageCounter);
+      messagingServerManagement.unregisterMessageCounter(queue);
+      clientConnection.close();
+      messagingServerManagement.stop();
+      messagingServer.stop();
+   }
+
+   private static class Timer implements Runnable
+   {
+      boolean running = true;
+
+      public boolean isRunning()
+      {
+         return running;
+      }
+
+      public void run()
+      {
+         running = false;
+      }
+   }
+}

Modified: trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java	2008-02-15 14:55:19 UTC (rev 3725)
+++ trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java	2008-02-15 15:59:01 UTC (rev 3726)
@@ -416,12 +416,9 @@
 
       String coreDest = dest.getAddress();
 
-      // TODO - can optimise this copy to do copy lazily.
-      org.jboss.messaging.core.Message messageToSend = jbm.getCoreMessage().copy();
-
       try
       {
-         producer.send(coreDest, messageToSend);
+         producer.send(coreDest, jbm.getCoreMessage());
       }
       catch (MessagingException e)
       {

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java	2008-02-15 14:55:19 UTC (rev 3725)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java	2008-02-15 15:59:01 UTC (rev 3726)
@@ -429,7 +429,7 @@
    {
       checkClosed();
       
-      SessionSendMessage message = new SessionSendMessage(address, m);
+      SessionSendMessage message = new SessionSendMessage(address, m.copy());
       
       remotingConnection.send(id, message, !m.isDurable());
    }

Modified: trunk/src/main/org/jboss/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/JMSServerManager.java	2008-02-15 14:55:19 UTC (rev 3725)
+++ trunk/src/main/org/jboss/jms/server/JMSServerManager.java	2008-02-15 15:59:01 UTC (rev 3726)
@@ -4,6 +4,7 @@
 import org.jboss.jms.destination.JBossTopic;
 import org.jboss.messaging.core.impl.server.SubscriptionInfo;
 
+import javax.jms.Message;
 import java.util.List;
 
 /**
@@ -13,6 +14,10 @@
 public interface JMSServerManager
 {
    // management operations
+   enum ListType
+   {
+      ALL, DURABLE, NON_DURABLE
+   }
    boolean isStarted();
 
    boolean createQueue(String queueName, String jndiBinding) throws Exception;
@@ -29,27 +34,53 @@
 
    boolean destroyConnectionFactory(String name) throws Exception;
 
-   public void removeAllMessagesForQueue(String queueName) throws Exception;
+   public List<Message> listMessagesForQueue(String queue);
 
-   public void removeAllMessagesForTopic(String topicName) throws Exception;
+   public List<Message> listMessagesForQueue(String queue, ListType listType);
 
-   public void removeAllMessagesForQueue(JBossQueue queueName) throws Exception;
+   public List<Message> listMessages(JBossQueue queue);
 
-   public void removeAllMessagesForTopic(JBossTopic topicName) throws Exception;
+   public List<Message> listMessagesForTopic(String topic);
 
+   public List<Message> listMessagesForTopic(String topic, ListType listType);
+
+   public List<Message> listMessages(JBossTopic topic);
+
+   public List<Message> listMessages(JBossQueue queue, ListType listType);
+
+   public List<Message> listMessages(JBossTopic topic, ListType listType);
+
+   void removeAllMessagesForQueue(String queueName) throws Exception;
+
+   void removeAllMessagesForTopic(String topicName) throws Exception;
+
+   void removeAllMessages(JBossQueue queueName) throws Exception;
+
+   void removeAllMessages(JBossTopic topicName) throws Exception;
+
    int getMessageCountForQueue(String queue) throws Exception;
 
-   int getMessageCountForQueue(JBossQueue queue) throws Exception;
+   int getMessageCount(JBossQueue queue) throws Exception;
 
-   List<SubscriptionInfo> listAllSubscriptionsForTopic(String topicName) throws Exception;
+   List<SubscriptionInfo> listSubscriptions(String topicName) throws Exception;
 
-   List<SubscriptionInfo> listAllSubscriptionsForTopic(JBossTopic topicName) throws Exception;
+   List<SubscriptionInfo> listSubscriptions(JBossTopic topic) throws Exception;
 
-   List<SubscriptionInfo> listDurableSubscriptionsForTopic(String topicName) throws Exception;
+   List<SubscriptionInfo> listSubscriptions(String topicName, ListType listType) throws Exception;
 
-   List<SubscriptionInfo> listDurableSubscriptionsForTopic(JBossTopic topicName) throws Exception;
+   List<SubscriptionInfo> listSubscriptions(JBossTopic topic, ListType listType) throws Exception;
 
-   List<SubscriptionInfo> listNonDurableSubscriptionsForTopic(String topicName) throws Exception;
+   int getSubscriptionsCountForTopic(String topicName) throws Exception;
 
-   List<SubscriptionInfo> listNonDurableSubscriptionsForTopic(JBossTopic topicName) throws Exception;
+   int getSubscriptionsCount(JBossTopic topic) throws Exception;
+
+   int getSubscriptionsCountForTopic(String topicName, ListType listType) throws Exception;
+
+   int getSubscriptionsCount(JBossTopic topic,ListType listType) throws Exception;
+
+   int getConsumerCountForQueue(String queue) throws Exception;
+
+   int getConsumerCountForQueue(JBossQueue queue) throws Exception;
+
+   List getClients() throws Exception;
 }

Modified: trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java	2008-02-15 14:55:19 UTC (rev 3725)
+++ trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java	2008-02-15 15:59:01 UTC (rev 3726)
@@ -25,12 +25,18 @@
 import org.jboss.jms.client.api.ClientConnectionFactory;
 import org.jboss.jms.destination.JBossQueue;
 import org.jboss.jms.destination.JBossTopic;
+import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.MessagingServerManagement;
+import org.jboss.messaging.core.Queue;
 import org.jboss.messaging.core.impl.server.SubscriptionInfo;
+import org.jboss.messaging.core.impl.messagecounter.MessageStatistics;
+import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
 import org.jboss.messaging.deployers.Deployer;
 import org.jboss.messaging.deployers.DeploymentManager;
 import org.jboss.messaging.util.JNDIUtil;
+import org.jboss.messaging.util.MessageQueueNameHelper;
+import org.jboss.aop.microcontainer.aspects.jmx.JMX;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 
@@ -38,9 +44,11 @@
 import javax.naming.InitialContext;
 import javax.naming.NameNotFoundException;
 import javax.naming.NamingException;
+import javax.jms.Message;
 import java.util.HashMap;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.Collection;
 
 /**
  * A Deployer used to create and add to JNDI queues, topics and connection factories. Typically this would only be used
@@ -48,6 +56,7 @@
  *
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
  */
+ at JMX(name = "jboss.messaging:service=MessagingServer", exposedInterface = JMSServerManager.class)
 public class JMSServerManagerImpl extends Deployer implements JMSServerManager
 {
    Logger log = Logger.getLogger(JMSServerManagerImpl.class);
@@ -65,8 +74,6 @@
 
    MessagingServerManagement messagingServerManagement;
 
-//   UserManager userManager = new UserManager();
-
    private static final String CLIENTID_ELEMENT = "client-id";
    private static final String DUPS_OK_BATCH_SIZE_ELEMENT = "dups-ok-batch-size";
    private static final String PREFETECH_SIZE_ELEMENT = "prefetch-size";
@@ -401,68 +408,185 @@
       return true;
    }
 
+
+   public List<Message> listMessagesForQueue(String queue)
+   {
+      return listMessagesForQueue(queue, ListType.ALL);
+   }
+
+   public List<Message> listMessagesForQueue(String queue, ListType listType)
+   {
+      return listMessages(new JBossQueue(queue), listType);
+   }
+
+   public List<Message> listMessages(JBossQueue queue)
+   {
+      return listMessages(queue, ListType.ALL);
+   }
+
+   public List<Message> listMessagesForTopic(String topic)
+   {
+      return listMessagesForTopic(topic, ListType.ALL);
+   }
+
+   public List<Message> listMessagesForTopic(String topic, ListType listType)
+   {
+      return listMessages(new JBossTopic(topic), listType);
+   }
+
+   public List<Message> listMessages(JBossTopic topic)
+   {
+      return listMessages(topic, ListType.ALL);
+   }
+
+   public List<Message> listMessages(JBossQueue queue, ListType listType)
+   {
+      return null;  //todo
+   }
+
+   public List<Message> listMessages(JBossTopic topic, ListType listType)
+   {
+      return null;  //todo
+   }
+
    public void removeAllMessagesForQueue(String queueName) throws Exception
    {
       JBossQueue jBossQueue = new JBossQueue(queueName);
-      removeAllMessagesForQueue(jBossQueue);
+      removeAllMessages(jBossQueue);
    }
 
    public void removeAllMessagesForTopic(String topicName) throws Exception
    {
       JBossTopic jBossTopic = new JBossTopic(topicName);
-      removeAllMessagesForTopic(jBossTopic);
+      removeAllMessages(jBossTopic);
    }
 
-   public void removeAllMessagesForQueue(JBossQueue queue) throws Exception
+   public void removeAllMessages(JBossQueue queue) throws Exception
    {
       messagingServerManagement.removeAllMessagesForAddress(queue.getAddress());
    }
 
-   public void removeAllMessagesForTopic(JBossTopic topic) throws Exception
+   public void removeAllMessages(JBossTopic topic) throws Exception
    {
       messagingServerManagement.removeAllMessagesForAddress(topic.getAddress());
    }
 
    public int getMessageCountForQueue(String queue) throws Exception
    {
-      return getMessageCountForQueue(new JBossQueue(queue));
+      return getMessageCount(new JBossQueue(queue));
    }
 
-   public int getMessageCountForQueue(JBossQueue queue) throws Exception
+   public int getMessageCount(JBossQueue queue) throws Exception
    {
       return messagingServerManagement.getMessageCountForQueue(queue.getAddress());
    }
 
-   public List<SubscriptionInfo> listAllSubscriptionsForTopic(String topicName) throws Exception
+
+   public List<SubscriptionInfo> listSubscriptions(String topicName) throws Exception
    {
-      return listAllSubscriptionsForTopic(new JBossTopic(topicName));
+      return listSubscriptions(new JBossTopic(topicName));
    }
 
-   public List<SubscriptionInfo> listAllSubscriptionsForTopic(JBossTopic topicName) throws Exception
+   public List<SubscriptionInfo> listSubscriptions(JBossTopic topic) throws Exception
    {
-      return messagingServerManagement.listAllSubscriptionsForAddress(topicName.getAddress());
+      return listSubscriptions(topic, ListType.ALL);
    }
 
-   public List<SubscriptionInfo> listDurableSubscriptionsForTopic(String topicName) throws Exception
+   public List<SubscriptionInfo> listSubscriptions(String topic, ListType type) throws Exception
    {
-      return null;  //To change body of implemented methods use File | Settings | File Templates.
+      return listSubscriptions(new JBossTopic(topic), type);
    }
+   public List<SubscriptionInfo> listSubscriptions(JBossTopic topic, ListType type) throws Exception
+   {
+      List<SubscriptionInfo> subs = new ArrayList<SubscriptionInfo>();
 
-   public List<SubscriptionInfo> listDurableSubscriptionsForTopic(JBossTopic topicName) throws Exception
+      List<Queue> queues = messagingServerManagement.getQueuesForAddress(topic.getAddress());
+
+      for (Queue queue : queues)
+      {
+         if (type == ListType.ALL || (type == ListType.DURABLE && queue.isDurable()) || (type == ListType.NON_DURABLE && !queue.isDurable()))
+         {
+            String subName = null;
+            String clientID = null;
+
+            if (queue.isDurable())
+            {
+               MessageQueueNameHelper helper = MessageQueueNameHelper.createHelper(queue.getName());
+               subName = helper.getSubName();
+               clientID = helper.getClientId();
+            }
+
+            SubscriptionInfo info = new SubscriptionInfo(queue.getName(), queue.isDurable(), subName, clientID,
+                    queue.getFilter() == null ? null : queue.getFilter().getFilterString(), queue.getMessageCount(), queue.getMaxSize());
+
+            subs.add(info);
+         }
+      }
+
+      return subs;
+   }
+
+   public int getSubscriptionsCountForTopic(String topicName) throws Exception
    {
-      return null;  //To change body of implemented methods use File | Settings | File Templates.
+      return getSubscriptionsCount(new JBossTopic(topicName));
    }
 
-   public List<SubscriptionInfo> listNonDurableSubscriptionsForTopic(String topicName) throws Exception
+   public int getSubscriptionsCount(JBossTopic topic) throws Exception
    {
-      return null;  //To change body of implemented methods use File | Settings | File Templates.
+      return getSubscriptionsCount(topic, ListType.ALL);
    }
 
-   public List<SubscriptionInfo> listNonDurableSubscriptionsForTopic(JBossTopic topicName) throws Exception
+   public int getSubscriptionsCountForTopic(String topicName, ListType listType) throws Exception
    {
-      return null;  //To change body of implemented methods use File | Settings | File Templates.
+      return getSubscriptionsCount(new JBossTopic(topicName), listType);
    }
 
+   public int getSubscriptionsCount(JBossTopic topic, ListType listType) throws Exception
+   {
+       return listSubscriptions(topic, listType).size();
+   }
+
+   public int getConsumerCountForQueue(String queue) throws Exception
+   {
+      return getConsumerCountForQueue(new JBossQueue(queue));
+   }
+
+   public int getConsumerCountForQueue(JBossQueue queue) throws Exception
+   {
+       return messagingServerManagement.getConsumerCountForQueue(queue.getName());
+   }
+   //todo something!
+   public List getClients() throws Exception
+   {
+      List<ServerConnectionEndpoint> endpoints = messagingServerManagement.getClients();
+      for (ServerConnectionEndpoint endpoint : endpoints)
+      {
+         //endpoint.
+      }
+      return null;
+   }
+
+   List<MessageStatistics> getMessageStatistics(String queue) throws Exception
+   {
+      Collection<MessageCounter> counters = messagingServerManagement.getMessageCounters();
+      List<MessageStatistics> list = new ArrayList<MessageStatistics>(counters.size());
+      for (Object counter1 : counters)
+      {
+         MessageCounter counter = (MessageCounter) counter1;
+
+         MessageStatistics stats = new MessageStatistics();
+         stats.setName(counter.getDestinationName());
+         stats.setDurable(counter.getDestinationDurable());
+         stats.setCount(counter.getCount());
+         stats.setCountDelta(counter.getCountDelta());
+         stats.setDepth(counter.getMessageCount());
+         stats.setDepthDelta(counter.getMessageCountDelta());
+         stats.setTimeLastUpdate(counter.getLastUpdate());
+
+         list.add(stats);
+      }
+      return list;
+   }
    //private
 
    private void addToDestinationBindings(String destination, String jndiBinding)
@@ -473,4 +597,5 @@
       }
       destinations.get(destination).add(jndiBinding);
    }
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java	2008-02-15 14:55:19 UTC (rev 3725)
+++ trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java	2008-02-15 15:59:01 UTC (rev 3726)
@@ -21,10 +21,12 @@
   */
 package org.jboss.messaging.core;
 
-import org.jboss.messaging.core.impl.server.SubscriptionInfo;
+import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
 import org.jboss.jms.client.api.ClientConnectionFactory;
+import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
 
 import java.util.List;
+import java.util.Collection;
 
 /**
  * This interface describes the management interface exposed by the server
@@ -47,6 +49,8 @@
    boolean addAddress(String address);
 
    boolean removeAddress(String address);
+   
+   List<Queue> getQueuesForAddress(String address) throws Exception;
 
    ClientConnectionFactory createClientConnectionFactory(boolean strictTck,int prefetchSize);
 
@@ -56,303 +60,31 @@
 
    int getMessageCountForQueue(String queue) throws Exception;
 
-   List<SubscriptionInfo> listAllSubscriptionsForAddress(String address) throws Exception;
+   void registerMessageCounter(String queueName) throws Exception;
 
-   List<SubscriptionInfo> listDurableSubscriptionsForAddress(String address) throws Exception;
+   void unregisterMessageCounter(String queueName) throws Exception;
 
-   List<SubscriptionInfo> listNonSubscriptionsForAddress(String address) throws Exception;
-//
-//   /**
-//    * returns how many messagesd have been delivered from a specific queue
-//    * @param queue the queue
-//    * @return number of messages
-//    * @throws Exception if a problem occurs
-//    */
-//   int getDeliveringCountForQueue(String queue) throws Exception;
-//
-//   /**
-//    * returns how many messages are scheduled for a specific queue
-//    * @param queue the queue
-//    * @return number of messages
-//    * @throws Exception if a problem occurs
-//    */
-//   int getScheduledMessageCountForQueue(String queue) throws Exception;
-//
-//   /**
-//    * returns the message counter for a queue
-//    * @param queue the queue
-//    * @return number of messages
-//    * @throws Exception if a problem occurs
-//    */
-//   MessageCounter getMessageCounterForQueue(String queue) throws Exception;
-//
-//   /**
-//    * returns the message statistics for a queue
-//    * @param queue the queue
-//    * @return the message statistics
-//    * @throws Exception if a problem occurs
-//    */
-//   MessageStatistics getMessageStatisticsForQueue(String queue) throws Exception;
-//
-//   /**
-//    * returns how many consumers a specific queue has
-//    * @param queue the queue
-//    * @return number of consumers
-//    * @throws Exception if a problem occurs
-//    */
-//   int getConsumerCountForQueue(String queue) throws Exception;
-//
-//   /**
-//    * restes the message counter for a specific queue
-//    * @param queue the queue
-//    * @throws Exception if a problem occurs
-//    */
-//   void resetMessageCounterForQueue(String queue) throws Exception;
-//
-//   /**
-//    * resets the counter history for a specific queue
-//    * @param queue the queue
-//    * @throws Exception if a problem occurs
-//    */
-//   void resetMessageCounterHistoryForQueue(String queue) throws Exception;
-//
-//   /**
-//    * lists all messages for a specific queue. This will contain the message references only
-//    * @param queue the queue
-//    * @return the messages
-//    * @throws Exception if a problem occurs
-//    */
-//   List listAllMessagesForQueue(String queue) throws Exception;
-//
-//   /**
-//    * lists all messages that match the given selector.
-//    * @param queue the queue
-//    * @param selector
-//    * @return the messages
-//    * @throws Exception if a problem occurs
-//    */
-//   List listAllMessagesForQueue(String queue,String selector) throws Exception;
-//
-//   /**
-//    * list all the durable messages for a specific queue
-//    * @param queue the queue
-//    * @return the messages
-//    * @throws Exception if a problem occurs
-//    */
-//   List listDurableMessagesForQueue(String queue) throws Exception;
-//
-//   /**
-//    * list all the durable messages for a queue that match a given selector
-//    * @param queue the queue
-//    * @param selector
-//    * @return the messages
-//    * @throws Exception if a problem occurs
-//    */
-//   List listDurableMessagesForQueue(String queue,String selector) throws Exception;
-//
-//   /**
-//    * lists all the non durable messages for a specific queue.
-//    * @param queue the queue
-//    * @return the messages
-//    * @throws Exception if a problem occurs
-//    */
-//   List listNonDurableMessagesForQueue(String queue) throws Exception;
-//
-//   /**
-//    * lists all noon durable messages for a queue.
-//    * @param queue
-//    * @param selector
-//    * @return
-//    * @throws Exception
-//    */
-//   List listNonDurableMessagesForQueue(String queue,String selector) throws Exception;
-//
-//   /**
-//    * lists all durable messages for a specific queue that match a specific selector
-//    * @param queue
-//    * @return
-//    * @throws Exception
-//    */
-//   String listMessageCounterAsHTMLForQueue(String queue) throws Exception;
-//
-//   /**
-//    * list the message count history for a specific queue in HTML format
-//    * @param queue
-//    * @return
-//    * @throws Exception
-//    */
-//   String listMessageCounterHistoryAsHTMLForQueue(String queue) throws Exception;
-//
-//   //topic
-//
-//   /**
-//    * counts messages received for a topic
-//    * @param topicName
-//    * @return
-//    * @throws Exception
-//    */
-//   int getAllMessageCountForTopic(String topicName) throws Exception;
-//
-//   /**
-//    * counts durable messages recieved for a topic
-//    * @param topicName
-//    * @return
-//    * @throws Exception
-//    */
-//   int getDurableMessageCountForTopic(String topicName) throws Exception;
-//
-//   /**
-//    * counts non durable messages recieved for a topic
-//    * @param topicName
-//    * @return
-//    * @throws Exception
-//    */
-//   int getNonDurableMessageCountForTopic(String topicName) throws Exception;
-//
-//   /**
-//    * counts all subscriptions for a topic
-//    * @param topicName
-//    * @return
-//    * @throws Exception
-//    */
-//   int getAllSubscriptionsCountForTopic(String topicName) throws Exception;
-//
-//   /**
-//    * counts all durable subscriptions for a topic
-//    * @param topicName
-//    * @return
-//    * @throws Exception
-//    */
-//   int getDurableSubscriptionsCountForTopic(String topicName) throws Exception;
-//
-//   /**
-//    * counts all non durable subscriptions for a topic
-//    * @param topicName
-//    * @return
-//    * @throws Exception
-//    */
-//   int getNonDurableSubscriptionsCountForTopic(String topicName) throws Exception;
-//
-//   /**
-//    * removes all the messages for a specific topic
-//    * @param topic
-//    * @throws Exception
-//    * @throws Exception
-//    */
-//   void removeAllMessagesForTopic(String topic) throws Exception;
-//
-//   /**
-//    * lists all subscriptions for a topic
-//    * @param topic
-//    * @return
-//    * @throws Exception
-//    */
-//   List listAllSubscriptionsForAddress(String topic) throws Exception;
-//
-//   /**
-//    * lists all durable subscriptions for a topic
-//    * @param topic
-//    * @return
-//    * @throws Exception
-//    */
-//   List listDurableSubscriptionsForTopic(String topic) throws Exception;
-//
-//   /**
-//    * lists all non durable subscriptions for a topic
-//    * @param topic
-//    * @return
-//    * @throws Exception
-//    */
-//   List listNonDurableSubscriptionsForTopic(String topic) throws Exception;
-//
-//   /**
-//    * lists all subscriptions for a topic as html
-//    * @param topic
-//    * @return
-//    * @throws Exception
-//    */
-//   String listAllSubscriptionsAsHTMLForTopic(String topic) throws Exception;
-//
-//   /**
-//    * lists all durable subscriptions for a topic as html
-//    * @param topic
-//    * @return
-//    * @throws Exception
-//    */
-//   String listDurableSubscriptionsAsHTMLForTopic(String topic) throws Exception;
-//
-//   /**
-//    * lists all non durable subscriptions for a topic as html
-//    * @param topic
-//    * @return
-//    * @throws Exception
-//    */
-//   String listNonDurableSubscriptionsAsHTMLForTopic(String topic) throws Exception;
-//
-//   /**
-//    * lists all the messages for a topic for a given subscription
-//    * @param topic
-//    * @param subscriptionId
-//    * @return
-//    * @throws Exception
-//    */
-//   List listAllMessagesForTopic(String topic,String subscriptionId) throws Exception;
-//
-//   /**
-//    * lists all the messages for a topic for a given subscription and message selector
-//    * @param topic
-//    * @param subscriptionId
-//    * @param selector
-//    * @return
-//    * @throws Exception
-//    */
-//   List listAllMessagesForTopic(String topic,String subscriptionId, String selector) throws Exception;
-//
-//   /**
-//    *  lists durable messages for a topic for a given subscription
-//    * @param topic
-//    * @param subscriptionId
-//    * @return
-//    * @throws Exception
-//    */
-//   List listDurableMessagesForTopic(String topic,String subscriptionId) throws Exception;
-//
-//   /**
-//    * lists durable messages for a topic for a given subscription and message selector
-//    * @param topic
-//    * @param subscriptionId
-//    * @param selector
-//    * @return
-//    * @throws Exception
-//    */
-//   List listDurableMessagesForTopic(String topic,String subscriptionId, String selector) throws Exception;
-//
-//   /**
-//    *  lists non durable messages for a topic for a given subscription
-//    * @param topic
-//    * @param subscriptionId
-//    * @return
-//    * @throws Exception
-//    */
-//   List listNonDurableMessagesForTopic(String topic,String subscriptionId) throws Exception;
-//
-//   /**
-//    * lists durable messages for a topic for a given subscription and message selector
-//    * @param topic
-//    * @param subscriptionId
-//    * @param selector
-//    * @return
-//    * @throws Exception
-//    */
-//   List listNonDurableMessagesForTopic(String topic,String subscriptionId, String selector) throws Exception;
-//
-//   List getMessageCountersForTopic(String topic) throws Exception;
-//   
-//   String showActiveClientsAsHTML() throws Exception;
-//
-//   String showPreparedTransactionsAsHTML();
-//
-//   String listMessageCountersAsHTML() throws Exception;
+   void startMessageCounter(String queueName, long duration) throws Exception;
 
+   MessageCounter stopMessageCounter(String queueName) throws Exception;
 
+   MessageCounter getMessageCounter(String queueName);
+
+   Collection<MessageCounter> getMessageCounters();
+
+   void resetMessageCounter(String queue);
+
+   void resetMessageCounters();
+
+   void resetMessageCounterHistory(String queue);
+
+   void resetMessageCounterHistories();
+
+   List<MessageCounter> stopAllMessageCounters() throws Exception;
+
+   void unregisterAllMessageCounters() throws Exception;
+
+   public int getConsumerCountForQueue(String queue) throws Exception;
+
+   List<ServerConnectionEndpoint> getClients();
 }

Modified: trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounter.java	2008-02-15 14:55:19 UTC (rev 3725)
+++ trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounter.java	2008-02-15 15:59:01 UTC (rev 3726)
@@ -52,8 +52,6 @@
 
    // destination related information
    private String destName;
-   private String destSubscription;
-   private boolean destTopic;
    private boolean destDurable;
 
    // destination queue
@@ -67,36 +65,31 @@
 
    // per hour day counter history
    private int dayCounterMax;
-   private ArrayList dayCounter;
+   private ArrayList<DayCounter> dayCounter;
    
    /**
     * Get a list of message statistics from a list of message counters
-    * 
-    * @param counter the message counters
+    *
     * @return the message statistics
     * @throws Exception for any error
     */
-   public static List getMessageStatistics(List counters) throws Exception
+   public static List<? extends MessageStatistics> getMessageStatistics(List counters) throws Exception
    {
-      List list = new ArrayList(counters.size());
-      
-      Iterator iter = counters.iterator();
-      
-      while (iter.hasNext())
+      List<MessageStatistics> list = new ArrayList<MessageStatistics>(counters.size());
+
+      for (Object counter1 : counters)
       {
-         MessageCounter counter = (MessageCounter)iter.next();
-         
+         MessageCounter counter = (MessageCounter) counter1;
+
          MessageStatistics stats = new MessageStatistics();
          stats.setName(counter.getDestinationName());
-         stats.setSubscriptionID(counter.getDestinationSubscription());
-         stats.setTopic(counter.getDestinationTopic());
          stats.setDurable(counter.getDestinationDurable());
          stats.setCount(counter.getCount());
          stats.setCountDelta(counter.getCountDelta());
          stats.setDepth(counter.getMessageCount());
          stats.setDepthDelta(counter.getMessageCountDelta());
          stats.setTimeLastUpdate(counter.getLastUpdate());
-         
+
          list.add(stats);
       }
       return list;
@@ -106,23 +99,17 @@
     *    Constructor
     *
     * @param name             destination name
-    * @param subscription     subscription name
     * @param queue            internal queue object
-    * @param topic            topic destination flag
     * @param durable          durable subsciption flag
     * @param daycountmax      max message history day count
     */
    public MessageCounter(String name,
-                         String subscription,
                          Queue queue,
-                         boolean topic,
                          boolean durable,
                          int daycountmax)
    {
       // store destination related information
       destName = name;
-      destSubscription = subscription;
-      destTopic = topic;
       destDurable = durable;
       destQueue = queue;      
 
@@ -130,7 +117,7 @@
       resetCounter();
 
       // initialize message history
-      dayCounter = new ArrayList();
+      dayCounter = new ArrayList<DayCounter>();
 
       setHistoryLimit(daycountmax);
    }
@@ -148,14 +135,12 @@
    /*
     * This method is called periodically to update statistics from the queue
     */
-   public synchronized void onTimer()
+   public synchronized void sample()
    {
       int latestMessagesAdded = destQueue.getMessagesAdded();
+
+      countTotal += latestMessagesAdded - lastMessagesAdded;
       
-      int newMessagesAdded = latestMessagesAdded - lastMessagesAdded;
-      
-      countTotal += newMessagesAdded;
-      
       lastMessagesAdded = latestMessagesAdded;
       
       //update timestamp
@@ -176,26 +161,6 @@
    }
 
    /**
-    * Gets the related destination subscription
-    *
-    * @return String    destination name
-    */
-   public String getDestinationSubscription()
-   {
-      return destSubscription;
-   }
-
-   /**
-    * Gets the related destination topic flag
-    *
-    * @return boolean    true: topic destination, false: queue destination
-    */
-   public boolean getDestinationTopic()
-   {
-      return destTopic;
-   }
-
-   /**
     * Gets the related destination durable subscription flag
     *
     * @return boolean   true : durable subscription,
@@ -291,45 +256,18 @@
    {
       StringBuffer ret = new StringBuffer();
 
-      // Topic/Queue
-      if (destTopic)
-         ret.append("Topic,");
-      else
-         ret.append("Queue,");
-
       // name 
       ret.append(destName).append(",");
 
-      // subscription
-      if (destSubscription != null)
-         ret.append(destSubscription).append(",");
-      else
-         ret.append("-,");
-
-      // Durable subscription
-      if (destTopic)
-      {
-         // Topic
-         if (destDurable)
-            ret.append("true,");
-         else
-            ret.append("false,");
-      }
-      else
-      {
-         // Queue
-         ret.append("-,");
-      }
-
       // counter values
-      ret.append(getCount()).append(",").append(getCountDelta()).append(",").append(getMessageCount()).append(",").append(getMessageCountDelta()).append(",");
+      ret.append("total = ").append(getCount()).append(",").append("total delta = ").append(getCountDelta()).append(",").append("current = ").append(getMessageCount()).append(",").append("current delta = ").append(getMessageCountDelta()).append(",");
 
       // timestamp last counter update
       if (timeLastUpdate > 0)
       {
          DateFormat dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM);
 
-         ret.append(dateFormat.format(new Date(timeLastUpdate)));
+         ret.append("last reset at ").append(dateFormat.format(new Date(timeLastUpdate)));
       }
       else
       {
@@ -414,7 +352,7 @@
       // calculate day difference between current date and date of last day counter entry
       synchronized (dayCounter)
       {
-         DayCounter counterLast = (DayCounter) dayCounter.get(dayCounter.size() - 1);
+         DayCounter counterLast = dayCounter.get(dayCounter.size() - 1);
 
          GregorianCalendar calNow = new GregorianCalendar();
          GregorianCalendar calLast = counterLast.getDate();
@@ -468,7 +406,7 @@
          }
 
          // update last day counter entry
-         counterLast = (DayCounter) dayCounter.get(dayCounter.size() - 1);
+         counterLast = dayCounter.get(dayCounter.size() - 1);
          counterLast.updateDayCounter(incrementCounter);
       }
    }
@@ -510,10 +448,8 @@
          ret += dayCounter.size() + "\n";
 
          // following lines: day counter data
-         for (int i = 0; i < dayCounter.size(); i++)
+         for (DayCounter counter : dayCounter)
          {
-            DayCounter counter = (DayCounter) dayCounter.get(i);
-
             ret += counter.getDayCounterAsString() + "\n";
          }
       }
@@ -602,7 +538,7 @@
                bUpdate = true;
             }
 
-            if (bUpdate == true)
+            if (bUpdate)
             {
                if (counters[i] == -1)
                   counters[i] = 0;

Modified: trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounterManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounterManager.java	2008-02-15 14:55:19 UTC (rev 3725)
+++ trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounterManager.java	2008-02-15 15:59:01 UTC (rev 3726)
@@ -189,7 +189,7 @@
             {
                MessageCounter counter = (MessageCounter)iter.next();
                
-               counter.onTimer();
+               counter.sample();
             }
          }
       }  

Modified: trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java	2008-02-15 14:55:19 UTC (rev 3725)
+++ trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java	2008-02-15 15:59:01 UTC (rev 3726)
@@ -21,34 +21,43 @@
    */
 package org.jboss.messaging.core.impl.server;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledFuture;
 
 import org.jboss.aop.microcontainer.aspects.jmx.JMX;
 import org.jboss.jms.client.api.ClientConnectionFactory;
 import org.jboss.jms.client.impl.ClientConnectionFactoryImpl;
-import org.jboss.messaging.core.Binding;
-import org.jboss.messaging.core.MessagingServer;
-import org.jboss.messaging.core.MessagingServerManagement;
+import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
+import org.jboss.messaging.core.*;
 import org.jboss.messaging.core.Queue;
-import org.jboss.messaging.util.MessageQueueNameHelper;
+import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
+import org.jboss.messaging.util.MessagingException;
 
 /**
  * This interface describes the properties and operations that comprise the management interface of the
  * Messaging Server.
- * 
+ * <p/>
  * It includes operations to create and destroy queues and provides various statistics measures
  * such as message count for queues and topics.
- * 
+ *
  * @author <a href="mailto:ataylor at redhat.com>Andy Taylor</a>
  * @author <a href="mailto:ataylor at redhat.com>Andy Taylor</a>
- * 
  */
 @JMX(name = "jboss.messaging:service=MessagingServerManagement", exposedInterface = MessagingServerManagement.class)
-public class MessagingServerManagementImpl implements MessagingServerManagement
+public class MessagingServerManagementImpl implements MessagingServerManagement, MessagingComponent
 {
    private MessagingServer messagingServer;
 
+   private HashMap<String, MessageCounter> currentCounters = new HashMap<String, MessageCounter>();
+
+   private HashMap<String, ScheduledFuture> currentRunningCounters = new HashMap<String, ScheduledFuture>();
+
+   private ScheduledExecutorService scheduler;
+   private int maxMessageCounters = 20;
+
    public void setMessagingServer(MessagingServer messagingServer)
    {
       this.messagingServer = messagingServer;
@@ -63,12 +72,13 @@
 //   {
 //      return messagingServer.getConfiguration();
 //   }
-//   
+
+   //
    public boolean isStarted()
    {
       return messagingServer.isStarted();
    }
-   
+
    public void createQueue(String address, String name) throws Exception
    {
       messagingServer.createQueue(address, name);
@@ -89,10 +99,10 @@
       return messagingServer.removeAddress(address);
    }
 
-   public ClientConnectionFactory createClientConnectionFactory(boolean strictTck,int prefetchSize)
+   public ClientConnectionFactory createClientConnectionFactory(boolean strictTck, int prefetchSize)
    {
       return new ClientConnectionFactoryImpl(messagingServer.getConfiguration().getMessagingServerID(),
-                      messagingServer.getConfiguration().getRemotingConfiguration(), messagingServer.getVersion(), messagingServer.getConfiguration().isStrictTck() || strictTck, prefetchSize);
+              messagingServer.getConfiguration().getRemotingConfiguration(), messagingServer.getVersion(), messagingServer.getConfiguration().isStrictTck() || strictTck, prefetchSize);
    }
 
    public void removeAllMessagesForAddress(String address) throws Exception
@@ -104,27 +114,190 @@
    {
       messagingServer.removeAllMessagesForBinding(name);
    }
-   
+
+   public List<Queue> getQueuesForAddress(String address) throws Exception
+   {
+      List<Queue> queues = new ArrayList<Queue>();
+      List<Binding> bindings = messagingServer.getPostOffice().getBindingsForAddress(address);
+
+      for (Binding binding : bindings)
+      {
+         Queue queue = binding.getQueue();
+         queues.add(queue);
+      }
+      return queues;
+   }
+
    public int getMessageCountForQueue(String queue) throws Exception
    {
       return getQueue(queue).getMessageCount();
    }
-  
-   public List<SubscriptionInfo> listAllSubscriptionsForAddress(String address) throws Exception
+
+   public int getMaxMessageCounters()
    {
-      return listSubscriptions(address, ListType.ALL);
+      return maxMessageCounters;
    }
 
-   public List<SubscriptionInfo> listDurableSubscriptionsForAddress(String address) throws Exception
+   public void setMaxMessageCounters(int maxMessageCounters)
    {
-      return listSubscriptions(address, ListType.DURABLE);
+      this.maxMessageCounters = maxMessageCounters;
    }
 
-   public List<SubscriptionInfo> listNonSubscriptionsForAddress(String address) throws Exception
+   public void registerMessageCounter(final String queueName) throws Exception
    {
-      return listSubscriptions(address, ListType.NON_DURABLE);
+      if (currentCounters.get(queueName) != null)
+      {
+         throw new IllegalStateException("Message Counter Already Registered");
+      }
+      Binding binding = messagingServer.getPostOffice().getBinding(queueName);
+      if (binding == null)
+      {
+         throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
+      }
+      Queue queue = binding.getQueue();
+      currentCounters.put(queueName, new MessageCounter(queue.getName(),queue, queue.isDurable(), queue.getMessageCounterHistoryDayLimit()));
    }
 
+   public void unregisterMessageCounter(final String queueName) throws Exception
+   {
+      if (currentCounters.get(queueName) == null)
+      {
+         throw new MessagingException(MessagingException.ILLEGAL_STATE, "Counter is not registered");
+      }
+      currentCounters.remove(queueName);
+      if(currentRunningCounters.get(queueName) != null)
+      {
+         currentRunningCounters.get(queueName).cancel(true);
+         currentRunningCounters.remove(queueName);
+      }
+   }
+
+   public void startMessageCounter(final String queueName, long duration) throws Exception
+   {
+      MessageCounter messageCounter = currentCounters.get(queueName);
+      if (messageCounter == null)
+      {
+         Binding binding = messagingServer.getPostOffice().getBinding(queueName);
+         if (binding == null)
+         {
+            throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
+         }
+         Queue queue = binding.getQueue();
+         messageCounter = new MessageCounter(queue.getName(), queue,  queue.isDurable(), queue.getMessageCounterHistoryDayLimit());
+      }
+      currentCounters.put(queueName, messageCounter);
+      messageCounter.resetCounter();
+      if (duration > 0)
+      {
+
+         ScheduledFuture future = scheduler.schedule(new Runnable()
+         {
+            public void run()
+            {
+               currentCounters.get(queueName).sample();
+            }
+         }, duration, TimeUnit.SECONDS);
+         currentRunningCounters.put(queueName, future);
+      }
+   }
+
+   public MessageCounter stopMessageCounter(String queueName) throws Exception
+   {
+      MessageCounter messageCounter = currentCounters.get(queueName);
+      if (messageCounter == null)
+      {
+         throw new IllegalArgumentException(queueName + "counter not registered");
+      }
+      if(currentRunningCounters.get(queueName) != null)
+      {
+         currentRunningCounters.get(queueName).cancel(true);
+         currentRunningCounters.remove(queueName);
+      }
+      messageCounter.sample();
+      return messageCounter;
+   }
+
+   public MessageCounter getMessageCounter(String queueName)
+   {
+      MessageCounter messageCounter = currentCounters.get(queueName);
+      if(messageCounter != null && currentRunningCounters.get(queueName) == null)
+      {
+         messageCounter.sample();
+      }
+      return messageCounter;
+   }
+
+
+   public Collection<MessageCounter> getMessageCounters()
+   {
+      return currentCounters.values();
+   }
+
+   public void resetMessageCounter(String queue)
+   {
+      MessageCounter messageCounter = currentCounters.get(queue);
+      if(messageCounter != null)
+      {
+         messageCounter.resetCounter();
+      }
+   }
+
+   public void resetMessageCounters()
+   {
+      Set<String> counterNames = currentCounters.keySet();
+      for (String counterName : counterNames)
+      {
+         resetMessageCounter(counterName);
+      }
+   }
+
+   public void resetMessageCounterHistory(String queue)
+   {
+      MessageCounter messageCounter = currentCounters.get(queue);
+      if(messageCounter != null)
+      {
+         messageCounter.resetHistory();
+      }
+   }
+
+   public void resetMessageCounterHistories()
+   {
+      Set<String> counterNames = currentCounters.keySet();
+      for (String counterName : counterNames)
+      {
+         resetMessageCounterHistory(counterName);
+      }
+   }
+
+   public List<MessageCounter> stopAllMessageCounters() throws Exception
+   {
+      Set<String> counterNames = currentCounters.keySet();
+      List<MessageCounter> messageCounters = new ArrayList<MessageCounter>();
+      for (String counterName : counterNames)
+      {
+         messageCounters.add(stopMessageCounter(counterName));
+      }
+      return messageCounters;
+   }
+
+   public void unregisterAllMessageCounters() throws Exception
+   {
+      Set<String> counterNames = currentCounters.keySet();
+      for (String counterName : counterNames)
+      {
+         unregisterMessageCounter(counterName);
+      }
+   }
+
+   public int getConsumerCountForQueue(String queue) throws Exception
+   {
+      return getQueue(queue).getConsumerCount();
+   }
+
+   public  List<ServerConnectionEndpoint> getClients()
+   {
+      return messagingServer.getConnectionManager().getActiveConnections();
+   }
 //
 ////   public int getDeliveringCountForQueue(String queue) throws Exception
 ////   {
@@ -151,21 +324,6 @@
 ////      return (MessageStatistics)stats.get(0);
 ////   }
 //
-//   public int getConsumerCountForQueue(String queue) throws Exception
-//   {
-//      return getQueue(queue).getConsumerCount();
-//   }
-//
-////   public void resetMessageCounterForQueue(String queue) throws Exception
-////   {
-////      getMessageCounterForQueue(queue).resetCounter();
-////   }
-////
-////   public void resetMessageCounterHistoryForQueue(String queue) throws Exception
-////   {
-////      getMessageCounterForQueue(queue).resetHistory();
-////   }
-//
 ////   public List<Message> listAllMessagesForQueue(String queue) throws Exception
 ////   {
 ////      return getQueue(queue).listAllMessages(null);
@@ -221,9 +379,9 @@
 //      return getNonDurableMessageCountForTopic(topicName);
 //   }
 //
-//   public int getAllSubscriptionsCountForTopic(String topicName) throws Exception
+//   public int getAllSubscriptionsCount(String topicName) throws Exception
 //   {
-//      return getAllSubscriptionsCountForTopic(topicName);
+//      return getAllSubscriptionsCount(topicName);
 //   }
 //
 //   public int getDurableSubscriptionsCountForTopic(String topicName) throws Exception
@@ -324,7 +482,7 @@
 //
 //      return charArray.toString();
 //   }
-//   
+//
 ////   public String showPreparedTransactionsAsHTML()
 ////   {
 ////      List txs = messagingServer.getTxRepository().getPreparedTransactions();
@@ -526,99 +684,100 @@
 //   {
 //      return listMessages(queue, ListType.ALL, selector);
 //   }
-//   
+//
 //   public List<Message> listDurableMessages(Queue queue, String selector) throws Exception
 //   {
 //      return listMessages(queue, ListType.DURABLE, selector);
 //   }
-//   
+//
 //   public List<Message> listNonDurableMessages(Queue queue, String selector) throws Exception
 //   {
 //      return listMessages(queue, ListType.NON_DURABLE, selector);
 //   }
 //
-//   
+//
 //   public List<SubscriptionInfo> listDurableSubscriptions(String topicName) throws Exception
 //   {
 //      return listSubscriptions(topicName, ListType.DURABLE);
 //   }
-//   
+//
 //   public List<SubscriptionInfo> listNonDurableSubscriptions(String topicName) throws Exception
 //   {
 //      return listSubscriptions(topicName, ListType.NON_DURABLE);
 //   }
-//   
+//
 //   public String listAllSubscriptionsAsHTML(String topicName) throws Exception
 //   {
 //      return listSubscriptionsAsHTML(topicName, ListType.ALL);
 //   }
-//   
+//
 //   public String listDurableSubscriptionsAsHTML(String topicName) throws Exception
 //   {
 //      return listSubscriptionsAsHTML(topicName, ListType.DURABLE);
 //   }
-//   
+//
 //   public String listNonDurableSubscriptionsAsHTML(String topicName) throws Exception
 //   {
 //      return listSubscriptionsAsHTML(topicName, ListType.NON_DURABLE);
 //   }
-//      
+//
 //   public List<Message> listDurableMessagesForSubscription(String subId, String selector) throws Exception
 //   {
 //      return listMessagesForSubscription(ListType.DURABLE, subId, selector);
 //   }
-//   
+//
 //   public List<Message> listNonDurableMessagesForSubscription(String subId, String selector) throws Exception
 //   {
 //      return listMessagesForSubscription(ListType.NON_DURABLE, subId, selector);
 //   }
-//   
+//
 //   public List<MessageCounter> getMessageCounters(String topicName) throws Exception
 //   {
 //      List<MessageCounter> counters = new ArrayList<MessageCounter>();
-//      
+//
 //      Condition condition = new ConditionImpl(DestinationType.TOPIC, topicName);
-//      
+//
 //      List<Binding> bindings = messagingServer.getPostOffice().getBindingsForCondition(condition);
-//      
+//
 //      for (Binding binding: bindings)
 //      {
 //         Queue queue = binding.getQueue();
-//         
+//
 //         //TODO - get message counters
-//         
+//
 ////         String counterName = SUBSCRIPTION_MESSAGECOUNTER_PREFIX + queue.getName();
-////         
+////
 ////         MessageCounter counter = messagingServer.getMessageCounterManager().getMessageCounter(counterName);
-////         
+////
 ////         if (counter == null)
 ////         {
 ////            throw new IllegalStateException("Cannot find counter with name " + counterName);
 ////         }
-////         
+////
 ////         counters.add(counter);
 //      }
-//      
-//      return counters; 
+//
+//      return counters;
 //   }
-//   
-//   
+//
+//
 ////   public void setMessageCounterHistoryDayLimit(String topicName, int limit) throws Exception
 ////   {
 ////      Condition condition = new ConditionImpl(DestinationType.TOPIC, topicName);
-////      
+////
 ////      List<Binding> bindings = messagingServer.getPostOffice().getBindingsForCondition(condition);
-////         
+////
 ////      for (Binding binding: bindings)
 ////      {
 ////         Queue queue = binding.getQueue();
-////         
+////
 ////         queue.setMessageCounterHistoryDayLimit(limit);
-////      }      
+////      }
 ////   }
-//   
+//
 //   // Private ---------------------------------------------------------------------------
-//   
+
+   //
    private Queue getQueue(String queueName) throws Exception
    {
       Binding binding = messagingServer.getPostOffice().getBinding(queueName);
@@ -626,10 +785,10 @@
       {
          throw new IllegalArgumentException("No queue with name " + queueName);
       }
-      
+
       return binding.getQueue();
    }
-//   
+//
 //    
 //   
 //   private List<Message> listMessages(Queue queue, ListType type, String jmsSelector) throws Exception
@@ -793,12 +952,10 @@
 //
 //      return ret;
 //   }
+
+   //
+
 //   
-   private enum ListType
-   {
-      ALL, DURABLE, NON_DURABLE
-   }
-//   
 //  
 //   private List<Message> listMessagesForSubscription(ListType type, String subId, String jmsSelector) throws Exception
 //   { 
@@ -845,39 +1002,11 @@
 //      
 //      return msgs;
 //   }
-//   
-   private List<SubscriptionInfo> listSubscriptions(String address, ListType type) throws Exception
-   {      
-      List<SubscriptionInfo> subs = new ArrayList<SubscriptionInfo>();
-      
-      List<Binding> bindings = messagingServer.getPostOffice().getBindingsForAddress(address);
-      
-      for (Binding binding: bindings)
-      {
-         Queue queue = binding.getQueue();
-         
-         if (type == ListType.ALL || (type == ListType.DURABLE && queue.isDurable()) || (type == ListType.NON_DURABLE && !queue.isDurable()))
-         {         
-            String subName = null;
-            String clientID = null;
-            
-            if (queue.isDurable())
-            {
-               MessageQueueNameHelper helper = MessageQueueNameHelper.createHelper(queue.getName());
-               subName = helper.getSubName();
-               clientID = helper.getClientId();
-            }
-            
-            SubscriptionInfo info = new SubscriptionInfo(queue.getName(), queue.isDurable(), subName, clientID,
-                     queue.getFilter() == null ? null : queue.getFilter().getFilterString(), queue.getMessageCount(), queue.getMaxSize());
-            
-            subs.add(info);
-         }
-      }
-      
-      return subs;
-   }
-//   
+
+   //
+
+
+//
 //   private int getMessageCount(String topicName, ListType type) throws Exception
 //   {
 //      Condition condition = new ConditionImpl(DestinationType.TOPIC, topicName);
@@ -974,6 +1103,24 @@
 //      
 //      return sb.toString();                                
 //   }
-   
-   
+
+
+   public void start() throws Exception
+   {
+      scheduler = Executors.newScheduledThreadPool(maxMessageCounters);
+   }
+
+   public void stop() throws Exception
+   {
+      if(scheduler != null)
+      {
+         scheduler.shutdown();
+      }
+   }
+
+   protected void finalize() throws Throwable
+   {
+      super.finalize();
+
+   }
 }

Modified: trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java	2008-02-15 14:55:19 UTC (rev 3725)
+++ trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java	2008-02-15 15:59:01 UTC (rev 3726)
@@ -809,7 +809,7 @@
 
    public List listAllSubscriptionsForTopic(String s) throws Exception
    {
-      return getJMSServerManager().listAllSubscriptionsForTopic(s);
+      return getJMSServerManager().listSubscriptions(s);
    }
 
 




More information about the jboss-cvs-commits mailing list