[jboss-cvs] JBoss Messaging SVN: r3570 - in trunk/src/main/org/jboss: messaging/core/impl and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jan 15 16:24:02 EST 2008


Author: timfox
Date: 2008-01-15 16:24:02 -0500 (Tue, 15 Jan 2008)
New Revision: 3570

Added:
   trunk/src/main/org/jboss/jms/server/Configuration.java
   trunk/src/main/org/jboss/jms/server/TransactionRepository.java
   trunk/src/main/org/jboss/messaging/core/impl/channelfactory/
   trunk/src/main/org/jboss/messaging/core/impl/channelfactory/MultiplexerChannelFactory.java
   trunk/src/main/org/jboss/messaging/core/impl/channelfactory/XMLChannelFactory.java
   trunk/src/main/org/jboss/messaging/core/impl/messagecounter/
   trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounter.java
   trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounterManager.java
   trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageStatistics.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessageRequest.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/impl/server/
   trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java
   trunk/src/main/org/jboss/messaging/core/impl/server/SubscriptionInfo.java
Log:
Persistence refactoring part II


Added: trunk/src/main/org/jboss/jms/server/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/Configuration.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/server/Configuration.java	2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,576 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+package org.jboss.jms.server;
+
+import org.jboss.jms.server.security.Role;
+import org.jboss.messaging.util.XMLUtil;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+import java.beans.PropertyChangeListener;
+import java.beans.PropertyChangeSupport;
+import java.io.Serializable;
+import java.net.URL;
+import java.util.HashSet;
+
+/**
+ * This is the JBM configuration. It is used to configure the ServerPeer.
+ * It does this by parsing the jbm-configuration.xml configuration file. It also uses PropertyChangeSupport so users of
+ * this class can be notified on configuration changes.
+ *
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class Configuration implements Serializable
+{
+   private static final String READ_ATTR = "read";
+   private static final String WRITE_ATTR = "write";
+   private static final String CREATE_ATTR = "create";
+   private static final String NAME_ATTR = "name";
+
+   private  PropertyChangeSupport propertyChangeSupport;
+   private  Integer _serverPeerID = -1;
+   private  String _defaultQueueJNDIContext = "";
+   private  String _defaultTopicJNDIContext = "";
+   private  String _securityDomain;
+   private  HashSet<Role> _securityConfig;
+   private  String _defaultDLQ;
+   // The default maximum number of delivery attempts before sending to DLQ - can be overridden on
+   // the destination
+   private  Integer _defaultMaxDeliveryAttempts = 10;
+   protected  String _defaultExpiryQueue;
+
+   private  Long _defaultRedeliveryDelay = (long) 0;
+
+   private  Long _messageCounterSamplePeriod = (long) 10000;// Default is 1 minute
+   private  Long _failoverStartTimeout = (long) (60 * 1000);
+   // Default is 5 minutes
+   private  Long _failoverCompleteTimeout = (long) (5 * 60 * 1000);
+
+   private  Integer _defaultMessageCounterHistoryDayLimit = 1;
+
+   private  String _clusterPullConnectionFactoryName;
+
+
+   private  Boolean _useXAForMessagePull = false;
+
+   private  Boolean _defaultPreserveOrdering = false;
+
+   private  Long _recoverDeliveriesTimeout = (long) (5 * 60 * 1000);
+
+
+   private  String _suckerPassword;
+
+   //Global override for strict behaviour
+   private  Boolean _strictTck = false;
+
+   //From a system property - this overrides
+   private  Boolean _strictTckProperty = false;
+
+   private  String _postOfficeName;
+
+   private  Boolean _clustered = false;
+
+   private  Long _stateTimeout = (long) 5000;
+
+   private  Long _castTimeout = (long) 5000;
+
+   private  String _groupName;
+
+   private  String _controlChannelName;
+
+   private  String _dataChannelName;
+
+   private  String _channelPartitionName;
+
+   private  Integer _maxConcurrentReplications = 25;
+
+   private  Boolean _useJGroupsWorkaround = false;
+
+   private Integer _remotingBindAddress;
+
+   private String _remotingTimeout;
+
+   //default confog file location
+   private String configurationUrl = "jbm-configuration.xml";
+
+   public void start() throws Exception
+   {
+      propertyChangeSupport = new PropertyChangeSupport(this);
+
+      _strictTckProperty = "true".equalsIgnoreCase(System.getProperty("jboss.messaging.stricttck"));
+      _useJGroupsWorkaround = "true".equals(System.getProperty("jboss.messaging.usejgroupsworkaround"));
+
+      URL url = getClass().getClassLoader().getResource(configurationUrl);
+      Element e = XMLUtil.urlToElement(url);
+      _serverPeerID = getInteger(e, "server-peer-id", _serverPeerID);
+      _defaultQueueJNDIContext = getString(e, "default-queue-jndi-context", _defaultQueueJNDIContext);
+      _defaultTopicJNDIContext = getString(e, "default-topic-jndi-context", _defaultTopicJNDIContext);
+      _securityDomain = getString(e, "security-domain", _securityDomain);
+      _defaultDLQ = getString(e, "default-dlq", _defaultDLQ);
+      _defaultMaxDeliveryAttempts = getInteger(e, "default-max-delivery-attempts", _defaultMaxDeliveryAttempts);
+      _defaultExpiryQueue = getString(e, "default-expiry-queue", _defaultExpiryQueue);
+      _defaultRedeliveryDelay = getLong(e, "default-redelivery-delay", _defaultRedeliveryDelay);
+      _messageCounterSamplePeriod = getLong(e, "message-counter-sample-period", _messageCounterSamplePeriod);
+      _failoverStartTimeout = getLong(e, "failover-start-timeout", _failoverStartTimeout);
+      _failoverCompleteTimeout = getLong(e, "failover-complete-timeout", _failoverCompleteTimeout);
+      _defaultMessageCounterHistoryDayLimit = getInteger(e, "default-message-counter-history-day-limit", _defaultMessageCounterHistoryDayLimit);
+      _clusterPullConnectionFactoryName = getString(e, "cluster-pull-connection-factory-name", _clusterPullConnectionFactoryName);
+      _useXAForMessagePull = getBoolean(e, "use-xa-for-message-pull", _useXAForMessagePull);
+      _defaultPreserveOrdering = getBoolean(e, "default-preserve-ordering", _defaultPreserveOrdering);
+      _recoverDeliveriesTimeout = getLong(e, "recover-deliveries-timeout", _recoverDeliveriesTimeout);
+      _suckerPassword = getString(e, "sucker-password", _suckerPassword);
+      _strictTck = getBoolean(e, "strict-tck", _strictTck);
+      _postOfficeName = getString(e, "post-office-name", _postOfficeName);
+      _clustered = getBoolean(e, "clustered", _clustered);
+      _stateTimeout = getLong(e, "state-timeout", _stateTimeout);
+      _castTimeout = getLong(e, "cast-timeout", _castTimeout);
+      _groupName = getString(e, "group-name", _groupName);
+      _controlChannelName = getString(e, "control-channel-name", _controlChannelName);
+      _dataChannelName = getString(e, "data-channel-name", _dataChannelName);
+      _channelPartitionName = getString(e, "channel-partition-name", _channelPartitionName);
+      _maxConcurrentReplications = getInteger(e, "max-concurrent-replications", _maxConcurrentReplications);
+      _remotingBindAddress = getInteger(e, "remoting-bind-address", _remotingBindAddress);
+      _remotingTimeout = getString(e, "remoting-timeout", _remotingTimeout);
+      NodeList security = e.getElementsByTagName("default-security-config");
+      if (security.getLength() > 0)
+      {
+         HashSet<Role> securityConfig;
+         securityConfig = new HashSet<Role>();
+         NodeList roles = security.item(0).getChildNodes();
+         for (int k = 0; k < roles.getLength(); k++)
+         {
+            if ("role".equalsIgnoreCase(roles.item(k).getNodeName()))
+            {
+               Boolean read = roles.item(k).getAttributes().getNamedItem(READ_ATTR) != null && Boolean.valueOf(roles.item(k).getAttributes().getNamedItem(READ_ATTR).getNodeValue());
+               Boolean write = roles.item(k).getAttributes().getNamedItem(WRITE_ATTR) != null && Boolean.valueOf(roles.item(k).getAttributes().getNamedItem(WRITE_ATTR).getNodeValue());
+               Boolean create = roles.item(k).getAttributes().getNamedItem(CREATE_ATTR) != null && Boolean.valueOf(roles.item(k).getAttributes().getNamedItem(CREATE_ATTR).getNodeValue());
+               Role role = new Role(roles.item(k).getAttributes().getNamedItem(NAME_ATTR).getNodeValue(),
+                       read,
+                       write,
+                       create);
+               securityConfig.add(role);
+            }
+         }
+         _securityConfig = securityConfig;
+      }
+
+   }
+
+   private  Boolean getBoolean(Element e, String name, Boolean def)
+   {
+      NodeList nl = e.getElementsByTagName(name);
+      if (nl.getLength() > 0)
+      {
+         return Boolean.valueOf(nl.item(0).getTextContent().trim());
+      }
+      return def;
+   }
+
+
+   private  Integer getInteger(Element e, String name, Integer def)
+   {
+      NodeList nl = e.getElementsByTagName(name);
+      if (nl.getLength() > 0)
+      {
+         return Integer.valueOf(nl.item(0).getTextContent().trim());
+      }
+      return def;
+   }
+
+   private  Long getLong(Element e, String name, Long def)
+   {
+      NodeList nl = e.getElementsByTagName(name);
+      if (nl.getLength() > 0)
+      {
+         return Long.valueOf(nl.item(0).getTextContent().trim());
+      }
+      return def;
+   }
+
+   private  String getString(Element e, String name, String def)
+   {
+      NodeList nl = e.getElementsByTagName(name);
+      if (nl.getLength() > 0)
+      {
+         return nl.item(0).getTextContent().trim();
+      }
+      return def;
+   }
+
+   public  void addPropertyChangeListener(
+           PropertyChangeListener listener)
+   {
+      propertyChangeSupport.addPropertyChangeListener(listener);
+   }
+
+   public  Integer getServerPeerID()
+   {
+      return _serverPeerID;
+   }
+
+   public  void setServerPeerID(Integer serverPeerID)
+   {
+      _serverPeerID = serverPeerID;
+   }
+
+
+   public  String getDefaultQueueJNDIContext()
+   {
+      return _defaultQueueJNDIContext;
+   }
+
+   public  void setDefaultQueueJNDIContext(String defaultQueueJNDIContext)
+   {
+      _defaultQueueJNDIContext = defaultQueueJNDIContext;
+   }
+
+
+   public  String getDefaultTopicJNDIContext()
+   {
+      return _defaultTopicJNDIContext;
+   }
+
+   public  void setDefaultTopicJNDIContext(String defaultTopicJNDIContext)
+   {
+      _defaultTopicJNDIContext = defaultTopicJNDIContext;
+   }
+
+   public  void setSecurityDomain(String securityDomain) throws Exception
+   {
+      _securityDomain = securityDomain;
+   }
+
+   public  String getSecurityDomain()
+   {
+      return _securityDomain;
+   }
+
+
+   public  HashSet<Role> getSecurityConfig()
+   {
+      return _securityConfig;
+   }
+
+   public  void setSecurityConfig(HashSet<Role> securityConfig)
+   {
+      propertyChangeSupport.firePropertyChange("securityConfig", _securityConfig, securityConfig);
+      _securityConfig = securityConfig;
+   }
+
+
+   public  String getDefaultDLQ()
+   {
+      return _defaultDLQ;
+   }
+
+   public  void setDefaultDLQ(String defaultDLQ)
+   {
+      _defaultDLQ = defaultDLQ;
+   }
+
+
+   public  Integer getDefaultMaxDeliveryAttempts()
+   {
+      return _defaultMaxDeliveryAttempts;
+   }
+
+   public  void setDefaultMaxDeliveryAttempts(Integer defaultMaxDeliveryAttempts)
+   {
+      _defaultMaxDeliveryAttempts = defaultMaxDeliveryAttempts;
+   }
+
+
+   public  String getDefaultExpiryQueue()
+   {
+      return _defaultExpiryQueue;
+   }
+
+   public  void setDefaultExpiryQueue(String defaultExpiryQueue)
+   {
+      _defaultExpiryQueue = defaultExpiryQueue;
+   }
+
+
+   public  long getDefaultRedeliveryDelay()
+   {
+      return _defaultRedeliveryDelay;
+   }
+
+   public  void setDefaultRedeliveryDelay(long defaultRedeliveryDelay)
+   {
+      _defaultRedeliveryDelay = defaultRedeliveryDelay;
+   }
+
+
+   public  long getMessageCounterSamplePeriod()
+   {
+      return _messageCounterSamplePeriod;
+   }
+
+   public  void setMessageCounterSamplePeriod(long messageCounterSamplePeriod)
+   {
+      if (messageCounterSamplePeriod < 1000)
+      {
+         throw new IllegalArgumentException("Cannot set MessageCounterSamplePeriod < 1000 ms");
+      }
+      propertyChangeSupport.firePropertyChange("messageCounterSamplePeriod", _messageCounterSamplePeriod, messageCounterSamplePeriod);
+      _messageCounterSamplePeriod = messageCounterSamplePeriod;
+   }
+
+
+   public  Long getFailoverStartTimeout()
+   {
+      return _failoverStartTimeout;
+   }
+
+   public  void setFailoverStartTimeout(Long failoverStartTimeout)
+   {
+      _failoverStartTimeout = failoverStartTimeout;
+   }
+
+
+   public  Long getFailoverCompleteTimeout()
+   {
+      return _failoverCompleteTimeout;
+   }
+
+   public  void setFailoverCompleteTimeout(Long failoverCompleteTimeout)
+   {
+      _failoverCompleteTimeout = failoverCompleteTimeout;
+   }
+
+
+   public  Integer getDefaultMessageCounterHistoryDayLimit()
+   {
+      return _defaultMessageCounterHistoryDayLimit;
+   }
+
+   public  void setDefaultMessageCounterHistoryDayLimit(Integer defaultMessageCounterHistoryDayLimit)
+   {
+      if (defaultMessageCounterHistoryDayLimit < -1)
+      {
+         defaultMessageCounterHistoryDayLimit = -1;
+      }
+      _defaultMessageCounterHistoryDayLimit = defaultMessageCounterHistoryDayLimit;
+   }
+
+
+   public  String getClusterPullConnectionFactoryName()
+   {
+      return _clusterPullConnectionFactoryName;
+   }
+
+   public  void setClusterPullConnectionFactoryName(String clusterPullConnectionFactoryName)
+   {
+      _clusterPullConnectionFactoryName = clusterPullConnectionFactoryName;
+   }
+
+
+   public  Boolean isUseXAForMessagePull()
+   {
+      return _useXAForMessagePull;
+   }
+
+   public  void setUseXAForMessagePull(Boolean useXAForMessagePull)
+   {
+      _useXAForMessagePull = useXAForMessagePull;
+   }
+
+
+   public  Boolean isDefaultPreserveOrdering()
+   {
+      return _defaultPreserveOrdering;
+   }
+
+   public  void setDefaultPreserveOrdering(Boolean defaultPreserveOrdering)
+   {
+      _defaultPreserveOrdering = defaultPreserveOrdering;
+   }
+
+
+   public  Long getRecoverDeliveriesTimeout()
+   {
+      return _recoverDeliveriesTimeout;
+   }
+
+   public  void setRecoverDeliveriesTimeout(Long recoverDeliveriesTimeout)
+   {
+      _recoverDeliveriesTimeout = recoverDeliveriesTimeout;
+   }
+
+
+   public  String getSuckerPassword()
+   {
+      return _suckerPassword;
+   }
+
+   public  void setSuckerPassword(String suckerPassword)
+   {
+      _suckerPassword = suckerPassword;
+   }
+
+
+   public  Boolean isStrictTck()
+   {
+      return _strictTck || _strictTckProperty;
+   }
+
+   public  void setStrictTck(Boolean strictTck)
+   {
+      _strictTck = strictTck || _strictTckProperty;
+   }
+
+
+   public  String getPostOfficeName()
+   {
+      return _postOfficeName;
+   }
+
+   public  void setPostOfficeName(String postOfficeName)
+   {
+      _postOfficeName = postOfficeName;
+   }
+
+
+   public  Boolean isClustered()
+   {
+      return _clustered;
+   }
+
+   public  void setClustered(Boolean clustered)
+   {
+      _clustered = clustered;
+   }
+
+
+   public  Long getStateTimeout()
+   {
+      return _stateTimeout;
+   }
+
+   public  void setStateTimeout(Long stateTimeout)
+   {
+      _stateTimeout = stateTimeout;
+   }
+
+
+   public  Long getCastTimeout()
+   {
+      return _castTimeout;
+   }
+
+   public  void setCastTimeout(Long castTimeout)
+   {
+      _castTimeout = castTimeout;
+   }
+
+
+   public  String getGroupName()
+   {
+      return _groupName;
+   }
+
+   public  void setGroupName(String groupName)
+   {
+      _groupName = groupName;
+   }
+
+
+   public  String getControlChannelName()
+   {
+      return _controlChannelName;
+   }
+
+   public  void setControlChannelName(String controlChannelName)
+   {
+      _controlChannelName = controlChannelName;
+   }
+
+
+   public  String getDataChannelName()
+   {
+      return _dataChannelName;
+   }
+
+   public  void setDataChannelName(String dataChannelName)
+   {
+      _dataChannelName = dataChannelName;
+   }
+
+
+   public  String getChannelPartitionName()
+   {
+      return _channelPartitionName;
+   }
+
+   public  void setChannelPartitionName(String channelPartitionName)
+   {
+      _channelPartitionName = channelPartitionName;
+   }
+
+
+   public  Integer getMaxConcurrentReplications()
+   {
+      return _maxConcurrentReplications;
+   }
+
+   public  void setMaxConcurrentReplications(Integer maxConcurrentReplications)
+   {
+      _maxConcurrentReplications = maxConcurrentReplications;
+   }
+
+
+   public  Boolean isUseJGroupsWorkaround()
+   {
+      return _useJGroupsWorkaround;
+   }
+
+   public  void setUseJGroupsWorkaround(Boolean useJGroupsWorkaround)
+   {
+      _useJGroupsWorkaround = useJGroupsWorkaround;
+   }
+
+   public Integer getRemotingBindAddress()
+   {
+      return _remotingBindAddress;
+   }
+   
+   public void setRemotingBindAddress(Integer remotingBindAddress)
+   {
+      this._remotingBindAddress = remotingBindAddress;
+   }
+
+   public String getRemotingTimeout()
+   {
+      return _remotingTimeout;
+   }
+   
+   public String getConfigurationUrl()
+   {
+      return configurationUrl;
+   }
+
+   public void setConfigurationUrl(String configurationUrl)
+   {
+      this.configurationUrl = configurationUrl;
+   }
+}

Added: trunk/src/main/org/jboss/jms/server/TransactionRepository.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/TransactionRepository.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/server/TransactionRepository.java	2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,36 @@
+package org.jboss.jms.server;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.Transaction;
+
+
+//FIXME temp class
+public class TransactionRepository
+{
+   private Map<Xid, Transaction> map;
+   
+   public TransactionRepository()
+   {
+      map = new ConcurrentHashMap<Xid, Transaction>();
+   }
+   
+   public void addTransaction(Xid xid, Transaction transaction)
+   {
+      map.put(xid, transaction);
+   }
+   
+   public Transaction getTransaction(Xid xid)
+   {
+      return map.get(xid);
+   }
+   
+   public Transaction removeTransaction(Xid xid)
+   {
+      return map.remove(xid);
+   }
+      
+}

Added: trunk/src/main/org/jboss/messaging/core/impl/channelfactory/MultiplexerChannelFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/channelfactory/MultiplexerChannelFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/channelfactory/MultiplexerChannelFactory.java	2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,115 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.messaging.core.impl.channelfactory;
+
+import org.jboss.messaging.core.ChannelFactory;
+import org.jgroups.Channel;
+import org.jgroups.JChannelFactory;
+
+/**
+ * A ChannelFactory that will use the MBean ChannelFactory interface
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
+ * @version <tt>$Revision: 3465 $</tt>
+ * $Id: MultiplexerChannelFactory.java 3465 2007-12-10 17:32:22Z ataylor $
+ */
+public class MultiplexerChannelFactory implements ChannelFactory
+{
+
+   // Constants ------------------------------------------------------------------------------------
+
+
+   // Attributes -----------------------------------------------------------------------------------
+   JChannelFactory jChannelFactory;
+   String dataStack;
+   String controlStack;
+   String uniqueID;
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public MultiplexerChannelFactory(JChannelFactory jChannelFactory,
+                                    String uniqueID,
+                                    String controlStack,
+                                    String dataStack)
+   {
+      this.jChannelFactory = jChannelFactory;
+      this.uniqueID = uniqueID;
+      this.dataStack = dataStack;
+      this.controlStack = controlStack;
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+
+   public String getDataStack()
+   {
+      return dataStack;
+   }
+
+   public void setDataStack(String dataStack)
+   {
+      this.dataStack = dataStack;
+   }
+
+   public String getControlStack()
+   {
+      return controlStack;
+   }
+
+   public void setControlStack(String controlStack)
+   {
+      this.controlStack = controlStack;
+   }
+
+   public String getUniqueID()
+   {
+      return uniqueID;
+   }
+
+   public void setUniqueID(String uniqueID)
+   {
+      this.uniqueID = uniqueID;
+   }
+
+   public Channel createControlChannel() throws Exception
+   {
+      return jChannelFactory.createMultiplexerChannel(controlStack, uniqueID + "-CTRL", Boolean.TRUE, uniqueID);
+   }
+
+   public Channel createDataChannel() throws Exception
+   {
+      return jChannelFactory.createMultiplexerChannel(dataStack, uniqueID + "-DATA", Boolean.TRUE, uniqueID);
+   }
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   // Private  -------------------------------------------------------------------------------------
+
+   // Inner classes  -------------------------------------------------------------------------------
+
+}

Added: trunk/src/main/org/jboss/messaging/core/impl/channelfactory/XMLChannelFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/channelfactory/XMLChannelFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/channelfactory/XMLChannelFactory.java	2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,96 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.messaging.core.impl.channelfactory;
+
+import org.jboss.messaging.core.ChannelFactory;
+import org.jgroups.Channel;
+import org.jgroups.JChannel;
+import org.w3c.dom.Element;
+
+/**
+ * A ChannelFactory that will use Elements to create channels.
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:1909 $</tt>
+ * $Id:XMLJChannelFactory.java 1909 2007-01-06 06:08:03Z clebert.suconic at jboss.com $
+ */
+public class XMLChannelFactory implements ChannelFactory
+{
+
+   // Constants ------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+   Element controlConfig;
+   Element dataConfig;
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public XMLChannelFactory(Element controlConfig, Element dataConfig)
+   {
+      this.controlConfig = controlConfig;
+      this.dataConfig = dataConfig;
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   public Element getControlConfig()
+   {
+      return controlConfig;
+   }
+
+   public void setControlConfig(Element controlConfig)
+   {
+      this.controlConfig = controlConfig;
+   }
+
+   public Element getDataConfig()
+   {
+      return dataConfig;
+   }
+
+   public void setDataConfig(Element dataConfig)
+   {
+      this.dataConfig = dataConfig;
+   }
+
+   // implementation of JChannelFactory ------------------------------------------------------------
+   public Channel createControlChannel() throws Exception
+   {
+      return new JChannel(controlConfig);
+   }
+
+   public Channel createDataChannel() throws Exception
+   {
+      return new JChannel(dataConfig);
+   }
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner classes --------------------------------------------------------------------------------
+
+}

Added: trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounter.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounter.java	2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,669 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.messaging.core.impl.messagecounter;
+
+import java.text.DateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Queue;
+
+/**
+ * This class stores message count informations for a given queue
+ * 
+ * At intervals this class samples the queue for message count data
+ * 
+ * Note that the underlying queue *does not* update statistics every time a message
+ * is added since that would reall slow things down, instead we *sample* the queues at
+ * regular intervals - this means we are less intrusive on the queue
+ *
+ * @author <a href="mailto:u.schroeter at mobilcom.de">Ulf Schroeter</a>
+ * @author <a href="mailto:s.steinbacher at mobilcom.de">Stephan Steinbacher</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version $Revision: 1.8 $
+ */
+public class MessageCounter
+{
+   protected static final Logger log = Logger.getLogger(MessageCounter.class);
+
+   // destination related information
+   private String destName;
+   private String destSubscription;
+   private boolean destTopic;
+   private boolean destDurable;
+
+   // destination queue
+   private Queue destQueue;
+
+   // counter
+   private int countTotal;
+   private int countTotalLast;
+   private int depthLast;
+   private long timeLastUpdate;
+
+   // per hour day counter history
+   private int dayCounterMax;
+   private ArrayList dayCounter;
+   
+   /**
+    * Get a list of message statistics from a list of message counters
+    * 
+    * @param counter the message counters
+    * @return the message statistics
+    * @throws Exception for any error
+    */
+   public static List getMessageStatistics(List counters) throws Exception
+   {
+      List list = new ArrayList(counters.size());
+      
+      Iterator iter = counters.iterator();
+      
+      while (iter.hasNext())
+      {
+         MessageCounter counter = (MessageCounter)iter.next();
+         
+         MessageStatistics stats = new MessageStatistics();
+         stats.setName(counter.getDestinationName());
+         stats.setSubscriptionID(counter.getDestinationSubscription());
+         stats.setTopic(counter.getDestinationTopic());
+         stats.setDurable(counter.getDestinationDurable());
+         stats.setCount(counter.getCount());
+         stats.setCountDelta(counter.getCountDelta());
+         stats.setDepth(counter.getMessageCount());
+         stats.setDepthDelta(counter.getMessageCountDelta());
+         stats.setTimeLastUpdate(counter.getLastUpdate());
+         
+         list.add(stats);
+      }
+      return list;
+   }
+
+   /**
+    *    Constructor
+    *
+    * @param name             destination name
+    * @param subscription     subscription name
+    * @param queue            internal queue object
+    * @param topic            topic destination flag
+    * @param durable          durable subsciption flag
+    * @param daycountmax      max message history day count
+    */
+   public MessageCounter(String name,
+                         String subscription,
+                         Queue queue,
+                         boolean topic,
+                         boolean durable,
+                         int daycountmax)
+   {
+      // store destination related information
+      destName = name;
+      destSubscription = subscription;
+      destTopic = topic;
+      destDurable = durable;
+      destQueue = queue;      
+
+      // initialize counter
+      resetCounter();
+
+      // initialize message history
+      dayCounter = new ArrayList();
+
+      setHistoryLimit(daycountmax);
+   }
+
+   /**
+   * Get string representation
+   */
+   public String toString()
+   {
+      return getCounterAsString();
+   }
+
+   private int lastMessagesAdded;
+   
+   /*
+    * This method is called periodically to update statistics from the queue
+    */
+   public synchronized void onTimer()
+   {
+      int latestMessagesAdded = destQueue.getMessagesAdded();
+      
+      int newMessagesAdded = latestMessagesAdded - lastMessagesAdded;
+      
+      countTotal += newMessagesAdded;
+      
+      lastMessagesAdded = latestMessagesAdded;
+      
+      //update timestamp
+      timeLastUpdate = System.currentTimeMillis();
+      
+      // update message history
+      updateHistory(true);
+   }
+
+   /**
+    * Gets the related destination name
+    *
+    * @return String    destination name
+    */
+   public String getDestinationName()
+   {
+      return destName;
+   }
+
+   /**
+    * Gets the related destination subscription
+    *
+    * @return String    destination name
+    */
+   public String getDestinationSubscription()
+   {
+      return destSubscription;
+   }
+
+   /**
+    * Gets the related destination topic flag
+    *
+    * @return boolean    true: topic destination, false: queue destination
+    */
+   public boolean getDestinationTopic()
+   {
+      return destTopic;
+   }
+
+   /**
+    * Gets the related destination durable subscription flag
+    *
+    * @return boolean   true : durable subscription,
+    *                   false: non-durable subscription
+    */
+   public boolean getDestinationDurable()
+   {
+      return destDurable;
+   }
+
+   /**
+    * Gets the total message count since startup or
+    * last counter reset
+    *
+    * @return int    message count
+    */
+   public int getCount()
+   {
+      return countTotal;
+   }
+
+   /**
+    * Gets the message count delta since last method call
+    *
+    * @return int    message count delta
+    */
+   public int getCountDelta()
+   {
+      int delta = countTotal - countTotalLast;
+
+      countTotalLast = countTotal;
+
+      return delta;
+   }
+
+   /**
+    * Gets the current message count of pending messages
+    * within the destination waiting for dispatch
+    *
+    * @return int message queue depth
+    */
+   public int getMessageCount()
+   {
+      return destQueue.getMessageCount();
+   }
+
+   /**
+    * Gets the message count delta of pending messages
+    * since last method call. Therefore
+    *
+    * @return int message queue depth delta
+    */
+   public int getMessageCountDelta()
+   {
+      int current = destQueue.getMessageCount();
+      int delta = current - depthLast;
+
+      depthLast = current;
+
+      return delta;
+   }
+
+   /**
+    * Gets the timestamp of the last message add
+    *
+    * @return long      system time
+    */
+   public long getLastUpdate()
+   {
+      return timeLastUpdate;
+   }
+
+   /**
+    * Reset message counter values
+    */
+   public void resetCounter()
+   {
+      countTotal = 0;
+      countTotalLast = 0;
+      depthLast = 0;
+      timeLastUpdate = 0;
+   }
+
+   /**
+    * Get message counter data as string in format
+    *
+    *  "Topic/Queue, Name, Subscription, Durable, Count, CountDelta,
+    *  Depth, DepthDelta, Timestamp Last Increment"  
+    *
+    * @return  String   message counter data string
+    */
+   public String getCounterAsString()
+   {
+      StringBuffer ret = new StringBuffer();
+
+      // Topic/Queue
+      if (destTopic)
+         ret.append("Topic,");
+      else
+         ret.append("Queue,");
+
+      // name 
+      ret.append(destName).append(",");
+
+      // subscription
+      if (destSubscription != null)
+         ret.append(destSubscription).append(",");
+      else
+         ret.append("-,");
+
+      // Durable subscription
+      if (destTopic)
+      {
+         // Topic
+         if (destDurable)
+            ret.append("true,");
+         else
+            ret.append("false,");
+      }
+      else
+      {
+         // Queue
+         ret.append("-,");
+      }
+
+      // counter values
+      ret.append(getCount()).append(",").append(getCountDelta()).append(",").append(getMessageCount()).append(",").append(getMessageCountDelta()).append(",");
+
+      // timestamp last counter update
+      if (timeLastUpdate > 0)
+      {
+         DateFormat dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM);
+
+         ret.append(dateFormat.format(new Date(timeLastUpdate)));
+      }
+      else
+      {
+         ret.append("-");
+      }
+
+      return ret.toString();
+   }
+
+   /**
+    * Get message counter history day count limit
+    *
+    * <0: unlimited, 0: history disabled, >0: day count
+    */
+   public int getHistoryLimit()
+   {
+      return dayCounterMax;
+   }
+
+   /**
+    * Set message counter history day count limit
+    *
+    * <0: unlimited, 0: history disabled, >0: day count
+    */
+   public void setHistoryLimit(int daycountmax)
+   {
+      boolean bInitialize = false;
+
+      // store new maximum day count
+      dayCounterMax = daycountmax;
+
+      // update day counter array
+      synchronized (dayCounter)
+      {
+         if (dayCounterMax > 0)
+         {
+            // limit day history to specified day count
+            int delta = dayCounter.size() - dayCounterMax;
+
+            for (int i = 0; i < delta; i++)
+            {
+               // reduce array size to requested size by dropping
+               // oldest day counters
+               dayCounter.remove(0);
+            }
+
+            // create initial day counter when empty
+            bInitialize = dayCounter.isEmpty();
+         }
+         else if (dayCounterMax == 0)
+         {
+            // disable history
+            dayCounter.clear();
+         }
+         else
+         {
+            // unlimited day history
+
+            // create initial day counter when empty
+            bInitialize = dayCounter.isEmpty();
+         }
+
+         // optionally initialize first day counter entry
+         if (bInitialize)
+         {
+            dayCounter.add(new DayCounter(new GregorianCalendar(), true));
+         }
+      }
+   }
+
+   /**
+    * Update message counter history
+    */
+   private void updateHistory(boolean incrementCounter)
+   {
+      // check history activation
+      if (dayCounter.isEmpty())
+      {
+         return;
+      }
+
+      // calculate day difference between current date and date of last day counter entry
+      synchronized (dayCounter)
+      {
+         DayCounter counterLast = (DayCounter) dayCounter.get(dayCounter.size() - 1);
+
+         GregorianCalendar calNow = new GregorianCalendar();
+         GregorianCalendar calLast = counterLast.getDate();
+
+         // clip day time part for day delta calulation
+         calNow.clear(Calendar.AM_PM);
+         calNow.clear(Calendar.HOUR);
+         calNow.clear(Calendar.HOUR_OF_DAY);
+         calNow.clear(Calendar.MINUTE);
+         calNow.clear(Calendar.SECOND);
+         calNow.clear(Calendar.MILLISECOND);
+
+         calLast.clear(Calendar.AM_PM);
+         calLast.clear(Calendar.HOUR);
+         calLast.clear(Calendar.HOUR_OF_DAY);
+         calLast.clear(Calendar.MINUTE);
+         calLast.clear(Calendar.SECOND);
+         calLast.clear(Calendar.MILLISECOND);
+
+         long millisPerDay = 86400000; // 24 * 60 * 60 * 1000
+         long millisDelta = calNow.getTime().getTime() - calLast.getTime().getTime();
+
+         int dayDelta = (int) (millisDelta / millisPerDay);
+
+         if (dayDelta > 0)
+         {
+            // finalize last day counter
+            counterLast.finalizeDayCounter();
+
+            // add new intermediate empty day counter entries
+            DayCounter counterNew;
+
+            for (int i = 1; i < dayDelta; i++)
+            {
+               // increment date
+               calLast.add(Calendar.DAY_OF_YEAR, 1);
+
+               counterNew = new DayCounter(calLast, false);
+               counterNew.finalizeDayCounter();
+
+               dayCounter.add(counterNew);
+            }
+
+            // add new day counter entry for current day
+            counterNew = new DayCounter(calNow, false);
+
+            dayCounter.add(counterNew);
+
+            // ensure history day count limit
+            setHistoryLimit(dayCounterMax);
+         }
+
+         // update last day counter entry
+         counterLast = (DayCounter) dayCounter.get(dayCounter.size() - 1);
+         counterLast.updateDayCounter(incrementCounter);
+      }
+   }
+
+   /**
+    * Reset message counter history
+    */
+   public void resetHistory()
+   {
+      int max = dayCounterMax;
+
+      setHistoryLimit(0);
+      setHistoryLimit(max);
+   }
+
+   /**
+    * Get message counter history data as string in format
+    * 
+    * "day count\n  
+    *  Date 1, hour counter 0, hour counter 1, ..., hour counter 23\n
+    *  Date 2, hour counter 0, hour counter 1, ..., hour counter 23\n
+    *  .....
+    *  .....
+    *  Date n, hour counter 0, hour counter 1, ..., hour counter 23\n"
+    *
+    * @return  String   message history data string
+    */
+   public String getHistoryAsString()
+   {
+      String ret = "";
+
+      // ensure history counters are up to date
+      updateHistory(false);
+
+      // compile string       
+      synchronized (dayCounter)
+      {
+         // first line: history day count  
+         ret += dayCounter.size() + "\n";
+
+         // following lines: day counter data
+         for (int i = 0; i < dayCounter.size(); i++)
+         {
+            DayCounter counter = (DayCounter) dayCounter.get(i);
+
+            ret += counter.getDayCounterAsString() + "\n";
+         }
+      }
+
+      return ret;
+   }
+
+   /**
+    * Internal day counter class for one day hour based counter history
+    */
+   private static class DayCounter
+   {
+      static final int HOURS = 24;
+
+      GregorianCalendar date = null;
+      int[] counters = new int[HOURS];
+
+      /**
+       *    Constructor
+       *
+       * @param date          day counter date
+       * @param isStartDay    true  first day counter
+       *                      false follow up day counter
+       */
+      DayCounter(GregorianCalendar date, boolean isStartDay)
+      {
+         // store internal copy of creation date
+         this.date = (GregorianCalendar) date.clone();
+
+         // initialize the array with '0'- values to current hour (if it is not the
+         // first monitored day) and the rest with default values ('-1')
+         int hour = date.get(Calendar.HOUR_OF_DAY);
+
+         for (int i = 0; i < HOURS; i++)
+         {
+            if (i < hour)
+            {
+               if (isStartDay)
+                  counters[i] = -1;
+               else
+                  counters[i] = 0;
+            }
+            else
+            {
+               counters[i] = -1;
+            }
+         }
+
+         // set the array element of the current hour to '0'
+         counters[hour] = 0;
+      }
+
+      /**
+       * Gets copy of day counter date
+       *
+       * @return GregorianCalendar        day counter date
+       */
+      GregorianCalendar getDate()
+      {
+         return (GregorianCalendar) date.clone();
+      }
+
+      /**
+       * Update day counter hour array elements  
+       *
+       * @param incrementCounter      update current hour counter
+       */
+      void updateDayCounter(boolean incrementCounter)
+      {
+         // get the current hour of the day
+         GregorianCalendar cal = new GregorianCalendar();
+
+         int currentIndex = cal.get(Calendar.HOUR_OF_DAY);
+
+         // check if the last array update is more than 1 hour ago, if so fill all
+         // array elements between the last index and the current index with '0' values
+         boolean bUpdate = false;
+
+         for (int i = 0; i <= currentIndex; i++)
+         {
+            if (counters[i] > -1)
+            {
+               // found first initialized hour counter
+               // -> set all following uninitialized
+               //    counter values to 0
+               bUpdate = true;
+            }
+
+            if (bUpdate == true)
+            {
+               if (counters[i] == -1)
+                  counters[i] = 0;
+            }
+         }
+
+         // optionally increment current counter
+         if (incrementCounter)
+         {
+            counters[currentIndex]++;
+         }
+      }
+
+      /**
+       * Finalize day counter hour array elements  
+       */
+      void finalizeDayCounter()
+      {
+         // a new day has began, so fill all array elements from index to end with
+         // '0' values
+         boolean bFinalize = false;
+
+         for (int i = 0; i < HOURS; i++)
+         {
+            if (counters[i] > -1)
+            {
+               // found first initialized hour counter
+               // -> finalize all following uninitialized
+               //    counter values
+               bFinalize = true;
+            }
+
+            if (bFinalize)
+            {
+               if (counters[i] == -1)
+                  counters[i] = 0;
+            }
+         }
+      }
+
+      /**
+       * Return day counter data as string with format
+       * "Date, hour counter 0, hour counter 1, ..., hour counter 23"
+       * 
+       * @return  String   day counter data
+       */
+      String getDayCounterAsString()
+      {
+         // first element day counter date
+         DateFormat dateFormat = DateFormat.getDateInstance(DateFormat.SHORT);
+
+         String strData = dateFormat.format(date.getTime());
+
+         // append 24 comma separated hour counter values           
+         for (int i = 0; i < HOURS; i++)
+         {
+            strData += "," + counters[i];
+         }
+
+         return strData;
+      }
+   }
+}
+

Added: trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounterManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounterManager.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounterManager.java	2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,196 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.impl.messagecounter;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.MessagingComponent;
+
+import java.util.*;
+
+/**
+ * 
+ * A MessageCounterManager
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 3465 $</tt>
+ *
+ * $Id: MessageCounterManager.java 3465 2007-12-10 17:32:22Z ataylor $
+ *
+ */
+public class MessageCounterManager implements MessagingComponent
+{
+   private static final Logger log = Logger.getLogger(MessageCounterManager.class);
+   
+   private Map messageCounters;
+   
+   private boolean started;
+   
+   private Timer timer;
+   
+   private long period;
+   
+   private PingMessageCountersTask task;
+          
+   public MessageCounterManager(long period)
+   {
+      messageCounters = new HashMap();
+      
+      this.period = period;
+   }
+
+   public synchronized void start()
+   {
+      if (started)
+      {  
+         return;
+      }
+      
+      // Needs to be daemon
+      timer = new Timer(true);
+      
+      task = new PingMessageCountersTask();
+            
+      timer.schedule(task, 0, period);      
+      
+      started = true;      
+   }
+
+   public synchronized void stop()
+   { 
+      if (!started)
+      {
+         return;
+      }
+      
+      //Wait for timer task to stop
+      
+      task.stop();
+      
+      timer.cancel();
+      
+      timer = null;
+      
+      started = false;
+   }
+   
+   public synchronized void reschedule(long newPeriod)
+   {
+      boolean wasStarted = this.started;
+      
+      if (wasStarted)
+      {
+         stop();
+      }
+      
+      period = newPeriod;
+      
+      if (wasStarted)
+      {
+         start();
+      }
+   }
+   
+   public void registerMessageCounter(String name, MessageCounter counter)
+   {
+      synchronized (messageCounters)
+      {
+         messageCounters.put(name, counter);
+      }
+   }
+   
+   public MessageCounter unregisterMessageCounter(String name)
+   {
+      synchronized (messageCounters)
+      {
+         return (MessageCounter)messageCounters.remove(name);
+      }
+   }
+   
+   public Set getMessageCounters()
+   {
+      synchronized (messageCounters)
+      {
+         return new HashSet(messageCounters.values());
+      }
+   }
+   
+   public MessageCounter getMessageCounter(String name)
+   {
+      synchronized (messageCounters)
+      {
+         return (MessageCounter)messageCounters.get(name);
+      }
+   }
+   
+   public void resetAllCounters()
+   {
+      synchronized (messageCounters)
+      {
+         Iterator iter = messageCounters.values().iterator();
+         
+         while (iter.hasNext())
+         {
+            MessageCounter counter = (MessageCounter)iter.next();
+            
+            counter.resetCounter();
+         }
+      }
+   }
+   
+   public void resetAllCounterHistories()
+   {
+      synchronized (messageCounters)
+      {
+         Iterator iter = messageCounters.values().iterator();
+         
+         while (iter.hasNext())
+         {
+            MessageCounter counter = (MessageCounter)iter.next();
+            
+            counter.resetHistory();
+         }
+      }
+   }
+   
+   class PingMessageCountersTask extends TimerTask
+   {
+      public synchronized void run()
+      {
+         synchronized (messageCounters)
+         {
+            Iterator iter = messageCounters.values().iterator();
+            
+            while (iter.hasNext())
+            {
+               MessageCounter counter = (MessageCounter)iter.next();
+               
+               counter.onTimer();
+            }
+         }
+      }  
+                        
+      synchronized void stop()
+      {
+         cancel();
+      }
+   }
+}

Added: trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageStatistics.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageStatistics.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageStatistics.java	2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,340 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.messaging.core.impl.messagecounter;
+
+import java.io.Serializable;
+import java.text.DateFormat;
+import java.util.Date;
+
+//FIXME this doesn't belong here
+
+/**
+ * Message statistics
+ * 
+ * @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
+ * @version <tt>$Revision: 1.3 $</tt>
+ */
+public class MessageStatistics implements Serializable
+{
+   // Constants -----------------------------------------------------
+
+   /** The serialVersionUID */
+   static final long serialVersionUID = 8056884098781414022L;
+
+   // Attributes ----------------------------------------------------
+
+   /** Whether we are topic */
+   private boolean topic;
+
+   /** Whether we are durable */
+   private boolean durable;
+
+   /** The name */
+   private String name;
+
+   /** The subscription id */
+   private String subscriptionID;
+
+   /** The message count */
+   private int count;
+
+   /** The message count delta */
+   private int countDelta;
+
+   /** The message depth */
+   private int depth;
+
+   /** The message depth delta */
+   private int depthDelta;
+
+   /** The last update */
+   private long timeLastUpdate;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   /**
+    * Construct a new Message Statistics
+    */
+   public MessageStatistics()
+   {
+   }
+
+   // Public --------------------------------------------------------
+
+   /**
+    * Get the count.
+    * 
+    * @return Returns the count.
+    */
+   public int getCount()
+   {
+      return count;
+   }
+
+   /**
+    * Set the count.
+    * 
+    * @param count The count to set.
+    */
+   public void setCount(int count)
+   {
+      this.count = count;
+   }
+
+   /**
+    * Get the countDelta.
+    * 
+    * @return Returns the countDelta.
+    */
+   public int getCountDelta()
+   {
+      return countDelta;
+   }
+
+   /**
+    * Set the countDelta.
+    * 
+    * @param countDelta The countDelta to set.
+    */
+   public void setCountDelta(int countDelta)
+   {
+      this.countDelta = countDelta;
+   }
+
+   /**
+    * Get the depth.
+    * 
+    * @return Returns the depth.
+    */
+   public int getDepth()
+   {
+      return depth;
+   }
+
+   /**
+    * Set the depth.
+    * 
+    * @param depth The depth to set.
+    */
+   public void setDepth(int depth)
+   {
+      this.depth = depth;
+   }
+
+   /**
+    * Get the depthDelta.
+    * 
+    * @return Returns the depthDelta.
+    */
+   public int getDepthDelta()
+   {
+      return depthDelta;
+   }
+
+   /**
+    * Set the depthDelta.
+    * 
+    * @param depthDelta The depthDelta to set.
+    */
+   public void setDepthDelta(int depthDelta)
+   {
+      this.depthDelta = depthDelta;
+   }
+
+   /**
+    * Get the durable.
+    * 
+    * @return Returns the durable.
+    */
+   public boolean isDurable()
+   {
+      return durable;
+   }
+
+   /**
+    * Set the durable.
+    * 
+    * @param durable The durable to set.
+    */
+   public void setDurable(boolean durable)
+   {
+      this.durable = durable;
+   }
+
+   /**
+    * Get the name.
+    * 
+    * @return Returns the name.
+    */
+   public String getName()
+   {
+      return name;
+   }
+
+   /**
+    * Set the name.
+    * 
+    * @param name The name to set.
+    */
+   public void setName(String name)
+   {
+      this.name = name;
+   }
+
+   /**
+    * Get the subscriptionID.
+    * 
+    * @return Returns the subscriptionID.
+    */
+   public String getSubscriptionID()
+   {
+      return subscriptionID;
+   }
+
+   /**
+    * Set the subscriptionID.
+    * 
+    * @param subscriptionID The subscriptionID to set.
+    */
+   public void setSubscriptionID(String subscriptionID)
+   {
+      this.subscriptionID = subscriptionID;
+   }
+
+   /**
+    * Get the timeLastUpdate.
+    * 
+    * @return Returns the timeLastUpdate.
+    */
+   public long getTimeLastUpdate()
+   {
+      return timeLastUpdate;
+   }
+
+   /**
+    * Set the timeLastUpdate.
+    * 
+    * @param timeLastUpdate The timeLastUpdate to set.
+    */
+   public void setTimeLastUpdate(long timeLastUpdate)
+   {
+      this.timeLastUpdate = timeLastUpdate;
+   }
+
+   /**
+    * Get the topic.
+    * 
+    * @return Returns the topic.
+    */
+   public boolean isTopic()
+   {
+      return topic;
+   }
+
+   /**
+    * Set the topic.
+    * 
+    * @param topic The topic to set.
+    */
+   public void setTopic(boolean topic)
+   {
+      this.topic = topic;
+   }
+
+   /**
+    * Get message data as string in format
+    *
+    *  "Topic/Queue, Name, Subscription, Durable, Count, CountDelta,
+    *  Depth, DepthDelta, Timestamp Last Increment"  
+    *
+    * @return  String data as a string
+    */
+   public String getAsString()
+   {
+      StringBuffer buffer = new StringBuffer(50);
+
+      // Topic/Queue
+      if (topic)
+         buffer.append("Topic,");
+      else
+         buffer.append("Queue,");
+
+      // name 
+      buffer.append(name).append(',');
+
+      // subscription
+      if (subscriptionID != null)
+         buffer.append(subscriptionID).append(',');
+      else
+         buffer.append("-,");
+
+      // Durable subscription
+      if (topic)
+      {
+         // Topic
+         if (durable)
+            buffer.append("DURABLE,");
+         else
+            buffer.append("NONDURABLE,");
+      }
+      else
+      {
+         buffer.append("-,");
+      }
+
+      // counter values
+      buffer.append(count).append(',').append(countDelta).append(',').append(depth).append(',').append(depthDelta)
+            .append(',');
+
+      // timestamp last counter update
+      if (timeLastUpdate > 0)
+      {
+         DateFormat dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM);
+
+         buffer.append(dateFormat.format(new Date(timeLastUpdate)));
+      }
+      else
+      {
+         buffer.append('-');
+      }
+
+      return buffer.toString();
+   }
+
+   // Object overrides ----------------------------------------------
+
+   public String toString()
+   {
+      return getAsString();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}
\ No newline at end of file

Added: trunk/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java	2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,113 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.impl.postoffice;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import org.jboss.messaging.core.PostOffice;
+import org.jboss.messaging.util.Streamable;
+
+/**
+ * 
+ * A ClusterRequest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1917 $</tt>
+ *
+ * $Id: ClusterRequest.java 1917 2007-01-08 20:26:12Z clebert.suconic at jboss.com $
+ *
+ */
+public abstract class ClusterRequest implements Streamable
+{    
+	public static final int JOIN_CLUSTER_REQUEST = 1;
+	
+	public static final int LEAVE_CLUSTER_REQUEST = 2;
+		
+	public static final int BIND_REQUEST = 3;
+	
+	public static final int UNBIND_REQUEST = 4;
+		
+	public static final int MESSAGE_REQUEST = 5;
+	
+	protected static final int NULL = 0;
+	
+	protected static final int NOT_NULL = 1;
+	
+   /*
+    * Factory method
+    */
+   static ClusterRequest createFromStream(DataInputStream dais) throws Exception
+   {
+      byte type = dais.readByte();
+       
+      ClusterRequest request = null;
+      
+      switch (type)
+      {
+	      case MESSAGE_REQUEST:
+	      {
+	         request = new MessageRequest();
+	         break;
+	      }      
+//         case BIND_REQUEST:
+//         {
+//            request =  new BindRequest();
+//            break;
+//         }
+//         case UNBIND_REQUEST:
+//         {
+//            request = new UnbindRequest();
+//            break;
+//         }
+//         case JOIN_CLUSTER_REQUEST:
+//         {
+//         	request = new JoinClusterRequest();
+//         	break;
+//         }
+//         case LEAVE_CLUSTER_REQUEST:
+//         {
+//            request = new LeaveClusterRequest();
+//            break;
+//         }         
+         default:
+         {
+            throw new IllegalArgumentException("Invalid type: " + type);
+         }
+      }
+      
+      request.read(dais);
+      
+      return request;
+   }
+   
+   public static void writeToStream(DataOutputStream daos, ClusterRequest request) throws Exception
+   {
+      daos.writeByte(request.getType());
+      
+      request.write(daos);
+   }
+   
+   abstract Object execute(PostOffice office) throws Exception;
+   
+   abstract byte getType();
+}

Added: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessageRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessageRequest.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessageRequest.java	2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,130 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.impl.postoffice;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import org.jboss.messaging.core.Condition;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.PostOffice;
+import org.jboss.messaging.core.impl.ConditionImpl;
+import org.jboss.messaging.core.impl.MessageImpl;
+
+/**
+ * A MessageRequest
+ * 
+ * Used when sending a single message non reliably across the group
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 2202 $</tt>
+ *
+ * $Id: MessageRequest.java 2202 2007-02-08 10:50:26Z timfox $
+ *
+ */
+public class MessageRequest extends ClusterRequest
+{
+   private Condition condition;   
+   
+   private Message message;
+   
+   //private Set queueNames;
+   
+   MessageRequest()
+   {      
+   }
+   
+   MessageRequest(Condition condition, Message message)//, Set queueNames)
+   {
+      this.condition = condition;
+      
+      this.message = message;
+      
+      //this.queueNames = queueNames;
+   }
+   
+   Object execute(PostOffice office) throws Exception
+   {
+      office.routeFromCluster(condition, message);    
+      
+      return null;
+   }  
+   
+   byte getType()
+   {
+      return ClusterRequest.MESSAGE_REQUEST;
+   }
+   
+   public void read(DataInputStream in) throws Exception
+   {
+      condition = new ConditionImpl();
+      
+      condition.read(in);
+      
+      message = new MessageImpl();
+      
+      message.read(in);  
+      
+//      byte b = in.readByte();
+//      
+//      if (b != NULL)
+//      {
+//      	int size = in.readInt();
+//      	
+//      	queueNames = new HashSet(size);
+//      	
+//      	for (int i = 0; i < size; i++)
+//      	{
+//      		String queueName = in.readUTF();
+//      		
+//      		queueNames.add(queueName);
+//      	}
+//      }
+   }
+   
+   public void write(DataOutputStream out) throws Exception
+   {
+      condition.write(out);
+      
+      message.write(out);
+      
+//      if (queueNames == null)
+//      {
+//      	out.writeByte(NULL);
+//      }
+//      else
+//      {
+//      	out.writeByte(NOT_NULL);
+//      	
+//      	out.writeInt(queueNames.size());
+//      	
+//      	Iterator iter = queueNames.iterator();
+//      	
+//      	while (iter.hasNext())
+//      	{
+//      		String queueName = (String)iter.next();
+//      		
+//      		out.writeUTF(queueName);
+//      	}      	     
+//      }
+   }
+}

Added: trunk/src/main/org/jboss/messaging/core/impl/postoffice/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/PostOfficeImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/PostOfficeImpl.java	2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,369 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.impl.postoffice;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.jms.TextMessage;
+
+import org.jboss.jms.message.JBossMessage;
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Binding;
+import org.jboss.messaging.core.Condition;
+import org.jboss.messaging.core.Filter;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.PersistenceManager;
+import org.jboss.messaging.core.PostOffice;
+import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.QueueFactory;
+import org.jboss.messaging.core.TransactionSynchronization;
+import org.jboss.messaging.core.impl.BindingImpl;
+import org.jboss.messaging.util.ConcurrentHashSet;
+
+/**
+ * 
+ * A PostOfficeImpl
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class PostOfficeImpl implements PostOffice
+{  
+   private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
+   
+   private int nodeID;
+   
+  // private Map<Integer, Map<String, Queue>> queues = new HashMap<Integer, Map<String, Queue>>();
+   
+   private Map<Condition, List<Binding>> mappings = new ConcurrentHashMap<Condition, List<Binding>>();
+   
+   private Set<Condition> conditions = new ConcurrentHashSet<Condition>();
+   
+   private PersistenceManager persistenceManager;
+   
+   private QueueFactory queueFactory;
+    
+   public PostOfficeImpl(int nodeID, PersistenceManager persistenceManager, QueueFactory queueFactory)
+   {
+      this.nodeID = nodeID;
+      
+      this.persistenceManager = persistenceManager;
+      
+      this.queueFactory = queueFactory;
+   }
+      
+   // MessagingComponent implementation ---------------------------------------
+   
+   public void start() throws Exception
+   {
+      loadBindings();
+   }
+
+   public void stop() throws Exception
+   {
+      mappings.clear();
+      
+      conditions.clear();
+   }
+   
+   // PostOffice implementation -----------------------------------------------
+
+   public Queue addQueue(Condition condition, String name, Filter filter, 
+                         boolean durable, boolean temporary, boolean allNodes) throws Exception
+   {
+      Binding binding = createBinding(condition, name, filter, durable, temporary, allNodes);
+      
+      addBindingInMemory(binding);
+       
+      if (durable)
+      {
+         persistenceManager.addBinding(binding);
+      }
+      
+      return binding.getQueue();
+   }
+         
+   public boolean removeQueue(Condition condition, String name, boolean allNodes) throws Exception
+   {
+      Binding binding = removeQueueInMemory(condition, name);
+      
+      if (binding != null)
+      {
+         if (binding.getQueue().isDurable())
+         {
+            persistenceManager.deleteBinding(binding);
+         }
+         
+         return true;
+      }
+      else
+      {
+         return false;
+      }            
+   }
+   
+   public void addCondition(Condition condition)
+   {      
+      conditions.add(condition);
+   }
+   
+   public boolean removeCondition(Condition condition)
+   {      
+      return conditions.remove(condition);
+   }
+   
+   public boolean containsCondition(Condition condition)
+   {
+      return conditions.contains(condition);
+   }
+    
+   public void route(Condition condition, Message message) throws Exception
+   {
+     // boolean routeRemote = false;
+           
+      List<Binding> bindings = mappings.get(condition);
+      
+      if (bindings != null)
+      {
+         for (Binding binding: bindings)
+         {
+            Queue queue = binding.getQueue();
+            
+            if (queue.getFilter() == null || queue.getFilter().match(message))
+            {         
+               if (binding.getNodeID() == nodeID)
+               {
+                  //Local queue
+                                 
+                  message.createReference(queue);              
+               }
+               else
+               {
+//                  if (!queue.isDurable())
+//                  {
+//                     //Remote queue - we never route to remote durable queues since we will lose atomicity in event
+//                     //of crash - for moving between durable queues we use message redistribution
+//                     
+//                     routeRemote = true;                  
+//                  }               
+               }
+            }
+         }
+      }
+
+      
+//      if (routeRemote)
+//      {
+//         tx.addSynchronization(new CastMessageCallback(new MessageRequest(condition, message)));
+//      }
+   }
+   
+   public void routeFromCluster(Condition condition, Message message) throws Exception
+   {     
+      List<Binding> bindings = mappings.get(condition);
+      
+      for (Binding binding: bindings)
+      {
+         Queue queue = binding.getQueue();
+         
+         if (binding.getNodeID() == nodeID)
+         {         
+            if (queue.getFilter() == null || queue.getFilter().match(message))
+            {         
+               MessageReference ref = message.createReference(queue);
+
+               //We never route durably from other nodes - so no need to persist
+
+               queue.addLast(ref);             
+            }
+         }
+      }
+   }
+
+   public Map<Condition, List<Binding>> getMappings()
+   {
+      return mappings;
+   }
+   
+   public List<Binding> getBindingsForQueueName(String queueName)
+   {
+      List<Binding> list = new ArrayList<Binding>();
+      
+      for (List<Binding> bindings: mappings.values())
+      {
+         for (Binding binding: bindings)
+         {
+            if (binding.getQueue().getName().equals(queueName) && binding.getNodeID() == nodeID)
+            {
+               list.add(binding);
+            }
+         }
+      }
+      return list;
+   }
+   
+   public List<Binding> getBindingsForCondition(Condition condition)
+   {
+      List<Binding> list = new ArrayList<Binding>();
+      
+      List<Binding> bindings = mappings.get(condition);
+      
+      if (bindings != null)
+      {
+         for (Binding binding: bindings)
+         {
+            if (binding.getNodeID() == nodeID)
+            {
+               list.add(binding);
+            }
+         }
+      }         
+         
+      return list;
+   }
+   
+   // Private -----------------------------------------------------------------
+   
+   private Binding createBinding(Condition condition, String name, Filter filter,
+                                 boolean durable, boolean temporary, boolean allNodes)
+   {
+      Queue queue = queueFactory.createQueue(-1, name, filter, durable, temporary);
+      
+      Binding binding = new BindingImpl(this.nodeID, condition, queue, allNodes);
+      
+      return binding;
+   }
+   
+   private void addBindingInMemory(Binding binding) throws Exception
+   {      
+      List<Binding> bindings = mappings.get(binding.getCondition());
+      
+      if (bindings == null)
+      {
+         bindings = new CopyOnWriteArrayList<Binding>();
+         
+         mappings.put(binding.getCondition(), bindings);
+      }
+      
+      bindings.add(binding);
+      
+   //      Map<String, Queue> nameMap = queues.get(nodeID);
+   //      
+   //      if (nameMap == null)
+   //      {
+   //         nameMap = new HashMap<String, Queue>();
+   //         
+   //         queues.put(nodeID, nameMap);
+   //      }
+   //      
+   //      nameMap.put(name, queue);
+   }
+   
+   private Binding removeQueueInMemory(Condition condition, String name) throws Exception
+   {
+      Binding binding = null;
+      
+   //      Map<String, Queue> nameMap = queues.get(nodeID);
+   //      
+   //      nameMap.remove(name);
+   //      
+   //      if (nameMap.isEmpty())
+   //      {
+   //         queues.remove(nodeID);
+   //      }
+             
+      List<Binding> bindings = mappings.get(condition);
+                  
+      for (Iterator<Binding> iter = bindings.iterator(); iter.hasNext();)
+      {
+         Binding b = iter.next();
+         
+         if (b.getQueue().getName().equals(name))
+         {
+            binding = b;
+                                          
+            break;
+         }
+      }
+      
+      if (binding != null)
+      {
+         bindings.remove(binding);
+      }
+      
+      if (bindings.isEmpty())
+      {
+         mappings.remove(condition);
+      }
+         
+      return binding;
+   }
+   
+   private void loadBindings() throws Exception
+   {
+      List<Binding> bindings = persistenceManager.loadBindings(queueFactory);
+      
+      for (Binding binding: bindings)
+      {
+         addBindingInMemory(binding);                    
+      }
+   }
+   
+   private void deleteMappingsForNode(int theNodeID)
+   {
+      
+   }
+   
+   private class CastMessageCallback implements TransactionSynchronization
+   {
+      private ClusterRequest request;
+      
+      CastMessageCallback(ClusterRequest request)
+      {
+         this.request = request;
+      }
+      
+      public void afterCommit() throws Exception
+      {
+         //TODO - cast request
+      }
+
+      public void afterRollback() throws Exception
+      { 
+      }
+
+      public void beforeCommit() throws Exception
+      {
+      }
+
+      public void beforeRollback() throws Exception
+      {
+      } 
+   }
+}

Added: trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java	2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,480 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.impl.server;
+
+import java.beans.PropertyChangeEvent;
+import java.beans.PropertyChangeListener;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.jboss.aop.microcontainer.aspects.jmx.JMX;
+import org.jboss.jms.server.ConnectionFactoryManager;
+import org.jboss.jms.server.ConnectionManager;
+import org.jboss.jms.server.DestinationJNDIMapper;
+import org.jboss.jms.server.DestinationManager;
+import org.jboss.jms.server.MessagingTimeoutFactory;
+import org.jboss.jms.server.SecurityStore;
+import org.jboss.jms.server.TransactionRepository;
+import org.jboss.jms.server.connectionfactory.ConnectionFactoryDeployer;
+import org.jboss.jms.server.connectionfactory.ConnectionFactoryJNDIMapper;
+import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
+import org.jboss.jms.server.destination.DestinationDeployer;
+import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
+import org.jboss.jms.server.plugin.contract.JMSUserManager;
+import org.jboss.jms.server.security.SecurityMetadataStore;
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Binding;
+import org.jboss.messaging.core.Condition;
+import org.jboss.messaging.core.Configuration;
+import org.jboss.messaging.core.DestinationType;
+import org.jboss.messaging.core.MemoryManager;
+import org.jboss.messaging.core.MessagingServer;
+import org.jboss.messaging.core.PersistenceManager;
+import org.jboss.messaging.core.PostOffice;
+import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.impl.ConditionImpl;
+import org.jboss.messaging.core.impl.QueueFactoryImpl;
+import org.jboss.messaging.core.impl.messagecounter.MessageCounterManager;
+import org.jboss.messaging.core.impl.postoffice.PostOfficeImpl;
+import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.newcore.impl.memory.SimpleMemoryManager;
+import org.jboss.messaging.util.ExceptionUtil;
+import org.jboss.messaging.util.Version;
+
+/**
+ * A Messaging Server
+ *
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:juha at jboss.org">Juha Lindfors</a>
+ * @author <a href="mailto:aslak at conduct.no">Aslak Knutsen</a>
+ * @author <a href="mailto:ataylor at redhat.com>Andy Taylor</a>
+ * @version <tt>$Revision: 3543 $</tt>
+ *          <p/>
+ *          $Id: ServerPeer.java 3543 2008-01-07 22:31:58Z clebert.suconic at jboss.com $
+ */
+ at JMX(name = "jboss.messaging:service=MessagingServer", exposedInterface = MessagingServer.class)
+public class MessagingServerImpl implements MessagingServer
+{
+   // Constants ------------------------------------------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(MessagingServerImpl.class);
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   private Version version;
+
+   private boolean started;
+
+   //private boolean supportsFailover = true;
+
+   private Map<String, ServerSessionEndpoint> sessions;
+
+   // wired components
+
+   private DestinationManager destinationJNDIMapper;
+   private SecurityMetadataStore securityStore;
+   private ConnectionFactoryJNDIMapper connFactoryJNDIMapper;
+   private SimpleConnectionManager connectionManager;
+   private MemoryManager memoryManager;
+   private MessageCounterManager messageCounterManager;
+   private TransactionRepository transactionRepository = new TransactionRepository();
+   private PostOffice postOffice;
+   private DestinationDeployer destinationDeployer;
+   private ConnectionFactoryDeployer connectionFactoryDeployer;
+
+   // plugins
+
+   private PersistenceManager persistenceManager;
+
+   private JMSUserManager jmsUserManager;
+      
+   private MinaService minaService;
+   
+   private Configuration configuration;
+
+   // Constructors ---------------------------------------------------------------------------------
+   public MessagingServerImpl() throws Exception
+   {
+      // Some wired components need to be started here
+
+      version = Version.instance();
+
+      sessions = new ConcurrentHashMap<String, ServerSessionEndpoint>();
+
+      started = false;
+   }
+
+   // lifecycle methods ----------------------------------------------------------------
+
+   public synchronized void start() throws Exception
+   {
+      try
+      {
+         log.debug("starting MessagingServer");
+
+         if (started)
+         {
+            return;
+         }
+
+         if (configuration.getMessagingServerID() < 0)
+         {
+            throw new IllegalStateException("MessagingServer ID not set");
+         }
+
+         log.debug(this + " starting");
+
+         // Create the wired components
+
+         securityStore = new SecurityMetadataStore(this);         
+         destinationJNDIMapper = new DestinationJNDIMapper(this);
+         connFactoryJNDIMapper = new ConnectionFactoryJNDIMapper(this);
+         connectionManager = new SimpleConnectionManager();
+         memoryManager = new SimpleMemoryManager();
+         destinationDeployer = new DestinationDeployer(this);
+         connectionFactoryDeployer = new ConnectionFactoryDeployer(this, minaService);
+         messageCounterManager = new MessageCounterManager(configuration.getMessageCounterSamplePeriod());
+         configuration.addPropertyChangeListener(new PropertyChangeListener()
+         {
+            public void propertyChange(PropertyChangeEvent evt)
+            {
+               if(evt.getPropertyName().equals("messageCounterSamplePeriod"))
+                  messageCounterManager.reschedule(configuration.getMessageCounterSamplePeriod());
+            }
+         });
+         postOffice = new PostOfficeImpl(configuration.getMessagingServerID(), 
+                                         persistenceManager, new QueueFactoryImpl());
+
+         // Start the wired components
+
+         destinationJNDIMapper.start();
+         connFactoryJNDIMapper.start();
+         connectionManager.start();
+         memoryManager.start();         
+         securityStore.start();
+         connectionFactoryDeployer.start();
+         destinationDeployer.start();
+         postOffice.start();
+         
+         started = true;
+         log.info("JBoss Messaging " + getVersion().getProviderVersion() + " server [" +
+                 configuration.getMessagingServerID() + "] started");
+      }
+      catch (Throwable t)
+      {
+         throw ExceptionUtil.handleJMXInvocation(t, this + " startService");
+      }
+   }
+
+   public synchronized void stop() throws Exception
+   {
+      try
+      {
+         if (!started)
+         {
+            return;
+         }
+
+         log.info(this + " is Stopping. NOTE! Stopping the server peer cleanly will NOT cause failover to occur");
+
+         started = false;
+
+         // Stop the wired components
+         destinationDeployer.stop();
+         destinationDeployer = null;
+         connectionFactoryDeployer.stop();
+         connectionFactoryDeployer = null;
+         destinationJNDIMapper.stop();
+         destinationJNDIMapper = null;
+         connFactoryJNDIMapper.stop();
+         connFactoryJNDIMapper = null;
+         connectionManager.stop();
+         connectionManager = null;
+         memoryManager.stop();
+         memoryManager = null;
+         securityStore.stop();
+         messageCounterManager.stop();
+         messageCounterManager = null;
+         postOffice.stop();
+         postOffice = null;
+
+         MessagingTimeoutFactory.instance.reset();
+
+         log.info("JMS " + this + " stopped");
+      }
+      catch (Throwable t)
+      {
+         throw ExceptionUtil.handleJMXInvocation(t, this + " stopService");
+      }
+   }
+   
+
+   // MessagingServer implementation -----------------------------------------------------------
+   
+   public Version getVersion()
+   {
+      return version;
+   }
+   
+   public Configuration getConfiguration()
+   {
+      return configuration;
+   }
+   
+   public boolean isStarted()
+   {
+      return started;
+   }
+
+   public void setConfiguration(Configuration configuration)
+   {
+      this.configuration = configuration;
+   }
+   
+   public void setMinaService(MinaService minaService)
+   {
+      this.minaService = minaService;
+   }
+   
+   public MinaService getMinaService()
+   {
+      return minaService;
+   }   
+
+   public ServerSessionEndpoint getSession(String sessionID)
+   {
+      return (ServerSessionEndpoint) sessions.get(sessionID);
+   }
+
+   public Collection<ServerSessionEndpoint> getSessions()
+   {
+      return sessions.values();
+   }
+
+   public void addSession(String id, ServerSessionEndpoint session)
+   {
+      sessions.put(id, session);
+   }
+
+   public void removeSession(String id)
+   {
+      if (sessions.remove(id) == null)
+      {
+         throw new IllegalStateException("Cannot find session with id " + id + " to remove");
+      }
+   }
+
+   public synchronized Queue getDefaultDLQInstance() throws Exception
+   {
+      if (configuration.getDefaultDLQ() != null)
+      {
+         List<Binding> bindings = postOffice.getBindingsForQueueName(configuration.getDefaultDLQ());
+         
+         if (bindings.isEmpty())
+         {
+            throw new IllegalStateException("Cannot find binding for queue " + configuration.getDefaultDLQ());
+         }
+
+         return bindings.get(0).getQueue();
+      }
+      else
+      {
+         return null;
+      }   
+   }
+
+   public synchronized Queue getDefaultExpiryQueueInstance() throws Exception
+   {
+      if (configuration.getDefaultExpiryQueue() != null)
+      {
+         List<Binding> bindings = postOffice.getBindingsForQueueName(configuration.getDefaultExpiryQueue());
+         
+         if (bindings.isEmpty())
+         {
+            throw new IllegalStateException("Cannot find binding for queue " + configuration.getDefaultExpiryQueue());
+         }
+
+         return bindings.get(0).getQueue();
+      }
+      else
+      {
+         return null;
+      }  
+   }
+   
+   public void enableMessageCounters()
+   {
+      messageCounterManager.start();
+   }
+
+   public void disableMessageCounters()
+   {
+      messageCounterManager.stop();
+
+      messageCounterManager.resetAllCounters();
+
+      messageCounterManager.resetAllCounterHistories();
+   }
+
+   public void createQueue(String name, String jndiName) throws Exception
+   {
+      destinationDeployer.createQueue(name, jndiName);
+   }
+   
+   public void destroyQueue(String name, String jndiName) throws Exception
+   {
+      destinationDeployer.destroyQueue(name, jndiName);
+   }
+   
+   public void createTopic(String name, String jndiName) throws Exception
+   {
+      this.destinationDeployer.createTopic(name, jndiName);
+   }
+
+   public void destroyTopic(String name, String jndiName) throws Exception
+   {
+      destinationDeployer.destroyTopic(name, jndiName);
+   }
+
+   public void resetAllMessageCounters()
+   {
+      this.messageCounterManager.resetAllCounters();
+   }
+
+   public void resetAllMessageCounterHistories()
+   {
+      this.messageCounterManager.resetAllCounterHistories();
+   }
+
+   public void removeAllMessagesForQueue(String queueName) throws Exception
+   {
+      Condition condition = new ConditionImpl(DestinationType.QUEUE, queueName);
+      
+      List<Binding> bindings = postOffice.getBindingsForCondition(condition);
+      
+      if (!bindings.isEmpty())
+      {
+         Queue queue = bindings.get(0).getQueue();
+         
+         persistenceManager.deleteAllReferences(queue);
+         
+         queue.removeAllReferences();
+      }
+   }
+
+   public void removeAllMessagesForTopic(String queueName) throws Exception
+   {
+      Condition condition = new ConditionImpl(DestinationType.QUEUE, queueName);
+      
+      List<Binding> bindings = postOffice.getBindingsForCondition(condition);
+      
+      for (Binding binding: bindings)
+      {
+         Queue queue = binding.getQueue();
+         
+         if (queue.isDurable())
+         {
+            persistenceManager.deleteAllReferences(queue);
+         }
+         
+         queue.removeAllReferences();
+      }
+   }
+  
+   public SecurityStore getSecurityManager()
+   {
+      return securityStore;
+   }
+
+   public DestinationManager getDestinationManager()
+   {
+      return destinationJNDIMapper;
+   }
+
+   public ConnectionFactoryManager getConnectionFactoryManager()
+   {
+      return connFactoryJNDIMapper;
+   }
+
+   public ConnectionManager getConnectionManager()
+   {
+      return connectionManager;
+   }
+
+   public MemoryManager getMemoryManager()
+   {
+      return memoryManager;
+   }
+
+   public TransactionRepository getTransactionRepository()
+   {
+      return transactionRepository;
+   }
+   
+   public PersistenceManager getPersistenceManager()
+   {
+      return persistenceManager;
+   }
+
+   public void setPersistenceManager(PersistenceManager persistenceManager)
+   {
+      this.persistenceManager = persistenceManager;
+   }
+
+   public JMSUserManager getJmsUserManagerInstance()
+   {
+      return jmsUserManager;
+   }
+
+   public void setJmsUserManager(JMSUserManager jmsUserManager)
+   {
+      this.jmsUserManager = jmsUserManager;
+   }
+
+   public PostOffice getPostOffice()
+   {
+      return postOffice;
+   }
+
+   public void setPostOffice(PostOffice postOffice)
+   {
+      this.postOffice = postOffice;
+   }
+
+   public String toString()
+   {
+      return "MessagingServer[" + configuration.getMessagingServerID() + "]";
+   }
+   
+   // Public ---------------------------------------------------------------------------------------
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   // Private -------------------------------------------------------------------------------------- 
+}

Added: trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java	2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,981 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+package org.jboss.messaging.core.impl.server;
+
+import java.io.CharArrayWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.jboss.aop.microcontainer.aspects.jmx.JMX;
+import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
+import org.jboss.jms.server.selector.Selector;
+import org.jboss.messaging.core.Binding;
+import org.jboss.messaging.core.Condition;
+import org.jboss.messaging.core.Configuration;
+import org.jboss.messaging.core.DestinationType;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.MessagingServer;
+import org.jboss.messaging.core.MessagingServerManagement;
+import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.impl.ConditionImpl;
+import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
+import org.jboss.messaging.util.MessageQueueNameHelper;
+
+/**
+ * This interface describes the properties and operations that comprise the management interface of the
+ * Messaging Server.
+ * 
+ * It includes operations to create and destroy queues and provides various statistics measures
+ * such as message count for queues and topics.
+ * 
+ * @author <a href="mailto:ataylor at redhat.com>Andy Taylor</a>
+ * @author <a href="mailto:ataylor at redhat.com>Andy Taylor</a>
+ * 
+ */
+ at JMX(name = "jboss.messaging:service=MessagingServerManagement", exposedInterface = MessagingServerManagement.class)
+public class MessagingServerManagementImpl implements MessagingServerManagement
+{
+   private MessagingServer messagingServer;
+
+   public void setMessagingServer(MessagingServer messagingServer)
+   {
+      this.messagingServer = messagingServer;
+   }
+
+   public String getServerVersion()
+   {      
+      return messagingServer.getVersion().getProviderVersion();
+   }
+
+   public Configuration getConfiguration()
+   {
+      return messagingServer.getConfiguration();
+   }
+   
+   public boolean isStarted()
+   {
+      return messagingServer.isStarted();
+   }
+   
+   public void createQueue(String name, String jndiName) throws Exception
+   {
+      messagingServer.createQueue(name, jndiName);
+   }
+   
+   public void createTopic(String name, String jndiName) throws Exception
+   {
+      messagingServer.createTopic(name, jndiName);
+   }
+   
+   public void destroyQueue(String name, String jndiName) throws Exception
+   {
+      messagingServer.destroyQueue(name, jndiName);
+   }
+   
+   public void destroyTopic(String name, String jndiName) throws Exception
+   {
+      messagingServer.destroyTopic(name, jndiName);
+   }
+
+   public void removeAllMessagesForQueue(String queueName) throws Exception
+   {
+      messagingServer.removeAllMessagesForQueue(queueName);
+   }
+
+   public void removeAllMessagesForTopic(String topicName) throws Exception
+   {
+      messagingServer.removeAllMessagesForTopic(topicName);
+   }
+   
+   public int getMessageCountForQueue(String queue) throws Exception
+   {
+      return getQueue(queue).getMessageCount();
+   }
+  
+   public List<SubscriptionInfo> listAllSubscriptionsForTopic(String topicName) throws Exception
+   {
+      return listAllSubscriptions(topicName);
+   }
+
+//   public int getDeliveringCountForQueue(String queue) throws Exception
+//   {
+//      return getQueue(queue).getDeliveringCount();
+//   }
+
+   public int getScheduledMessageCountForQueue(String queue) throws Exception
+   {
+      return getQueue(queue).getScheduledCount();
+   }
+
+//   public MessageCounter getMessageCounterForQueue(String queue) throws Exception
+//   {
+//      return getQueue(queue).getMessageCounter();
+//   }
+
+//   public MessageStatistics getMessageStatisticsForQueue(String queue) throws Exception
+//   {
+//      List counters = new ArrayList();
+//      counters.add(getQueue(queue).getMessageCounter());
+//
+//      List stats = MessageCounter.getMessageStatistics(counters);
+//
+//      return (MessageStatistics)stats.get(0);
+//   }
+
+   public int getConsumerCountForQueue(String queue) throws Exception
+   {
+      return getQueue(queue).getConsumerCount();
+   }
+
+//   public void resetMessageCounterForQueue(String queue) throws Exception
+//   {
+//      getMessageCounterForQueue(queue).resetCounter();
+//   }
+//
+//   public void resetMessageCounterHistoryForQueue(String queue) throws Exception
+//   {
+//      getMessageCounterForQueue(queue).resetHistory();
+//   }
+
+//   public List<Message> listAllMessagesForQueue(String queue) throws Exception
+//   {
+//      return getQueue(queue).listAllMessages(null);
+//   }
+
+   public List<Message> listAllMessagesForQueue(String queue, String selector) throws Exception
+   {
+      return listAllMessages(getQueue(queue), selector);
+   }
+
+   public List<Message> listDurableMessagesForQueue(String queue) throws Exception
+   {
+      return listDurableMessages(getQueue(queue), null);
+   }
+
+   public List<Message> listDurableMessagesForQueue(String queue, String selector) throws Exception
+   {
+      return listDurableMessages(getQueue(queue), selector);
+   }
+
+   public List<Message> listNonDurableMessagesForQueue(String queue) throws Exception
+   {
+      return listNonDurableMessages(getQueue(queue), null);
+   }
+
+   public List<Message> listNonDurableMessagesForQueue(String queue, String selector) throws Exception
+   {
+      return listNonDurableMessages(getQueue(queue), selector);
+   }
+
+//   public String listMessageCounterAsHTMLForQueue(String queue) throws Exception
+//   {
+//      return listMessageCounterAsHTML(new MessageCounter[] { getMessageCounterForQueue(queue) });
+//   }
+//
+//   public String listMessageCounterHistoryAsHTMLForQueue(String queue) throws Exception
+//   {
+//      return listMessageCounterHistoryAsHTML(new MessageCounter[] {getMessageCounterForQueue(queue)});
+//   }
+
+   public int getAllMessageCountForTopic(String topicName) throws Exception
+   {
+      return getAllMessageCountForTopic(topicName);
+   }
+
+   public int getDurableMessageCountForTopic(String topicName) throws Exception
+   {
+      return getDurableMessageCountForTopic(topicName);
+   }
+
+   public int getNonDurableMessageCountForTopic(String topicName) throws Exception
+   {
+      return getNonDurableMessageCountForTopic(topicName);
+   }
+
+   public int getAllSubscriptionsCountForTopic(String topicName) throws Exception
+   {
+      return getAllSubscriptionsCountForTopic(topicName);
+   }
+
+   public int getDurableSubscriptionsCountForTopic(String topicName) throws Exception
+   {
+      return getDurableSubscriptionsCountForTopic(topicName);
+   }
+
+   public int getNonDurableSubscriptionsCountForTopic(String topicName) throws Exception
+   {
+      return getNonDurableSubscriptionsCountForTopic(topicName);
+   }
+
+
+
+
+   public List<SubscriptionInfo> listDurableSubscriptionsForTopic(String topicName) throws Exception
+   {
+      return listDurableSubscriptions(topicName);
+   }
+
+   public List<SubscriptionInfo> listNonDurableSubscriptionsForTopic(String topicName) throws Exception
+   {
+      return listNonDurableSubscriptions(topicName);
+   }
+
+   public String listAllSubscriptionsAsHTMLForTopic(String topicName) throws Exception
+   {
+      return listAllSubscriptionsAsHTML(topicName);
+   }
+
+   public String listDurableSubscriptionsAsHTMLForTopic(String topicName) throws Exception
+   {
+      return listDurableSubscriptionsAsHTML(topicName);
+   }
+
+   public String listNonDurableSubscriptionsAsHTMLForTopic(String topicName) throws Exception
+   {
+      return listNonDurableSubscriptionsAsHTML(topicName);
+   }
+
+//   public List<Message> listAllMessagesForSubscription(String subscriptionId) throws Exception
+//   {
+//      return listAllMessagesForSubscription(subscriptionId, null);
+//   }
+//
+//   public List<Message> listAllMessagesForSubscription(String subscriptionId, String selector) throws Exception
+//   {
+//      return listAllMessages(subscriptionId, selector);
+//   }
+//
+//   public List<Message> listDurableMessagesForSubscription(String subscriptionId) throws Exception
+//   {
+//      return listDurableMessages(subscriptionId, null);
+//   }
+//
+//   public List<Message> listDurableMessagesForTopic(String topicName, String subscriptionId, String selector) throws Exception
+//   {
+//      return listDurableMessages(topicName, subscriptionId, selector);
+//   }
+//
+//   public List<Message> listNonDurableMessagesForTopic(String topicName, String subscriptionId) throws Exception
+//   {
+//      return listNonDurableMessages(subscriptionId, null);
+//   }
+//
+//   public List<Message> listNonDurableMessagesForTopic(String topicName, String subscriptionId, String selector) throws Exception
+//   {
+//      return listNonDurableMessages(subscriptionId, selector);
+//   }
+
+   public List<MessageCounter> getMessageCountersForTopic(String topicName) throws Exception
+   {
+      return getMessageCounters(topicName);
+   }
+
+   public String showActiveClientsAsHTML() throws Exception
+   {
+      CharArrayWriter charArray = new CharArrayWriter();
+      PrintWriter out = new PrintWriter(charArray);
+
+      List endpoints = messagingServer.getConnectionManager().getActiveConnections();
+
+      out.println("<table><tr><td>ID</td><td>Host</td><td>User</td><td>#Sessions</td></tr>");
+      for (Iterator iter = endpoints.iterator(); iter.hasNext();)
+      {
+         ServerConnectionEndpoint endpoint = (ServerConnectionEndpoint) iter.next();
+
+         out.println("<tr>");
+         out.println("<td>" + endpoint.toString() + "</td>");
+         // FIXME display URI of client
+         out.println("<td>" + endpoint.getUsername() + "</td>");
+         out.println("<td>" + endpoint.getSessions().size() + "</td>");
+         out.println("</tr>");
+      }
+
+      out.println("</table>");
+
+
+      return charArray.toString();
+   }
+   
+//   public String showPreparedTransactionsAsHTML()
+//   {
+//      List txs = messagingServer.getTxRepository().getPreparedTransactions();
+//      JBossStringBuilder buffer = new JBossStringBuilder();
+//      buffer.append("<table width=\"100%\" border=\"1\" cellpadding=\"1\" cellspacing=\"1\">");
+//      buffer.append("<tr><th>Xid</th></tr>");
+//      for (Iterator i = txs.iterator(); i.hasNext();)
+//      {
+//         Xid xid = (Xid) i.next();
+//         if (xid != null)
+//         {
+//            buffer.append("<tr><td>");
+//            buffer.append(xid);
+//            buffer.append("</td></tr>");
+//         }
+//      }
+//      buffer.append("</table>");
+//      return buffer.toString();
+//   }
+
+//   public String listMessageCountersAsHTML() throws Exception
+//   {
+//      List counters = messagingServer.getMessageCounters();
+//
+//      Collections.sort(counters, new Comparator()
+//      {
+//         public int compare(Object o1, Object o2)
+//         {
+//            MessageCounter m1 = (MessageCounter) o1;
+//            MessageCounter m2 = (MessageCounter) o2;
+//            return m1.getDestinationName().compareTo(m2.getDestinationName());
+//         }
+//      });
+//
+//      String ret =
+//              "<table width=\"100%\" border=\"1\" cellpadding=\"1\" cellspacing=\"1\">"
+//                      + "<tr>"
+//                      + "<th>Type</th>"
+//                      + "<th>Name</th>"
+//                      + "<th>Subscription</th>"
+//                      + "<th>Durable</th>"
+//                      + "<th>Count</th>"
+//                      + "<th>CountDelta</th>"
+//                      + "<th>Depth</th>"
+//                      + "<th>DepthDelta</th>"
+//                      + "<th>Last Add</th>"
+//                      + "</tr>";
+//
+//      String strNameLast = null;
+//      String strTypeLast = null;
+//      String strDestLast = null;
+//
+//      String destData = "";
+//      int destCount = 0;
+//
+//      int countTotal = 0;
+//      int countDeltaTotal = 0;
+//      int depthTotal = 0;
+//      int depthDeltaTotal = 0;
+//
+//      int i = 0; // define outside of for statement, so variable
+//      // still exists after for loop, because it is
+//      // needed during output of last module data string
+//
+//      Iterator iter = counters.iterator();
+//
+//      while (iter.hasNext())
+//      {
+//         MessageCounter counter = (MessageCounter) iter.next();
+//
+//         // get counter data
+//         StringTokenizer tokens = new StringTokenizer(counter.getCounterAsString(), ",");
+//
+//         String strType = tokens.nextToken();
+//         String strName = tokens.nextToken();
+//         String strSub = tokens.nextToken();
+//         String strDurable = tokens.nextToken();
+//
+//         String strDest = strType + "-" + strName;
+//
+//         String strCount = tokens.nextToken();
+//         String strCountDelta = tokens.nextToken();
+//         String strDepth = tokens.nextToken();
+//         String strDepthDelta = tokens.nextToken();
+//         String strDate = tokens.nextToken();
+//
+//         // update total count / depth values
+//         countTotal += Integer.parseInt(strCount);
+//         depthTotal += Integer.parseInt(strDepth);
+//
+//         countDeltaTotal += Integer.parseInt(strCountDelta);
+//         depthDeltaTotal += Integer.parseInt(strDepthDelta);
+//
+//         if (strCountDelta.equalsIgnoreCase("0"))
+//            strCountDelta = "-"; // looks better
+//
+//         if (strDepthDelta.equalsIgnoreCase("0"))
+//            strDepthDelta = "-"; // looks better
+//
+//         // output destination counter data as HTML table row
+//         // ( for topics with multiple subscriptions output
+//         //   type + name field as rowspans, looks better )
+//         if (strDestLast != null && strDestLast.equals(strDest))
+//         {
+//            // still same destination -> append destination subscription data
+//            destData += "<tr bgcolor=\"#" + ((i % 2) == 0 ? "FFFFFF" : "F0F0F0") + "\">";
+//            destCount += 1;
+//         }
+//         else
+//         {
+//            // startnew destination data
+//            if (strDestLast != null)
+//            {
+//               // store last destination data string
+//               ret += "<tr bgcolor=\"#"
+//                       + ((i % 2) == 0 ? "FFFFFF" : "F0F0F0")
+//                       + "\"><td rowspan=\""
+//                       + destCount
+//                       + "\">"
+//                       + strTypeLast
+//                       + "</td><td rowspan=\""
+//                       + destCount
+//                       + "\">"
+//                       + strNameLast
+//                       + "</td>"
+//                       + destData;
+//
+//               destData = "";
+//            }
+//
+//            destCount = 1;
+//         }
+//
+//         // counter data row
+//         destData += "<td>"
+//                 + strSub
+//                 + "</td>"
+//                 + "<td>"
+//                 + strDurable
+//                 + "</td>"
+//                 + "<td>"
+//                 + strCount
+//                 + "</td>"
+//                 + "<td>"
+//                 + strCountDelta
+//                 + "</td>"
+//                 + "<td>"
+//                 + strDepth
+//                 + "</td>"
+//                 + "<td>"
+//                 + strDepthDelta
+//                 + "</td>"
+//                 + "<td>"
+//                 + strDate
+//                 + "</td>";
+//
+//         // store current destination data for change detection
+//         strTypeLast = strType;
+//         strNameLast = strName;
+//         strDestLast = strDest;
+//      }
+//
+//      if (strDestLast != null)
+//      {
+//         // store last module data string
+//         ret += "<tr bgcolor=\"#"
+//                 + ((i % 2) == 0 ? "FFFFFF" : "F0F0F0")
+//                 + "\"><td rowspan=\""
+//                 + destCount
+//                 + "\">"
+//                 + strTypeLast
+//                 + "</td><td rowspan=\""
+//                 + destCount
+//                 + "\">"
+//                 + strNameLast
+//                 + "</td>"
+//                 + destData;
+//      }
+//
+//      // append summation info
+//      ret += "<tr>"
+//              + "<td><![CDATA[ ]]></td><td><![CDATA[ ]]></td>"
+//              + "<td><![CDATA[ ]]></td><td><![CDATA[ ]]></td><td>"
+//              + countTotal
+//              + "</td><td>"
+//              + (countDeltaTotal == 0 ? "-" : Integer.toString(countDeltaTotal))
+//              + "</td><td>"
+//              + depthTotal
+//              + "</td><td>"
+//              + (depthDeltaTotal == 0 ? "-" : Integer.toString(depthDeltaTotal))
+//              + "</td><td>Total</td></tr></table>";
+//
+//      return ret;
+//   }
+
+
+
+   public List<Message> listAllMessages(Queue queue, String selector) throws Exception
+   {
+      return listMessages(queue, ListType.ALL, selector);
+   }
+   
+   public List<Message> listDurableMessages(Queue queue, String selector) throws Exception
+   {
+      return listMessages(queue, ListType.DURABLE, selector);
+   }
+   
+   public List<Message> listNonDurableMessages(Queue queue, String selector) throws Exception
+   {
+      return listMessages(queue, ListType.NON_DURABLE, selector);
+   }
+         
+   public List<SubscriptionInfo> listAllSubscriptions(String topicName) throws Exception
+   {
+      return listSubscriptions(topicName, ListType.ALL);
+   }
+   
+   public List<SubscriptionInfo> listDurableSubscriptions(String topicName) throws Exception
+   {
+      return listSubscriptions(topicName, ListType.DURABLE);
+   }
+   
+   public List<SubscriptionInfo> listNonDurableSubscriptions(String topicName) throws Exception
+   {
+      return listSubscriptions(topicName, ListType.NON_DURABLE);
+   }
+   
+   public String listAllSubscriptionsAsHTML(String topicName) throws Exception
+   {
+      return listSubscriptionsAsHTML(topicName, ListType.ALL);
+   }
+   
+   public String listDurableSubscriptionsAsHTML(String topicName) throws Exception
+   {
+      return listSubscriptionsAsHTML(topicName, ListType.DURABLE);
+   }
+   
+   public String listNonDurableSubscriptionsAsHTML(String topicName) throws Exception
+   {
+      return listSubscriptionsAsHTML(topicName, ListType.NON_DURABLE);
+   }
+      
+   public List<Message> listDurableMessagesForSubscription(String subId, String selector) throws Exception
+   {
+      return listMessagesForSubscription(ListType.DURABLE, subId, selector);
+   }
+   
+   public List<Message> listNonDurableMessagesForSubscription(String subId, String selector) throws Exception
+   {
+      return listMessagesForSubscription(ListType.NON_DURABLE, subId, selector);
+   }
+   
+   public List<MessageCounter> getMessageCounters(String topicName) throws Exception
+   {
+      List<MessageCounter> counters = new ArrayList<MessageCounter>();
+      
+      Condition condition = new ConditionImpl(DestinationType.TOPIC, topicName);
+      
+      List<Binding> bindings = messagingServer.getPostOffice().getBindingsForCondition(condition);
+      
+      for (Binding binding: bindings)
+      {
+         Queue queue = binding.getQueue();
+         
+         //TODO - get message counters
+         
+//         String counterName = SUBSCRIPTION_MESSAGECOUNTER_PREFIX + queue.getName();
+//         
+//         MessageCounter counter = messagingServer.getMessageCounterManager().getMessageCounter(counterName);
+//         
+//         if (counter == null)
+//         {
+//            throw new IllegalStateException("Cannot find counter with name " + counterName);
+//         }
+//         
+//         counters.add(counter);
+      }
+      
+      return counters; 
+   }
+   
+   
+//   public void setMessageCounterHistoryDayLimit(String topicName, int limit) throws Exception
+//   {
+//      Condition condition = new ConditionImpl(DestinationType.TOPIC, topicName);
+//      
+//      List<Binding> bindings = messagingServer.getPostOffice().getBindingsForCondition(condition);
+//         
+//      for (Binding binding: bindings)
+//      {
+//         Queue queue = binding.getQueue();
+//         
+//         queue.setMessageCounterHistoryDayLimit(limit);
+//      }      
+//   }
+   
+   // Private ---------------------------------------------------------------------------
+   
+   private Queue getQueue(String queueName) throws Exception
+   {
+      Condition condition = new ConditionImpl(DestinationType.QUEUE, queueName);
+      
+      List<Binding> bindings = messagingServer.getPostOffice().getBindingsForCondition(condition);
+      
+      if (bindings.isEmpty())
+      {
+         throw new IllegalArgumentException("No queue with name " + queueName);
+      }
+      
+      return bindings.get(0).getQueue();
+   }
+   
+    
+   
+   private List<Message> listMessages(Queue queue, ListType type, String selector) throws Exception
+   {
+      Selector sel = null;
+                        
+      if (selector != null && "".equals(selector.trim()))
+      {
+         selector = null;
+      }
+      
+      if (selector != null)
+      {        
+         sel = new Selector(selector);
+      }
+      
+      List<Message> msgs = new ArrayList<Message>();
+      
+      List<MessageReference> allRefs = queue.list(sel);
+        
+      for (MessageReference ref: allRefs)
+      {
+         Message msg = ref.getMessage();
+         
+         if (type == ListType.ALL ||
+            (type == ListType.DURABLE && msg.isDurable()) ||
+            (type == ListType.NON_DURABLE && !msg.isDurable()))
+         {
+            msgs.add(msg);
+         }
+      }
+      
+      return msgs;
+   }
+   
+   
+   private String listMessageCounterAsHTML(MessageCounter[] counters)
+   {
+      if (counters == null)
+         return null;
+
+      String ret = "<table width=\"100%\" border=\"1\" cellpadding=\"1\" cellspacing=\"1\">"  +
+                   "<tr>"                  +
+                   "<th>Type</th>"         +
+                   "<th>Name</th>"         +
+                   "<th>Subscription</th>" +
+                   "<th>Durable</th>"      +
+                   "<th>Count</th>"        +
+                   "<th>CountDelta</th>"   +
+                   "<th>Depth</th>"        +
+                   "<th>DepthDelta</th>"   +
+                   "<th>Last Add</th>"     +
+                   "</tr>";
+
+      for( int i=0; i<counters.length; i++ )
+      {
+         String            data = counters[i].getCounterAsString();
+         StringTokenizer token = new StringTokenizer( data, ",");
+         String            value;
+
+         ret += "<tr bgcolor=\"#" + ( (i%2)==0 ? "FFFFFF" : "F0F0F0") + "\">";
+
+         ret += "<td>" + token.nextToken() + "</td>"; // type
+         ret += "<td>" + token.nextToken() + "</td>"; // name
+         ret += "<td>" + token.nextToken() + "</td>"; // subscription
+         ret += "<td>" + token.nextToken() + "</td>"; // durable
+
+         ret += "<td>" + token.nextToken() + "</td>"; // count
+
+         value = token.nextToken(); // countDelta
+
+         if( value.equalsIgnoreCase("0") )
+             value = "-";
+
+         ret += "<td>" + value + "</td>";
+
+         ret += "<td>" + token.nextToken() + "</td>"; // depth
+
+         value = token.nextToken(); // depthDelta
+
+         if( value.equalsIgnoreCase("0") )
+             value = "-";
+
+         ret += "<td>" + value + "</td>";
+
+         ret += "<td>" + token.nextToken() + "</td>"; // date last add
+
+         ret += "</tr>";
+      }
+
+      ret += "</table>";
+
+      return ret;
+   }
+
+   private String listMessageCounterHistoryAsHTML(MessageCounter[] counters)
+   {
+      if (counters == null)
+         return null;
+
+      String           ret = "";
+
+      for( int i=0; i<counters.length; i++ )
+      {
+         // destination name
+         ret += ( counters[i].getDestinationTopic() ? "Topic '" : "Queue '" );
+         ret += counters[i].getDestinationName() + "'";
+
+         if( counters[i].getDestinationSubscription() != null )
+            ret += "Subscription '" + counters[i].getDestinationSubscription() + "'";
+
+
+         // table header
+         ret += "<table width=\"100%\" border=\"1\" cellpadding=\"1\" cellspacing=\"1\">"  +
+                "<tr>"                  +
+                "<th>Date</th>";
+
+         for( int j = 0; j < 24; j++ )
+            ret += "<th width=\"4%\">" + j + "</th>";
+
+         ret += "<th>Total</th></tr>";
+
+         // get history data as CSV string
+         StringTokenizer tokens = new StringTokenizer( counters[i].getHistoryAsString(), ",\n");
+
+         // get history day count
+         int days = Integer.parseInt( tokens.nextToken() );
+
+         for( int j=0; j<days; j++ )
+         {
+            // next day counter row
+            ret += "<tr bgcolor=\"#" + ((j%2)==0 ? "FFFFFF" : "F0F0F0") + "\">";
+
+            // date
+            ret += "<td>" + tokens.nextToken() + "</td>";
+
+            // 24 hour counters
+            int total = 0;
+
+            for( int k=0; k<24; k++ )
+            {
+               int value = Integer.parseInt( tokens.nextToken().trim() );
+
+               if( value == -1 )
+               {
+                    ret += "<td></td>";
+               }
+               else
+               {
+                    ret += "<td>" + value + "</td>";
+
+                    total += value;
+               }
+            }
+
+            ret += "<td>" + total + "</td></tr>";
+         }
+
+         ret += "</table><br><br>";
+      }
+
+      return ret;
+   }
+   
+   private enum ListType
+   {
+      ALL, DURABLE, NON_DURABLE;
+   }
+   
+  
+   private List<Message> listMessagesForSubscription(ListType type, String subId, String selector) throws Exception
+   { 
+      List<Message> msgs = new ArrayList<Message>();
+      
+      if (subId == null || "".equals(subId.trim()))
+      {
+         return msgs;
+      }
+      
+      List<Binding> bindings = messagingServer.getPostOffice().getBindingsForQueueName(subId);
+      
+      if (bindings.isEmpty())
+      {
+         throw new IllegalArgumentException("Cannot find subscription with id " + subId);
+      }
+      
+      Selector sel = null;
+      
+      if (selector != null && "".equals(selector.trim()))
+      {
+         selector = null;
+      }
+         
+      if (selector != null)
+      {  
+         sel = new Selector(selector);
+      }
+      
+      Binding binding = bindings.get(0);
+      
+      List<MessageReference> allRefs = binding.getQueue().list(sel);
+         
+      for (MessageReference ref: allRefs)
+      {
+         Message msg = ref.getMessage();
+     
+         if (type == ListType.ALL || (type == ListType.DURABLE && msg.isDurable()) || (type == ListType.NON_DURABLE && !msg.isDurable()))
+         {
+            msgs.add(msg);
+         }
+      }
+      
+      return msgs;
+   }
+   
+   private List<SubscriptionInfo> listSubscriptions(String topicName, ListType type) throws Exception
+   {      
+      List<SubscriptionInfo> subs = new ArrayList<SubscriptionInfo>();
+   
+      Condition condition = new ConditionImpl(DestinationType.TOPIC, topicName);
+      
+      List<Binding> bindings = messagingServer.getPostOffice().getBindingsForCondition(condition);
+      
+      for (Binding binding: bindings)
+      {
+         Queue queue = binding.getQueue();
+         
+         if (type == ListType.ALL || (type == ListType.DURABLE && queue.isDurable()) || (type == ListType.NON_DURABLE && !queue.isDurable()))
+         {         
+            String subName = null;
+            String clientID = null;
+            
+            if (queue.isDurable())
+            {
+               MessageQueueNameHelper helper = MessageQueueNameHelper.createHelper(queue.getName());
+               subName = helper.getSubName();
+               clientID = helper.getClientId();
+            }
+            
+            SubscriptionInfo info = new SubscriptionInfo(queue.getName(), queue.isDurable(), subName, clientID,
+                     queue.getFilter() == null ? null : queue.getFilter().getFilterString(), queue.getMessageCount(), queue.getMaxSize());
+            
+            subs.add(info);
+         }
+      }
+      
+      return subs;
+   }
+   
+   private int getMessageCount(String topicName, ListType type) throws Exception
+   {
+      Condition condition = new ConditionImpl(DestinationType.TOPIC, topicName);
+      
+      List<Binding> bindings = messagingServer.getPostOffice().getBindingsForCondition(condition);
+      
+      int count = 0;
+      
+      for (Binding binding: bindings)
+      {
+         Queue queue = binding.getQueue();
+         
+         if (type == ListType.ALL || (type == ListType.DURABLE && queue.isDurable())
+             || (type == ListType.NON_DURABLE && !queue.isDurable()))
+         {            
+            count += queue.getMessageCount();
+         }
+      }
+
+      return count;
+   }  
+   
+   private int getSubscriptionsCount(String topicName, boolean durable) throws Exception
+   {
+      Condition condition = new ConditionImpl(DestinationType.TOPIC, topicName);
+      
+      List<Binding> bindings = messagingServer.getPostOffice().getBindingsForCondition(condition);
+      
+      int count = 0;
+      
+      for (Binding binding: bindings)
+      {
+         Queue queue = binding.getQueue();
+                  
+         if ((queue.isDurable() && durable) || (!queue.isDurable() && !durable))
+         {
+            count++;
+         }
+      }
+
+      return count;
+   }
+   
+   
+   private String listSubscriptionsAsHTML(String topicName, ListType type) throws Exception
+   {
+      Condition condition = new ConditionImpl(DestinationType.TOPIC, topicName);
+      
+      List<Binding> bindings = messagingServer.getPostOffice().getBindingsForCondition(condition);
+      
+      StringBuffer sb = new StringBuffer();
+      
+      sb.append("<table width=\"100%\" border=\"1\" cellpadding=\"1\" cellspacing=\"1\">"  +
+                  "<tr>"                  +
+                  "<th>Id</th>"         +
+                  "<th>Durable</th>" +
+                  "<th>Subscription Name</th>"      +
+                  "<th>Client ID</th>"        +
+                  "<th>Selector</th>"   +
+                  "<th>Message Count</th>"        +
+                  "<th>Max Size</th>"   +
+                  "</tr>");
+      
+      for (Binding binding: bindings)
+      {
+         Queue queue = binding.getQueue();
+
+         if (type == ListType.ALL || (type == ListType.DURABLE && queue.isDurable())
+                  || (type == ListType.NON_DURABLE && !queue.isDurable()))
+         {            
+            String filterString = queue.getFilter() != null ? queue.getFilter().getFilterString() : null;
+                     
+            String subName = null;
+            String clientID = null;
+            
+            if (queue.isDurable())
+            {
+               MessageQueueNameHelper helper = MessageQueueNameHelper.createHelper(queue.getName());
+               subName = helper.getSubName();
+               clientID = helper.getClientId();
+            }
+            
+            sb.append("<tr><td>").append(queue.getName()).append("</td>");
+            sb.append("<td>").append(queue.isDurable() ? "Durable" : "Non Durable").append("</td>");
+            sb.append("<td>").append(subName != null ? subName : "").append("</td>");
+            sb.append("<td>").append(clientID != null ? clientID : "").append("</td>");
+            sb.append("<td>").append(filterString != null ? filterString : "").append("</td>");
+            sb.append("<td>").append(queue.getMessageCount()).append("</td>");
+            sb.append("<td>").append(queue.getMaxSize()).append("</td>");
+            sb.append("</tr>");
+         }
+      }
+      sb.append("</table>");
+      
+      return sb.toString();                                
+   }
+   
+   
+}

Added: trunk/src/main/org/jboss/messaging/core/impl/server/SubscriptionInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/server/SubscriptionInfo.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/server/SubscriptionInfo.java	2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,99 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.impl.server;
+
+import java.io.Serializable;
+
+/**
+ * A SubscriptionInfo
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1935 $</tt>
+ *
+ * $Id: SubscriptionInfo.java 1935 2007-01-09 23:29:20Z clebert.suconic at jboss.com $
+ *
+ */
+public class SubscriptionInfo implements Serializable
+{
+   private static final long serialVersionUID = -38689006079435295L;
+
+   private String id;
+   
+   private boolean durable;
+   
+   private String name;
+   
+   private String clientID;
+   
+   private String selector;
+   
+   private int messageCount;
+   
+   private int maxSize;
+   
+   public SubscriptionInfo(String id, boolean durable, String name, String clientID, String selector, int messageCount, int maxSize)
+   {
+      this.id = id;
+      this.durable = durable;
+      this.name = name;
+      this.clientID = clientID;
+      this.selector = selector;
+      this.messageCount = messageCount;
+      this.maxSize = maxSize;
+   }
+   
+   public String getId()
+   {
+      return id;
+   }
+
+   public String getClientID()
+   {
+      return clientID;
+   }
+
+   public boolean isDurable()
+   {
+      return durable;
+   }
+
+   public int getMaxSize()
+   {
+      return maxSize;
+   }
+
+   public int getMessageCount()
+   {
+      return messageCount;
+   }
+
+   public String getName()
+   {
+      return name;
+   }
+
+   public String getSelector()
+   {
+      return selector;
+   }
+
+}




More information about the jboss-cvs-commits mailing list