[jboss-cvs] JBoss Messaging SVN: r3570 - in trunk/src/main/org/jboss: messaging/core/impl and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jan 15 16:24:02 EST 2008
Author: timfox
Date: 2008-01-15 16:24:02 -0500 (Tue, 15 Jan 2008)
New Revision: 3570
Added:
trunk/src/main/org/jboss/jms/server/Configuration.java
trunk/src/main/org/jboss/jms/server/TransactionRepository.java
trunk/src/main/org/jboss/messaging/core/impl/channelfactory/
trunk/src/main/org/jboss/messaging/core/impl/channelfactory/MultiplexerChannelFactory.java
trunk/src/main/org/jboss/messaging/core/impl/channelfactory/XMLChannelFactory.java
trunk/src/main/org/jboss/messaging/core/impl/messagecounter/
trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounter.java
trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounterManager.java
trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageStatistics.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/
trunk/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessageRequest.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/impl/server/
trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java
trunk/src/main/org/jboss/messaging/core/impl/server/SubscriptionInfo.java
Log:
Persistence refactoring part II
Added: trunk/src/main/org/jboss/jms/server/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/Configuration.java (rev 0)
+++ trunk/src/main/org/jboss/jms/server/Configuration.java 2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,576 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server;
+
+import org.jboss.jms.server.security.Role;
+import org.jboss.messaging.util.XMLUtil;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+import java.beans.PropertyChangeListener;
+import java.beans.PropertyChangeSupport;
+import java.io.Serializable;
+import java.net.URL;
+import java.util.HashSet;
+
+/**
+ * This is the JBM configuration. It is used to configure the ServerPeer.
+ * It does this by parsing the jbm-configuration.xml configuration file. It also uses PropertyChangeSupport so users of
+ * this class can be notified on configuration changes.
+ *
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class Configuration implements Serializable
+{
+ private static final String READ_ATTR = "read";
+ private static final String WRITE_ATTR = "write";
+ private static final String CREATE_ATTR = "create";
+ private static final String NAME_ATTR = "name";
+
+ private PropertyChangeSupport propertyChangeSupport;
+ private Integer _serverPeerID = -1;
+ private String _defaultQueueJNDIContext = "";
+ private String _defaultTopicJNDIContext = "";
+ private String _securityDomain;
+ private HashSet<Role> _securityConfig;
+ private String _defaultDLQ;
+ // The default maximum number of delivery attempts before sending to DLQ - can be overridden on
+ // the destination
+ private Integer _defaultMaxDeliveryAttempts = 10;
+ protected String _defaultExpiryQueue;
+
+ private Long _defaultRedeliveryDelay = (long) 0;
+
+ private Long _messageCounterSamplePeriod = (long) 10000;// Default is 1 minute
+ private Long _failoverStartTimeout = (long) (60 * 1000);
+ // Default is 5 minutes
+ private Long _failoverCompleteTimeout = (long) (5 * 60 * 1000);
+
+ private Integer _defaultMessageCounterHistoryDayLimit = 1;
+
+ private String _clusterPullConnectionFactoryName;
+
+
+ private Boolean _useXAForMessagePull = false;
+
+ private Boolean _defaultPreserveOrdering = false;
+
+ private Long _recoverDeliveriesTimeout = (long) (5 * 60 * 1000);
+
+
+ private String _suckerPassword;
+
+ //Global override for strict behaviour
+ private Boolean _strictTck = false;
+
+ //From a system property - this overrides
+ private Boolean _strictTckProperty = false;
+
+ private String _postOfficeName;
+
+ private Boolean _clustered = false;
+
+ private Long _stateTimeout = (long) 5000;
+
+ private Long _castTimeout = (long) 5000;
+
+ private String _groupName;
+
+ private String _controlChannelName;
+
+ private String _dataChannelName;
+
+ private String _channelPartitionName;
+
+ private Integer _maxConcurrentReplications = 25;
+
+ private Boolean _useJGroupsWorkaround = false;
+
+ private Integer _remotingBindAddress;
+
+ private String _remotingTimeout;
+
+ //default confog file location
+ private String configurationUrl = "jbm-configuration.xml";
+
+ public void start() throws Exception
+ {
+ propertyChangeSupport = new PropertyChangeSupport(this);
+
+ _strictTckProperty = "true".equalsIgnoreCase(System.getProperty("jboss.messaging.stricttck"));
+ _useJGroupsWorkaround = "true".equals(System.getProperty("jboss.messaging.usejgroupsworkaround"));
+
+ URL url = getClass().getClassLoader().getResource(configurationUrl);
+ Element e = XMLUtil.urlToElement(url);
+ _serverPeerID = getInteger(e, "server-peer-id", _serverPeerID);
+ _defaultQueueJNDIContext = getString(e, "default-queue-jndi-context", _defaultQueueJNDIContext);
+ _defaultTopicJNDIContext = getString(e, "default-topic-jndi-context", _defaultTopicJNDIContext);
+ _securityDomain = getString(e, "security-domain", _securityDomain);
+ _defaultDLQ = getString(e, "default-dlq", _defaultDLQ);
+ _defaultMaxDeliveryAttempts = getInteger(e, "default-max-delivery-attempts", _defaultMaxDeliveryAttempts);
+ _defaultExpiryQueue = getString(e, "default-expiry-queue", _defaultExpiryQueue);
+ _defaultRedeliveryDelay = getLong(e, "default-redelivery-delay", _defaultRedeliveryDelay);
+ _messageCounterSamplePeriod = getLong(e, "message-counter-sample-period", _messageCounterSamplePeriod);
+ _failoverStartTimeout = getLong(e, "failover-start-timeout", _failoverStartTimeout);
+ _failoverCompleteTimeout = getLong(e, "failover-complete-timeout", _failoverCompleteTimeout);
+ _defaultMessageCounterHistoryDayLimit = getInteger(e, "default-message-counter-history-day-limit", _defaultMessageCounterHistoryDayLimit);
+ _clusterPullConnectionFactoryName = getString(e, "cluster-pull-connection-factory-name", _clusterPullConnectionFactoryName);
+ _useXAForMessagePull = getBoolean(e, "use-xa-for-message-pull", _useXAForMessagePull);
+ _defaultPreserveOrdering = getBoolean(e, "default-preserve-ordering", _defaultPreserveOrdering);
+ _recoverDeliveriesTimeout = getLong(e, "recover-deliveries-timeout", _recoverDeliveriesTimeout);
+ _suckerPassword = getString(e, "sucker-password", _suckerPassword);
+ _strictTck = getBoolean(e, "strict-tck", _strictTck);
+ _postOfficeName = getString(e, "post-office-name", _postOfficeName);
+ _clustered = getBoolean(e, "clustered", _clustered);
+ _stateTimeout = getLong(e, "state-timeout", _stateTimeout);
+ _castTimeout = getLong(e, "cast-timeout", _castTimeout);
+ _groupName = getString(e, "group-name", _groupName);
+ _controlChannelName = getString(e, "control-channel-name", _controlChannelName);
+ _dataChannelName = getString(e, "data-channel-name", _dataChannelName);
+ _channelPartitionName = getString(e, "channel-partition-name", _channelPartitionName);
+ _maxConcurrentReplications = getInteger(e, "max-concurrent-replications", _maxConcurrentReplications);
+ _remotingBindAddress = getInteger(e, "remoting-bind-address", _remotingBindAddress);
+ _remotingTimeout = getString(e, "remoting-timeout", _remotingTimeout);
+ NodeList security = e.getElementsByTagName("default-security-config");
+ if (security.getLength() > 0)
+ {
+ HashSet<Role> securityConfig;
+ securityConfig = new HashSet<Role>();
+ NodeList roles = security.item(0).getChildNodes();
+ for (int k = 0; k < roles.getLength(); k++)
+ {
+ if ("role".equalsIgnoreCase(roles.item(k).getNodeName()))
+ {
+ Boolean read = roles.item(k).getAttributes().getNamedItem(READ_ATTR) != null && Boolean.valueOf(roles.item(k).getAttributes().getNamedItem(READ_ATTR).getNodeValue());
+ Boolean write = roles.item(k).getAttributes().getNamedItem(WRITE_ATTR) != null && Boolean.valueOf(roles.item(k).getAttributes().getNamedItem(WRITE_ATTR).getNodeValue());
+ Boolean create = roles.item(k).getAttributes().getNamedItem(CREATE_ATTR) != null && Boolean.valueOf(roles.item(k).getAttributes().getNamedItem(CREATE_ATTR).getNodeValue());
+ Role role = new Role(roles.item(k).getAttributes().getNamedItem(NAME_ATTR).getNodeValue(),
+ read,
+ write,
+ create);
+ securityConfig.add(role);
+ }
+ }
+ _securityConfig = securityConfig;
+ }
+
+ }
+
+ private Boolean getBoolean(Element e, String name, Boolean def)
+ {
+ NodeList nl = e.getElementsByTagName(name);
+ if (nl.getLength() > 0)
+ {
+ return Boolean.valueOf(nl.item(0).getTextContent().trim());
+ }
+ return def;
+ }
+
+
+ private Integer getInteger(Element e, String name, Integer def)
+ {
+ NodeList nl = e.getElementsByTagName(name);
+ if (nl.getLength() > 0)
+ {
+ return Integer.valueOf(nl.item(0).getTextContent().trim());
+ }
+ return def;
+ }
+
+ private Long getLong(Element e, String name, Long def)
+ {
+ NodeList nl = e.getElementsByTagName(name);
+ if (nl.getLength() > 0)
+ {
+ return Long.valueOf(nl.item(0).getTextContent().trim());
+ }
+ return def;
+ }
+
+ private String getString(Element e, String name, String def)
+ {
+ NodeList nl = e.getElementsByTagName(name);
+ if (nl.getLength() > 0)
+ {
+ return nl.item(0).getTextContent().trim();
+ }
+ return def;
+ }
+
+ public void addPropertyChangeListener(
+ PropertyChangeListener listener)
+ {
+ propertyChangeSupport.addPropertyChangeListener(listener);
+ }
+
+ public Integer getServerPeerID()
+ {
+ return _serverPeerID;
+ }
+
+ public void setServerPeerID(Integer serverPeerID)
+ {
+ _serverPeerID = serverPeerID;
+ }
+
+
+ public String getDefaultQueueJNDIContext()
+ {
+ return _defaultQueueJNDIContext;
+ }
+
+ public void setDefaultQueueJNDIContext(String defaultQueueJNDIContext)
+ {
+ _defaultQueueJNDIContext = defaultQueueJNDIContext;
+ }
+
+
+ public String getDefaultTopicJNDIContext()
+ {
+ return _defaultTopicJNDIContext;
+ }
+
+ public void setDefaultTopicJNDIContext(String defaultTopicJNDIContext)
+ {
+ _defaultTopicJNDIContext = defaultTopicJNDIContext;
+ }
+
+ public void setSecurityDomain(String securityDomain) throws Exception
+ {
+ _securityDomain = securityDomain;
+ }
+
+ public String getSecurityDomain()
+ {
+ return _securityDomain;
+ }
+
+
+ public HashSet<Role> getSecurityConfig()
+ {
+ return _securityConfig;
+ }
+
+ public void setSecurityConfig(HashSet<Role> securityConfig)
+ {
+ propertyChangeSupport.firePropertyChange("securityConfig", _securityConfig, securityConfig);
+ _securityConfig = securityConfig;
+ }
+
+
+ public String getDefaultDLQ()
+ {
+ return _defaultDLQ;
+ }
+
+ public void setDefaultDLQ(String defaultDLQ)
+ {
+ _defaultDLQ = defaultDLQ;
+ }
+
+
+ public Integer getDefaultMaxDeliveryAttempts()
+ {
+ return _defaultMaxDeliveryAttempts;
+ }
+
+ public void setDefaultMaxDeliveryAttempts(Integer defaultMaxDeliveryAttempts)
+ {
+ _defaultMaxDeliveryAttempts = defaultMaxDeliveryAttempts;
+ }
+
+
+ public String getDefaultExpiryQueue()
+ {
+ return _defaultExpiryQueue;
+ }
+
+ public void setDefaultExpiryQueue(String defaultExpiryQueue)
+ {
+ _defaultExpiryQueue = defaultExpiryQueue;
+ }
+
+
+ public long getDefaultRedeliveryDelay()
+ {
+ return _defaultRedeliveryDelay;
+ }
+
+ public void setDefaultRedeliveryDelay(long defaultRedeliveryDelay)
+ {
+ _defaultRedeliveryDelay = defaultRedeliveryDelay;
+ }
+
+
+ public long getMessageCounterSamplePeriod()
+ {
+ return _messageCounterSamplePeriod;
+ }
+
+ public void setMessageCounterSamplePeriod(long messageCounterSamplePeriod)
+ {
+ if (messageCounterSamplePeriod < 1000)
+ {
+ throw new IllegalArgumentException("Cannot set MessageCounterSamplePeriod < 1000 ms");
+ }
+ propertyChangeSupport.firePropertyChange("messageCounterSamplePeriod", _messageCounterSamplePeriod, messageCounterSamplePeriod);
+ _messageCounterSamplePeriod = messageCounterSamplePeriod;
+ }
+
+
+ public Long getFailoverStartTimeout()
+ {
+ return _failoverStartTimeout;
+ }
+
+ public void setFailoverStartTimeout(Long failoverStartTimeout)
+ {
+ _failoverStartTimeout = failoverStartTimeout;
+ }
+
+
+ public Long getFailoverCompleteTimeout()
+ {
+ return _failoverCompleteTimeout;
+ }
+
+ public void setFailoverCompleteTimeout(Long failoverCompleteTimeout)
+ {
+ _failoverCompleteTimeout = failoverCompleteTimeout;
+ }
+
+
+ public Integer getDefaultMessageCounterHistoryDayLimit()
+ {
+ return _defaultMessageCounterHistoryDayLimit;
+ }
+
+ public void setDefaultMessageCounterHistoryDayLimit(Integer defaultMessageCounterHistoryDayLimit)
+ {
+ if (defaultMessageCounterHistoryDayLimit < -1)
+ {
+ defaultMessageCounterHistoryDayLimit = -1;
+ }
+ _defaultMessageCounterHistoryDayLimit = defaultMessageCounterHistoryDayLimit;
+ }
+
+
+ public String getClusterPullConnectionFactoryName()
+ {
+ return _clusterPullConnectionFactoryName;
+ }
+
+ public void setClusterPullConnectionFactoryName(String clusterPullConnectionFactoryName)
+ {
+ _clusterPullConnectionFactoryName = clusterPullConnectionFactoryName;
+ }
+
+
+ public Boolean isUseXAForMessagePull()
+ {
+ return _useXAForMessagePull;
+ }
+
+ public void setUseXAForMessagePull(Boolean useXAForMessagePull)
+ {
+ _useXAForMessagePull = useXAForMessagePull;
+ }
+
+
+ public Boolean isDefaultPreserveOrdering()
+ {
+ return _defaultPreserveOrdering;
+ }
+
+ public void setDefaultPreserveOrdering(Boolean defaultPreserveOrdering)
+ {
+ _defaultPreserveOrdering = defaultPreserveOrdering;
+ }
+
+
+ public Long getRecoverDeliveriesTimeout()
+ {
+ return _recoverDeliveriesTimeout;
+ }
+
+ public void setRecoverDeliveriesTimeout(Long recoverDeliveriesTimeout)
+ {
+ _recoverDeliveriesTimeout = recoverDeliveriesTimeout;
+ }
+
+
+ public String getSuckerPassword()
+ {
+ return _suckerPassword;
+ }
+
+ public void setSuckerPassword(String suckerPassword)
+ {
+ _suckerPassword = suckerPassword;
+ }
+
+
+ public Boolean isStrictTck()
+ {
+ return _strictTck || _strictTckProperty;
+ }
+
+ public void setStrictTck(Boolean strictTck)
+ {
+ _strictTck = strictTck || _strictTckProperty;
+ }
+
+
+ public String getPostOfficeName()
+ {
+ return _postOfficeName;
+ }
+
+ public void setPostOfficeName(String postOfficeName)
+ {
+ _postOfficeName = postOfficeName;
+ }
+
+
+ public Boolean isClustered()
+ {
+ return _clustered;
+ }
+
+ public void setClustered(Boolean clustered)
+ {
+ _clustered = clustered;
+ }
+
+
+ public Long getStateTimeout()
+ {
+ return _stateTimeout;
+ }
+
+ public void setStateTimeout(Long stateTimeout)
+ {
+ _stateTimeout = stateTimeout;
+ }
+
+
+ public Long getCastTimeout()
+ {
+ return _castTimeout;
+ }
+
+ public void setCastTimeout(Long castTimeout)
+ {
+ _castTimeout = castTimeout;
+ }
+
+
+ public String getGroupName()
+ {
+ return _groupName;
+ }
+
+ public void setGroupName(String groupName)
+ {
+ _groupName = groupName;
+ }
+
+
+ public String getControlChannelName()
+ {
+ return _controlChannelName;
+ }
+
+ public void setControlChannelName(String controlChannelName)
+ {
+ _controlChannelName = controlChannelName;
+ }
+
+
+ public String getDataChannelName()
+ {
+ return _dataChannelName;
+ }
+
+ public void setDataChannelName(String dataChannelName)
+ {
+ _dataChannelName = dataChannelName;
+ }
+
+
+ public String getChannelPartitionName()
+ {
+ return _channelPartitionName;
+ }
+
+ public void setChannelPartitionName(String channelPartitionName)
+ {
+ _channelPartitionName = channelPartitionName;
+ }
+
+
+ public Integer getMaxConcurrentReplications()
+ {
+ return _maxConcurrentReplications;
+ }
+
+ public void setMaxConcurrentReplications(Integer maxConcurrentReplications)
+ {
+ _maxConcurrentReplications = maxConcurrentReplications;
+ }
+
+
+ public Boolean isUseJGroupsWorkaround()
+ {
+ return _useJGroupsWorkaround;
+ }
+
+ public void setUseJGroupsWorkaround(Boolean useJGroupsWorkaround)
+ {
+ _useJGroupsWorkaround = useJGroupsWorkaround;
+ }
+
+ public Integer getRemotingBindAddress()
+ {
+ return _remotingBindAddress;
+ }
+
+ public void setRemotingBindAddress(Integer remotingBindAddress)
+ {
+ this._remotingBindAddress = remotingBindAddress;
+ }
+
+ public String getRemotingTimeout()
+ {
+ return _remotingTimeout;
+ }
+
+ public String getConfigurationUrl()
+ {
+ return configurationUrl;
+ }
+
+ public void setConfigurationUrl(String configurationUrl)
+ {
+ this.configurationUrl = configurationUrl;
+ }
+}
Added: trunk/src/main/org/jboss/jms/server/TransactionRepository.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/TransactionRepository.java (rev 0)
+++ trunk/src/main/org/jboss/jms/server/TransactionRepository.java 2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,36 @@
+package org.jboss.jms.server;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.Transaction;
+
+
+//FIXME temp class
+public class TransactionRepository
+{
+ private Map<Xid, Transaction> map;
+
+ public TransactionRepository()
+ {
+ map = new ConcurrentHashMap<Xid, Transaction>();
+ }
+
+ public void addTransaction(Xid xid, Transaction transaction)
+ {
+ map.put(xid, transaction);
+ }
+
+ public Transaction getTransaction(Xid xid)
+ {
+ return map.get(xid);
+ }
+
+ public Transaction removeTransaction(Xid xid)
+ {
+ return map.remove(xid);
+ }
+
+}
Added: trunk/src/main/org/jboss/messaging/core/impl/channelfactory/MultiplexerChannelFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/channelfactory/MultiplexerChannelFactory.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/channelfactory/MultiplexerChannelFactory.java 2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,115 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.impl.channelfactory;
+
+import org.jboss.messaging.core.ChannelFactory;
+import org.jgroups.Channel;
+import org.jgroups.JChannelFactory;
+
+/**
+ * A ChannelFactory that will use the MBean ChannelFactory interface
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
+ * @version <tt>$Revision: 3465 $</tt>
+ * $Id: MultiplexerChannelFactory.java 3465 2007-12-10 17:32:22Z ataylor $
+ */
+public class MultiplexerChannelFactory implements ChannelFactory
+{
+
+ // Constants ------------------------------------------------------------------------------------
+
+
+ // Attributes -----------------------------------------------------------------------------------
+ JChannelFactory jChannelFactory;
+ String dataStack;
+ String controlStack;
+ String uniqueID;
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public MultiplexerChannelFactory(JChannelFactory jChannelFactory,
+ String uniqueID,
+ String controlStack,
+ String dataStack)
+ {
+ this.jChannelFactory = jChannelFactory;
+ this.uniqueID = uniqueID;
+ this.dataStack = dataStack;
+ this.controlStack = controlStack;
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+
+ public String getDataStack()
+ {
+ return dataStack;
+ }
+
+ public void setDataStack(String dataStack)
+ {
+ this.dataStack = dataStack;
+ }
+
+ public String getControlStack()
+ {
+ return controlStack;
+ }
+
+ public void setControlStack(String controlStack)
+ {
+ this.controlStack = controlStack;
+ }
+
+ public String getUniqueID()
+ {
+ return uniqueID;
+ }
+
+ public void setUniqueID(String uniqueID)
+ {
+ this.uniqueID = uniqueID;
+ }
+
+ public Channel createControlChannel() throws Exception
+ {
+ return jChannelFactory.createMultiplexerChannel(controlStack, uniqueID + "-CTRL", Boolean.TRUE, uniqueID);
+ }
+
+ public Channel createDataChannel() throws Exception
+ {
+ return jChannelFactory.createMultiplexerChannel(dataStack, uniqueID + "-DATA", Boolean.TRUE, uniqueID);
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private -------------------------------------------------------------------------------------
+
+ // Inner classes -------------------------------------------------------------------------------
+
+}
Added: trunk/src/main/org/jboss/messaging/core/impl/channelfactory/XMLChannelFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/channelfactory/XMLChannelFactory.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/channelfactory/XMLChannelFactory.java 2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,96 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.impl.channelfactory;
+
+import org.jboss.messaging.core.ChannelFactory;
+import org.jgroups.Channel;
+import org.jgroups.JChannel;
+import org.w3c.dom.Element;
+
+/**
+ * A ChannelFactory that will use Elements to create channels.
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:1909 $</tt>
+ * $Id:XMLJChannelFactory.java 1909 2007-01-06 06:08:03Z clebert.suconic at jboss.com $
+ */
+public class XMLChannelFactory implements ChannelFactory
+{
+
+ // Constants ------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+ Element controlConfig;
+ Element dataConfig;
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public XMLChannelFactory(Element controlConfig, Element dataConfig)
+ {
+ this.controlConfig = controlConfig;
+ this.dataConfig = dataConfig;
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ public Element getControlConfig()
+ {
+ return controlConfig;
+ }
+
+ public void setControlConfig(Element controlConfig)
+ {
+ this.controlConfig = controlConfig;
+ }
+
+ public Element getDataConfig()
+ {
+ return dataConfig;
+ }
+
+ public void setDataConfig(Element dataConfig)
+ {
+ this.dataConfig = dataConfig;
+ }
+
+ // implementation of JChannelFactory ------------------------------------------------------------
+ public Channel createControlChannel() throws Exception
+ {
+ return new JChannel(controlConfig);
+ }
+
+ public Channel createDataChannel() throws Exception
+ {
+ return new JChannel(dataConfig);
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
+}
Added: trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounter.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounter.java 2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,669 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.messaging.core.impl.messagecounter;
+
+import java.text.DateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Queue;
+
+/**
+ * This class stores message count informations for a given queue
+ *
+ * At intervals this class samples the queue for message count data
+ *
+ * Note that the underlying queue *does not* update statistics every time a message
+ * is added since that would reall slow things down, instead we *sample* the queues at
+ * regular intervals - this means we are less intrusive on the queue
+ *
+ * @author <a href="mailto:u.schroeter at mobilcom.de">Ulf Schroeter</a>
+ * @author <a href="mailto:s.steinbacher at mobilcom.de">Stephan Steinbacher</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version $Revision: 1.8 $
+ */
+public class MessageCounter
+{
+ protected static final Logger log = Logger.getLogger(MessageCounter.class);
+
+ // destination related information
+ private String destName;
+ private String destSubscription;
+ private boolean destTopic;
+ private boolean destDurable;
+
+ // destination queue
+ private Queue destQueue;
+
+ // counter
+ private int countTotal;
+ private int countTotalLast;
+ private int depthLast;
+ private long timeLastUpdate;
+
+ // per hour day counter history
+ private int dayCounterMax;
+ private ArrayList dayCounter;
+
+ /**
+ * Get a list of message statistics from a list of message counters
+ *
+ * @param counter the message counters
+ * @return the message statistics
+ * @throws Exception for any error
+ */
+ public static List getMessageStatistics(List counters) throws Exception
+ {
+ List list = new ArrayList(counters.size());
+
+ Iterator iter = counters.iterator();
+
+ while (iter.hasNext())
+ {
+ MessageCounter counter = (MessageCounter)iter.next();
+
+ MessageStatistics stats = new MessageStatistics();
+ stats.setName(counter.getDestinationName());
+ stats.setSubscriptionID(counter.getDestinationSubscription());
+ stats.setTopic(counter.getDestinationTopic());
+ stats.setDurable(counter.getDestinationDurable());
+ stats.setCount(counter.getCount());
+ stats.setCountDelta(counter.getCountDelta());
+ stats.setDepth(counter.getMessageCount());
+ stats.setDepthDelta(counter.getMessageCountDelta());
+ stats.setTimeLastUpdate(counter.getLastUpdate());
+
+ list.add(stats);
+ }
+ return list;
+ }
+
+ /**
+ * Constructor
+ *
+ * @param name destination name
+ * @param subscription subscription name
+ * @param queue internal queue object
+ * @param topic topic destination flag
+ * @param durable durable subsciption flag
+ * @param daycountmax max message history day count
+ */
+ public MessageCounter(String name,
+ String subscription,
+ Queue queue,
+ boolean topic,
+ boolean durable,
+ int daycountmax)
+ {
+ // store destination related information
+ destName = name;
+ destSubscription = subscription;
+ destTopic = topic;
+ destDurable = durable;
+ destQueue = queue;
+
+ // initialize counter
+ resetCounter();
+
+ // initialize message history
+ dayCounter = new ArrayList();
+
+ setHistoryLimit(daycountmax);
+ }
+
+ /**
+ * Get string representation
+ */
+ public String toString()
+ {
+ return getCounterAsString();
+ }
+
+ private int lastMessagesAdded;
+
+ /*
+ * This method is called periodically to update statistics from the queue
+ */
+ public synchronized void onTimer()
+ {
+ int latestMessagesAdded = destQueue.getMessagesAdded();
+
+ int newMessagesAdded = latestMessagesAdded - lastMessagesAdded;
+
+ countTotal += newMessagesAdded;
+
+ lastMessagesAdded = latestMessagesAdded;
+
+ //update timestamp
+ timeLastUpdate = System.currentTimeMillis();
+
+ // update message history
+ updateHistory(true);
+ }
+
+ /**
+ * Gets the related destination name
+ *
+ * @return String destination name
+ */
+ public String getDestinationName()
+ {
+ return destName;
+ }
+
+ /**
+ * Gets the related destination subscription
+ *
+ * @return String destination name
+ */
+ public String getDestinationSubscription()
+ {
+ return destSubscription;
+ }
+
+ /**
+ * Gets the related destination topic flag
+ *
+ * @return boolean true: topic destination, false: queue destination
+ */
+ public boolean getDestinationTopic()
+ {
+ return destTopic;
+ }
+
+ /**
+ * Gets the related destination durable subscription flag
+ *
+ * @return boolean true : durable subscription,
+ * false: non-durable subscription
+ */
+ public boolean getDestinationDurable()
+ {
+ return destDurable;
+ }
+
+ /**
+ * Gets the total message count since startup or
+ * last counter reset
+ *
+ * @return int message count
+ */
+ public int getCount()
+ {
+ return countTotal;
+ }
+
+ /**
+ * Gets the message count delta since last method call
+ *
+ * @return int message count delta
+ */
+ public int getCountDelta()
+ {
+ int delta = countTotal - countTotalLast;
+
+ countTotalLast = countTotal;
+
+ return delta;
+ }
+
+ /**
+ * Gets the current message count of pending messages
+ * within the destination waiting for dispatch
+ *
+ * @return int message queue depth
+ */
+ public int getMessageCount()
+ {
+ return destQueue.getMessageCount();
+ }
+
+ /**
+ * Gets the message count delta of pending messages
+ * since last method call. Therefore
+ *
+ * @return int message queue depth delta
+ */
+ public int getMessageCountDelta()
+ {
+ int current = destQueue.getMessageCount();
+ int delta = current - depthLast;
+
+ depthLast = current;
+
+ return delta;
+ }
+
+ /**
+ * Gets the timestamp of the last message add
+ *
+ * @return long system time
+ */
+ public long getLastUpdate()
+ {
+ return timeLastUpdate;
+ }
+
+ /**
+ * Reset message counter values
+ */
+ public void resetCounter()
+ {
+ countTotal = 0;
+ countTotalLast = 0;
+ depthLast = 0;
+ timeLastUpdate = 0;
+ }
+
+ /**
+ * Get message counter data as string in format
+ *
+ * "Topic/Queue, Name, Subscription, Durable, Count, CountDelta,
+ * Depth, DepthDelta, Timestamp Last Increment"
+ *
+ * @return String message counter data string
+ */
+ public String getCounterAsString()
+ {
+ StringBuffer ret = new StringBuffer();
+
+ // Topic/Queue
+ if (destTopic)
+ ret.append("Topic,");
+ else
+ ret.append("Queue,");
+
+ // name
+ ret.append(destName).append(",");
+
+ // subscription
+ if (destSubscription != null)
+ ret.append(destSubscription).append(",");
+ else
+ ret.append("-,");
+
+ // Durable subscription
+ if (destTopic)
+ {
+ // Topic
+ if (destDurable)
+ ret.append("true,");
+ else
+ ret.append("false,");
+ }
+ else
+ {
+ // Queue
+ ret.append("-,");
+ }
+
+ // counter values
+ ret.append(getCount()).append(",").append(getCountDelta()).append(",").append(getMessageCount()).append(",").append(getMessageCountDelta()).append(",");
+
+ // timestamp last counter update
+ if (timeLastUpdate > 0)
+ {
+ DateFormat dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM);
+
+ ret.append(dateFormat.format(new Date(timeLastUpdate)));
+ }
+ else
+ {
+ ret.append("-");
+ }
+
+ return ret.toString();
+ }
+
+ /**
+ * Get message counter history day count limit
+ *
+ * <0: unlimited, 0: history disabled, >0: day count
+ */
+ public int getHistoryLimit()
+ {
+ return dayCounterMax;
+ }
+
+ /**
+ * Set message counter history day count limit
+ *
+ * <0: unlimited, 0: history disabled, >0: day count
+ */
+ public void setHistoryLimit(int daycountmax)
+ {
+ boolean bInitialize = false;
+
+ // store new maximum day count
+ dayCounterMax = daycountmax;
+
+ // update day counter array
+ synchronized (dayCounter)
+ {
+ if (dayCounterMax > 0)
+ {
+ // limit day history to specified day count
+ int delta = dayCounter.size() - dayCounterMax;
+
+ for (int i = 0; i < delta; i++)
+ {
+ // reduce array size to requested size by dropping
+ // oldest day counters
+ dayCounter.remove(0);
+ }
+
+ // create initial day counter when empty
+ bInitialize = dayCounter.isEmpty();
+ }
+ else if (dayCounterMax == 0)
+ {
+ // disable history
+ dayCounter.clear();
+ }
+ else
+ {
+ // unlimited day history
+
+ // create initial day counter when empty
+ bInitialize = dayCounter.isEmpty();
+ }
+
+ // optionally initialize first day counter entry
+ if (bInitialize)
+ {
+ dayCounter.add(new DayCounter(new GregorianCalendar(), true));
+ }
+ }
+ }
+
+ /**
+ * Update message counter history
+ */
+ private void updateHistory(boolean incrementCounter)
+ {
+ // check history activation
+ if (dayCounter.isEmpty())
+ {
+ return;
+ }
+
+ // calculate day difference between current date and date of last day counter entry
+ synchronized (dayCounter)
+ {
+ DayCounter counterLast = (DayCounter) dayCounter.get(dayCounter.size() - 1);
+
+ GregorianCalendar calNow = new GregorianCalendar();
+ GregorianCalendar calLast = counterLast.getDate();
+
+ // clip day time part for day delta calulation
+ calNow.clear(Calendar.AM_PM);
+ calNow.clear(Calendar.HOUR);
+ calNow.clear(Calendar.HOUR_OF_DAY);
+ calNow.clear(Calendar.MINUTE);
+ calNow.clear(Calendar.SECOND);
+ calNow.clear(Calendar.MILLISECOND);
+
+ calLast.clear(Calendar.AM_PM);
+ calLast.clear(Calendar.HOUR);
+ calLast.clear(Calendar.HOUR_OF_DAY);
+ calLast.clear(Calendar.MINUTE);
+ calLast.clear(Calendar.SECOND);
+ calLast.clear(Calendar.MILLISECOND);
+
+ long millisPerDay = 86400000; // 24 * 60 * 60 * 1000
+ long millisDelta = calNow.getTime().getTime() - calLast.getTime().getTime();
+
+ int dayDelta = (int) (millisDelta / millisPerDay);
+
+ if (dayDelta > 0)
+ {
+ // finalize last day counter
+ counterLast.finalizeDayCounter();
+
+ // add new intermediate empty day counter entries
+ DayCounter counterNew;
+
+ for (int i = 1; i < dayDelta; i++)
+ {
+ // increment date
+ calLast.add(Calendar.DAY_OF_YEAR, 1);
+
+ counterNew = new DayCounter(calLast, false);
+ counterNew.finalizeDayCounter();
+
+ dayCounter.add(counterNew);
+ }
+
+ // add new day counter entry for current day
+ counterNew = new DayCounter(calNow, false);
+
+ dayCounter.add(counterNew);
+
+ // ensure history day count limit
+ setHistoryLimit(dayCounterMax);
+ }
+
+ // update last day counter entry
+ counterLast = (DayCounter) dayCounter.get(dayCounter.size() - 1);
+ counterLast.updateDayCounter(incrementCounter);
+ }
+ }
+
+ /**
+ * Reset message counter history
+ */
+ public void resetHistory()
+ {
+ int max = dayCounterMax;
+
+ setHistoryLimit(0);
+ setHistoryLimit(max);
+ }
+
+ /**
+ * Get message counter history data as string in format
+ *
+ * "day count\n
+ * Date 1, hour counter 0, hour counter 1, ..., hour counter 23\n
+ * Date 2, hour counter 0, hour counter 1, ..., hour counter 23\n
+ * .....
+ * .....
+ * Date n, hour counter 0, hour counter 1, ..., hour counter 23\n"
+ *
+ * @return String message history data string
+ */
+ public String getHistoryAsString()
+ {
+ String ret = "";
+
+ // ensure history counters are up to date
+ updateHistory(false);
+
+ // compile string
+ synchronized (dayCounter)
+ {
+ // first line: history day count
+ ret += dayCounter.size() + "\n";
+
+ // following lines: day counter data
+ for (int i = 0; i < dayCounter.size(); i++)
+ {
+ DayCounter counter = (DayCounter) dayCounter.get(i);
+
+ ret += counter.getDayCounterAsString() + "\n";
+ }
+ }
+
+ return ret;
+ }
+
+ /**
+ * Internal day counter class for one day hour based counter history
+ */
+ private static class DayCounter
+ {
+ static final int HOURS = 24;
+
+ GregorianCalendar date = null;
+ int[] counters = new int[HOURS];
+
+ /**
+ * Constructor
+ *
+ * @param date day counter date
+ * @param isStartDay true first day counter
+ * false follow up day counter
+ */
+ DayCounter(GregorianCalendar date, boolean isStartDay)
+ {
+ // store internal copy of creation date
+ this.date = (GregorianCalendar) date.clone();
+
+ // initialize the array with '0'- values to current hour (if it is not the
+ // first monitored day) and the rest with default values ('-1')
+ int hour = date.get(Calendar.HOUR_OF_DAY);
+
+ for (int i = 0; i < HOURS; i++)
+ {
+ if (i < hour)
+ {
+ if (isStartDay)
+ counters[i] = -1;
+ else
+ counters[i] = 0;
+ }
+ else
+ {
+ counters[i] = -1;
+ }
+ }
+
+ // set the array element of the current hour to '0'
+ counters[hour] = 0;
+ }
+
+ /**
+ * Gets copy of day counter date
+ *
+ * @return GregorianCalendar day counter date
+ */
+ GregorianCalendar getDate()
+ {
+ return (GregorianCalendar) date.clone();
+ }
+
+ /**
+ * Update day counter hour array elements
+ *
+ * @param incrementCounter update current hour counter
+ */
+ void updateDayCounter(boolean incrementCounter)
+ {
+ // get the current hour of the day
+ GregorianCalendar cal = new GregorianCalendar();
+
+ int currentIndex = cal.get(Calendar.HOUR_OF_DAY);
+
+ // check if the last array update is more than 1 hour ago, if so fill all
+ // array elements between the last index and the current index with '0' values
+ boolean bUpdate = false;
+
+ for (int i = 0; i <= currentIndex; i++)
+ {
+ if (counters[i] > -1)
+ {
+ // found first initialized hour counter
+ // -> set all following uninitialized
+ // counter values to 0
+ bUpdate = true;
+ }
+
+ if (bUpdate == true)
+ {
+ if (counters[i] == -1)
+ counters[i] = 0;
+ }
+ }
+
+ // optionally increment current counter
+ if (incrementCounter)
+ {
+ counters[currentIndex]++;
+ }
+ }
+
+ /**
+ * Finalize day counter hour array elements
+ */
+ void finalizeDayCounter()
+ {
+ // a new day has began, so fill all array elements from index to end with
+ // '0' values
+ boolean bFinalize = false;
+
+ for (int i = 0; i < HOURS; i++)
+ {
+ if (counters[i] > -1)
+ {
+ // found first initialized hour counter
+ // -> finalize all following uninitialized
+ // counter values
+ bFinalize = true;
+ }
+
+ if (bFinalize)
+ {
+ if (counters[i] == -1)
+ counters[i] = 0;
+ }
+ }
+ }
+
+ /**
+ * Return day counter data as string with format
+ * "Date, hour counter 0, hour counter 1, ..., hour counter 23"
+ *
+ * @return String day counter data
+ */
+ String getDayCounterAsString()
+ {
+ // first element day counter date
+ DateFormat dateFormat = DateFormat.getDateInstance(DateFormat.SHORT);
+
+ String strData = dateFormat.format(date.getTime());
+
+ // append 24 comma separated hour counter values
+ for (int i = 0; i < HOURS; i++)
+ {
+ strData += "," + counters[i];
+ }
+
+ return strData;
+ }
+ }
+}
+
Added: trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounterManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounterManager.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageCounterManager.java 2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,196 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.impl.messagecounter;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.MessagingComponent;
+
+import java.util.*;
+
+/**
+ *
+ * A MessageCounterManager
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 3465 $</tt>
+ *
+ * $Id: MessageCounterManager.java 3465 2007-12-10 17:32:22Z ataylor $
+ *
+ */
+public class MessageCounterManager implements MessagingComponent
+{
+ private static final Logger log = Logger.getLogger(MessageCounterManager.class);
+
+ private Map messageCounters;
+
+ private boolean started;
+
+ private Timer timer;
+
+ private long period;
+
+ private PingMessageCountersTask task;
+
+ public MessageCounterManager(long period)
+ {
+ messageCounters = new HashMap();
+
+ this.period = period;
+ }
+
+ public synchronized void start()
+ {
+ if (started)
+ {
+ return;
+ }
+
+ // Needs to be daemon
+ timer = new Timer(true);
+
+ task = new PingMessageCountersTask();
+
+ timer.schedule(task, 0, period);
+
+ started = true;
+ }
+
+ public synchronized void stop()
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ //Wait for timer task to stop
+
+ task.stop();
+
+ timer.cancel();
+
+ timer = null;
+
+ started = false;
+ }
+
+ public synchronized void reschedule(long newPeriod)
+ {
+ boolean wasStarted = this.started;
+
+ if (wasStarted)
+ {
+ stop();
+ }
+
+ period = newPeriod;
+
+ if (wasStarted)
+ {
+ start();
+ }
+ }
+
+ public void registerMessageCounter(String name, MessageCounter counter)
+ {
+ synchronized (messageCounters)
+ {
+ messageCounters.put(name, counter);
+ }
+ }
+
+ public MessageCounter unregisterMessageCounter(String name)
+ {
+ synchronized (messageCounters)
+ {
+ return (MessageCounter)messageCounters.remove(name);
+ }
+ }
+
+ public Set getMessageCounters()
+ {
+ synchronized (messageCounters)
+ {
+ return new HashSet(messageCounters.values());
+ }
+ }
+
+ public MessageCounter getMessageCounter(String name)
+ {
+ synchronized (messageCounters)
+ {
+ return (MessageCounter)messageCounters.get(name);
+ }
+ }
+
+ public void resetAllCounters()
+ {
+ synchronized (messageCounters)
+ {
+ Iterator iter = messageCounters.values().iterator();
+
+ while (iter.hasNext())
+ {
+ MessageCounter counter = (MessageCounter)iter.next();
+
+ counter.resetCounter();
+ }
+ }
+ }
+
+ public void resetAllCounterHistories()
+ {
+ synchronized (messageCounters)
+ {
+ Iterator iter = messageCounters.values().iterator();
+
+ while (iter.hasNext())
+ {
+ MessageCounter counter = (MessageCounter)iter.next();
+
+ counter.resetHistory();
+ }
+ }
+ }
+
+ class PingMessageCountersTask extends TimerTask
+ {
+ public synchronized void run()
+ {
+ synchronized (messageCounters)
+ {
+ Iterator iter = messageCounters.values().iterator();
+
+ while (iter.hasNext())
+ {
+ MessageCounter counter = (MessageCounter)iter.next();
+
+ counter.onTimer();
+ }
+ }
+ }
+
+ synchronized void stop()
+ {
+ cancel();
+ }
+ }
+}
Added: trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageStatistics.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageStatistics.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/messagecounter/MessageStatistics.java 2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,340 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.messaging.core.impl.messagecounter;
+
+import java.io.Serializable;
+import java.text.DateFormat;
+import java.util.Date;
+
+//FIXME this doesn't belong here
+
+/**
+ * Message statistics
+ *
+ * @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
+ * @version <tt>$Revision: 1.3 $</tt>
+ */
+public class MessageStatistics implements Serializable
+{
+ // Constants -----------------------------------------------------
+
+ /** The serialVersionUID */
+ static final long serialVersionUID = 8056884098781414022L;
+
+ // Attributes ----------------------------------------------------
+
+ /** Whether we are topic */
+ private boolean topic;
+
+ /** Whether we are durable */
+ private boolean durable;
+
+ /** The name */
+ private String name;
+
+ /** The subscription id */
+ private String subscriptionID;
+
+ /** The message count */
+ private int count;
+
+ /** The message count delta */
+ private int countDelta;
+
+ /** The message depth */
+ private int depth;
+
+ /** The message depth delta */
+ private int depthDelta;
+
+ /** The last update */
+ private long timeLastUpdate;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ /**
+ * Construct a new Message Statistics
+ */
+ public MessageStatistics()
+ {
+ }
+
+ // Public --------------------------------------------------------
+
+ /**
+ * Get the count.
+ *
+ * @return Returns the count.
+ */
+ public int getCount()
+ {
+ return count;
+ }
+
+ /**
+ * Set the count.
+ *
+ * @param count The count to set.
+ */
+ public void setCount(int count)
+ {
+ this.count = count;
+ }
+
+ /**
+ * Get the countDelta.
+ *
+ * @return Returns the countDelta.
+ */
+ public int getCountDelta()
+ {
+ return countDelta;
+ }
+
+ /**
+ * Set the countDelta.
+ *
+ * @param countDelta The countDelta to set.
+ */
+ public void setCountDelta(int countDelta)
+ {
+ this.countDelta = countDelta;
+ }
+
+ /**
+ * Get the depth.
+ *
+ * @return Returns the depth.
+ */
+ public int getDepth()
+ {
+ return depth;
+ }
+
+ /**
+ * Set the depth.
+ *
+ * @param depth The depth to set.
+ */
+ public void setDepth(int depth)
+ {
+ this.depth = depth;
+ }
+
+ /**
+ * Get the depthDelta.
+ *
+ * @return Returns the depthDelta.
+ */
+ public int getDepthDelta()
+ {
+ return depthDelta;
+ }
+
+ /**
+ * Set the depthDelta.
+ *
+ * @param depthDelta The depthDelta to set.
+ */
+ public void setDepthDelta(int depthDelta)
+ {
+ this.depthDelta = depthDelta;
+ }
+
+ /**
+ * Get the durable.
+ *
+ * @return Returns the durable.
+ */
+ public boolean isDurable()
+ {
+ return durable;
+ }
+
+ /**
+ * Set the durable.
+ *
+ * @param durable The durable to set.
+ */
+ public void setDurable(boolean durable)
+ {
+ this.durable = durable;
+ }
+
+ /**
+ * Get the name.
+ *
+ * @return Returns the name.
+ */
+ public String getName()
+ {
+ return name;
+ }
+
+ /**
+ * Set the name.
+ *
+ * @param name The name to set.
+ */
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ /**
+ * Get the subscriptionID.
+ *
+ * @return Returns the subscriptionID.
+ */
+ public String getSubscriptionID()
+ {
+ return subscriptionID;
+ }
+
+ /**
+ * Set the subscriptionID.
+ *
+ * @param subscriptionID The subscriptionID to set.
+ */
+ public void setSubscriptionID(String subscriptionID)
+ {
+ this.subscriptionID = subscriptionID;
+ }
+
+ /**
+ * Get the timeLastUpdate.
+ *
+ * @return Returns the timeLastUpdate.
+ */
+ public long getTimeLastUpdate()
+ {
+ return timeLastUpdate;
+ }
+
+ /**
+ * Set the timeLastUpdate.
+ *
+ * @param timeLastUpdate The timeLastUpdate to set.
+ */
+ public void setTimeLastUpdate(long timeLastUpdate)
+ {
+ this.timeLastUpdate = timeLastUpdate;
+ }
+
+ /**
+ * Get the topic.
+ *
+ * @return Returns the topic.
+ */
+ public boolean isTopic()
+ {
+ return topic;
+ }
+
+ /**
+ * Set the topic.
+ *
+ * @param topic The topic to set.
+ */
+ public void setTopic(boolean topic)
+ {
+ this.topic = topic;
+ }
+
+ /**
+ * Get message data as string in format
+ *
+ * "Topic/Queue, Name, Subscription, Durable, Count, CountDelta,
+ * Depth, DepthDelta, Timestamp Last Increment"
+ *
+ * @return String data as a string
+ */
+ public String getAsString()
+ {
+ StringBuffer buffer = new StringBuffer(50);
+
+ // Topic/Queue
+ if (topic)
+ buffer.append("Topic,");
+ else
+ buffer.append("Queue,");
+
+ // name
+ buffer.append(name).append(',');
+
+ // subscription
+ if (subscriptionID != null)
+ buffer.append(subscriptionID).append(',');
+ else
+ buffer.append("-,");
+
+ // Durable subscription
+ if (topic)
+ {
+ // Topic
+ if (durable)
+ buffer.append("DURABLE,");
+ else
+ buffer.append("NONDURABLE,");
+ }
+ else
+ {
+ buffer.append("-,");
+ }
+
+ // counter values
+ buffer.append(count).append(',').append(countDelta).append(',').append(depth).append(',').append(depthDelta)
+ .append(',');
+
+ // timestamp last counter update
+ if (timeLastUpdate > 0)
+ {
+ DateFormat dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM);
+
+ buffer.append(dateFormat.format(new Date(timeLastUpdate)));
+ }
+ else
+ {
+ buffer.append('-');
+ }
+
+ return buffer.toString();
+ }
+
+ // Object overrides ----------------------------------------------
+
+ public String toString()
+ {
+ return getAsString();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
\ No newline at end of file
Added: trunk/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java 2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,113 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.impl.postoffice;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import org.jboss.messaging.core.PostOffice;
+import org.jboss.messaging.util.Streamable;
+
+/**
+ *
+ * A ClusterRequest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1917 $</tt>
+ *
+ * $Id: ClusterRequest.java 1917 2007-01-08 20:26:12Z clebert.suconic at jboss.com $
+ *
+ */
+public abstract class ClusterRequest implements Streamable
+{
+ public static final int JOIN_CLUSTER_REQUEST = 1;
+
+ public static final int LEAVE_CLUSTER_REQUEST = 2;
+
+ public static final int BIND_REQUEST = 3;
+
+ public static final int UNBIND_REQUEST = 4;
+
+ public static final int MESSAGE_REQUEST = 5;
+
+ protected static final int NULL = 0;
+
+ protected static final int NOT_NULL = 1;
+
+ /*
+ * Factory method
+ */
+ static ClusterRequest createFromStream(DataInputStream dais) throws Exception
+ {
+ byte type = dais.readByte();
+
+ ClusterRequest request = null;
+
+ switch (type)
+ {
+ case MESSAGE_REQUEST:
+ {
+ request = new MessageRequest();
+ break;
+ }
+// case BIND_REQUEST:
+// {
+// request = new BindRequest();
+// break;
+// }
+// case UNBIND_REQUEST:
+// {
+// request = new UnbindRequest();
+// break;
+// }
+// case JOIN_CLUSTER_REQUEST:
+// {
+// request = new JoinClusterRequest();
+// break;
+// }
+// case LEAVE_CLUSTER_REQUEST:
+// {
+// request = new LeaveClusterRequest();
+// break;
+// }
+ default:
+ {
+ throw new IllegalArgumentException("Invalid type: " + type);
+ }
+ }
+
+ request.read(dais);
+
+ return request;
+ }
+
+ public static void writeToStream(DataOutputStream daos, ClusterRequest request) throws Exception
+ {
+ daos.writeByte(request.getType());
+
+ request.write(daos);
+ }
+
+ abstract Object execute(PostOffice office) throws Exception;
+
+ abstract byte getType();
+}
Added: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessageRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessageRequest.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessageRequest.java 2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,130 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.impl.postoffice;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import org.jboss.messaging.core.Condition;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.PostOffice;
+import org.jboss.messaging.core.impl.ConditionImpl;
+import org.jboss.messaging.core.impl.MessageImpl;
+
+/**
+ * A MessageRequest
+ *
+ * Used when sending a single message non reliably across the group
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 2202 $</tt>
+ *
+ * $Id: MessageRequest.java 2202 2007-02-08 10:50:26Z timfox $
+ *
+ */
+public class MessageRequest extends ClusterRequest
+{
+ private Condition condition;
+
+ private Message message;
+
+ //private Set queueNames;
+
+ MessageRequest()
+ {
+ }
+
+ MessageRequest(Condition condition, Message message)//, Set queueNames)
+ {
+ this.condition = condition;
+
+ this.message = message;
+
+ //this.queueNames = queueNames;
+ }
+
+ Object execute(PostOffice office) throws Exception
+ {
+ office.routeFromCluster(condition, message);
+
+ return null;
+ }
+
+ byte getType()
+ {
+ return ClusterRequest.MESSAGE_REQUEST;
+ }
+
+ public void read(DataInputStream in) throws Exception
+ {
+ condition = new ConditionImpl();
+
+ condition.read(in);
+
+ message = new MessageImpl();
+
+ message.read(in);
+
+// byte b = in.readByte();
+//
+// if (b != NULL)
+// {
+// int size = in.readInt();
+//
+// queueNames = new HashSet(size);
+//
+// for (int i = 0; i < size; i++)
+// {
+// String queueName = in.readUTF();
+//
+// queueNames.add(queueName);
+// }
+// }
+ }
+
+ public void write(DataOutputStream out) throws Exception
+ {
+ condition.write(out);
+
+ message.write(out);
+
+// if (queueNames == null)
+// {
+// out.writeByte(NULL);
+// }
+// else
+// {
+// out.writeByte(NOT_NULL);
+//
+// out.writeInt(queueNames.size());
+//
+// Iterator iter = queueNames.iterator();
+//
+// while (iter.hasNext())
+// {
+// String queueName = (String)iter.next();
+//
+// out.writeUTF(queueName);
+// }
+// }
+ }
+}
Added: trunk/src/main/org/jboss/messaging/core/impl/postoffice/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/PostOfficeImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/PostOfficeImpl.java 2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,369 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.impl.postoffice;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.jms.TextMessage;
+
+import org.jboss.jms.message.JBossMessage;
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Binding;
+import org.jboss.messaging.core.Condition;
+import org.jboss.messaging.core.Filter;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.PersistenceManager;
+import org.jboss.messaging.core.PostOffice;
+import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.QueueFactory;
+import org.jboss.messaging.core.TransactionSynchronization;
+import org.jboss.messaging.core.impl.BindingImpl;
+import org.jboss.messaging.util.ConcurrentHashSet;
+
+/**
+ *
+ * A PostOfficeImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class PostOfficeImpl implements PostOffice
+{
+ private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
+
+ private int nodeID;
+
+ // private Map<Integer, Map<String, Queue>> queues = new HashMap<Integer, Map<String, Queue>>();
+
+ private Map<Condition, List<Binding>> mappings = new ConcurrentHashMap<Condition, List<Binding>>();
+
+ private Set<Condition> conditions = new ConcurrentHashSet<Condition>();
+
+ private PersistenceManager persistenceManager;
+
+ private QueueFactory queueFactory;
+
+ public PostOfficeImpl(int nodeID, PersistenceManager persistenceManager, QueueFactory queueFactory)
+ {
+ this.nodeID = nodeID;
+
+ this.persistenceManager = persistenceManager;
+
+ this.queueFactory = queueFactory;
+ }
+
+ // MessagingComponent implementation ---------------------------------------
+
+ public void start() throws Exception
+ {
+ loadBindings();
+ }
+
+ public void stop() throws Exception
+ {
+ mappings.clear();
+
+ conditions.clear();
+ }
+
+ // PostOffice implementation -----------------------------------------------
+
+ public Queue addQueue(Condition condition, String name, Filter filter,
+ boolean durable, boolean temporary, boolean allNodes) throws Exception
+ {
+ Binding binding = createBinding(condition, name, filter, durable, temporary, allNodes);
+
+ addBindingInMemory(binding);
+
+ if (durable)
+ {
+ persistenceManager.addBinding(binding);
+ }
+
+ return binding.getQueue();
+ }
+
+ public boolean removeQueue(Condition condition, String name, boolean allNodes) throws Exception
+ {
+ Binding binding = removeQueueInMemory(condition, name);
+
+ if (binding != null)
+ {
+ if (binding.getQueue().isDurable())
+ {
+ persistenceManager.deleteBinding(binding);
+ }
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public void addCondition(Condition condition)
+ {
+ conditions.add(condition);
+ }
+
+ public boolean removeCondition(Condition condition)
+ {
+ return conditions.remove(condition);
+ }
+
+ public boolean containsCondition(Condition condition)
+ {
+ return conditions.contains(condition);
+ }
+
+ public void route(Condition condition, Message message) throws Exception
+ {
+ // boolean routeRemote = false;
+
+ List<Binding> bindings = mappings.get(condition);
+
+ if (bindings != null)
+ {
+ for (Binding binding: bindings)
+ {
+ Queue queue = binding.getQueue();
+
+ if (queue.getFilter() == null || queue.getFilter().match(message))
+ {
+ if (binding.getNodeID() == nodeID)
+ {
+ //Local queue
+
+ message.createReference(queue);
+ }
+ else
+ {
+// if (!queue.isDurable())
+// {
+// //Remote queue - we never route to remote durable queues since we will lose atomicity in event
+// //of crash - for moving between durable queues we use message redistribution
+//
+// routeRemote = true;
+// }
+ }
+ }
+ }
+ }
+
+
+// if (routeRemote)
+// {
+// tx.addSynchronization(new CastMessageCallback(new MessageRequest(condition, message)));
+// }
+ }
+
+ public void routeFromCluster(Condition condition, Message message) throws Exception
+ {
+ List<Binding> bindings = mappings.get(condition);
+
+ for (Binding binding: bindings)
+ {
+ Queue queue = binding.getQueue();
+
+ if (binding.getNodeID() == nodeID)
+ {
+ if (queue.getFilter() == null || queue.getFilter().match(message))
+ {
+ MessageReference ref = message.createReference(queue);
+
+ //We never route durably from other nodes - so no need to persist
+
+ queue.addLast(ref);
+ }
+ }
+ }
+ }
+
+ public Map<Condition, List<Binding>> getMappings()
+ {
+ return mappings;
+ }
+
+ public List<Binding> getBindingsForQueueName(String queueName)
+ {
+ List<Binding> list = new ArrayList<Binding>();
+
+ for (List<Binding> bindings: mappings.values())
+ {
+ for (Binding binding: bindings)
+ {
+ if (binding.getQueue().getName().equals(queueName) && binding.getNodeID() == nodeID)
+ {
+ list.add(binding);
+ }
+ }
+ }
+ return list;
+ }
+
+ public List<Binding> getBindingsForCondition(Condition condition)
+ {
+ List<Binding> list = new ArrayList<Binding>();
+
+ List<Binding> bindings = mappings.get(condition);
+
+ if (bindings != null)
+ {
+ for (Binding binding: bindings)
+ {
+ if (binding.getNodeID() == nodeID)
+ {
+ list.add(binding);
+ }
+ }
+ }
+
+ return list;
+ }
+
+ // Private -----------------------------------------------------------------
+
+ private Binding createBinding(Condition condition, String name, Filter filter,
+ boolean durable, boolean temporary, boolean allNodes)
+ {
+ Queue queue = queueFactory.createQueue(-1, name, filter, durable, temporary);
+
+ Binding binding = new BindingImpl(this.nodeID, condition, queue, allNodes);
+
+ return binding;
+ }
+
+ private void addBindingInMemory(Binding binding) throws Exception
+ {
+ List<Binding> bindings = mappings.get(binding.getCondition());
+
+ if (bindings == null)
+ {
+ bindings = new CopyOnWriteArrayList<Binding>();
+
+ mappings.put(binding.getCondition(), bindings);
+ }
+
+ bindings.add(binding);
+
+ // Map<String, Queue> nameMap = queues.get(nodeID);
+ //
+ // if (nameMap == null)
+ // {
+ // nameMap = new HashMap<String, Queue>();
+ //
+ // queues.put(nodeID, nameMap);
+ // }
+ //
+ // nameMap.put(name, queue);
+ }
+
+ private Binding removeQueueInMemory(Condition condition, String name) throws Exception
+ {
+ Binding binding = null;
+
+ // Map<String, Queue> nameMap = queues.get(nodeID);
+ //
+ // nameMap.remove(name);
+ //
+ // if (nameMap.isEmpty())
+ // {
+ // queues.remove(nodeID);
+ // }
+
+ List<Binding> bindings = mappings.get(condition);
+
+ for (Iterator<Binding> iter = bindings.iterator(); iter.hasNext();)
+ {
+ Binding b = iter.next();
+
+ if (b.getQueue().getName().equals(name))
+ {
+ binding = b;
+
+ break;
+ }
+ }
+
+ if (binding != null)
+ {
+ bindings.remove(binding);
+ }
+
+ if (bindings.isEmpty())
+ {
+ mappings.remove(condition);
+ }
+
+ return binding;
+ }
+
+ private void loadBindings() throws Exception
+ {
+ List<Binding> bindings = persistenceManager.loadBindings(queueFactory);
+
+ for (Binding binding: bindings)
+ {
+ addBindingInMemory(binding);
+ }
+ }
+
+ private void deleteMappingsForNode(int theNodeID)
+ {
+
+ }
+
+ private class CastMessageCallback implements TransactionSynchronization
+ {
+ private ClusterRequest request;
+
+ CastMessageCallback(ClusterRequest request)
+ {
+ this.request = request;
+ }
+
+ public void afterCommit() throws Exception
+ {
+ //TODO - cast request
+ }
+
+ public void afterRollback() throws Exception
+ {
+ }
+
+ public void beforeCommit() throws Exception
+ {
+ }
+
+ public void beforeRollback() throws Exception
+ {
+ }
+ }
+}
Added: trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java 2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,480 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.impl.server;
+
+import java.beans.PropertyChangeEvent;
+import java.beans.PropertyChangeListener;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.jboss.aop.microcontainer.aspects.jmx.JMX;
+import org.jboss.jms.server.ConnectionFactoryManager;
+import org.jboss.jms.server.ConnectionManager;
+import org.jboss.jms.server.DestinationJNDIMapper;
+import org.jboss.jms.server.DestinationManager;
+import org.jboss.jms.server.MessagingTimeoutFactory;
+import org.jboss.jms.server.SecurityStore;
+import org.jboss.jms.server.TransactionRepository;
+import org.jboss.jms.server.connectionfactory.ConnectionFactoryDeployer;
+import org.jboss.jms.server.connectionfactory.ConnectionFactoryJNDIMapper;
+import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
+import org.jboss.jms.server.destination.DestinationDeployer;
+import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
+import org.jboss.jms.server.plugin.contract.JMSUserManager;
+import org.jboss.jms.server.security.SecurityMetadataStore;
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Binding;
+import org.jboss.messaging.core.Condition;
+import org.jboss.messaging.core.Configuration;
+import org.jboss.messaging.core.DestinationType;
+import org.jboss.messaging.core.MemoryManager;
+import org.jboss.messaging.core.MessagingServer;
+import org.jboss.messaging.core.PersistenceManager;
+import org.jboss.messaging.core.PostOffice;
+import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.impl.ConditionImpl;
+import org.jboss.messaging.core.impl.QueueFactoryImpl;
+import org.jboss.messaging.core.impl.messagecounter.MessageCounterManager;
+import org.jboss.messaging.core.impl.postoffice.PostOfficeImpl;
+import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.newcore.impl.memory.SimpleMemoryManager;
+import org.jboss.messaging.util.ExceptionUtil;
+import org.jboss.messaging.util.Version;
+
+/**
+ * A Messaging Server
+ *
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:juha at jboss.org">Juha Lindfors</a>
+ * @author <a href="mailto:aslak at conduct.no">Aslak Knutsen</a>
+ * @author <a href="mailto:ataylor at redhat.com>Andy Taylor</a>
+ * @version <tt>$Revision: 3543 $</tt>
+ * <p/>
+ * $Id: ServerPeer.java 3543 2008-01-07 22:31:58Z clebert.suconic at jboss.com $
+ */
+ at JMX(name = "jboss.messaging:service=MessagingServer", exposedInterface = MessagingServer.class)
+public class MessagingServerImpl implements MessagingServer
+{
+ // Constants ------------------------------------------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(MessagingServerImpl.class);
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ private Version version;
+
+ private boolean started;
+
+ //private boolean supportsFailover = true;
+
+ private Map<String, ServerSessionEndpoint> sessions;
+
+ // wired components
+
+ private DestinationManager destinationJNDIMapper;
+ private SecurityMetadataStore securityStore;
+ private ConnectionFactoryJNDIMapper connFactoryJNDIMapper;
+ private SimpleConnectionManager connectionManager;
+ private MemoryManager memoryManager;
+ private MessageCounterManager messageCounterManager;
+ private TransactionRepository transactionRepository = new TransactionRepository();
+ private PostOffice postOffice;
+ private DestinationDeployer destinationDeployer;
+ private ConnectionFactoryDeployer connectionFactoryDeployer;
+
+ // plugins
+
+ private PersistenceManager persistenceManager;
+
+ private JMSUserManager jmsUserManager;
+
+ private MinaService minaService;
+
+ private Configuration configuration;
+
+ // Constructors ---------------------------------------------------------------------------------
+ public MessagingServerImpl() throws Exception
+ {
+ // Some wired components need to be started here
+
+ version = Version.instance();
+
+ sessions = new ConcurrentHashMap<String, ServerSessionEndpoint>();
+
+ started = false;
+ }
+
+ // lifecycle methods ----------------------------------------------------------------
+
+ public synchronized void start() throws Exception
+ {
+ try
+ {
+ log.debug("starting MessagingServer");
+
+ if (started)
+ {
+ return;
+ }
+
+ if (configuration.getMessagingServerID() < 0)
+ {
+ throw new IllegalStateException("MessagingServer ID not set");
+ }
+
+ log.debug(this + " starting");
+
+ // Create the wired components
+
+ securityStore = new SecurityMetadataStore(this);
+ destinationJNDIMapper = new DestinationJNDIMapper(this);
+ connFactoryJNDIMapper = new ConnectionFactoryJNDIMapper(this);
+ connectionManager = new SimpleConnectionManager();
+ memoryManager = new SimpleMemoryManager();
+ destinationDeployer = new DestinationDeployer(this);
+ connectionFactoryDeployer = new ConnectionFactoryDeployer(this, minaService);
+ messageCounterManager = new MessageCounterManager(configuration.getMessageCounterSamplePeriod());
+ configuration.addPropertyChangeListener(new PropertyChangeListener()
+ {
+ public void propertyChange(PropertyChangeEvent evt)
+ {
+ if(evt.getPropertyName().equals("messageCounterSamplePeriod"))
+ messageCounterManager.reschedule(configuration.getMessageCounterSamplePeriod());
+ }
+ });
+ postOffice = new PostOfficeImpl(configuration.getMessagingServerID(),
+ persistenceManager, new QueueFactoryImpl());
+
+ // Start the wired components
+
+ destinationJNDIMapper.start();
+ connFactoryJNDIMapper.start();
+ connectionManager.start();
+ memoryManager.start();
+ securityStore.start();
+ connectionFactoryDeployer.start();
+ destinationDeployer.start();
+ postOffice.start();
+
+ started = true;
+ log.info("JBoss Messaging " + getVersion().getProviderVersion() + " server [" +
+ configuration.getMessagingServerID() + "] started");
+ }
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMXInvocation(t, this + " startService");
+ }
+ }
+
+ public synchronized void stop() throws Exception
+ {
+ try
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ log.info(this + " is Stopping. NOTE! Stopping the server peer cleanly will NOT cause failover to occur");
+
+ started = false;
+
+ // Stop the wired components
+ destinationDeployer.stop();
+ destinationDeployer = null;
+ connectionFactoryDeployer.stop();
+ connectionFactoryDeployer = null;
+ destinationJNDIMapper.stop();
+ destinationJNDIMapper = null;
+ connFactoryJNDIMapper.stop();
+ connFactoryJNDIMapper = null;
+ connectionManager.stop();
+ connectionManager = null;
+ memoryManager.stop();
+ memoryManager = null;
+ securityStore.stop();
+ messageCounterManager.stop();
+ messageCounterManager = null;
+ postOffice.stop();
+ postOffice = null;
+
+ MessagingTimeoutFactory.instance.reset();
+
+ log.info("JMS " + this + " stopped");
+ }
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMXInvocation(t, this + " stopService");
+ }
+ }
+
+
+ // MessagingServer implementation -----------------------------------------------------------
+
+ public Version getVersion()
+ {
+ return version;
+ }
+
+ public Configuration getConfiguration()
+ {
+ return configuration;
+ }
+
+ public boolean isStarted()
+ {
+ return started;
+ }
+
+ public void setConfiguration(Configuration configuration)
+ {
+ this.configuration = configuration;
+ }
+
+ public void setMinaService(MinaService minaService)
+ {
+ this.minaService = minaService;
+ }
+
+ public MinaService getMinaService()
+ {
+ return minaService;
+ }
+
+ public ServerSessionEndpoint getSession(String sessionID)
+ {
+ return (ServerSessionEndpoint) sessions.get(sessionID);
+ }
+
+ public Collection<ServerSessionEndpoint> getSessions()
+ {
+ return sessions.values();
+ }
+
+ public void addSession(String id, ServerSessionEndpoint session)
+ {
+ sessions.put(id, session);
+ }
+
+ public void removeSession(String id)
+ {
+ if (sessions.remove(id) == null)
+ {
+ throw new IllegalStateException("Cannot find session with id " + id + " to remove");
+ }
+ }
+
+ public synchronized Queue getDefaultDLQInstance() throws Exception
+ {
+ if (configuration.getDefaultDLQ() != null)
+ {
+ List<Binding> bindings = postOffice.getBindingsForQueueName(configuration.getDefaultDLQ());
+
+ if (bindings.isEmpty())
+ {
+ throw new IllegalStateException("Cannot find binding for queue " + configuration.getDefaultDLQ());
+ }
+
+ return bindings.get(0).getQueue();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public synchronized Queue getDefaultExpiryQueueInstance() throws Exception
+ {
+ if (configuration.getDefaultExpiryQueue() != null)
+ {
+ List<Binding> bindings = postOffice.getBindingsForQueueName(configuration.getDefaultExpiryQueue());
+
+ if (bindings.isEmpty())
+ {
+ throw new IllegalStateException("Cannot find binding for queue " + configuration.getDefaultExpiryQueue());
+ }
+
+ return bindings.get(0).getQueue();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public void enableMessageCounters()
+ {
+ messageCounterManager.start();
+ }
+
+ public void disableMessageCounters()
+ {
+ messageCounterManager.stop();
+
+ messageCounterManager.resetAllCounters();
+
+ messageCounterManager.resetAllCounterHistories();
+ }
+
+ public void createQueue(String name, String jndiName) throws Exception
+ {
+ destinationDeployer.createQueue(name, jndiName);
+ }
+
+ public void destroyQueue(String name, String jndiName) throws Exception
+ {
+ destinationDeployer.destroyQueue(name, jndiName);
+ }
+
+ public void createTopic(String name, String jndiName) throws Exception
+ {
+ this.destinationDeployer.createTopic(name, jndiName);
+ }
+
+ public void destroyTopic(String name, String jndiName) throws Exception
+ {
+ destinationDeployer.destroyTopic(name, jndiName);
+ }
+
+ public void resetAllMessageCounters()
+ {
+ this.messageCounterManager.resetAllCounters();
+ }
+
+ public void resetAllMessageCounterHistories()
+ {
+ this.messageCounterManager.resetAllCounterHistories();
+ }
+
+ public void removeAllMessagesForQueue(String queueName) throws Exception
+ {
+ Condition condition = new ConditionImpl(DestinationType.QUEUE, queueName);
+
+ List<Binding> bindings = postOffice.getBindingsForCondition(condition);
+
+ if (!bindings.isEmpty())
+ {
+ Queue queue = bindings.get(0).getQueue();
+
+ persistenceManager.deleteAllReferences(queue);
+
+ queue.removeAllReferences();
+ }
+ }
+
+ public void removeAllMessagesForTopic(String queueName) throws Exception
+ {
+ Condition condition = new ConditionImpl(DestinationType.QUEUE, queueName);
+
+ List<Binding> bindings = postOffice.getBindingsForCondition(condition);
+
+ for (Binding binding: bindings)
+ {
+ Queue queue = binding.getQueue();
+
+ if (queue.isDurable())
+ {
+ persistenceManager.deleteAllReferences(queue);
+ }
+
+ queue.removeAllReferences();
+ }
+ }
+
+ public SecurityStore getSecurityManager()
+ {
+ return securityStore;
+ }
+
+ public DestinationManager getDestinationManager()
+ {
+ return destinationJNDIMapper;
+ }
+
+ public ConnectionFactoryManager getConnectionFactoryManager()
+ {
+ return connFactoryJNDIMapper;
+ }
+
+ public ConnectionManager getConnectionManager()
+ {
+ return connectionManager;
+ }
+
+ public MemoryManager getMemoryManager()
+ {
+ return memoryManager;
+ }
+
+ public TransactionRepository getTransactionRepository()
+ {
+ return transactionRepository;
+ }
+
+ public PersistenceManager getPersistenceManager()
+ {
+ return persistenceManager;
+ }
+
+ public void setPersistenceManager(PersistenceManager persistenceManager)
+ {
+ this.persistenceManager = persistenceManager;
+ }
+
+ public JMSUserManager getJmsUserManagerInstance()
+ {
+ return jmsUserManager;
+ }
+
+ public void setJmsUserManager(JMSUserManager jmsUserManager)
+ {
+ this.jmsUserManager = jmsUserManager;
+ }
+
+ public PostOffice getPostOffice()
+ {
+ return postOffice;
+ }
+
+ public void setPostOffice(PostOffice postOffice)
+ {
+ this.postOffice = postOffice;
+ }
+
+ public String toString()
+ {
+ return "MessagingServer[" + configuration.getMessagingServerID() + "]";
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+}
Added: trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java 2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,981 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.impl.server;
+
+import java.io.CharArrayWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.jboss.aop.microcontainer.aspects.jmx.JMX;
+import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
+import org.jboss.jms.server.selector.Selector;
+import org.jboss.messaging.core.Binding;
+import org.jboss.messaging.core.Condition;
+import org.jboss.messaging.core.Configuration;
+import org.jboss.messaging.core.DestinationType;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.MessagingServer;
+import org.jboss.messaging.core.MessagingServerManagement;
+import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.impl.ConditionImpl;
+import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
+import org.jboss.messaging.util.MessageQueueNameHelper;
+
+/**
+ * This interface describes the properties and operations that comprise the management interface of the
+ * Messaging Server.
+ *
+ * It includes operations to create and destroy queues and provides various statistics measures
+ * such as message count for queues and topics.
+ *
+ * @author <a href="mailto:ataylor at redhat.com>Andy Taylor</a>
+ * @author <a href="mailto:ataylor at redhat.com>Andy Taylor</a>
+ *
+ */
+ at JMX(name = "jboss.messaging:service=MessagingServerManagement", exposedInterface = MessagingServerManagement.class)
+public class MessagingServerManagementImpl implements MessagingServerManagement
+{
+ private MessagingServer messagingServer;
+
+ public void setMessagingServer(MessagingServer messagingServer)
+ {
+ this.messagingServer = messagingServer;
+ }
+
+ public String getServerVersion()
+ {
+ return messagingServer.getVersion().getProviderVersion();
+ }
+
+ public Configuration getConfiguration()
+ {
+ return messagingServer.getConfiguration();
+ }
+
+ public boolean isStarted()
+ {
+ return messagingServer.isStarted();
+ }
+
+ public void createQueue(String name, String jndiName) throws Exception
+ {
+ messagingServer.createQueue(name, jndiName);
+ }
+
+ public void createTopic(String name, String jndiName) throws Exception
+ {
+ messagingServer.createTopic(name, jndiName);
+ }
+
+ public void destroyQueue(String name, String jndiName) throws Exception
+ {
+ messagingServer.destroyQueue(name, jndiName);
+ }
+
+ public void destroyTopic(String name, String jndiName) throws Exception
+ {
+ messagingServer.destroyTopic(name, jndiName);
+ }
+
+ public void removeAllMessagesForQueue(String queueName) throws Exception
+ {
+ messagingServer.removeAllMessagesForQueue(queueName);
+ }
+
+ public void removeAllMessagesForTopic(String topicName) throws Exception
+ {
+ messagingServer.removeAllMessagesForTopic(topicName);
+ }
+
+ public int getMessageCountForQueue(String queue) throws Exception
+ {
+ return getQueue(queue).getMessageCount();
+ }
+
+ public List<SubscriptionInfo> listAllSubscriptionsForTopic(String topicName) throws Exception
+ {
+ return listAllSubscriptions(topicName);
+ }
+
+// public int getDeliveringCountForQueue(String queue) throws Exception
+// {
+// return getQueue(queue).getDeliveringCount();
+// }
+
+ public int getScheduledMessageCountForQueue(String queue) throws Exception
+ {
+ return getQueue(queue).getScheduledCount();
+ }
+
+// public MessageCounter getMessageCounterForQueue(String queue) throws Exception
+// {
+// return getQueue(queue).getMessageCounter();
+// }
+
+// public MessageStatistics getMessageStatisticsForQueue(String queue) throws Exception
+// {
+// List counters = new ArrayList();
+// counters.add(getQueue(queue).getMessageCounter());
+//
+// List stats = MessageCounter.getMessageStatistics(counters);
+//
+// return (MessageStatistics)stats.get(0);
+// }
+
+ public int getConsumerCountForQueue(String queue) throws Exception
+ {
+ return getQueue(queue).getConsumerCount();
+ }
+
+// public void resetMessageCounterForQueue(String queue) throws Exception
+// {
+// getMessageCounterForQueue(queue).resetCounter();
+// }
+//
+// public void resetMessageCounterHistoryForQueue(String queue) throws Exception
+// {
+// getMessageCounterForQueue(queue).resetHistory();
+// }
+
+// public List<Message> listAllMessagesForQueue(String queue) throws Exception
+// {
+// return getQueue(queue).listAllMessages(null);
+// }
+
+ public List<Message> listAllMessagesForQueue(String queue, String selector) throws Exception
+ {
+ return listAllMessages(getQueue(queue), selector);
+ }
+
+ public List<Message> listDurableMessagesForQueue(String queue) throws Exception
+ {
+ return listDurableMessages(getQueue(queue), null);
+ }
+
+ public List<Message> listDurableMessagesForQueue(String queue, String selector) throws Exception
+ {
+ return listDurableMessages(getQueue(queue), selector);
+ }
+
+ public List<Message> listNonDurableMessagesForQueue(String queue) throws Exception
+ {
+ return listNonDurableMessages(getQueue(queue), null);
+ }
+
+ public List<Message> listNonDurableMessagesForQueue(String queue, String selector) throws Exception
+ {
+ return listNonDurableMessages(getQueue(queue), selector);
+ }
+
+// public String listMessageCounterAsHTMLForQueue(String queue) throws Exception
+// {
+// return listMessageCounterAsHTML(new MessageCounter[] { getMessageCounterForQueue(queue) });
+// }
+//
+// public String listMessageCounterHistoryAsHTMLForQueue(String queue) throws Exception
+// {
+// return listMessageCounterHistoryAsHTML(new MessageCounter[] {getMessageCounterForQueue(queue)});
+// }
+
+ public int getAllMessageCountForTopic(String topicName) throws Exception
+ {
+ return getAllMessageCountForTopic(topicName);
+ }
+
+ public int getDurableMessageCountForTopic(String topicName) throws Exception
+ {
+ return getDurableMessageCountForTopic(topicName);
+ }
+
+ public int getNonDurableMessageCountForTopic(String topicName) throws Exception
+ {
+ return getNonDurableMessageCountForTopic(topicName);
+ }
+
+ public int getAllSubscriptionsCountForTopic(String topicName) throws Exception
+ {
+ return getAllSubscriptionsCountForTopic(topicName);
+ }
+
+ public int getDurableSubscriptionsCountForTopic(String topicName) throws Exception
+ {
+ return getDurableSubscriptionsCountForTopic(topicName);
+ }
+
+ public int getNonDurableSubscriptionsCountForTopic(String topicName) throws Exception
+ {
+ return getNonDurableSubscriptionsCountForTopic(topicName);
+ }
+
+
+
+
+ public List<SubscriptionInfo> listDurableSubscriptionsForTopic(String topicName) throws Exception
+ {
+ return listDurableSubscriptions(topicName);
+ }
+
+ public List<SubscriptionInfo> listNonDurableSubscriptionsForTopic(String topicName) throws Exception
+ {
+ return listNonDurableSubscriptions(topicName);
+ }
+
+ public String listAllSubscriptionsAsHTMLForTopic(String topicName) throws Exception
+ {
+ return listAllSubscriptionsAsHTML(topicName);
+ }
+
+ public String listDurableSubscriptionsAsHTMLForTopic(String topicName) throws Exception
+ {
+ return listDurableSubscriptionsAsHTML(topicName);
+ }
+
+ public String listNonDurableSubscriptionsAsHTMLForTopic(String topicName) throws Exception
+ {
+ return listNonDurableSubscriptionsAsHTML(topicName);
+ }
+
+// public List<Message> listAllMessagesForSubscription(String subscriptionId) throws Exception
+// {
+// return listAllMessagesForSubscription(subscriptionId, null);
+// }
+//
+// public List<Message> listAllMessagesForSubscription(String subscriptionId, String selector) throws Exception
+// {
+// return listAllMessages(subscriptionId, selector);
+// }
+//
+// public List<Message> listDurableMessagesForSubscription(String subscriptionId) throws Exception
+// {
+// return listDurableMessages(subscriptionId, null);
+// }
+//
+// public List<Message> listDurableMessagesForTopic(String topicName, String subscriptionId, String selector) throws Exception
+// {
+// return listDurableMessages(topicName, subscriptionId, selector);
+// }
+//
+// public List<Message> listNonDurableMessagesForTopic(String topicName, String subscriptionId) throws Exception
+// {
+// return listNonDurableMessages(subscriptionId, null);
+// }
+//
+// public List<Message> listNonDurableMessagesForTopic(String topicName, String subscriptionId, String selector) throws Exception
+// {
+// return listNonDurableMessages(subscriptionId, selector);
+// }
+
+ public List<MessageCounter> getMessageCountersForTopic(String topicName) throws Exception
+ {
+ return getMessageCounters(topicName);
+ }
+
+ public String showActiveClientsAsHTML() throws Exception
+ {
+ CharArrayWriter charArray = new CharArrayWriter();
+ PrintWriter out = new PrintWriter(charArray);
+
+ List endpoints = messagingServer.getConnectionManager().getActiveConnections();
+
+ out.println("<table><tr><td>ID</td><td>Host</td><td>User</td><td>#Sessions</td></tr>");
+ for (Iterator iter = endpoints.iterator(); iter.hasNext();)
+ {
+ ServerConnectionEndpoint endpoint = (ServerConnectionEndpoint) iter.next();
+
+ out.println("<tr>");
+ out.println("<td>" + endpoint.toString() + "</td>");
+ // FIXME display URI of client
+ out.println("<td>" + endpoint.getUsername() + "</td>");
+ out.println("<td>" + endpoint.getSessions().size() + "</td>");
+ out.println("</tr>");
+ }
+
+ out.println("</table>");
+
+
+ return charArray.toString();
+ }
+
+// public String showPreparedTransactionsAsHTML()
+// {
+// List txs = messagingServer.getTxRepository().getPreparedTransactions();
+// JBossStringBuilder buffer = new JBossStringBuilder();
+// buffer.append("<table width=\"100%\" border=\"1\" cellpadding=\"1\" cellspacing=\"1\">");
+// buffer.append("<tr><th>Xid</th></tr>");
+// for (Iterator i = txs.iterator(); i.hasNext();)
+// {
+// Xid xid = (Xid) i.next();
+// if (xid != null)
+// {
+// buffer.append("<tr><td>");
+// buffer.append(xid);
+// buffer.append("</td></tr>");
+// }
+// }
+// buffer.append("</table>");
+// return buffer.toString();
+// }
+
+// public String listMessageCountersAsHTML() throws Exception
+// {
+// List counters = messagingServer.getMessageCounters();
+//
+// Collections.sort(counters, new Comparator()
+// {
+// public int compare(Object o1, Object o2)
+// {
+// MessageCounter m1 = (MessageCounter) o1;
+// MessageCounter m2 = (MessageCounter) o2;
+// return m1.getDestinationName().compareTo(m2.getDestinationName());
+// }
+// });
+//
+// String ret =
+// "<table width=\"100%\" border=\"1\" cellpadding=\"1\" cellspacing=\"1\">"
+// + "<tr>"
+// + "<th>Type</th>"
+// + "<th>Name</th>"
+// + "<th>Subscription</th>"
+// + "<th>Durable</th>"
+// + "<th>Count</th>"
+// + "<th>CountDelta</th>"
+// + "<th>Depth</th>"
+// + "<th>DepthDelta</th>"
+// + "<th>Last Add</th>"
+// + "</tr>";
+//
+// String strNameLast = null;
+// String strTypeLast = null;
+// String strDestLast = null;
+//
+// String destData = "";
+// int destCount = 0;
+//
+// int countTotal = 0;
+// int countDeltaTotal = 0;
+// int depthTotal = 0;
+// int depthDeltaTotal = 0;
+//
+// int i = 0; // define outside of for statement, so variable
+// // still exists after for loop, because it is
+// // needed during output of last module data string
+//
+// Iterator iter = counters.iterator();
+//
+// while (iter.hasNext())
+// {
+// MessageCounter counter = (MessageCounter) iter.next();
+//
+// // get counter data
+// StringTokenizer tokens = new StringTokenizer(counter.getCounterAsString(), ",");
+//
+// String strType = tokens.nextToken();
+// String strName = tokens.nextToken();
+// String strSub = tokens.nextToken();
+// String strDurable = tokens.nextToken();
+//
+// String strDest = strType + "-" + strName;
+//
+// String strCount = tokens.nextToken();
+// String strCountDelta = tokens.nextToken();
+// String strDepth = tokens.nextToken();
+// String strDepthDelta = tokens.nextToken();
+// String strDate = tokens.nextToken();
+//
+// // update total count / depth values
+// countTotal += Integer.parseInt(strCount);
+// depthTotal += Integer.parseInt(strDepth);
+//
+// countDeltaTotal += Integer.parseInt(strCountDelta);
+// depthDeltaTotal += Integer.parseInt(strDepthDelta);
+//
+// if (strCountDelta.equalsIgnoreCase("0"))
+// strCountDelta = "-"; // looks better
+//
+// if (strDepthDelta.equalsIgnoreCase("0"))
+// strDepthDelta = "-"; // looks better
+//
+// // output destination counter data as HTML table row
+// // ( for topics with multiple subscriptions output
+// // type + name field as rowspans, looks better )
+// if (strDestLast != null && strDestLast.equals(strDest))
+// {
+// // still same destination -> append destination subscription data
+// destData += "<tr bgcolor=\"#" + ((i % 2) == 0 ? "FFFFFF" : "F0F0F0") + "\">";
+// destCount += 1;
+// }
+// else
+// {
+// // startnew destination data
+// if (strDestLast != null)
+// {
+// // store last destination data string
+// ret += "<tr bgcolor=\"#"
+// + ((i % 2) == 0 ? "FFFFFF" : "F0F0F0")
+// + "\"><td rowspan=\""
+// + destCount
+// + "\">"
+// + strTypeLast
+// + "</td><td rowspan=\""
+// + destCount
+// + "\">"
+// + strNameLast
+// + "</td>"
+// + destData;
+//
+// destData = "";
+// }
+//
+// destCount = 1;
+// }
+//
+// // counter data row
+// destData += "<td>"
+// + strSub
+// + "</td>"
+// + "<td>"
+// + strDurable
+// + "</td>"
+// + "<td>"
+// + strCount
+// + "</td>"
+// + "<td>"
+// + strCountDelta
+// + "</td>"
+// + "<td>"
+// + strDepth
+// + "</td>"
+// + "<td>"
+// + strDepthDelta
+// + "</td>"
+// + "<td>"
+// + strDate
+// + "</td>";
+//
+// // store current destination data for change detection
+// strTypeLast = strType;
+// strNameLast = strName;
+// strDestLast = strDest;
+// }
+//
+// if (strDestLast != null)
+// {
+// // store last module data string
+// ret += "<tr bgcolor=\"#"
+// + ((i % 2) == 0 ? "FFFFFF" : "F0F0F0")
+// + "\"><td rowspan=\""
+// + destCount
+// + "\">"
+// + strTypeLast
+// + "</td><td rowspan=\""
+// + destCount
+// + "\">"
+// + strNameLast
+// + "</td>"
+// + destData;
+// }
+//
+// // append summation info
+// ret += "<tr>"
+// + "<td><![CDATA[ ]]></td><td><![CDATA[ ]]></td>"
+// + "<td><![CDATA[ ]]></td><td><![CDATA[ ]]></td><td>"
+// + countTotal
+// + "</td><td>"
+// + (countDeltaTotal == 0 ? "-" : Integer.toString(countDeltaTotal))
+// + "</td><td>"
+// + depthTotal
+// + "</td><td>"
+// + (depthDeltaTotal == 0 ? "-" : Integer.toString(depthDeltaTotal))
+// + "</td><td>Total</td></tr></table>";
+//
+// return ret;
+// }
+
+
+
+ public List<Message> listAllMessages(Queue queue, String selector) throws Exception
+ {
+ return listMessages(queue, ListType.ALL, selector);
+ }
+
+ public List<Message> listDurableMessages(Queue queue, String selector) throws Exception
+ {
+ return listMessages(queue, ListType.DURABLE, selector);
+ }
+
+ public List<Message> listNonDurableMessages(Queue queue, String selector) throws Exception
+ {
+ return listMessages(queue, ListType.NON_DURABLE, selector);
+ }
+
+ public List<SubscriptionInfo> listAllSubscriptions(String topicName) throws Exception
+ {
+ return listSubscriptions(topicName, ListType.ALL);
+ }
+
+ public List<SubscriptionInfo> listDurableSubscriptions(String topicName) throws Exception
+ {
+ return listSubscriptions(topicName, ListType.DURABLE);
+ }
+
+ public List<SubscriptionInfo> listNonDurableSubscriptions(String topicName) throws Exception
+ {
+ return listSubscriptions(topicName, ListType.NON_DURABLE);
+ }
+
+ public String listAllSubscriptionsAsHTML(String topicName) throws Exception
+ {
+ return listSubscriptionsAsHTML(topicName, ListType.ALL);
+ }
+
+ public String listDurableSubscriptionsAsHTML(String topicName) throws Exception
+ {
+ return listSubscriptionsAsHTML(topicName, ListType.DURABLE);
+ }
+
+ public String listNonDurableSubscriptionsAsHTML(String topicName) throws Exception
+ {
+ return listSubscriptionsAsHTML(topicName, ListType.NON_DURABLE);
+ }
+
+ public List<Message> listDurableMessagesForSubscription(String subId, String selector) throws Exception
+ {
+ return listMessagesForSubscription(ListType.DURABLE, subId, selector);
+ }
+
+ public List<Message> listNonDurableMessagesForSubscription(String subId, String selector) throws Exception
+ {
+ return listMessagesForSubscription(ListType.NON_DURABLE, subId, selector);
+ }
+
+ public List<MessageCounter> getMessageCounters(String topicName) throws Exception
+ {
+ List<MessageCounter> counters = new ArrayList<MessageCounter>();
+
+ Condition condition = new ConditionImpl(DestinationType.TOPIC, topicName);
+
+ List<Binding> bindings = messagingServer.getPostOffice().getBindingsForCondition(condition);
+
+ for (Binding binding: bindings)
+ {
+ Queue queue = binding.getQueue();
+
+ //TODO - get message counters
+
+// String counterName = SUBSCRIPTION_MESSAGECOUNTER_PREFIX + queue.getName();
+//
+// MessageCounter counter = messagingServer.getMessageCounterManager().getMessageCounter(counterName);
+//
+// if (counter == null)
+// {
+// throw new IllegalStateException("Cannot find counter with name " + counterName);
+// }
+//
+// counters.add(counter);
+ }
+
+ return counters;
+ }
+
+
+// public void setMessageCounterHistoryDayLimit(String topicName, int limit) throws Exception
+// {
+// Condition condition = new ConditionImpl(DestinationType.TOPIC, topicName);
+//
+// List<Binding> bindings = messagingServer.getPostOffice().getBindingsForCondition(condition);
+//
+// for (Binding binding: bindings)
+// {
+// Queue queue = binding.getQueue();
+//
+// queue.setMessageCounterHistoryDayLimit(limit);
+// }
+// }
+
+ // Private ---------------------------------------------------------------------------
+
+ private Queue getQueue(String queueName) throws Exception
+ {
+ Condition condition = new ConditionImpl(DestinationType.QUEUE, queueName);
+
+ List<Binding> bindings = messagingServer.getPostOffice().getBindingsForCondition(condition);
+
+ if (bindings.isEmpty())
+ {
+ throw new IllegalArgumentException("No queue with name " + queueName);
+ }
+
+ return bindings.get(0).getQueue();
+ }
+
+
+
+ private List<Message> listMessages(Queue queue, ListType type, String selector) throws Exception
+ {
+ Selector sel = null;
+
+ if (selector != null && "".equals(selector.trim()))
+ {
+ selector = null;
+ }
+
+ if (selector != null)
+ {
+ sel = new Selector(selector);
+ }
+
+ List<Message> msgs = new ArrayList<Message>();
+
+ List<MessageReference> allRefs = queue.list(sel);
+
+ for (MessageReference ref: allRefs)
+ {
+ Message msg = ref.getMessage();
+
+ if (type == ListType.ALL ||
+ (type == ListType.DURABLE && msg.isDurable()) ||
+ (type == ListType.NON_DURABLE && !msg.isDurable()))
+ {
+ msgs.add(msg);
+ }
+ }
+
+ return msgs;
+ }
+
+
+ private String listMessageCounterAsHTML(MessageCounter[] counters)
+ {
+ if (counters == null)
+ return null;
+
+ String ret = "<table width=\"100%\" border=\"1\" cellpadding=\"1\" cellspacing=\"1\">" +
+ "<tr>" +
+ "<th>Type</th>" +
+ "<th>Name</th>" +
+ "<th>Subscription</th>" +
+ "<th>Durable</th>" +
+ "<th>Count</th>" +
+ "<th>CountDelta</th>" +
+ "<th>Depth</th>" +
+ "<th>DepthDelta</th>" +
+ "<th>Last Add</th>" +
+ "</tr>";
+
+ for( int i=0; i<counters.length; i++ )
+ {
+ String data = counters[i].getCounterAsString();
+ StringTokenizer token = new StringTokenizer( data, ",");
+ String value;
+
+ ret += "<tr bgcolor=\"#" + ( (i%2)==0 ? "FFFFFF" : "F0F0F0") + "\">";
+
+ ret += "<td>" + token.nextToken() + "</td>"; // type
+ ret += "<td>" + token.nextToken() + "</td>"; // name
+ ret += "<td>" + token.nextToken() + "</td>"; // subscription
+ ret += "<td>" + token.nextToken() + "</td>"; // durable
+
+ ret += "<td>" + token.nextToken() + "</td>"; // count
+
+ value = token.nextToken(); // countDelta
+
+ if( value.equalsIgnoreCase("0") )
+ value = "-";
+
+ ret += "<td>" + value + "</td>";
+
+ ret += "<td>" + token.nextToken() + "</td>"; // depth
+
+ value = token.nextToken(); // depthDelta
+
+ if( value.equalsIgnoreCase("0") )
+ value = "-";
+
+ ret += "<td>" + value + "</td>";
+
+ ret += "<td>" + token.nextToken() + "</td>"; // date last add
+
+ ret += "</tr>";
+ }
+
+ ret += "</table>";
+
+ return ret;
+ }
+
+ private String listMessageCounterHistoryAsHTML(MessageCounter[] counters)
+ {
+ if (counters == null)
+ return null;
+
+ String ret = "";
+
+ for( int i=0; i<counters.length; i++ )
+ {
+ // destination name
+ ret += ( counters[i].getDestinationTopic() ? "Topic '" : "Queue '" );
+ ret += counters[i].getDestinationName() + "'";
+
+ if( counters[i].getDestinationSubscription() != null )
+ ret += "Subscription '" + counters[i].getDestinationSubscription() + "'";
+
+
+ // table header
+ ret += "<table width=\"100%\" border=\"1\" cellpadding=\"1\" cellspacing=\"1\">" +
+ "<tr>" +
+ "<th>Date</th>";
+
+ for( int j = 0; j < 24; j++ )
+ ret += "<th width=\"4%\">" + j + "</th>";
+
+ ret += "<th>Total</th></tr>";
+
+ // get history data as CSV string
+ StringTokenizer tokens = new StringTokenizer( counters[i].getHistoryAsString(), ",\n");
+
+ // get history day count
+ int days = Integer.parseInt( tokens.nextToken() );
+
+ for( int j=0; j<days; j++ )
+ {
+ // next day counter row
+ ret += "<tr bgcolor=\"#" + ((j%2)==0 ? "FFFFFF" : "F0F0F0") + "\">";
+
+ // date
+ ret += "<td>" + tokens.nextToken() + "</td>";
+
+ // 24 hour counters
+ int total = 0;
+
+ for( int k=0; k<24; k++ )
+ {
+ int value = Integer.parseInt( tokens.nextToken().trim() );
+
+ if( value == -1 )
+ {
+ ret += "<td></td>";
+ }
+ else
+ {
+ ret += "<td>" + value + "</td>";
+
+ total += value;
+ }
+ }
+
+ ret += "<td>" + total + "</td></tr>";
+ }
+
+ ret += "</table><br><br>";
+ }
+
+ return ret;
+ }
+
+ private enum ListType
+ {
+ ALL, DURABLE, NON_DURABLE;
+ }
+
+
+ private List<Message> listMessagesForSubscription(ListType type, String subId, String selector) throws Exception
+ {
+ List<Message> msgs = new ArrayList<Message>();
+
+ if (subId == null || "".equals(subId.trim()))
+ {
+ return msgs;
+ }
+
+ List<Binding> bindings = messagingServer.getPostOffice().getBindingsForQueueName(subId);
+
+ if (bindings.isEmpty())
+ {
+ throw new IllegalArgumentException("Cannot find subscription with id " + subId);
+ }
+
+ Selector sel = null;
+
+ if (selector != null && "".equals(selector.trim()))
+ {
+ selector = null;
+ }
+
+ if (selector != null)
+ {
+ sel = new Selector(selector);
+ }
+
+ Binding binding = bindings.get(0);
+
+ List<MessageReference> allRefs = binding.getQueue().list(sel);
+
+ for (MessageReference ref: allRefs)
+ {
+ Message msg = ref.getMessage();
+
+ if (type == ListType.ALL || (type == ListType.DURABLE && msg.isDurable()) || (type == ListType.NON_DURABLE && !msg.isDurable()))
+ {
+ msgs.add(msg);
+ }
+ }
+
+ return msgs;
+ }
+
+ private List<SubscriptionInfo> listSubscriptions(String topicName, ListType type) throws Exception
+ {
+ List<SubscriptionInfo> subs = new ArrayList<SubscriptionInfo>();
+
+ Condition condition = new ConditionImpl(DestinationType.TOPIC, topicName);
+
+ List<Binding> bindings = messagingServer.getPostOffice().getBindingsForCondition(condition);
+
+ for (Binding binding: bindings)
+ {
+ Queue queue = binding.getQueue();
+
+ if (type == ListType.ALL || (type == ListType.DURABLE && queue.isDurable()) || (type == ListType.NON_DURABLE && !queue.isDurable()))
+ {
+ String subName = null;
+ String clientID = null;
+
+ if (queue.isDurable())
+ {
+ MessageQueueNameHelper helper = MessageQueueNameHelper.createHelper(queue.getName());
+ subName = helper.getSubName();
+ clientID = helper.getClientId();
+ }
+
+ SubscriptionInfo info = new SubscriptionInfo(queue.getName(), queue.isDurable(), subName, clientID,
+ queue.getFilter() == null ? null : queue.getFilter().getFilterString(), queue.getMessageCount(), queue.getMaxSize());
+
+ subs.add(info);
+ }
+ }
+
+ return subs;
+ }
+
+ private int getMessageCount(String topicName, ListType type) throws Exception
+ {
+ Condition condition = new ConditionImpl(DestinationType.TOPIC, topicName);
+
+ List<Binding> bindings = messagingServer.getPostOffice().getBindingsForCondition(condition);
+
+ int count = 0;
+
+ for (Binding binding: bindings)
+ {
+ Queue queue = binding.getQueue();
+
+ if (type == ListType.ALL || (type == ListType.DURABLE && queue.isDurable())
+ || (type == ListType.NON_DURABLE && !queue.isDurable()))
+ {
+ count += queue.getMessageCount();
+ }
+ }
+
+ return count;
+ }
+
+ private int getSubscriptionsCount(String topicName, boolean durable) throws Exception
+ {
+ Condition condition = new ConditionImpl(DestinationType.TOPIC, topicName);
+
+ List<Binding> bindings = messagingServer.getPostOffice().getBindingsForCondition(condition);
+
+ int count = 0;
+
+ for (Binding binding: bindings)
+ {
+ Queue queue = binding.getQueue();
+
+ if ((queue.isDurable() && durable) || (!queue.isDurable() && !durable))
+ {
+ count++;
+ }
+ }
+
+ return count;
+ }
+
+
+ private String listSubscriptionsAsHTML(String topicName, ListType type) throws Exception
+ {
+ Condition condition = new ConditionImpl(DestinationType.TOPIC, topicName);
+
+ List<Binding> bindings = messagingServer.getPostOffice().getBindingsForCondition(condition);
+
+ StringBuffer sb = new StringBuffer();
+
+ sb.append("<table width=\"100%\" border=\"1\" cellpadding=\"1\" cellspacing=\"1\">" +
+ "<tr>" +
+ "<th>Id</th>" +
+ "<th>Durable</th>" +
+ "<th>Subscription Name</th>" +
+ "<th>Client ID</th>" +
+ "<th>Selector</th>" +
+ "<th>Message Count</th>" +
+ "<th>Max Size</th>" +
+ "</tr>");
+
+ for (Binding binding: bindings)
+ {
+ Queue queue = binding.getQueue();
+
+ if (type == ListType.ALL || (type == ListType.DURABLE && queue.isDurable())
+ || (type == ListType.NON_DURABLE && !queue.isDurable()))
+ {
+ String filterString = queue.getFilter() != null ? queue.getFilter().getFilterString() : null;
+
+ String subName = null;
+ String clientID = null;
+
+ if (queue.isDurable())
+ {
+ MessageQueueNameHelper helper = MessageQueueNameHelper.createHelper(queue.getName());
+ subName = helper.getSubName();
+ clientID = helper.getClientId();
+ }
+
+ sb.append("<tr><td>").append(queue.getName()).append("</td>");
+ sb.append("<td>").append(queue.isDurable() ? "Durable" : "Non Durable").append("</td>");
+ sb.append("<td>").append(subName != null ? subName : "").append("</td>");
+ sb.append("<td>").append(clientID != null ? clientID : "").append("</td>");
+ sb.append("<td>").append(filterString != null ? filterString : "").append("</td>");
+ sb.append("<td>").append(queue.getMessageCount()).append("</td>");
+ sb.append("<td>").append(queue.getMaxSize()).append("</td>");
+ sb.append("</tr>");
+ }
+ }
+ sb.append("</table>");
+
+ return sb.toString();
+ }
+
+
+}
Added: trunk/src/main/org/jboss/messaging/core/impl/server/SubscriptionInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/server/SubscriptionInfo.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/server/SubscriptionInfo.java 2008-01-15 21:24:02 UTC (rev 3570)
@@ -0,0 +1,99 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.impl.server;
+
+import java.io.Serializable;
+
+/**
+ * A SubscriptionInfo
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1935 $</tt>
+ *
+ * $Id: SubscriptionInfo.java 1935 2007-01-09 23:29:20Z clebert.suconic at jboss.com $
+ *
+ */
+public class SubscriptionInfo implements Serializable
+{
+ private static final long serialVersionUID = -38689006079435295L;
+
+ private String id;
+
+ private boolean durable;
+
+ private String name;
+
+ private String clientID;
+
+ private String selector;
+
+ private int messageCount;
+
+ private int maxSize;
+
+ public SubscriptionInfo(String id, boolean durable, String name, String clientID, String selector, int messageCount, int maxSize)
+ {
+ this.id = id;
+ this.durable = durable;
+ this.name = name;
+ this.clientID = clientID;
+ this.selector = selector;
+ this.messageCount = messageCount;
+ this.maxSize = maxSize;
+ }
+
+ public String getId()
+ {
+ return id;
+ }
+
+ public String getClientID()
+ {
+ return clientID;
+ }
+
+ public boolean isDurable()
+ {
+ return durable;
+ }
+
+ public int getMaxSize()
+ {
+ return maxSize;
+ }
+
+ public int getMessageCount()
+ {
+ return messageCount;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public String getSelector()
+ {
+ return selector;
+ }
+
+}
More information about the jboss-cvs-commits
mailing list