[jboss-cvs] JBoss Messaging SVN: r3726 - in trunk: src/main/org/jboss/jms/client and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Feb 15 10:59:01 EST 2008
Author: ataylor
Date: 2008-02-15 10:59:01 -0500 (Fri, 15 Feb 2008)
New Revision: 3726
Added:
trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedManagementExample.java
Modified:
trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java
trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/jms/server/JMSServerManager.java
trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java
trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java
trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounter.java
trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounterManager.java
trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java
trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
Log:
refactoring of management interfaces second draft
Added: trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedManagementExample.java
===================================================================
--- trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedManagementExample.java (rev 0)
+++ trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedManagementExample.java 2008-02-15 15:59:01 UTC (rev 3726)
@@ -0,0 +1,131 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.example.embedded;
+
+import org.jboss.messaging.core.remoting.RemotingConfiguration;
+import static org.jboss.messaging.core.remoting.TransportType.TCP;
+import org.jboss.messaging.core.MessagingServer;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessagingServerManagement;
+import org.jboss.messaging.core.impl.server.MessagingServerImpl;
+import org.jboss.messaging.core.impl.server.MessagingServerManagementImpl;
+import org.jboss.messaging.core.impl.MessageImpl;
+import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
+import org.jboss.jms.client.api.*;
+import org.jboss.jms.client.impl.ClientConnectionFactoryImpl;
+import org.jboss.jms.message.JBossTextMessage;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class EmbeddedManagementExample
+{
+ public static void main(String args[]) throws Exception
+ {
+ RemotingConfiguration remotingConf = new RemotingConfiguration(TCP, "localhost", 5400);
+ remotingConf.setInvmDisabled(true);
+ MessagingServer messagingServer = new MessagingServerImpl(remotingConf);
+ messagingServer.start();
+ MessagingServerManagementImpl messagingServerManagement = new MessagingServerManagementImpl();
+ messagingServerManagement.setMessagingServer(messagingServer);
+ messagingServerManagement.start();
+ ClientConnectionFactory cf = new ClientConnectionFactoryImpl(remotingConf);
+ ClientConnection clientConnection = cf.createConnection(null, null);
+ ClientSession clientSession = clientConnection.createClientSession(false, true, true, 0);
+ String queue = "Queue1";
+ clientSession.createQueue(queue, queue, null, false, false);
+ ClientProducer clientProducer = clientSession.createProducer();
+
+ clientConnection.start();
+
+ messagingServerManagement.registerMessageCounter(queue);
+ Message message = new MessageImpl(JBossTextMessage.TYPE, true, 0, System.currentTimeMillis(), (byte) 1);
+ messagingServerManagement.startMessageCounter(queue, 0);
+ for (int i = 0; i < 1000; i++)
+ {
+ clientProducer.send(queue, message);
+ }
+
+ MessageCounter messageCounter = messagingServerManagement.getMessageCounter(queue);
+ System.out.println("messageCounter = " + messageCounter);
+ for (int i = 0; i < 2000; i++)
+ {
+ clientProducer.send(queue, message);
+ }
+
+ messageCounter = messagingServerManagement.getMessageCounter(queue);
+ System.out.println("messageCounter = " + messageCounter);
+ for (int i = 0; i < 3000; i++)
+ {
+ clientProducer.send(queue, message);
+ }
+
+ messageCounter = messagingServerManagement.getMessageCounter(queue);
+ System.out.println("messageCounter = " + messageCounter);
+ for (int i = 0; i < 4000; i++)
+ {
+ clientProducer.send(queue, message);
+ }
+
+ messageCounter = messagingServerManagement.getMessageCounter(queue);
+ System.out.println("messageCounter = " + messageCounter);
+ messagingServerManagement.stopMessageCounter(queue);
+ messagingServerManagement.startMessageCounter(queue, 5);
+ ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+ Timer timer = new Timer();
+ scheduler.schedule(timer, 10, TimeUnit.SECONDS);
+ int counter = 0;
+ while (timer.isRunning())
+ {
+ clientProducer.send(queue, message);
+ counter++;
+ }
+ scheduler.shutdown();
+ System.out.println("counter = " + counter);
+ messageCounter = messagingServerManagement.getMessageCounter(queue);
+ System.out.println("messageCounter = " + messageCounter);
+ messagingServerManagement.unregisterMessageCounter(queue);
+ clientConnection.close();
+ messagingServerManagement.stop();
+ messagingServer.stop();
+ }
+
+ private static class Timer implements Runnable
+ {
+ boolean running = true;
+
+ public boolean isRunning()
+ {
+ return running;
+ }
+
+ public void run()
+ {
+ running = false;
+ }
+ }
+}
Modified: trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java 2008-02-15 14:55:19 UTC (rev 3725)
+++ trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java 2008-02-15 15:59:01 UTC (rev 3726)
@@ -416,12 +416,9 @@
String coreDest = dest.getAddress();
- // TODO - can optimise this copy to do copy lazily.
- org.jboss.messaging.core.Message messageToSend = jbm.getCoreMessage().copy();
-
try
{
- producer.send(coreDest, messageToSend);
+ producer.send(coreDest, jbm.getCoreMessage());
}
catch (MessagingException e)
{
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java 2008-02-15 14:55:19 UTC (rev 3725)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java 2008-02-15 15:59:01 UTC (rev 3726)
@@ -429,7 +429,7 @@
{
checkClosed();
- SessionSendMessage message = new SessionSendMessage(address, m);
+ SessionSendMessage message = new SessionSendMessage(address, m.copy());
remotingConnection.send(id, message, !m.isDurable());
}
Modified: trunk/src/main/org/jboss/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/JMSServerManager.java 2008-02-15 14:55:19 UTC (rev 3725)
+++ trunk/src/main/org/jboss/jms/server/JMSServerManager.java 2008-02-15 15:59:01 UTC (rev 3726)
@@ -4,6 +4,7 @@
import org.jboss.jms.destination.JBossTopic;
import org.jboss.messaging.core.impl.server.SubscriptionInfo;
+import javax.jms.Message;
import java.util.List;
/**
@@ -13,6 +14,10 @@
public interface JMSServerManager
{
// management operations
+ enum ListType
+ {
+ ALL, DURABLE, NON_DURABLE
+ }
boolean isStarted();
boolean createQueue(String queueName, String jndiBinding) throws Exception;
@@ -29,27 +34,53 @@
boolean destroyConnectionFactory(String name) throws Exception;
- public void removeAllMessagesForQueue(String queueName) throws Exception;
+ public List<Message> listMessagesForQueue(String queue);
- public void removeAllMessagesForTopic(String topicName) throws Exception;
+ public List<Message> listMessagesForQueue(String queue, ListType listType);
- public void removeAllMessagesForQueue(JBossQueue queueName) throws Exception;
+ public List<Message> listMessages(JBossQueue queue);
- public void removeAllMessagesForTopic(JBossTopic topicName) throws Exception;
+ public List<Message> listMessagesForTopic(String topic);
+ public List<Message> listMessagesForTopic(String topic, ListType listType);
+
+ public List<Message> listMessages(JBossTopic topic);
+
+ public List<Message> listMessages(JBossQueue queue, ListType listType);
+
+ public List<Message> listMessages(JBossTopic topic, ListType listType);
+
+ void removeAllMessagesForQueue(String queueName) throws Exception;
+
+ void removeAllMessagesForTopic(String topicName) throws Exception;
+
+ void removeAllMessages(JBossQueue queueName) throws Exception;
+
+ void removeAllMessages(JBossTopic topicName) throws Exception;
+
int getMessageCountForQueue(String queue) throws Exception;
- int getMessageCountForQueue(JBossQueue queue) throws Exception;
+ int getMessageCount(JBossQueue queue) throws Exception;
- List<SubscriptionInfo> listAllSubscriptionsForTopic(String topicName) throws Exception;
+ List<SubscriptionInfo> listSubscriptions(String topicName) throws Exception;
- List<SubscriptionInfo> listAllSubscriptionsForTopic(JBossTopic topicName) throws Exception;
+ List<SubscriptionInfo> listSubscriptions(JBossTopic topic) throws Exception;
- List<SubscriptionInfo> listDurableSubscriptionsForTopic(String topicName) throws Exception;
+ List<SubscriptionInfo> listSubscriptions(String topicName, ListType listType) throws Exception;
- List<SubscriptionInfo> listDurableSubscriptionsForTopic(JBossTopic topicName) throws Exception;
+ List<SubscriptionInfo> listSubscriptions(JBossTopic topic, ListType listType) throws Exception;
- List<SubscriptionInfo> listNonDurableSubscriptionsForTopic(String topicName) throws Exception;
+ int getSubscriptionsCountForTopic(String topicName) throws Exception;
- List<SubscriptionInfo> listNonDurableSubscriptionsForTopic(JBossTopic topicName) throws Exception;
+ int getSubscriptionsCount(JBossTopic topic) throws Exception;
+
+ int getSubscriptionsCountForTopic(String topicName, ListType listType) throws Exception;
+
+ int getSubscriptionsCount(JBossTopic topic,ListType listType) throws Exception;
+
+ int getConsumerCountForQueue(String queue) throws Exception;
+
+ int getConsumerCountForQueue(JBossQueue queue) throws Exception;
+
+ List getClients() throws Exception;
}
Modified: trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java 2008-02-15 14:55:19 UTC (rev 3725)
+++ trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java 2008-02-15 15:59:01 UTC (rev 3726)
@@ -25,12 +25,18 @@
import org.jboss.jms.client.api.ClientConnectionFactory;
import org.jboss.jms.destination.JBossQueue;
import org.jboss.jms.destination.JBossTopic;
+import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.MessagingServerManagement;
+import org.jboss.messaging.core.Queue;
import org.jboss.messaging.core.impl.server.SubscriptionInfo;
+import org.jboss.messaging.core.impl.messagecounter.MessageStatistics;
+import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
import org.jboss.messaging.deployers.Deployer;
import org.jboss.messaging.deployers.DeploymentManager;
import org.jboss.messaging.util.JNDIUtil;
+import org.jboss.messaging.util.MessageQueueNameHelper;
+import org.jboss.aop.microcontainer.aspects.jmx.JMX;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
@@ -38,9 +44,11 @@
import javax.naming.InitialContext;
import javax.naming.NameNotFoundException;
import javax.naming.NamingException;
+import javax.jms.Message;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
+import java.util.Collection;
/**
* A Deployer used to create and add to JNDI queues, topics and connection factories. Typically this would only be used
@@ -48,6 +56,7 @@
*
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
*/
+ at JMX(name = "jboss.messaging:service=MessagingServer", exposedInterface = JMSServerManager.class)
public class JMSServerManagerImpl extends Deployer implements JMSServerManager
{
Logger log = Logger.getLogger(JMSServerManagerImpl.class);
@@ -65,8 +74,6 @@
MessagingServerManagement messagingServerManagement;
-// UserManager userManager = new UserManager();
-
private static final String CLIENTID_ELEMENT = "client-id";
private static final String DUPS_OK_BATCH_SIZE_ELEMENT = "dups-ok-batch-size";
private static final String PREFETECH_SIZE_ELEMENT = "prefetch-size";
@@ -401,68 +408,185 @@
return true;
}
+
+ public List<Message> listMessagesForQueue(String queue)
+ {
+ return listMessagesForQueue(queue, ListType.ALL);
+ }
+
+ public List<Message> listMessagesForQueue(String queue, ListType listType)
+ {
+ return listMessages(new JBossQueue(queue), listType);
+ }
+
+ public List<Message> listMessages(JBossQueue queue)
+ {
+ return listMessages(queue, ListType.ALL);
+ }
+
+ public List<Message> listMessagesForTopic(String topic)
+ {
+ return listMessagesForTopic(topic, ListType.ALL);
+ }
+
+ public List<Message> listMessagesForTopic(String topic, ListType listType)
+ {
+ return listMessages(new JBossTopic(topic), listType);
+ }
+
+ public List<Message> listMessages(JBossTopic topic)
+ {
+ return listMessages(topic, ListType.ALL);
+ }
+
+ public List<Message> listMessages(JBossQueue queue, ListType listType)
+ {
+ return null; //todo
+ }
+
+ public List<Message> listMessages(JBossTopic topic, ListType listType)
+ {
+ return null; //todo
+ }
+
public void removeAllMessagesForQueue(String queueName) throws Exception
{
JBossQueue jBossQueue = new JBossQueue(queueName);
- removeAllMessagesForQueue(jBossQueue);
+ removeAllMessages(jBossQueue);
}
public void removeAllMessagesForTopic(String topicName) throws Exception
{
JBossTopic jBossTopic = new JBossTopic(topicName);
- removeAllMessagesForTopic(jBossTopic);
+ removeAllMessages(jBossTopic);
}
- public void removeAllMessagesForQueue(JBossQueue queue) throws Exception
+ public void removeAllMessages(JBossQueue queue) throws Exception
{
messagingServerManagement.removeAllMessagesForAddress(queue.getAddress());
}
- public void removeAllMessagesForTopic(JBossTopic topic) throws Exception
+ public void removeAllMessages(JBossTopic topic) throws Exception
{
messagingServerManagement.removeAllMessagesForAddress(topic.getAddress());
}
public int getMessageCountForQueue(String queue) throws Exception
{
- return getMessageCountForQueue(new JBossQueue(queue));
+ return getMessageCount(new JBossQueue(queue));
}
- public int getMessageCountForQueue(JBossQueue queue) throws Exception
+ public int getMessageCount(JBossQueue queue) throws Exception
{
return messagingServerManagement.getMessageCountForQueue(queue.getAddress());
}
- public List<SubscriptionInfo> listAllSubscriptionsForTopic(String topicName) throws Exception
+
+ public List<SubscriptionInfo> listSubscriptions(String topicName) throws Exception
{
- return listAllSubscriptionsForTopic(new JBossTopic(topicName));
+ return listSubscriptions(new JBossTopic(topicName));
}
- public List<SubscriptionInfo> listAllSubscriptionsForTopic(JBossTopic topicName) throws Exception
+ public List<SubscriptionInfo> listSubscriptions(JBossTopic topic) throws Exception
{
- return messagingServerManagement.listAllSubscriptionsForAddress(topicName.getAddress());
+ return listSubscriptions(topic, ListType.ALL);
}
- public List<SubscriptionInfo> listDurableSubscriptionsForTopic(String topicName) throws Exception
+ public List<SubscriptionInfo> listSubscriptions(String topic, ListType type) throws Exception
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return listSubscriptions(new JBossTopic(topic), type);
}
+ public List<SubscriptionInfo> listSubscriptions(JBossTopic topic, ListType type) throws Exception
+ {
+ List<SubscriptionInfo> subs = new ArrayList<SubscriptionInfo>();
- public List<SubscriptionInfo> listDurableSubscriptionsForTopic(JBossTopic topicName) throws Exception
+ List<Queue> queues = messagingServerManagement.getQueuesForAddress(topic.getAddress());
+
+ for (Queue queue : queues)
+ {
+ if (type == ListType.ALL || (type == ListType.DURABLE && queue.isDurable()) || (type == ListType.NON_DURABLE && !queue.isDurable()))
+ {
+ String subName = null;
+ String clientID = null;
+
+ if (queue.isDurable())
+ {
+ MessageQueueNameHelper helper = MessageQueueNameHelper.createHelper(queue.getName());
+ subName = helper.getSubName();
+ clientID = helper.getClientId();
+ }
+
+ SubscriptionInfo info = new SubscriptionInfo(queue.getName(), queue.isDurable(), subName, clientID,
+ queue.getFilter() == null ? null : queue.getFilter().getFilterString(), queue.getMessageCount(), queue.getMaxSize());
+
+ subs.add(info);
+ }
+ }
+
+ return subs;
+ }
+
+ public int getSubscriptionsCountForTopic(String topicName) throws Exception
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return getSubscriptionsCount(new JBossTopic(topicName));
}
- public List<SubscriptionInfo> listNonDurableSubscriptionsForTopic(String topicName) throws Exception
+ public int getSubscriptionsCount(JBossTopic topic) throws Exception
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return getSubscriptionsCount(topic, ListType.ALL);
}
- public List<SubscriptionInfo> listNonDurableSubscriptionsForTopic(JBossTopic topicName) throws Exception
+ public int getSubscriptionsCountForTopic(String topicName, ListType listType) throws Exception
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return getSubscriptionsCount(new JBossTopic(topicName), listType);
}
+ public int getSubscriptionsCount(JBossTopic topic, ListType listType) throws Exception
+ {
+ return listSubscriptions(topic, listType).size();
+ }
+
+ public int getConsumerCountForQueue(String queue) throws Exception
+ {
+ return getConsumerCountForQueue(new JBossQueue(queue));
+ }
+
+ public int getConsumerCountForQueue(JBossQueue queue) throws Exception
+ {
+ return messagingServerManagement.getConsumerCountForQueue(queue.getName());
+ }
+ //todo something!
+ public List getClients() throws Exception
+ {
+ List<ServerConnectionEndpoint> endpoints = messagingServerManagement.getClients();
+ for (ServerConnectionEndpoint endpoint : endpoints)
+ {
+ //endpoint.
+ }
+ return null;
+ }
+
+ List<MessageStatistics> getMessageStatistics(String queue) throws Exception
+ {
+ Collection<MessageCounter> counters = messagingServerManagement.getMessageCounters();
+ List<MessageStatistics> list = new ArrayList<MessageStatistics>(counters.size());
+ for (Object counter1 : counters)
+ {
+ MessageCounter counter = (MessageCounter) counter1;
+
+ MessageStatistics stats = new MessageStatistics();
+ stats.setName(counter.getDestinationName());
+ stats.setDurable(counter.getDestinationDurable());
+ stats.setCount(counter.getCount());
+ stats.setCountDelta(counter.getCountDelta());
+ stats.setDepth(counter.getMessageCount());
+ stats.setDepthDelta(counter.getMessageCountDelta());
+ stats.setTimeLastUpdate(counter.getLastUpdate());
+
+ list.add(stats);
+ }
+ return list;
+ }
//private
private void addToDestinationBindings(String destination, String jndiBinding)
@@ -473,4 +597,5 @@
}
destinations.get(destination).add(jndiBinding);
}
+
}
Modified: trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java 2008-02-15 14:55:19 UTC (rev 3725)
+++ trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java 2008-02-15 15:59:01 UTC (rev 3726)
@@ -21,10 +21,12 @@
*/
package org.jboss.messaging.core;
-import org.jboss.messaging.core.impl.server.SubscriptionInfo;
+import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
import org.jboss.jms.client.api.ClientConnectionFactory;
+import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
import java.util.List;
+import java.util.Collection;
/**
* This interface describes the management interface exposed by the server
@@ -47,6 +49,8 @@
boolean addAddress(String address);
boolean removeAddress(String address);
+
+ List<Queue> getQueuesForAddress(String address) throws Exception;
ClientConnectionFactory createClientConnectionFactory(boolean strictTck,int prefetchSize);
@@ -56,303 +60,31 @@
int getMessageCountForQueue(String queue) throws Exception;
- List<SubscriptionInfo> listAllSubscriptionsForAddress(String address) throws Exception;
+ void registerMessageCounter(String queueName) throws Exception;
- List<SubscriptionInfo> listDurableSubscriptionsForAddress(String address) throws Exception;
+ void unregisterMessageCounter(String queueName) throws Exception;
- List<SubscriptionInfo> listNonSubscriptionsForAddress(String address) throws Exception;
-//
-// /**
-// * returns how many messagesd have been delivered from a specific queue
-// * @param queue the queue
-// * @return number of messages
-// * @throws Exception if a problem occurs
-// */
-// int getDeliveringCountForQueue(String queue) throws Exception;
-//
-// /**
-// * returns how many messages are scheduled for a specific queue
-// * @param queue the queue
-// * @return number of messages
-// * @throws Exception if a problem occurs
-// */
-// int getScheduledMessageCountForQueue(String queue) throws Exception;
-//
-// /**
-// * returns the message counter for a queue
-// * @param queue the queue
-// * @return number of messages
-// * @throws Exception if a problem occurs
-// */
-// MessageCounter getMessageCounterForQueue(String queue) throws Exception;
-//
-// /**
-// * returns the message statistics for a queue
-// * @param queue the queue
-// * @return the message statistics
-// * @throws Exception if a problem occurs
-// */
-// MessageStatistics getMessageStatisticsForQueue(String queue) throws Exception;
-//
-// /**
-// * returns how many consumers a specific queue has
-// * @param queue the queue
-// * @return number of consumers
-// * @throws Exception if a problem occurs
-// */
-// int getConsumerCountForQueue(String queue) throws Exception;
-//
-// /**
-// * restes the message counter for a specific queue
-// * @param queue the queue
-// * @throws Exception if a problem occurs
-// */
-// void resetMessageCounterForQueue(String queue) throws Exception;
-//
-// /**
-// * resets the counter history for a specific queue
-// * @param queue the queue
-// * @throws Exception if a problem occurs
-// */
-// void resetMessageCounterHistoryForQueue(String queue) throws Exception;
-//
-// /**
-// * lists all messages for a specific queue. This will contain the message references only
-// * @param queue the queue
-// * @return the messages
-// * @throws Exception if a problem occurs
-// */
-// List listAllMessagesForQueue(String queue) throws Exception;
-//
-// /**
-// * lists all messages that match the given selector.
-// * @param queue the queue
-// * @param selector
-// * @return the messages
-// * @throws Exception if a problem occurs
-// */
-// List listAllMessagesForQueue(String queue,String selector) throws Exception;
-//
-// /**
-// * list all the durable messages for a specific queue
-// * @param queue the queue
-// * @return the messages
-// * @throws Exception if a problem occurs
-// */
-// List listDurableMessagesForQueue(String queue) throws Exception;
-//
-// /**
-// * list all the durable messages for a queue that match a given selector
-// * @param queue the queue
-// * @param selector
-// * @return the messages
-// * @throws Exception if a problem occurs
-// */
-// List listDurableMessagesForQueue(String queue,String selector) throws Exception;
-//
-// /**
-// * lists all the non durable messages for a specific queue.
-// * @param queue the queue
-// * @return the messages
-// * @throws Exception if a problem occurs
-// */
-// List listNonDurableMessagesForQueue(String queue) throws Exception;
-//
-// /**
-// * lists all noon durable messages for a queue.
-// * @param queue
-// * @param selector
-// * @return
-// * @throws Exception
-// */
-// List listNonDurableMessagesForQueue(String queue,String selector) throws Exception;
-//
-// /**
-// * lists all durable messages for a specific queue that match a specific selector
-// * @param queue
-// * @return
-// * @throws Exception
-// */
-// String listMessageCounterAsHTMLForQueue(String queue) throws Exception;
-//
-// /**
-// * list the message count history for a specific queue in HTML format
-// * @param queue
-// * @return
-// * @throws Exception
-// */
-// String listMessageCounterHistoryAsHTMLForQueue(String queue) throws Exception;
-//
-// //topic
-//
-// /**
-// * counts messages received for a topic
-// * @param topicName
-// * @return
-// * @throws Exception
-// */
-// int getAllMessageCountForTopic(String topicName) throws Exception;
-//
-// /**
-// * counts durable messages recieved for a topic
-// * @param topicName
-// * @return
-// * @throws Exception
-// */
-// int getDurableMessageCountForTopic(String topicName) throws Exception;
-//
-// /**
-// * counts non durable messages recieved for a topic
-// * @param topicName
-// * @return
-// * @throws Exception
-// */
-// int getNonDurableMessageCountForTopic(String topicName) throws Exception;
-//
-// /**
-// * counts all subscriptions for a topic
-// * @param topicName
-// * @return
-// * @throws Exception
-// */
-// int getAllSubscriptionsCountForTopic(String topicName) throws Exception;
-//
-// /**
-// * counts all durable subscriptions for a topic
-// * @param topicName
-// * @return
-// * @throws Exception
-// */
-// int getDurableSubscriptionsCountForTopic(String topicName) throws Exception;
-//
-// /**
-// * counts all non durable subscriptions for a topic
-// * @param topicName
-// * @return
-// * @throws Exception
-// */
-// int getNonDurableSubscriptionsCountForTopic(String topicName) throws Exception;
-//
-// /**
-// * removes all the messages for a specific topic
-// * @param topic
-// * @throws Exception
-// * @throws Exception
-// */
-// void removeAllMessagesForTopic(String topic) throws Exception;
-//
-// /**
-// * lists all subscriptions for a topic
-// * @param topic
-// * @return
-// * @throws Exception
-// */
-// List listAllSubscriptionsForAddress(String topic) throws Exception;
-//
-// /**
-// * lists all durable subscriptions for a topic
-// * @param topic
-// * @return
-// * @throws Exception
-// */
-// List listDurableSubscriptionsForTopic(String topic) throws Exception;
-//
-// /**
-// * lists all non durable subscriptions for a topic
-// * @param topic
-// * @return
-// * @throws Exception
-// */
-// List listNonDurableSubscriptionsForTopic(String topic) throws Exception;
-//
-// /**
-// * lists all subscriptions for a topic as html
-// * @param topic
-// * @return
-// * @throws Exception
-// */
-// String listAllSubscriptionsAsHTMLForTopic(String topic) throws Exception;
-//
-// /**
-// * lists all durable subscriptions for a topic as html
-// * @param topic
-// * @return
-// * @throws Exception
-// */
-// String listDurableSubscriptionsAsHTMLForTopic(String topic) throws Exception;
-//
-// /**
-// * lists all non durable subscriptions for a topic as html
-// * @param topic
-// * @return
-// * @throws Exception
-// */
-// String listNonDurableSubscriptionsAsHTMLForTopic(String topic) throws Exception;
-//
-// /**
-// * lists all the messages for a topic for a given subscription
-// * @param topic
-// * @param subscriptionId
-// * @return
-// * @throws Exception
-// */
-// List listAllMessagesForTopic(String topic,String subscriptionId) throws Exception;
-//
-// /**
-// * lists all the messages for a topic for a given subscription and message selector
-// * @param topic
-// * @param subscriptionId
-// * @param selector
-// * @return
-// * @throws Exception
-// */
-// List listAllMessagesForTopic(String topic,String subscriptionId, String selector) throws Exception;
-//
-// /**
-// * lists durable messages for a topic for a given subscription
-// * @param topic
-// * @param subscriptionId
-// * @return
-// * @throws Exception
-// */
-// List listDurableMessagesForTopic(String topic,String subscriptionId) throws Exception;
-//
-// /**
-// * lists durable messages for a topic for a given subscription and message selector
-// * @param topic
-// * @param subscriptionId
-// * @param selector
-// * @return
-// * @throws Exception
-// */
-// List listDurableMessagesForTopic(String topic,String subscriptionId, String selector) throws Exception;
-//
-// /**
-// * lists non durable messages for a topic for a given subscription
-// * @param topic
-// * @param subscriptionId
-// * @return
-// * @throws Exception
-// */
-// List listNonDurableMessagesForTopic(String topic,String subscriptionId) throws Exception;
-//
-// /**
-// * lists durable messages for a topic for a given subscription and message selector
-// * @param topic
-// * @param subscriptionId
-// * @param selector
-// * @return
-// * @throws Exception
-// */
-// List listNonDurableMessagesForTopic(String topic,String subscriptionId, String selector) throws Exception;
-//
-// List getMessageCountersForTopic(String topic) throws Exception;
-//
-// String showActiveClientsAsHTML() throws Exception;
-//
-// String showPreparedTransactionsAsHTML();
-//
-// String listMessageCountersAsHTML() throws Exception;
+ void startMessageCounter(String queueName, long duration) throws Exception;
+ MessageCounter stopMessageCounter(String queueName) throws Exception;
+ MessageCounter getMessageCounter(String queueName);
+
+ Collection<MessageCounter> getMessageCounters();
+
+ void resetMessageCounter(String queue);
+
+ void resetMessageCounters();
+
+ void resetMessageCounterHistory(String queue);
+
+ void resetMessageCounterHistories();
+
+ List<MessageCounter> stopAllMessageCounters() throws Exception;
+
+ void unregisterAllMessageCounters() throws Exception;
+
+ public int getConsumerCountForQueue(String queue) throws Exception;
+
+ List<ServerConnectionEndpoint> getClients();
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounter.java 2008-02-15 14:55:19 UTC (rev 3725)
+++ trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounter.java 2008-02-15 15:59:01 UTC (rev 3726)
@@ -52,8 +52,6 @@
// destination related information
private String destName;
- private String destSubscription;
- private boolean destTopic;
private boolean destDurable;
// destination queue
@@ -67,36 +65,31 @@
// per hour day counter history
private int dayCounterMax;
- private ArrayList dayCounter;
+ private ArrayList<DayCounter> dayCounter;
/**
* Get a list of message statistics from a list of message counters
- *
- * @param counter the message counters
+ *
* @return the message statistics
* @throws Exception for any error
*/
- public static List getMessageStatistics(List counters) throws Exception
+ public static List<? extends MessageStatistics> getMessageStatistics(List counters) throws Exception
{
- List list = new ArrayList(counters.size());
-
- Iterator iter = counters.iterator();
-
- while (iter.hasNext())
+ List<MessageStatistics> list = new ArrayList<MessageStatistics>(counters.size());
+
+ for (Object counter1 : counters)
{
- MessageCounter counter = (MessageCounter)iter.next();
-
+ MessageCounter counter = (MessageCounter) counter1;
+
MessageStatistics stats = new MessageStatistics();
stats.setName(counter.getDestinationName());
- stats.setSubscriptionID(counter.getDestinationSubscription());
- stats.setTopic(counter.getDestinationTopic());
stats.setDurable(counter.getDestinationDurable());
stats.setCount(counter.getCount());
stats.setCountDelta(counter.getCountDelta());
stats.setDepth(counter.getMessageCount());
stats.setDepthDelta(counter.getMessageCountDelta());
stats.setTimeLastUpdate(counter.getLastUpdate());
-
+
list.add(stats);
}
return list;
@@ -106,23 +99,17 @@
* Constructor
*
* @param name destination name
- * @param subscription subscription name
* @param queue internal queue object
- * @param topic topic destination flag
* @param durable durable subsciption flag
* @param daycountmax max message history day count
*/
public MessageCounter(String name,
- String subscription,
Queue queue,
- boolean topic,
boolean durable,
int daycountmax)
{
// store destination related information
destName = name;
- destSubscription = subscription;
- destTopic = topic;
destDurable = durable;
destQueue = queue;
@@ -130,7 +117,7 @@
resetCounter();
// initialize message history
- dayCounter = new ArrayList();
+ dayCounter = new ArrayList<DayCounter>();
setHistoryLimit(daycountmax);
}
@@ -148,14 +135,12 @@
/*
* This method is called periodically to update statistics from the queue
*/
- public synchronized void onTimer()
+ public synchronized void sample()
{
int latestMessagesAdded = destQueue.getMessagesAdded();
+
+ countTotal += latestMessagesAdded - lastMessagesAdded;
- int newMessagesAdded = latestMessagesAdded - lastMessagesAdded;
-
- countTotal += newMessagesAdded;
-
lastMessagesAdded = latestMessagesAdded;
//update timestamp
@@ -176,26 +161,6 @@
}
/**
- * Gets the related destination subscription
- *
- * @return String destination name
- */
- public String getDestinationSubscription()
- {
- return destSubscription;
- }
-
- /**
- * Gets the related destination topic flag
- *
- * @return boolean true: topic destination, false: queue destination
- */
- public boolean getDestinationTopic()
- {
- return destTopic;
- }
-
- /**
* Gets the related destination durable subscription flag
*
* @return boolean true : durable subscription,
@@ -291,45 +256,18 @@
{
StringBuffer ret = new StringBuffer();
- // Topic/Queue
- if (destTopic)
- ret.append("Topic,");
- else
- ret.append("Queue,");
-
// name
ret.append(destName).append(",");
- // subscription
- if (destSubscription != null)
- ret.append(destSubscription).append(",");
- else
- ret.append("-,");
-
- // Durable subscription
- if (destTopic)
- {
- // Topic
- if (destDurable)
- ret.append("true,");
- else
- ret.append("false,");
- }
- else
- {
- // Queue
- ret.append("-,");
- }
-
// counter values
- ret.append(getCount()).append(",").append(getCountDelta()).append(",").append(getMessageCount()).append(",").append(getMessageCountDelta()).append(",");
+ ret.append("total = ").append(getCount()).append(",").append("total delta = ").append(getCountDelta()).append(",").append("current = ").append(getMessageCount()).append(",").append("current delta = ").append(getMessageCountDelta()).append(",");
// timestamp last counter update
if (timeLastUpdate > 0)
{
DateFormat dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM);
- ret.append(dateFormat.format(new Date(timeLastUpdate)));
+ ret.append("last reset at ").append(dateFormat.format(new Date(timeLastUpdate)));
}
else
{
@@ -414,7 +352,7 @@
// calculate day difference between current date and date of last day counter entry
synchronized (dayCounter)
{
- DayCounter counterLast = (DayCounter) dayCounter.get(dayCounter.size() - 1);
+ DayCounter counterLast = dayCounter.get(dayCounter.size() - 1);
GregorianCalendar calNow = new GregorianCalendar();
GregorianCalendar calLast = counterLast.getDate();
@@ -468,7 +406,7 @@
}
// update last day counter entry
- counterLast = (DayCounter) dayCounter.get(dayCounter.size() - 1);
+ counterLast = dayCounter.get(dayCounter.size() - 1);
counterLast.updateDayCounter(incrementCounter);
}
}
@@ -510,10 +448,8 @@
ret += dayCounter.size() + "\n";
// following lines: day counter data
- for (int i = 0; i < dayCounter.size(); i++)
+ for (DayCounter counter : dayCounter)
{
- DayCounter counter = (DayCounter) dayCounter.get(i);
-
ret += counter.getDayCounterAsString() + "\n";
}
}
@@ -602,7 +538,7 @@
bUpdate = true;
}
- if (bUpdate == true)
+ if (bUpdate)
{
if (counters[i] == -1)
counters[i] = 0;
Modified: trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounterManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounterManager.java 2008-02-15 14:55:19 UTC (rev 3725)
+++ trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounterManager.java 2008-02-15 15:59:01 UTC (rev 3726)
@@ -189,7 +189,7 @@
{
MessageCounter counter = (MessageCounter)iter.next();
- counter.onTimer();
+ counter.sample();
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java 2008-02-15 14:55:19 UTC (rev 3725)
+++ trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java 2008-02-15 15:59:01 UTC (rev 3726)
@@ -21,34 +21,43 @@
*/
package org.jboss.messaging.core.impl.server;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledFuture;
import org.jboss.aop.microcontainer.aspects.jmx.JMX;
import org.jboss.jms.client.api.ClientConnectionFactory;
import org.jboss.jms.client.impl.ClientConnectionFactoryImpl;
-import org.jboss.messaging.core.Binding;
-import org.jboss.messaging.core.MessagingServer;
-import org.jboss.messaging.core.MessagingServerManagement;
+import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
+import org.jboss.messaging.core.*;
import org.jboss.messaging.core.Queue;
-import org.jboss.messaging.util.MessageQueueNameHelper;
+import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
+import org.jboss.messaging.util.MessagingException;
/**
* This interface describes the properties and operations that comprise the management interface of the
* Messaging Server.
- *
+ * <p/>
* It includes operations to create and destroy queues and provides various statistics measures
* such as message count for queues and topics.
- *
+ *
* @author <a href="mailto:ataylor at redhat.com>Andy Taylor</a>
* @author <a href="mailto:ataylor at redhat.com>Andy Taylor</a>
- *
*/
@JMX(name = "jboss.messaging:service=MessagingServerManagement", exposedInterface = MessagingServerManagement.class)
-public class MessagingServerManagementImpl implements MessagingServerManagement
+public class MessagingServerManagementImpl implements MessagingServerManagement, MessagingComponent
{
private MessagingServer messagingServer;
+ private HashMap<String, MessageCounter> currentCounters = new HashMap<String, MessageCounter>();
+
+ private HashMap<String, ScheduledFuture> currentRunningCounters = new HashMap<String, ScheduledFuture>();
+
+ private ScheduledExecutorService scheduler;
+ private int maxMessageCounters = 20;
+
public void setMessagingServer(MessagingServer messagingServer)
{
this.messagingServer = messagingServer;
@@ -63,12 +72,13 @@
// {
// return messagingServer.getConfiguration();
// }
-//
+
+ //
public boolean isStarted()
{
return messagingServer.isStarted();
}
-
+
public void createQueue(String address, String name) throws Exception
{
messagingServer.createQueue(address, name);
@@ -89,10 +99,10 @@
return messagingServer.removeAddress(address);
}
- public ClientConnectionFactory createClientConnectionFactory(boolean strictTck,int prefetchSize)
+ public ClientConnectionFactory createClientConnectionFactory(boolean strictTck, int prefetchSize)
{
return new ClientConnectionFactoryImpl(messagingServer.getConfiguration().getMessagingServerID(),
- messagingServer.getConfiguration().getRemotingConfiguration(), messagingServer.getVersion(), messagingServer.getConfiguration().isStrictTck() || strictTck, prefetchSize);
+ messagingServer.getConfiguration().getRemotingConfiguration(), messagingServer.getVersion(), messagingServer.getConfiguration().isStrictTck() || strictTck, prefetchSize);
}
public void removeAllMessagesForAddress(String address) throws Exception
@@ -104,27 +114,190 @@
{
messagingServer.removeAllMessagesForBinding(name);
}
-
+
+ public List<Queue> getQueuesForAddress(String address) throws Exception
+ {
+ List<Queue> queues = new ArrayList<Queue>();
+ List<Binding> bindings = messagingServer.getPostOffice().getBindingsForAddress(address);
+
+ for (Binding binding : bindings)
+ {
+ Queue queue = binding.getQueue();
+ queues.add(queue);
+ }
+ return queues;
+ }
+
public int getMessageCountForQueue(String queue) throws Exception
{
return getQueue(queue).getMessageCount();
}
-
- public List<SubscriptionInfo> listAllSubscriptionsForAddress(String address) throws Exception
+
+ public int getMaxMessageCounters()
{
- return listSubscriptions(address, ListType.ALL);
+ return maxMessageCounters;
}
- public List<SubscriptionInfo> listDurableSubscriptionsForAddress(String address) throws Exception
+ public void setMaxMessageCounters(int maxMessageCounters)
{
- return listSubscriptions(address, ListType.DURABLE);
+ this.maxMessageCounters = maxMessageCounters;
}
- public List<SubscriptionInfo> listNonSubscriptionsForAddress(String address) throws Exception
+ public void registerMessageCounter(final String queueName) throws Exception
{
- return listSubscriptions(address, ListType.NON_DURABLE);
+ if (currentCounters.get(queueName) != null)
+ {
+ throw new IllegalStateException("Message Counter Already Registered");
+ }
+ Binding binding = messagingServer.getPostOffice().getBinding(queueName);
+ if (binding == null)
+ {
+ throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
+ }
+ Queue queue = binding.getQueue();
+ currentCounters.put(queueName, new MessageCounter(queue.getName(),queue, queue.isDurable(), queue.getMessageCounterHistoryDayLimit()));
}
+ public void unregisterMessageCounter(final String queueName) throws Exception
+ {
+ if (currentCounters.get(queueName) == null)
+ {
+ throw new MessagingException(MessagingException.ILLEGAL_STATE, "Counter is not registered");
+ }
+ currentCounters.remove(queueName);
+ if(currentRunningCounters.get(queueName) != null)
+ {
+ currentRunningCounters.get(queueName).cancel(true);
+ currentRunningCounters.remove(queueName);
+ }
+ }
+
+ public void startMessageCounter(final String queueName, long duration) throws Exception
+ {
+ MessageCounter messageCounter = currentCounters.get(queueName);
+ if (messageCounter == null)
+ {
+ Binding binding = messagingServer.getPostOffice().getBinding(queueName);
+ if (binding == null)
+ {
+ throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
+ }
+ Queue queue = binding.getQueue();
+ messageCounter = new MessageCounter(queue.getName(), queue, queue.isDurable(), queue.getMessageCounterHistoryDayLimit());
+ }
+ currentCounters.put(queueName, messageCounter);
+ messageCounter.resetCounter();
+ if (duration > 0)
+ {
+
+ ScheduledFuture future = scheduler.schedule(new Runnable()
+ {
+ public void run()
+ {
+ currentCounters.get(queueName).sample();
+ }
+ }, duration, TimeUnit.SECONDS);
+ currentRunningCounters.put(queueName, future);
+ }
+ }
+
+ public MessageCounter stopMessageCounter(String queueName) throws Exception
+ {
+ MessageCounter messageCounter = currentCounters.get(queueName);
+ if (messageCounter == null)
+ {
+ throw new IllegalArgumentException(queueName + "counter not registered");
+ }
+ if(currentRunningCounters.get(queueName) != null)
+ {
+ currentRunningCounters.get(queueName).cancel(true);
+ currentRunningCounters.remove(queueName);
+ }
+ messageCounter.sample();
+ return messageCounter;
+ }
+
+ public MessageCounter getMessageCounter(String queueName)
+ {
+ MessageCounter messageCounter = currentCounters.get(queueName);
+ if(messageCounter != null && currentRunningCounters.get(queueName) == null)
+ {
+ messageCounter.sample();
+ }
+ return messageCounter;
+ }
+
+
+ public Collection<MessageCounter> getMessageCounters()
+ {
+ return currentCounters.values();
+ }
+
+ public void resetMessageCounter(String queue)
+ {
+ MessageCounter messageCounter = currentCounters.get(queue);
+ if(messageCounter != null)
+ {
+ messageCounter.resetCounter();
+ }
+ }
+
+ public void resetMessageCounters()
+ {
+ Set<String> counterNames = currentCounters.keySet();
+ for (String counterName : counterNames)
+ {
+ resetMessageCounter(counterName);
+ }
+ }
+
+ public void resetMessageCounterHistory(String queue)
+ {
+ MessageCounter messageCounter = currentCounters.get(queue);
+ if(messageCounter != null)
+ {
+ messageCounter.resetHistory();
+ }
+ }
+
+ public void resetMessageCounterHistories()
+ {
+ Set<String> counterNames = currentCounters.keySet();
+ for (String counterName : counterNames)
+ {
+ resetMessageCounterHistory(counterName);
+ }
+ }
+
+ public List<MessageCounter> stopAllMessageCounters() throws Exception
+ {
+ Set<String> counterNames = currentCounters.keySet();
+ List<MessageCounter> messageCounters = new ArrayList<MessageCounter>();
+ for (String counterName : counterNames)
+ {
+ messageCounters.add(stopMessageCounter(counterName));
+ }
+ return messageCounters;
+ }
+
+ public void unregisterAllMessageCounters() throws Exception
+ {
+ Set<String> counterNames = currentCounters.keySet();
+ for (String counterName : counterNames)
+ {
+ unregisterMessageCounter(counterName);
+ }
+ }
+
+ public int getConsumerCountForQueue(String queue) throws Exception
+ {
+ return getQueue(queue).getConsumerCount();
+ }
+
+ public List<ServerConnectionEndpoint> getClients()
+ {
+ return messagingServer.getConnectionManager().getActiveConnections();
+ }
//
//// public int getDeliveringCountForQueue(String queue) throws Exception
//// {
@@ -151,21 +324,6 @@
//// return (MessageStatistics)stats.get(0);
//// }
//
-// public int getConsumerCountForQueue(String queue) throws Exception
-// {
-// return getQueue(queue).getConsumerCount();
-// }
-//
-//// public void resetMessageCounterForQueue(String queue) throws Exception
-//// {
-//// getMessageCounterForQueue(queue).resetCounter();
-//// }
-////
-//// public void resetMessageCounterHistoryForQueue(String queue) throws Exception
-//// {
-//// getMessageCounterForQueue(queue).resetHistory();
-//// }
-//
//// public List<Message> listAllMessagesForQueue(String queue) throws Exception
//// {
//// return getQueue(queue).listAllMessages(null);
@@ -221,9 +379,9 @@
// return getNonDurableMessageCountForTopic(topicName);
// }
//
-// public int getAllSubscriptionsCountForTopic(String topicName) throws Exception
+// public int getAllSubscriptionsCount(String topicName) throws Exception
// {
-// return getAllSubscriptionsCountForTopic(topicName);
+// return getAllSubscriptionsCount(topicName);
// }
//
// public int getDurableSubscriptionsCountForTopic(String topicName) throws Exception
@@ -324,7 +482,7 @@
//
// return charArray.toString();
// }
-//
+//
//// public String showPreparedTransactionsAsHTML()
//// {
//// List txs = messagingServer.getTxRepository().getPreparedTransactions();
@@ -526,99 +684,100 @@
// {
// return listMessages(queue, ListType.ALL, selector);
// }
-//
+//
// public List<Message> listDurableMessages(Queue queue, String selector) throws Exception
// {
// return listMessages(queue, ListType.DURABLE, selector);
// }
-//
+//
// public List<Message> listNonDurableMessages(Queue queue, String selector) throws Exception
// {
// return listMessages(queue, ListType.NON_DURABLE, selector);
// }
//
-//
+//
// public List<SubscriptionInfo> listDurableSubscriptions(String topicName) throws Exception
// {
// return listSubscriptions(topicName, ListType.DURABLE);
// }
-//
+//
// public List<SubscriptionInfo> listNonDurableSubscriptions(String topicName) throws Exception
// {
// return listSubscriptions(topicName, ListType.NON_DURABLE);
// }
-//
+//
// public String listAllSubscriptionsAsHTML(String topicName) throws Exception
// {
// return listSubscriptionsAsHTML(topicName, ListType.ALL);
// }
-//
+//
// public String listDurableSubscriptionsAsHTML(String topicName) throws Exception
// {
// return listSubscriptionsAsHTML(topicName, ListType.DURABLE);
// }
-//
+//
// public String listNonDurableSubscriptionsAsHTML(String topicName) throws Exception
// {
// return listSubscriptionsAsHTML(topicName, ListType.NON_DURABLE);
// }
-//
+//
// public List<Message> listDurableMessagesForSubscription(String subId, String selector) throws Exception
// {
// return listMessagesForSubscription(ListType.DURABLE, subId, selector);
// }
-//
+//
// public List<Message> listNonDurableMessagesForSubscription(String subId, String selector) throws Exception
// {
// return listMessagesForSubscription(ListType.NON_DURABLE, subId, selector);
// }
-//
+//
// public List<MessageCounter> getMessageCounters(String topicName) throws Exception
// {
// List<MessageCounter> counters = new ArrayList<MessageCounter>();
-//
+//
// Condition condition = new ConditionImpl(DestinationType.TOPIC, topicName);
-//
+//
// List<Binding> bindings = messagingServer.getPostOffice().getBindingsForCondition(condition);
-//
+//
// for (Binding binding: bindings)
// {
// Queue queue = binding.getQueue();
-//
+//
// //TODO - get message counters
-//
+//
//// String counterName = SUBSCRIPTION_MESSAGECOUNTER_PREFIX + queue.getName();
-////
+////
//// MessageCounter counter = messagingServer.getMessageCounterManager().getMessageCounter(counterName);
-////
+////
//// if (counter == null)
//// {
//// throw new IllegalStateException("Cannot find counter with name " + counterName);
//// }
-////
+////
//// counters.add(counter);
// }
-//
-// return counters;
+//
+// return counters;
// }
-//
-//
+//
+//
//// public void setMessageCounterHistoryDayLimit(String topicName, int limit) throws Exception
//// {
//// Condition condition = new ConditionImpl(DestinationType.TOPIC, topicName);
-////
+////
//// List<Binding> bindings = messagingServer.getPostOffice().getBindingsForCondition(condition);
-////
+////
//// for (Binding binding: bindings)
//// {
//// Queue queue = binding.getQueue();
-////
+////
//// queue.setMessageCounterHistoryDayLimit(limit);
-//// }
+//// }
//// }
-//
+//
// // Private ---------------------------------------------------------------------------
-//
+
+ //
private Queue getQueue(String queueName) throws Exception
{
Binding binding = messagingServer.getPostOffice().getBinding(queueName);
@@ -626,10 +785,10 @@
{
throw new IllegalArgumentException("No queue with name " + queueName);
}
-
+
return binding.getQueue();
}
-//
+//
//
//
// private List<Message> listMessages(Queue queue, ListType type, String jmsSelector) throws Exception
@@ -793,12 +952,10 @@
//
// return ret;
// }
+
+ //
+
//
- private enum ListType
- {
- ALL, DURABLE, NON_DURABLE
- }
-//
//
// private List<Message> listMessagesForSubscription(ListType type, String subId, String jmsSelector) throws Exception
// {
@@ -845,39 +1002,11 @@
//
// return msgs;
// }
-//
- private List<SubscriptionInfo> listSubscriptions(String address, ListType type) throws Exception
- {
- List<SubscriptionInfo> subs = new ArrayList<SubscriptionInfo>();
-
- List<Binding> bindings = messagingServer.getPostOffice().getBindingsForAddress(address);
-
- for (Binding binding: bindings)
- {
- Queue queue = binding.getQueue();
-
- if (type == ListType.ALL || (type == ListType.DURABLE && queue.isDurable()) || (type == ListType.NON_DURABLE && !queue.isDurable()))
- {
- String subName = null;
- String clientID = null;
-
- if (queue.isDurable())
- {
- MessageQueueNameHelper helper = MessageQueueNameHelper.createHelper(queue.getName());
- subName = helper.getSubName();
- clientID = helper.getClientId();
- }
-
- SubscriptionInfo info = new SubscriptionInfo(queue.getName(), queue.isDurable(), subName, clientID,
- queue.getFilter() == null ? null : queue.getFilter().getFilterString(), queue.getMessageCount(), queue.getMaxSize());
-
- subs.add(info);
- }
- }
-
- return subs;
- }
-//
+
+ //
+
+
+//
// private int getMessageCount(String topicName, ListType type) throws Exception
// {
// Condition condition = new ConditionImpl(DestinationType.TOPIC, topicName);
@@ -974,6 +1103,24 @@
//
// return sb.toString();
// }
-
-
+
+
+ public void start() throws Exception
+ {
+ scheduler = Executors.newScheduledThreadPool(maxMessageCounters);
+ }
+
+ public void stop() throws Exception
+ {
+ if(scheduler != null)
+ {
+ scheduler.shutdown();
+ }
+ }
+
+ protected void finalize() throws Throwable
+ {
+ super.finalize();
+
+ }
}
Modified: trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2008-02-15 14:55:19 UTC (rev 3725)
+++ trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2008-02-15 15:59:01 UTC (rev 3726)
@@ -809,7 +809,7 @@
public List listAllSubscriptionsForTopic(String s) throws Exception
{
- return getJMSServerManager().listAllSubscriptionsForTopic(s);
+ return getJMSServerManager().listSubscriptions(s);
}
More information about the jboss-cvs-commits
mailing list