[jboss-cvs] JBoss Messaging SVN: r3789 - in trunk: docs/examples/embedded/src/org/jboss/example/embedded and 14 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Feb 25 10:34:18 EST 2008
Author: ataylor
Date: 2008-02-25 10:34:18 -0500 (Mon, 25 Feb 2008)
New Revision: 3789
Added:
trunk/docs/examples/embedded/build.xml
trunk/docs/examples/embedded/runjconsole.sh
trunk/src/main/org/jboss/jms/server/ConnectionInfo.java
trunk/src/main/org/jboss/jms/server/JMSServerDeployer.java
trunk/src/main/org/jboss/jms/server/SessionInfo.java
Modified:
trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedManagementExample.java
trunk/src/etc/server/default/deploy/jbm-beans.xml
trunk/src/etc/server/default/deploy/jbm-jndi.xml
trunk/src/etc/server/default/deploy/queues.xml
trunk/src/main/org/jboss/jms/destination/JBossQueue.java
trunk/src/main/org/jboss/jms/destination/JBossTopic.java
trunk/src/main/org/jboss/jms/server/JMSServerManager.java
trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnection.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/messaging/core/MessagingServer.java
trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java
trunk/src/main/org/jboss/messaging/core/PostOffice.java
trunk/src/main/org/jboss/messaging/core/Queue.java
trunk/src/main/org/jboss/messaging/core/QueueSettings.java
trunk/src/main/org/jboss/messaging/core/impl/MessageReferenceImpl.java
trunk/src/main/org/jboss/messaging/core/impl/QueueFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java
trunk/src/main/org/jboss/messaging/deployers/queue/QueueSettingsDeployer.java
trunk/src/main/org/jboss/messaging/util/HierarchicalObjectRepository.java
trunk/tests/src/org/jboss/messaging/core/test/unit/QueueSettingsTest.java
trunk/tests/src/org/jboss/messaging/deployers/queue/tests/unit/QueueSettingsDeployerTest.java
trunk/tests/src/org/jboss/test/messaging/jms/server/JMSServerManagerTest.java
trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
Log:
more refactoring of management interfaces and changed queueimpl to use queuerepository and added example
Added: trunk/docs/examples/embedded/build.xml
===================================================================
--- trunk/docs/examples/embedded/build.xml (rev 0)
+++ trunk/docs/examples/embedded/build.xml 2008-02-25 15:34:18 UTC (rev 3789)
@@ -0,0 +1,95 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+
+ To run the example, set JBOSS_HOME and run ant (with no parameters)
+
+ $Id: build.xml 3140 2007-09-26 08:44:19Z ataylor $
+
+ -->
+
+<project name="EmbeddedExample" default="run">
+ <property environment="ENV"/>
+
+ <!-- These properties may be overriden by calling ants when this example is used in a smoke test -->
+ <property file="../examples.properties"/>
+ <property name="remoting.path" value="../config"/>
+ <property name="messaging.client.jar.path" value="../../"/>
+ <property name="messaging.client.jar.name" value="jboss-messaging-client.jar"/>
+ <property name="messaging.jar.name" value="jboss-messaging.jar"/>
+ <property name="jboss.home" value="${ENV.JBOSS_HOME}"/>
+ <property name="jboss.configuration" value="messaging"/>
+ <property name="example.queue.name" value="testQueue"/>
+
+ <path id="common.compilation.classpath">
+ <fileset file="${jboss.home}/client/jboss-j2ee.jar"/>
+ <fileset file="${jboss.home}/server/${jboss.configuration}/lib/jnpserver.jar"/>
+ <fileset file="${messaging.client.jar.path}/${messaging.client.jar.name}"/>
+ <fileset file="${messaging.client.jar.path}/${messaging.jar.name}"/>
+ </path>
+
+ <path id="example.compilation.classpath">
+ <path refid="common.compilation.classpath"/>
+ <pathelement path="../common/output/classes"/>
+ </path>
+
+ <path id="execution.classpath">
+ <pathelement path="./etc"/>
+ <pathelement path="../common/output/classes"/>
+ <pathelement path="./output/classes"/>
+ <fileset file="${jboss.home}/server/${jboss.configuration}/lib/jnpserver.jar"/>
+ <fileset file="${messaging.client.jar.path}/${messaging.client.jar.name}"/>
+ <fileset file="${messaging.client.jar.path}/${messaging.jar.name}"/>
+ <fileset dir="${jboss.home}/server/${jboss.configuration}/deploy/messaging.sar">
+ <include name="**/*.jar"/>
+ </fileset>
+ <fileset file="${jboss.home}/client/jbossall-client.jar"/>
+ <fileset file="${jboss.home}/server/${jboss.configuration}/lib/log4j.jar"/>
+ <fileset file="${jboss.home}/lib/javassist.jar"/>
+ <fileset file="${jboss.home}/lib/jboss-aop-jdk50.jar"/>
+ <fileset file="${jboss.home}/lib/trove.jar"/>
+ <fileset file="${jboss.home}/lib/jboss-container.jar"/>
+ </path>
+
+ <target name="identify">
+ <echo message="############################################################################"/>
+ <echo message="# Running the Embedded example #"/>
+ <echo message="############################################################################"/>
+ <echo message="The queue: ${example.queue.name}"/>
+ <echo message="The client jar: ${messaging.client.jar.path}/${messaging.client.jar.name}"/>
+ </target>
+
+ <target name="sanity-check" depends="identify">
+ <available property="client.jar.present" file="${messaging.client.jar.path}/${messaging.client.jar.name}"/>
+ <fail message="Could not find client jar ${messaging.client.jar.path}/${messaging.client.jar.name}"
+ unless="client.jar.present"/>
+ </target>
+
+ <target name="init" depends="sanity-check">
+ <mkdir dir="./output/classes"/>
+ <mkdir dir="../common/output/classes"/>
+ </target>
+
+ <target name="compile" depends="init">
+ <javac destdir="../common/output/classes" debug="on" debuglevel="lines,vars,source">
+ <src path="../common/src"/>
+ <include name="**/*EmbeddedManagementExample.java"/>
+ <exclude name="**/*EmbeddedExample.java"/>
+ <classpath refid="common.compilation.classpath"/>
+ </javac>
+ <javac destdir="./output/classes" debug="on" debuglevel="lines,vars,source">
+ <src path="./src"/>
+ <include name="**/*EmbeddedManagementExample.java"/>
+ <exclude name="**/*EmbeddedExample.java"/>
+ <classpath refid="example.compilation.classpath"/>
+ </javac>
+ </target>
+
+ <target name="run">
+ <java classname="org.jboss.example.embedded.EmbeddedManagementExample"
+ classpathref="execution.classpath" fork="true">
+
+ <jvmarg line="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=5401"/>
+ </java>
+ </target>
+</project>
\ No newline at end of file
Added: trunk/docs/examples/embedded/runjconsole.sh
===================================================================
--- trunk/docs/examples/embedded/runjconsole.sh (rev 0)
+++ trunk/docs/examples/embedded/runjconsole.sh 2008-02-25 15:34:18 UTC (rev 3789)
@@ -0,0 +1,3 @@
+export JAVA_HOME=/usr/lib/java-1.5.0/jdk1.5.0_12
+CLASSPATH=$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/jconsole.jar:../../../output/lib/jboss-messaging.jar
+jconsole -J-Djava.class.path=$CLASSPATH
\ No newline at end of file
Modified: trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedManagementExample.java
===================================================================
--- trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedManagementExample.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedManagementExample.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -24,20 +24,17 @@
import org.jboss.messaging.core.remoting.RemotingConfiguration;
import static org.jboss.messaging.core.remoting.TransportType.TCP;
import org.jboss.messaging.core.MessagingServer;
-import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.MessagingServerManagement;
import org.jboss.messaging.core.impl.server.MessagingServerImpl;
import org.jboss.messaging.core.impl.server.MessagingServerManagementImpl;
-import org.jboss.messaging.core.impl.MessageImpl;
-import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
-import org.jboss.jms.client.api.*;
-import org.jboss.jms.client.impl.ClientConnectionFactoryImpl;
-import org.jboss.jms.message.JBossTextMessage;
+import org.jboss.jms.server.JMSServerManager;
+import org.jboss.jms.server.JMSServerManagerImpl;
+import org.jnp.server.NamingBeanImpl;
+import org.jnp.server.Main;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import javax.management.StandardMBean;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
/**
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
@@ -46,6 +43,18 @@
{
public static void main(String args[]) throws Exception
{
+ System.setProperty("java.naming.factory.initial","org.jnp.interfaces.NamingContextFactory");
+ System.setProperty("java.naming.factory.url.pkgs","org.jboss.naming:org.jnp.interfaces");
+
+ NamingBeanImpl namingBean = new NamingBeanImpl();
+ namingBean.start();
+ Main mainMBean = new Main();
+ mainMBean.setPort(1099);
+ mainMBean.setBindAddress("localhost");
+ mainMBean.setRmiPort(1098);
+ mainMBean.setRmiBindAddress("localhost");
+ mainMBean.setNamingInfo(namingBean);
+ mainMBean.start();
RemotingConfiguration remotingConf = new RemotingConfiguration(TCP, "localhost", 5400);
remotingConf.setInvmDisabled(true);
MessagingServer messagingServer = new MessagingServerImpl(remotingConf);
@@ -53,79 +62,22 @@
MessagingServerManagementImpl messagingServerManagement = new MessagingServerManagementImpl();
messagingServerManagement.setMessagingServer(messagingServer);
messagingServerManagement.start();
- ClientConnectionFactory cf = new ClientConnectionFactoryImpl(remotingConf);
- ClientConnection clientConnection = cf.createConnection(null, null);
- ClientSession clientSession = clientConnection.createClientSession(false, true, true, 0, false);
- String queue = "Queue1";
- clientSession.createQueue(queue, queue, null, false, false);
- ClientProducer clientProducer = clientSession.createProducer("Queue1");
-
- clientConnection.start();
-
- messagingServerManagement.registerMessageCounter(queue);
- Message message = new MessageImpl(JBossTextMessage.TYPE, true, 0, System.currentTimeMillis(), (byte) 1);
- messagingServerManagement.startMessageCounter(queue, 0);
- for (int i = 0; i < 1000; i++)
- {
- clientProducer.send(message);
- }
-
- MessageCounter messageCounter = messagingServerManagement.getMessageCounter(queue);
- System.out.println("messageCounter = " + messageCounter);
- for (int i = 0; i < 2000; i++)
- {
- clientProducer.send(message);
- }
-
- messageCounter = messagingServerManagement.getMessageCounter(queue);
- System.out.println("messageCounter = " + messageCounter);
- for (int i = 0; i < 3000; i++)
- {
- clientProducer.send(message);
- }
-
- messageCounter = messagingServerManagement.getMessageCounter(queue);
- System.out.println("messageCounter = " + messageCounter);
- for (int i = 0; i < 4000; i++)
- {
- clientProducer.send(message);
- }
-
- messageCounter = messagingServerManagement.getMessageCounter(queue);
- System.out.println("messageCounter = " + messageCounter);
- messagingServerManagement.stopMessageCounter(queue);
- messagingServerManagement.startMessageCounter(queue, 5);
- ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
- Timer timer = new Timer();
- scheduler.schedule(timer, 10, TimeUnit.SECONDS);
- int counter = 0;
- while (timer.isRunning())
- {
- clientProducer.send(message);
- counter++;
- }
- scheduler.shutdown();
- System.out.println("counter = " + counter);
- messageCounter = messagingServerManagement.getMessageCounter(queue);
- System.out.println("messageCounter = " + messageCounter);
- messagingServerManagement.unregisterMessageCounter(queue);
- clientConnection.close();
+ JMSServerManagerImpl jmsServerManager = new JMSServerManagerImpl();
+ jmsServerManager.setMessagingServerManagement(messagingServerManagement);
+ jmsServerManager.start();
+ StandardMBean serverManagementMBean = new StandardMBean(messagingServerManagement, MessagingServerManagement.class);
+ ObjectName serverManagementON = ObjectName.getInstance("org.jboss.messaging:name=MessagingServerManagement");
+ ManagementFactory.getPlatformMBeanServer().registerMBean(serverManagementMBean, serverManagementON);
+ StandardMBean JMSServerManagementMBean = new StandardMBean(jmsServerManager, JMSServerManager.class);
+ ObjectName JMSServerManagerON = ObjectName.getInstance("org.jboss.messaging:name=JMSServerManager");
+ ManagementFactory.getPlatformMBeanServer().registerMBean(JMSServerManagementMBean, JMSServerManagerON);
+ System.out.println("Press enter to kill server");
+ System.out.println("Receiving jmx connections on port 5401, try running command 'jconsole service:jmx:rmi:///jndi/rmi://localhost:5401/jmxrmi'");
+
+ System.in.read();
messagingServerManagement.stop();
messagingServer.stop();
+ mainMBean.stop();
+ namingBean.stop();
}
-
- private static class Timer implements Runnable
- {
- boolean running = true;
-
- public boolean isRunning()
- {
- return running;
- }
-
- public void run()
- {
- running = false;
- }
- }
}
Modified: trunk/src/etc/server/default/deploy/jbm-beans.xml
===================================================================
--- trunk/src/etc/server/default/deploy/jbm-beans.xml 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/src/etc/server/default/deploy/jbm-beans.xml 2008-02-25 15:34:18 UTC (rev 3789)
@@ -11,6 +11,7 @@
</bean>
<bean name="MessagingServerManagement" class="org.jboss.messaging.core.impl.server.MessagingServerManagementImpl">
+ <annotation>@org.jboss.aop.microcontainer.aspects.jmx.JMX(name="jboss.messaging:service=MessagingServerManagement", exposedInterface=org.jboss.messaging.core.MessagingServerManagement.class)</annotation>
<property name="messagingServer">
<inject bean="MessagingServer"/>
</property>
@@ -68,9 +69,16 @@
</bean>
<bean name="JMSServerManager" class="org.jboss.jms.server.JMSServerManagerImpl">
+ <annotation>@org.jboss.aop.microcontainer.aspects.jmx.JMX(name="jboss.messaging:service=JMSServerManager", exposedInterface=org.jboss.jms.server.JMSServerManager.class)</annotation>
<property name="messagingServerManagement">
<inject bean="MessagingServerManagement"/>
</property>
</bean>
+
+ <bean name="JMSServerDeployer" class="org.jboss.jms.server.JMSServerDeployer">
+ <property name="jmsServerManager">
+ <inject bean="JMSServerManager"/>
+ </property>
+ </bean>
</deployment>
\ No newline at end of file
Modified: trunk/src/etc/server/default/deploy/jbm-jndi.xml
===================================================================
--- trunk/src/etc/server/default/deploy/jbm-jndi.xml 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/src/etc/server/default/deploy/jbm-jndi.xml 2008-02-25 15:34:18 UTC (rev 3789)
@@ -106,6 +106,4 @@
<entry name="/topic/testDistributedTopic"/>
</topic>
- <clientid name="testClientId" user="testUser" id="testId"/>
-
</deployment>
\ No newline at end of file
Modified: trunk/src/etc/server/default/deploy/queues.xml
===================================================================
--- trunk/src/etc/server/default/deploy/queues.xml 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/src/etc/server/default/deploy/queues.xml 2008-02-25 15:34:18 UTC (rev 3789)
@@ -65,12 +65,12 @@
<queue-settings match="queuejms.QueueWithOwnDLQAndExpiryQueue">
<dlq>PrivateDLQ</dlq>
- <expiry-queue>PrivateExpiryQueue</expiry-queue>
+ <expiry-queue>queuejms.PrivateExpiryQueue</expiry-queue>
</queue-settings>
<queue-settings match="topicjms.TopicWithOwnDLQAndExpiryQueue">
<dlq>PrivateDLQ</dlq>
- <expiry-queue>PrivateExpiryQueue</expiry-queue>
+ <expiry-queue>queuejms.PrivateExpiryQueue</expiry-queue>
</queue-settings>
<queue-settings match="queuejms.QueueWithOwnRedeliveryDelay">
@@ -93,7 +93,7 @@
<queue-settings match="*">
<clustered>false</clustered>
<dlq>DLQ</dlq>
- <expiry-queue>ExpiryQueue</expiry-queue>
+ <expiry-queue>queuejms.ExpiryQueue</expiry-queue>
<redelivery-delay>0</redelivery-delay>
<max-size>-1</max-size>
<distribution-policy-class>org.jboss.messaging.core.impl.RoundRobinDistributionPolicy</distribution-policy-class>
Modified: trunk/src/main/org/jboss/jms/destination/JBossQueue.java
===================================================================
--- trunk/src/main/org/jboss/jms/destination/JBossQueue.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/src/main/org/jboss/jms/destination/JBossQueue.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -37,7 +37,7 @@
private static final long serialVersionUID = 4121129234371655479L;
- private static final String JMS_QUEUE_ADDRESS_PREFIX = "queuejms.";
+ public static final String JMS_QUEUE_ADDRESS_PREFIX = "queuejms.";
// Static --------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/destination/JBossTopic.java
===================================================================
--- trunk/src/main/org/jboss/jms/destination/JBossTopic.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/src/main/org/jboss/jms/destination/JBossTopic.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -37,7 +37,7 @@
private static final long serialVersionUID = 3257845497845724981L;
- private static final String JMS_TOPIC_ADDRESS_PREFIX = "topicjms.";
+ public static final String JMS_TOPIC_ADDRESS_PREFIX = "topicjms.";
// Static --------------------------------------------------------
Copied: trunk/src/main/org/jboss/jms/server/ConnectionInfo.java (from rev 3734, trunk/src/main/org/jboss/jms/server/ClientInfo.java)
===================================================================
--- trunk/src/main/org/jboss/jms/server/ConnectionInfo.java (rev 0)
+++ trunk/src/main/org/jboss/jms/server/ConnectionInfo.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -0,0 +1,101 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server;
+
+import java.util.Calendar;
+import java.util.Date;
+import java.text.SimpleDateFormat;
+import java.io.Serializable;
+
+/**
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class ConnectionInfo implements Serializable
+{
+ private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("HH:mm:ss, EEE, MMM d, yyyy");
+
+ public enum status{ STARTED, STOPPED }
+
+ private String id;
+ private String user;
+ private String address;
+ private boolean started;
+ private long created;
+
+ public ConnectionInfo()
+ {
+
+ }
+
+ public ConnectionInfo(String id, String user, String address, boolean started, long created)
+ {
+ this.id = id;
+ this.user = user;
+ this.address = address;
+ this.started = started;
+ this.created = created;
+ }
+
+ public String getId()
+ {
+ return id;
+ }
+
+ public String getUser()
+ {
+ return user;
+ }
+
+ public String getAddress()
+ {
+ return address;
+ }
+
+ public status getStatus()
+ {
+ return started? status.STARTED:status.STOPPED;
+ }
+
+ public String getTimeCreated()
+ {
+
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(new Date(created));
+ return SIMPLE_DATE_FORMAT.format(calendar.getTime());
+ }
+
+ public String getAliveTime()
+ {
+ StringBuilder builder = new StringBuilder();
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(new Date(System.currentTimeMillis() - created));
+ builder.append(calendar.get(Calendar.DAY_OF_YEAR) - 1).append(" days ").append(calendar.get(Calendar.HOUR_OF_DAY)).
+ append(" hours ").append(calendar.get(Calendar.MINUTE)).append(" minutes ").append(calendar.get(Calendar.SECOND)).
+ append(" seconds.");
+ return builder.toString();
+ }
+
+ public String toString()
+ {
+ return id + (user!=null?"(" + user + ")":"") + "@" + address + " started at " + getTimeCreated() + "," + "uptime " + getAliveTime() + "\n";
+ }
+}
Added: trunk/src/main/org/jboss/jms/server/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/JMSServerDeployer.java (rev 0)
+++ trunk/src/main/org/jboss/jms/server/JMSServerDeployer.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -0,0 +1,231 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.deployers.DeploymentManager;
+import org.jboss.messaging.deployers.Deployer;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+/**
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class JMSServerDeployer extends Deployer
+{
+ Logger log = Logger.getLogger(JMSServerManagerImpl.class);
+
+ JMSServerManager jmsServerManager;
+
+
+ private static final String CLIENTID_ELEMENT = "client-id";
+ private static final String DUPS_OK_BATCH_SIZE_ELEMENT = "dups-ok-batch-size";
+ private static final String PREFETECH_SIZE_ELEMENT = "prefetch-size";
+ private static final String SUPPORTS_FAILOVER = "supports-failover";
+ private static final String SUPPORTS_LOAD_BALANCING = "supports-load-balancing";
+ private static final String LOAD_BALANCING_FACTORY = "load-balancing-factory";
+ private static final String STRICT_TCK = "strict-tck";
+ private static final String ENTRY_NODE_NAME = "entry";
+ private static final String CONNECTION_FACTORY_NODE_NAME = "connection-factory";
+ private static final String QUEUE_NODE_NAME = "queue";
+ private static final String TOPIC_NODE_NAME = "topic";
+
+ public void setJmsServerManager(JMSServerManager jmsServerManager)
+ {
+ this.jmsServerManager = jmsServerManager;
+ }
+
+ /**
+ * lifecycle method
+ */
+ public void start() throws Exception
+ {
+ try
+ {
+ DeploymentManager.getInstance().registerDeployable(this);
+ }
+ catch (Exception e)
+ {
+ log.error(new StringBuilder("Unable to get Deployment Manager: ").append(e));
+ }
+ }
+
+ /**
+ * lifecycle method
+ */
+ public void stop() throws Exception
+ {
+ super.stop();
+ DeploymentManager.getInstance().unregisterDeployable(this);
+ }
+
+ /**
+ * the names of the elements to deploy
+ *
+ * @return the names of the elements todeploy
+ */
+ public String[] getElementTagName()
+ {
+ return new String[]{QUEUE_NODE_NAME, TOPIC_NODE_NAME, CONNECTION_FACTORY_NODE_NAME};
+ }
+
+ /**
+ * deploy an element
+ *
+ * @param node the element to deploy
+ * @throws Exception .
+ */
+ public void deploy(Node node) throws Exception
+ {
+ createAndBindObject(node);
+ }
+
+ /**
+ * creates the object to bind, this will either be a JBossConnectionFActory, JBossQueue or JBossTopic
+ *
+ * @param node the config
+ * @throws Exception .
+ */
+ private void createAndBindObject(Node node) throws Exception
+ {
+ if (node.getNodeName().equals(CONNECTION_FACTORY_NODE_NAME))
+ {
+ // See http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4076040#4076040
+ NodeList attributes = node.getChildNodes();
+ boolean cfStrictTck = false;
+ int prefetchSize = 150;
+ String clientID = null;
+ int dupsOKBatchSize = 1000;
+ for (int j = 0; j < attributes.getLength(); j++)
+ {
+ if (STRICT_TCK.equalsIgnoreCase(attributes.item(j).getNodeName()))
+ {
+ cfStrictTck = Boolean.parseBoolean(attributes.item(j).getTextContent().trim());
+ }
+ else if (PREFETECH_SIZE_ELEMENT.equalsIgnoreCase(attributes.item(j).getNodeName()))
+ {
+ prefetchSize = Integer.parseInt(attributes.item(j).getTextContent().trim());
+ }
+ else if (CLIENTID_ELEMENT.equalsIgnoreCase(attributes.item(j).getNodeName()))
+ {
+ clientID = attributes.item(j).getTextContent();
+ }
+ else if (DUPS_OK_BATCH_SIZE_ELEMENT.equalsIgnoreCase(attributes.item(j).getNodeName()))
+ {
+ dupsOKBatchSize = Integer.parseInt(attributes.item(j).getTextContent().trim());
+ }
+ if (SUPPORTS_FAILOVER.equalsIgnoreCase(attributes.item(j).getNodeName()))
+ {
+ //setSupportsFailover(Boolean.parseBoolean(attributes.item(j).getTextContent().trim()));
+ }
+ if (SUPPORTS_LOAD_BALANCING.equalsIgnoreCase(attributes.item(j).getNodeName()))
+ {
+ //setSupportsLoadBalancing(Boolean.parseBoolean(attributes.item(j).getTextContent().trim()));
+ }
+ if (LOAD_BALANCING_FACTORY.equalsIgnoreCase(attributes.item(j).getNodeName()))
+ {
+ //setLoadBalancingFactory(attributes.item(j).getTextContent().trim());
+ }
+ }
+
+ NodeList children = node.getChildNodes();
+ for (int i = 0; i < children.getLength(); i++)
+ {
+ Node child = children.item(i);
+
+ if (ENTRY_NODE_NAME.equalsIgnoreCase(children.item(i).getNodeName()))
+ {
+ String jndiName = child.getAttributes().getNamedItem("name").getNodeValue();
+ String name = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
+ jmsServerManager.createConnectionFactory(name, clientID, dupsOKBatchSize, cfStrictTck, prefetchSize, jndiName);
+ }
+ }
+ }
+ else if (node.getNodeName().equals(QUEUE_NODE_NAME))
+ {
+ String queueName = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
+ NodeList children = node.getChildNodes();
+ for (int i = 0; i < children.getLength(); i++)
+ {
+ Node child = children.item(i);
+
+ if (ENTRY_NODE_NAME.equalsIgnoreCase(children.item(i).getNodeName()))
+ {
+ String jndiName = child.getAttributes().getNamedItem("name").getNodeValue();
+ jmsServerManager.createQueue(queueName, jndiName);
+ }
+
+ }
+ }
+ else if (node.getNodeName().equals(TOPIC_NODE_NAME))
+ {
+ String topicName = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
+ NodeList children = node.getChildNodes();
+ for (int i = 0; i < children.getLength(); i++)
+ {
+ Node child = children.item(i);
+
+ if (ENTRY_NODE_NAME.equalsIgnoreCase(children.item(i).getNodeName()))
+ {
+ String jndiName = child.getAttributes().getNamedItem("name").getNodeValue();
+ jmsServerManager.createTopic(topicName, jndiName);
+ }
+ }
+ }
+ }
+
+ /**
+ * undeploys an element
+ *
+ * @param node the element to undeploy
+ * @throws Exception .
+ */
+ public void undeploy(Node node) throws Exception
+ {
+ if (node.getNodeName().equals(CONNECTION_FACTORY_NODE_NAME))
+ {
+ String cfName = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
+ jmsServerManager.destroyConnectionFactory(cfName);
+ }
+ else if (node.getNodeName().equals(QUEUE_NODE_NAME))
+ {
+ String queueName = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
+ jmsServerManager.destroyQueue(queueName);
+ }
+ else if (node.getNodeName().equals(TOPIC_NODE_NAME))
+ {
+ String topicName = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
+ jmsServerManager.destroyTopic(topicName);
+ }
+ }
+
+ /**
+ * The name of the configuration file name to look for for deployment
+ *
+ * @return The name of the config file
+ */
+ public String getConfigFileName()
+ {
+ return "jbm-jndi.xml";
+ }
+
+}
Modified: trunk/src/main/org/jboss/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/JMSServerManager.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/src/main/org/jboss/jms/server/JMSServerManager.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -1,19 +1,23 @@
package org.jboss.jms.server;
-import org.jboss.jms.destination.JBossQueue;
-import org.jboss.jms.destination.JBossTopic;
import org.jboss.messaging.core.impl.server.SubscriptionInfo;
import org.jboss.jms.server.MessageStatistics;
+import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
+import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
import javax.jms.Message;
import java.util.List;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Set;
+import java.io.Serializable;
/**
- * A JMS Management interface.
+ * The JMS Management interface.
*
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
*/
-public interface JMSServerManager
+public interface JMSServerManager extends Serializable
{
// management operations
public enum ListType
@@ -31,6 +35,12 @@
boolean destroyTopic(String name) throws Exception;
+ Set<String> listAllQueues();
+
+ Set<String> listAllTopics();
+
+ Set<String> listTemporaryDestinations();
+
boolean createConnectionFactory(String name, String clientID, int dupsOKBatchSize, boolean strictTck, int prefetchSize, String jndiBinding) throws Exception;
boolean createConnectionFactory(String name, String clientID, int dupsOKBatchSize, boolean strictTck, int prefetchSize, List<String> jndiBindings) throws Exception;
@@ -55,6 +65,12 @@
void moveMessage(String fromQueue, String toQueue, String messageID) throws Exception;
+ void expireMessage(String queue, String messageId) throws Exception;
+
+ void changeMessagePriority(String messageId, int priority);
+
+ void changeMessageHeader(String messageId, String header, Object value);
+
int getMessageCountForQueue(String queue) throws Exception;
List<SubscriptionInfo> listSubscriptions(String topicName) throws Exception;
@@ -65,10 +81,24 @@
int getSubscriptionsCountForTopic(String topicName, ListType listType) throws Exception;
+ void dropSubscription(String subscription) throws Exception;
+
int getConsumerCountForQueue(String queue) throws Exception;
- List<ClientInfo> getClients() throws Exception;
+ List<ConnectionInfo> getConnections() throws Exception;
+ List<ConnectionInfo> getConnectionsForUser(String user) throws Exception;
+
+ void dropConnection(String clientId) throws Exception;
+
+ void dropConnectionForUser(String user) throws Exception;
+
+ public List<SessionInfo> getSessions() throws Exception;
+
+ public List<SessionInfo> getSessionsForConnection(String id) throws Exception;
+
+ public List<SessionInfo> getSessionsForUser(String user) throws Exception;
+
void startGatheringStatistics();
void startGatheringStatisticsForQueue(String queue);
Modified: trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -21,46 +21,51 @@
*/
package org.jboss.jms.server;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-
-import javax.jms.Message;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NameNotFoundException;
-import javax.naming.NamingException;
-
-import org.jboss.aop.microcontainer.aspects.jmx.JMX;
import org.jboss.jms.client.JBossConnectionFactory;
import org.jboss.jms.client.api.ClientConnectionFactory;
import org.jboss.jms.destination.JBossQueue;
import org.jboss.jms.destination.JBossTopic;
-import org.jboss.jms.message.JBossMessage;
+import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
+import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
import org.jboss.jms.server.endpoint.ServerConnection;
+import org.jboss.jms.server.endpoint.ServerSession;
import org.jboss.logging.Logger;
-import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.MessagingServerManagement;
import org.jboss.messaging.core.Queue;
-import org.jboss.messaging.core.impl.filter.FilterImpl;
-import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
+import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.impl.server.SubscriptionInfo;
+import org.jboss.jms.server.MessageStatistics;
+import org.jboss.jms.message.JBossMessage;
+import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
+import org.jboss.messaging.core.impl.filter.FilterImpl;
import org.jboss.messaging.deployers.Deployer;
import org.jboss.messaging.deployers.DeploymentManager;
import org.jboss.messaging.util.JNDIUtil;
import org.jboss.messaging.util.MessageQueueNameHelper;
+import org.jboss.aop.microcontainer.aspects.jmx.JMX;
+import org.jboss.managed.api.annotation.ManagementObject;
+import org.jboss.managed.api.annotation.ManagementOperation;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NameNotFoundException;
+import javax.naming.NamingException;
+import javax.jms.Message;
+import javax.management.StandardMBean;
+import javax.management.ObjectName;
+import javax.management.MBeanServer;
+import java.util.*;
+import java.lang.management.ManagementFactory;
+
/**
* A Deployer used to create and add to JNDI queues, topics and connection factories. Typically this would only be used
* in an app server env.
*
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
*/
- at JMX(name = "jboss.messaging:service=MessagingServer", exposedInterface = JMSServerManager.class)
-public class JMSServerManagerImpl extends Deployer implements JMSServerManager
+public class JMSServerManagerImpl implements JMSServerManager
{
Logger log = Logger.getLogger(JMSServerManagerImpl.class);
@@ -77,18 +82,6 @@
MessagingServerManagement messagingServerManagement;
- private static final String CLIENTID_ELEMENT = "client-id";
- private static final String DUPS_OK_BATCH_SIZE_ELEMENT = "dups-ok-batch-size";
- private static final String PREFETECH_SIZE_ELEMENT = "prefetch-size";
- private static final String SUPPORTS_FAILOVER = "supports-failover";
- private static final String SUPPORTS_LOAD_BALANCING = "supports-load-balancing";
- private static final String LOAD_BALANCING_FACTORY = "load-balancing-factory";
- private static final String STRICT_TCK = "strict-tck";
- private static final String ENTRY_NODE_NAME = "entry";
- private static final String CONNECTION_FACTORY_NODE_NAME = "connection-factory";
- private static final String QUEUE_NODE_NAME = "queue";
- private static final String TOPIC_NODE_NAME = "topic";
-
public void setMessagingServerManagement(MessagingServerManagement messagingServerManagement)
{
this.messagingServerManagement = messagingServerManagement;
@@ -97,7 +90,7 @@
/**
* lifecycle method
*/
- public void start()
+ public void start() throws Exception
{
try
{
@@ -107,46 +100,9 @@
{
log.error("Unable to create Initial Context", e);
}
- try
- {
- DeploymentManager.getInstance().registerDeployable(this);
- }
- catch (Exception e)
- {
- log.error(new StringBuilder("Unable to get Deployment Manager: ").append(e));
- }
}
- /**
- * lifecycle method
- */
- public void stop() throws Exception
- {
- super.stop();
- DeploymentManager.getInstance().unregisterDeployable(this);
- }
- /**
- * the names of the elements to deploy
- *
- * @return the names of the elements todeploy
- */
- public String[] getElementTagName()
- {
- return new String[]{QUEUE_NODE_NAME, TOPIC_NODE_NAME, CONNECTION_FACTORY_NODE_NAME};
- }
-
- /**
- * deploy an element
- *
- * @param node the element to deploy
- * @throws Exception .
- */
- public void deploy(Node node) throws Exception
- {
- createAndBindObject(node);
- }
-
private boolean bindToJndi(String jndiName, Object objectToBind)
throws NamingException
{
@@ -169,7 +125,7 @@
log.warn("Binding for " + jndiName + " already exists");
return false;
}
- catch (NameNotFoundException e)
+ catch (Throwable e)
{
// OK
}
@@ -180,130 +136,14 @@
return true;
}
- /**
- * creates the object to bind, this will either be a JBossConnectionFActory, JBossQueue or JBossTopic
- *
- * @param node the config
- * @throws Exception .
- */
- private void createAndBindObject(Node node) throws Exception
- {
- if (node.getNodeName().equals(CONNECTION_FACTORY_NODE_NAME))
- {
- // See http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4076040#4076040
- NodeList attributes = node.getChildNodes();
- boolean cfStrictTck = false;
- int prefetchSize = 150;
- String clientID = null;
- int dupsOKBatchSize = 1000;
- for (int j = 0; j < attributes.getLength(); j++)
- {
- if (STRICT_TCK.equalsIgnoreCase(attributes.item(j).getNodeName()))
- {
- cfStrictTck = Boolean.parseBoolean(attributes.item(j).getTextContent().trim());
- }
- else if (PREFETECH_SIZE_ELEMENT.equalsIgnoreCase(attributes.item(j).getNodeName()))
- {
- prefetchSize = Integer.parseInt(attributes.item(j).getTextContent().trim());
- }
- else if (CLIENTID_ELEMENT.equalsIgnoreCase(attributes.item(j).getNodeName()))
- {
- clientID = attributes.item(j).getTextContent();
- }
- else if (DUPS_OK_BATCH_SIZE_ELEMENT.equalsIgnoreCase(attributes.item(j).getNodeName()))
- {
- dupsOKBatchSize = Integer.parseInt(attributes.item(j).getTextContent().trim());
- }
- if (SUPPORTS_FAILOVER.equalsIgnoreCase(attributes.item(j).getNodeName()))
- {
- //setSupportsFailover(Boolean.parseBoolean(attributes.item(j).getTextContent().trim()));
- }
- if (SUPPORTS_LOAD_BALANCING.equalsIgnoreCase(attributes.item(j).getNodeName()))
- {
- //setSupportsLoadBalancing(Boolean.parseBoolean(attributes.item(j).getTextContent().trim()));
- }
- if (LOAD_BALANCING_FACTORY.equalsIgnoreCase(attributes.item(j).getNodeName()))
- {
- //setLoadBalancingFactory(attributes.item(j).getTextContent().trim());
- }
- }
- NodeList children = node.getChildNodes();
- for (int i = 0; i < children.getLength(); i++)
- {
- Node child = children.item(i);
-
- if (ENTRY_NODE_NAME.equalsIgnoreCase(children.item(i).getNodeName()))
- {
- String jndiName = child.getAttributes().getNamedItem("name").getNodeValue();
- String name = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
- createConnectionFactory(name, clientID, dupsOKBatchSize, cfStrictTck, prefetchSize, jndiName);
- }
- }
- }
- else if (node.getNodeName().equals(QUEUE_NODE_NAME))
- {
- String queueName = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
- NodeList children = node.getChildNodes();
- for (int i = 0; i < children.getLength(); i++)
- {
- Node child = children.item(i);
-
- if (ENTRY_NODE_NAME.equalsIgnoreCase(children.item(i).getNodeName()))
- {
- String jndiName = child.getAttributes().getNamedItem("name").getNodeValue();
- createQueue(queueName, jndiName);
- }
-
- }
- }
- else if (node.getNodeName().equals(TOPIC_NODE_NAME))
- {
- String topicName = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
- NodeList children = node.getChildNodes();
- for (int i = 0; i < children.getLength(); i++)
- {
- Node child = children.item(i);
-
- if (ENTRY_NODE_NAME.equalsIgnoreCase(children.item(i).getNodeName()))
- {
- String jndiName = child.getAttributes().getNamedItem("name").getNodeValue();
- createTopic(topicName, jndiName);
- }
- }
- }
- }
-
- /**
- * undeploys an element
- *
- * @param node the element to undeploy
- * @throws Exception .
- */
- public void undeploy(Node node) throws Exception
- {
- System.out.println("JNDIObjectDeployer.undeploy");
- }
-
- /**
- * The name of the configuration file name to look for for deployment
- *
- * @return The name of the config file
- */
- public String getConfigFileName()
- {
- return "jbm-jndi.xml";
- }
-
// management operations
- // management operations
-
public boolean isStarted()
{
return messagingServerManagement.isStarted();
}
-
+
public boolean createQueue(String queueName, String jndiBinding) throws Exception
{
JBossQueue jBossQueue = new JBossQueue(queueName);
@@ -361,6 +201,39 @@
return true;
}
+ public Set<String> listAllQueues()
+ {
+ Set<String> availableAddresses = messagingServerManagement.listAvailableAddresses();
+ Set<String> availableQueues = new HashSet<String>();
+ for (String address : availableAddresses)
+ {
+ if(address.startsWith(JBossQueue.JMS_QUEUE_ADDRESS_PREFIX))
+ {
+ availableQueues.add(address.replace(JBossQueue.JMS_QUEUE_ADDRESS_PREFIX, ""));
+ }
+ }
+ return availableQueues;
+ }
+
+ public Set<String> listAllTopics()
+ {
+ Set<String> availableAddresses = messagingServerManagement.listAvailableAddresses();
+ Set<String> availableTopics = new HashSet<String>();
+ for (String address : availableAddresses)
+ {
+ if(address.startsWith(JBossTopic.JMS_TOPIC_ADDRESS_PREFIX))
+ {
+ availableTopics.add(address.replace(JBossTopic.JMS_TOPIC_ADDRESS_PREFIX, ""));
+ }
+ }
+ return availableTopics;
+ }
+
+ public Set<String> listTemporaryDestinations()
+ {
+ return null; //todo
+ }
+
public boolean createConnectionFactory(String name, String clientID, int dupsOKBatchSize, boolean strictTck, int prefetchSize, String jndiBinding) throws Exception
{
JBossConnectionFactory cf = connectionFactories.get(name);
@@ -469,6 +342,21 @@
new FilterImpl("JMSMessageID='" + messageId + "'"));
}
+ public void expireMessage(String queue, String messageId) throws Exception
+ {
+ messagingServerManagement.expireMessages(new JBossQueue(queue).getAddress(), new FilterImpl("JMSMessageID='" + messageId + "'"));
+ }
+
+ public void changeMessagePriority(String messageId, int priority)
+ {
+ //todo
+ }
+
+ public void changeMessageHeader(String messageId, String header, Object value)
+ {
+ //todo
+ }
+
public int getMessageCountForQueue(String queue) throws Exception
{
return getMessageCount(new JBossQueue(queue));
@@ -494,26 +382,110 @@
return getSubscriptionsCount(new JBossTopic(topicName), listType);
}
+ public void dropSubscription(String subscription) throws Exception
+ {
+ messagingServerManagement.destroyQueue(subscription);
+
+ }
+
public int getConsumerCountForQueue(String queue) throws Exception
{
return getConsumerCount(new JBossQueue(queue));
}
- public List<ClientInfo> getClients() throws Exception
+ public List<ConnectionInfo> getConnections() throws Exception
{
- List<ClientInfo> clientInfos = new ArrayList<ClientInfo>();
+ return getConnectionsForUser(null);
+ }
+
+ public List<ConnectionInfo> getConnectionsForUser(String user) throws Exception
+ {
+ List<ConnectionInfo> connectionInfos = new ArrayList<ConnectionInfo>();
List<ServerConnection> endpoints = messagingServerManagement.getActiveConnections();
for (ServerConnection endpoint : endpoints)
{
- clientInfos.add(new ClientInfo(endpoint.getUsername(),
- endpoint.getClientAddress(),
- endpoint.isStarted(),
- endpoint.getCreatedTime()));
+ if (user == null || user.equals(endpoint.getUsername()))
+ {
+ connectionInfos.add(new ConnectionInfo(endpoint.getID(),
+ endpoint.getUsername(),
+ endpoint.getClientAddress(),
+ endpoint.isStarted(),
+ endpoint.getCreated()));
+ }
}
- return clientInfos;
+ return connectionInfos;
}
+ public void dropConnection(String clientId) throws Exception
+ {
+ List<ServerConnection> endpoints = messagingServerManagement.getActiveConnections();
+ for (ServerConnection endpoint : endpoints)
+ {
+ if (endpoint.getID().equals(clientId))
+ {
+ endpoint.close();
+ break;
+ }
+ }
+ }
+ public void dropConnectionForUser(String user) throws Exception
+ {
+ List<ServerConnection> endpoints = messagingServerManagement.getActiveConnections();
+ List<ConnectionInfo> connectionInfos = getConnectionsForUser(user);
+ for (ConnectionInfo connectionInfo : connectionInfos)
+ {
+
+
+ for (ServerConnection endpoint : endpoints)
+ {
+ if (endpoint.getID().equals(connectionInfo.getId()))
+ {
+ endpoint.close();
+ break;
+ }
+ }
+ }
+ }
+
+ public List<SessionInfo> getSessions() throws Exception
+ {
+ return getSessionsForConnection(null);
+ }
+
+ public List<SessionInfo> getSessionsForConnection(String id) throws Exception
+ {
+ List<SessionInfo> sessionInfos = new ArrayList<SessionInfo>();
+ List<ServerConnection> endpoints = messagingServerManagement.getActiveConnections();
+ for (ServerConnection endpoint : endpoints)
+ {
+ if(id == null || id.equals(endpoint.getID()))
+ {
+ Collection<ServerSession> serverSessionEndpoints = endpoint.getSessions();
+ for (ServerSession serverSessionEndpoint : serverSessionEndpoints)
+ {
+ sessionInfos.add(new SessionInfo(serverSessionEndpoint.getID(),
+ endpoint.getID()));
+ }
+ }
+ }
+ return sessionInfos;
+ }
+
+ public List<SessionInfo> getSessionsForUser(String user) throws Exception
+ {
+ List<SessionInfo> sessionInfos = new ArrayList<SessionInfo>();
+ List<ServerConnection> endpoints = messagingServerManagement.getActiveConnections();
+ for (ServerConnection endpoint : endpoints)
+ {
+ if(user == null || user.equals(endpoint.getUsername()))
+ {
+ sessionInfos.addAll(getSessionsForConnection(endpoint.getID()));
+ }
+ }
+ return sessionInfos;
+ }
+
public void startGatheringStatistics()
{
//To change body of implemented methods use File | Settings | File Templates.
Added: trunk/src/main/org/jboss/jms/server/SessionInfo.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/SessionInfo.java (rev 0)
+++ trunk/src/main/org/jboss/jms/server/SessionInfo.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -0,0 +1,55 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server;
+
+import java.io.Serializable;
+
+/**
+ * Information regarding to a session
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class SessionInfo implements Serializable
+{
+ private String id;
+ private String connectionID;
+
+ public SessionInfo(String id, String connectionID)
+ {
+ this.id = id;
+ this.connectionID = connectionID;
+ }
+
+ public String getId()
+ {
+ return id;
+ }
+
+ public String getConnectionID()
+ {
+ return connectionID;
+ }
+
+ public String toString()
+ {
+ return id + "@" + connectionID + "\n";
+ }
+}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnection.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnection.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -26,11 +26,14 @@
import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.messaging.core.remoting.wireformat.ConnectionCreateSessionResponseMessage;
+import java.util.Collection;
+
/**
*
* A ServerConnection
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
*
*/
public interface ServerConnection
@@ -62,5 +65,9 @@
long getCreatedTime();
- String getClientAddress();
+ String getClientAddress();
+
+ long getCreated();
+
+ Collection<ServerSession> getSessions();
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -21,11 +21,7 @@
*/
package org.jboss.jms.server.endpoint;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -49,6 +45,7 @@
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
* @version <tt>$Revision$</tt>
*
* $Id$
@@ -96,8 +93,8 @@
private final Set<Queue> temporaryQueues = new ConcurrentHashSet<Queue>();
private volatile boolean started;
-
-
+
+
// Constructors ---------------------------------------------------------------------------------
public ServerConnectionEndpoint(final String username, final String password,
@@ -264,6 +261,16 @@
return clientAddress;
}
+ public long getCreated()
+ {
+ return createdTime;
+ }
+
+ public Collection<ServerSession> getSessions()
+ {
+ return sessions.values();
+ }
+
// Public ---------------------------------------------------------------------------------------
public String toString()
Modified: trunk/src/main/org/jboss/messaging/core/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/MessagingServer.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/src/main/org/jboss/messaging/core/MessagingServer.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -40,6 +40,7 @@
* This interface is never exposed outside the messaging server, e.g. by JMX or other means
*
* @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
*/
public interface MessagingServer extends MessagingComponent
{
@@ -72,6 +73,8 @@
HierarchicalRepository<HashSet<Role>> getSecurityRepository();
+ HierarchicalRepository<QueueSettings> getQueueSettingsRepository();
+
void setPostOffice(PostOffice postOffice);
void createQueue(String address, String name) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -28,6 +28,7 @@
import org.jboss.jms.server.endpoint.ServerConnection;
import org.jboss.messaging.core.impl.filter.FilterImpl;
import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
+import java.util.Set;
/**
* This interface describes the management interface exposed by the server
@@ -92,4 +93,8 @@
List<ServerConnection> getActiveConnections();
void moveMessages(String toQueue, String fromQueue, FilterImpl filter) throws Exception;
+
+ void expireMessages(String queue,Filter filter) throws Exception;
+
+ Set<String> listAvailableAddresses();
}
Modified: trunk/src/main/org/jboss/messaging/core/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/PostOffice.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/src/main/org/jboss/messaging/core/PostOffice.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
*
@@ -65,4 +66,6 @@
//For testing only
Map<String, List<Binding>> getMappings();
+
+ Set<String> listAvailableAddresses();
}
Modified: trunk/src/main/org/jboss/messaging/core/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/Queue.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/src/main/org/jboss/messaging/core/Queue.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -22,6 +22,7 @@
package org.jboss.messaging.core;
import org.jboss.messaging.core.impl.filter.FilterImpl;
+import org.jboss.messaging.util.HierarchicalRepository;
import java.util.LinkedList;
import java.util.List;
@@ -32,6 +33,7 @@
* A Queue
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
*
*/
public interface Queue
@@ -100,21 +102,7 @@
int getMessagesAdded();
//--------
-
- Queue getDLQ();
-
- Queue getExpiryQueue();
-
- int getMaxDeliveryAttempts();
-
- void setMaxDeliveryAttempts(int max);
-
- long getRedeliveryDelay();
-
- void setRedeliveryDelay(long delay);
-
- int getMessageCounterHistoryDayLimit();
-
- void setMessageCounterHistoryDayLimit(int limit);
+ HierarchicalRepository<QueueSettings> getQueueSettings();
+
}
Modified: trunk/src/main/org/jboss/messaging/core/QueueSettings.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/QueueSettings.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/src/main/org/jboss/messaging/core/QueueSettings.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -48,8 +48,8 @@
private Integer maxDeliveryAttempts = null;
private Integer messageCounterHistoryDayLimit = null;
private Long redeliveryDelay = null;
- private String DLQ = null;
- private String ExpiryQueue = null;
+ private Queue DLQ = null;
+ private Queue ExpiryQueue = null;
public Boolean isClustered()
@@ -113,22 +113,22 @@
}
- public String getDLQ()
+ public Queue getDLQ()
{
return DLQ;
}
- public void setDLQ(String DLQ)
+ public void setDLQ(Queue DLQ)
{
this.DLQ = DLQ;
}
- public String getExpiryQueue()
+ public Queue getExpiryQueue()
{
return ExpiryQueue;
}
- public void setExpiryQueue(String expiryQueue)
+ public void setExpiryQueue(Queue expiryQueue)
{
ExpiryQueue = expiryQueue;
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/MessageReferenceImpl.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/src/main/org/jboss/messaging/core/impl/MessageReferenceImpl.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -135,16 +135,17 @@
}
queue.decrementDeliveringCount();
+
+ int maxDeliveries = queue.getQueueSettings().getMatch(queue.getName()).getMaxDeliveryAttempts();
- int maxDeliveries = queue.getMaxDeliveryAttempts();
-
if (maxDeliveries > 0 && deliveryCount >= maxDeliveries)
{
- if (queue.getDLQ() != null)
+ Queue DLQ = queue.getQueueSettings().getMatch(queue.getName()).getDLQ();
+ if (DLQ != null)
{
Message copyMessage = makeCopyForDLQOrExpiry(false, persistenceManager);
- moveInTransaction(queue.getDLQ(), copyMessage, persistenceManager);
+ moveInTransaction(DLQ, copyMessage, persistenceManager);
}
else
{
@@ -165,11 +166,12 @@
public void expire(PersistenceManager persistenceManager) throws Exception
{
- if (queue.getExpiryQueue() != null)
+ Queue expiryQueue = queue.getQueueSettings().getMatch(queue.getName()).getExpiryQueue();
+ if (expiryQueue != null)
{
Message copyMessage = makeCopyForDLQOrExpiry(false, persistenceManager);
- moveInTransaction(queue.getExpiryQueue(), copyMessage, persistenceManager);
+ moveInTransaction(expiryQueue, copyMessage, persistenceManager);
}
else
{
@@ -195,7 +197,7 @@
private void moveInTransaction(Queue destinationQueue, Message copyMessage,
PersistenceManager persistenceManager) throws Exception
{
- copyMessage.createReference(queue.getExpiryQueue());
+ copyMessage.createReference(destinationQueue);
TransactionImpl tx = new TransactionImpl();
Modified: trunk/src/main/org/jboss/messaging/core/impl/QueueFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/QueueFactoryImpl.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/src/main/org/jboss/messaging/core/impl/QueueFactoryImpl.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -31,30 +31,31 @@
import org.jboss.messaging.util.HierarchicalRepository;
/**
- *
+ *
* A QueueFactoryImpl
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
*
*/
public class QueueFactoryImpl implements QueueFactory
{
private HierarchicalRepository<QueueSettings> queueSettingsRepository;
-
+
private ScheduledExecutorService scheduledExecutor;
public QueueFactoryImpl(ScheduledExecutorService scheduledExecutor)
{
this();
-
+
this.scheduledExecutor = scheduledExecutor;
}
-
+
public QueueFactoryImpl()
{
queueSettingsRepository = new HierarchicalObjectRepository<QueueSettings>();
-
- queueSettingsRepository.setDefault(new QueueSettings());
+
+ queueSettingsRepository.setDefault(new QueueSettings());
}
public Queue createQueue(long id, String name, Filter filter,
@@ -62,20 +63,11 @@
{
QueueSettings queueSettings = queueSettingsRepository.getMatch(name);
- Queue queue = new QueueImpl(id, name, filter, queueSettings.isClustered(),
- durable, temporary, queueSettings.getMaxSize(),
- scheduledExecutor);
-
- queue.setMaxDeliveryAttempts(queueSettings.getMaxDeliveryAttempts());
-
- queue.setMessageCounterHistoryDayLimit(queueSettings.getMessageCounterHistoryDayLimit());
-
- queue.setRedeliveryDelay(queueSettings.getRedeliveryDelay());
-
+ Queue queue = new QueueImpl(id, name, filter, queueSettings.isClustered(), durable, temporary, queueSettings.getMaxSize(), queueSettingsRepository);
+
queue.setDistributionPolicy(queueSettings.getDistributionPolicy());
return queue;
-
}
public void setQueueSettingsRepository(HierarchicalRepository<QueueSettings> queueSettingsRepository)
Modified: trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -32,22 +32,19 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.jboss.messaging.core.Consumer;
-import org.jboss.messaging.core.DistributionPolicy;
-import org.jboss.messaging.core.Filter;
-import org.jboss.messaging.core.HandleStatus;
-import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.PriorityLinkedList;
-import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.*;
import org.jboss.messaging.util.Logger;
+import org.jboss.messaging.util.HierarchicalRepository;
+import org.jboss.messaging.util.HierarchicalObjectRepository;
/**
- *
+ *
* Implementation of a Queue
- *
+ *
* TODO use Java 5 concurrent queue
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
*
*/
public class QueueImpl implements Queue
@@ -55,92 +52,96 @@
private static final Logger log = Logger.getLogger(QueueImpl.class);
private static final boolean trace = log.isTraceEnabled();
-
+
protected volatile long id = -1;
-
+
protected String name;
-
+
protected volatile int maxSize = -1;
-
+
protected boolean clustered;
-
+
protected boolean temporary;
-
+
protected boolean durable;
-
+
protected Filter filter;
-
+
protected PriorityLinkedList<MessageReference> messageReferences;
-
+
protected List<Consumer> consumers;
-
+
protected Set<ScheduledDeliveryRunnable> scheduledRunnables;
-
+
protected DistributionPolicy distributionPolicy;
-
+
protected boolean direct;
-
+
protected boolean promptDelivery;
-
+
private int pos;
-
+
private AtomicInteger messagesAdded = new AtomicInteger(0);
-
+
private AtomicInteger deliveringCount = new AtomicInteger(0);
-
+
private ScheduledExecutorService scheduledExecutor;
-
+
// ---------
-
- private Queue dlq;
-
- private Queue expiryQueue;
-
- private int maxDeliveryAttempts;
-
- private long redeliveryDelay;
-
- private int messageCounterHistoryDayLimit;
-
+
+ HierarchicalRepository<QueueSettings> queueSettings;
+
public QueueImpl(long id, String name, Filter filter, boolean clustered,
boolean durable, boolean temporary, int maxSize, ScheduledExecutorService scheduledExecutor)
{
this(id, name, filter, clustered, durable, temporary, maxSize);
-
+
this.scheduledExecutor = scheduledExecutor;
}
-
+
public QueueImpl(long id, String name, Filter filter, boolean clustered,
+ boolean durable, boolean temporary, int maxSize, HierarchicalRepository<QueueSettings> queueSettingsRepository)
+ {
+ this(id, name, filter, clustered, durable, temporary, maxSize);
+
+ queueSettings = queueSettingsRepository;
+ }
+
+ public QueueImpl(long id, String name, Filter filter, boolean clustered,
boolean durable, boolean temporary, int maxSize)
{
this.id = id;
-
+
this.name = name;
-
+
this.filter = filter;
-
+
this.clustered = clustered;
-
+
this.durable = durable;
-
+
this.temporary = temporary;
-
+
this.maxSize = maxSize;
-
+
//TODO - use a wait free concurrent queue
messageReferences = new PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
-
+
consumers = new ArrayList<Consumer>();
-
+
scheduledRunnables = new HashSet<ScheduledDeliveryRunnable>();
-
+
distributionPolicy = new RoundRobinDistributionPolicy();
-
+
direct = true;
+
+ queueSettings = new HierarchicalObjectRepository<QueueSettings>();
+ queueSettings.setDefault(new QueueSettings());
}
-
+
+
// Queue implementation -------------------------------------------------------------------
-
+
public boolean isClustered()
{
return clustered;
@@ -155,12 +156,12 @@
{
return temporary;
}
-
+
public String getName()
{
return name;
}
-
+
public synchronized HandleStatus addLast(MessageReference ref)
{
return add(ref, false);
@@ -170,21 +171,21 @@
{
return add(ref, true);
}
-
+
public synchronized void addListFirst(LinkedList<MessageReference> list)
{
ListIterator<MessageReference> iter = list.listIterator(list.size());
-
+
while (iter.hasPrevious())
- {
+ {
MessageReference ref = iter.previous();
-
- messageReferences.addFirst(ref, ref.getMessage().getPriority());
+
+ messageReferences.addFirst(ref, ref.getMessage().getPriority());
}
-
+
deliver();
}
-
+
/*
* Attempt to deliver all the messages in the queue
* @see org.jboss.messaging.newcore.intf.Queue#deliver()
@@ -192,9 +193,9 @@
public synchronized void deliver()
{
MessageReference reference;
-
+
ListIterator<MessageReference> iterator = null;
-
+
while (true)
{
if (iterator == null)
@@ -212,21 +213,21 @@
reference = null;
}
}
-
+
if (reference == null)
{
if (iterator == null)
{
//We delivered all the messages - go into direct delivery
direct = true;
-
+
promptDelivery = false;
- }
+ }
return;
}
-
+
HandleStatus status = deliver(reference);
-
+
if (status == HandleStatus.HANDLED)
{
if (iterator == null)
@@ -248,31 +249,31 @@
//Consumers not all busy - but filter not accepting - iterate back through the queue
iterator = messageReferences.iterator();
}
- }
+ }
}
-
+
public synchronized void addConsumer(Consumer consumer)
{
consumers.add(consumer);
}
-
+
public synchronized boolean removeConsumer(Consumer consumer)
{
boolean removed = consumers.remove(consumer);
-
+
if (pos == consumers.size())
{
pos = 0;
}
-
+
if (consumers.isEmpty())
{
promptDelivery = false;
}
-
+
return removed;
}
-
+
public synchronized int getConsumerCount()
{
return consumers.size();
@@ -287,7 +288,7 @@
else
{
ArrayList<MessageReference> list = new ArrayList<MessageReference>();
-
+
for (MessageReference ref: messageReferences.getAll())
{
if (filter.match(ref.getMessage()))
@@ -295,7 +296,7 @@
list.add(ref);
}
}
-
+
return list;
}
}
@@ -303,16 +304,16 @@
public synchronized void removeAllReferences()
{
messageReferences.clear();
-
+
if (!scheduledRunnables.isEmpty())
{
Set<ScheduledDeliveryRunnable> clone = new HashSet<ScheduledDeliveryRunnable>(scheduledRunnables);
-
+
for (ScheduledDeliveryRunnable runnable: clone)
{
runnable.cancel();
}
-
+
scheduledRunnables.clear();
}
}
@@ -320,7 +321,7 @@
public synchronized void removeReference(MessageReference messageReference)
{
messageReferences.remove(messageReference , messageReference.getMessage().getPriority());
-
+
if (!scheduledRunnables.isEmpty())
{
Set<ScheduledDeliveryRunnable> clone = new HashSet<ScheduledDeliveryRunnable>(scheduledRunnables);
@@ -334,16 +335,13 @@
}
}
- //FIXME - probably better with an iterator
public synchronized List<MessageReference> removeReferences(Filter filter)
{
List<MessageReference> allRefs = list(filter);
-
for (MessageReference messageReference : allRefs)
{
removeReference(messageReference);
}
-
return allRefs;
}
@@ -351,17 +349,17 @@
{
return id;
}
-
+
public void setPersistenceID(long id)
{
this.id = id;
}
-
+
public synchronized Filter getFilter()
{
return filter;
}
-
+
public synchronized void setFilter(Filter filter)
{
this.filter = filter;
@@ -369,14 +367,15 @@
public synchronized int getMessageCount()
{
+ // log.info("mr: " + messageReferences.size() + " sc: " + getScheduledCount() + " dc: " + getDeliveringCount());
return messageReferences.size() + getScheduledCount() + getDeliveringCount();
}
-
+
public synchronized int getScheduledCount()
{
return scheduledRunnables.size();
}
-
+
public int getDeliveringCount()
{
return deliveringCount.get();
@@ -395,14 +394,14 @@
public synchronized void setMaxSize(int maxSize)
{
int num = messageReferences.size() + scheduledRunnables.size();
-
+
if (maxSize < num)
{
throw new IllegalArgumentException("Cannot set maxSize to " + maxSize + " since there are " + num + " refs");
}
this.maxSize = maxSize;
}
-
+
public synchronized DistributionPolicy getDistributionPolicy()
{
return distributionPolicy;
@@ -412,29 +411,29 @@
{
this.distributionPolicy = distributionPolicy;
}
-
+
public int getMessagesAdded()
{
return messagesAdded.get();
}
-
+
public boolean equals(Object other)
{
if (this == other)
{
return true;
}
-
+
QueueImpl qother = (QueueImpl)other;
-
+
return name.equals(qother.name);
}
-
+
public int hashCode()
{
return name.hashCode();
}
-
+
// Private ------------------------------------------------------------------------------
private HandleStatus add(MessageReference ref, boolean first)
@@ -443,22 +442,22 @@
{
return HandleStatus.BUSY;
}
-
+
if (!first)
{
messagesAdded.incrementAndGet();
}
-
+
if (!checkAndSchedule(ref))
- {
+ {
boolean add = false;
-
+
if (direct)
{
//Deliver directly
-
+
HandleStatus status = deliver(ref);
-
+
if (status == HandleStatus.HANDLED)
{
//Ok
@@ -471,7 +470,7 @@
{
add = true;
}
-
+
if (add)
{
direct = false;
@@ -481,7 +480,7 @@
{
add = true;
}
-
+
if (add)
{
if (first)
@@ -492,9 +491,9 @@
{
messageReferences.addLast(ref, ref.getMessage().getPriority());
}
-
+
if (!direct && promptDelivery)
- {
+ {
//We have consumers with filters which don't match, so we need to prompt delivery every time
//a new message arrives - this is why you really shouldn't use filters with queues - in most cases
//it's an ant-pattern since it would cause a queue scan on each message
@@ -502,28 +501,28 @@
}
}
}
-
+
return HandleStatus.HANDLED;
}
-
+
private boolean checkAndSchedule(MessageReference ref)
{
long now = System.currentTimeMillis();
-
+
if (scheduledExecutor != null && ref.getScheduledDeliveryTime() > now)
- {
+ {
if (trace) { log.trace("Scheduling delivery for " + ref + " to occur at " + ref.getScheduledDeliveryTime()); }
-
+
long delay = ref.getScheduledDeliveryTime() - now;
-
+
ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
-
+
scheduledRunnables.add(runnable);
-
+
Future<?> future = scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
runnable.setFuture(future);
-
+
return true;
}
else
@@ -531,13 +530,13 @@
return false;
}
}
-
+
private boolean checkFull()
{
if (maxSize != -1 && (messageReferences.size() + scheduledRunnables.size()) >= maxSize)
{
if (trace) { log.trace(this + " queue is full, rejecting message"); }
-
+
return false;
}
else
@@ -545,26 +544,26 @@
return true;
}
}
-
+
private HandleStatus deliver(MessageReference reference)
{
if (consumers.isEmpty())
{
return HandleStatus.BUSY;
}
-
+
int startPos = pos;
-
+
boolean filterRejected = false;
-
+
while (true)
- {
+ {
Consumer consumer = consumers.get(pos);
-
- pos = distributionPolicy.select(consumers, pos);
-
+
+ pos = distributionPolicy.select(consumers, pos);
+
HandleStatus status;
-
+
try
{
status = consumer.handle(reference);
@@ -573,15 +572,15 @@
{
//If the consumer throws an exception we remove the consumer
removeConsumer(consumer);
-
+
return HandleStatus.BUSY;
}
-
+
if (status == null)
{
throw new IllegalStateException("ClientConsumer.handle() should never return null");
}
-
+
if (status == HandleStatus.HANDLED)
{
deliveringCount.incrementAndGet();
@@ -591,10 +590,10 @@
else if (status == HandleStatus.NO_MATCH)
{
promptDelivery = true;
-
+
filterRejected = true;
- }
-
+ }
+
if (pos == startPos)
{
//Tried all of them
@@ -608,24 +607,24 @@
return HandleStatus.BUSY;
}
}
- }
+ }
}
-
+
// Inner classes --------------------------------------------------------------------------
-
+
private class ScheduledDeliveryRunnable implements Runnable
{
private MessageReference ref;
-
+
private volatile Future<?> future;
-
+
private boolean cancelled;
public ScheduledDeliveryRunnable(MessageReference ref)
{
this.ref = ref;
}
-
+
public synchronized void setFuture(Future<?> future)
{
if (cancelled)
@@ -637,39 +636,39 @@
this.future = future;
}
}
-
+
public synchronized void cancel()
{
if (future != null)
{
- future.cancel(false);
+ future.cancel(false);
}
-
+
cancelled = true;
}
public void run()
{
if (trace) { log.trace("Scheduled delivery timeout " + ref); }
-
+
synchronized (scheduledRunnables)
{
boolean removed = scheduledRunnables.remove(this);
-
+
if (!removed)
{
log.warn("Failed to remove timeout " + this);
}
}
-
+
ref.setScheduledDeliveryTime(0);
-
+
HandleStatus status = deliver(ref);
-
+
if (HandleStatus.HANDLED != status)
{
//Add back to the front of the queue
-
+
addFirst(ref);
}
else
@@ -680,63 +679,9 @@
}
// -------------------------------
-
-
- //TODO - these can probably all be managed by the queue settings manager
-
- public Queue getDLQ()
- {
- return dlq;
- }
- public void setDLQ(Queue dlq)
+ public HierarchicalRepository<QueueSettings> getQueueSettings()
{
- this.dlq = dlq;
+ return queueSettings;
}
-
- public Queue getExpiryQueue()
- {
- return expiryQueue;
- }
-
- public void setExpiryQueue(Queue expiryQueue)
- {
- this.expiryQueue = expiryQueue;
- }
-
- public int getMaxDeliveryAttempts()
- {
- return maxDeliveryAttempts;
- }
-
- public void setMaxDeliveryAttempts(int maxDeliveryAttempts)
- {
- this.maxDeliveryAttempts = maxDeliveryAttempts;
- }
-
- public long getRedeliveryDelay()
- {
- return redeliveryDelay;
- }
-
- public void setRedeliveryDelay(long redeliveryDelay)
- {
- this.redeliveryDelay = redeliveryDelay;
- }
-
- public int getMessageCounterHistoryDayLimit()
- {
- return messageCounterHistoryDayLimit;
- }
-
- public void setMessageCounterHistoryDayLimit(int messageCounterHistoryDayLimit)
- {
- this.messageCounterHistoryDayLimit = messageCounterHistoryDayLimit;
- }
-
- // -------------------------------------------------------
-
-
-
-
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/PostOfficeImpl.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/PostOfficeImpl.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -110,7 +110,13 @@
{
return allowableAddresses.contains(address);
}
-
+
+
+ public Set<String> listAvailableAddresses()
+ {
+ return allowableAddresses;
+ }
+
public Binding addBinding(String address, String queueName, Filter filter,
boolean durable, boolean temporary) throws Exception
{
@@ -239,9 +245,9 @@
{
return mappings;
}
-
-
-
+
+
+
// Private -----------------------------------------------------------------
private Binding createBinding(String address, String name, Filter filter,
Modified: trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -28,7 +28,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
-import org.jboss.aop.microcontainer.aspects.jmx.JMX;
import org.jboss.jms.server.ConnectionManager;
import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
import org.jboss.jms.server.endpoint.MessagingServerPacketHandler;
@@ -70,6 +69,8 @@
import org.jboss.messaging.util.Version;
import org.jboss.security.AuthenticationManager;
+import javax.jms.Destination;
+
/**
* A Messaging Server
*
@@ -82,7 +83,6 @@
* <p/>
* $Id: ServerPeer.java 3543 2008-01-07 22:31:58Z clebert.suconic at jboss.com $
*/
- at JMX(name = "jboss.messaging:service=MessagingServer", exposedInterface = MessagingServer.class)
public class MessagingServerImpl implements MessagingServer
{
// Constants ------------------------------------------------------------------------------------
@@ -119,7 +119,7 @@
private Configuration configuration = new Configuration();
private HierarchicalRepository<HashSet<Role>> securityRepository = new HierarchicalObjectRepository<HashSet<Role>>();
private HierarchicalRepository<QueueSettings> queueSettingsRepository = new HierarchicalObjectRepository<QueueSettings>();
- private QueueFactory queueFactory;
+ private QueueFactory queueFactory = new QueueFactoryImpl();
private ResourceManager resourceManager = new ResourceManagerImpl(0);
private ScheduledExecutorService scheduledExecutor;
@@ -194,6 +194,7 @@
});
postOffice = new PostOfficeImpl(configuration.getMessagingServerID(),
persistenceManager, queueFactory, configuration.isStrictTck());
+ queueSettingsDeployer.setPostOffice(postOffice);
if (createTransport)
{
@@ -201,14 +202,12 @@
}
// Start the wired components
securityDeployer.start();
- queueSettingsDeployer.start();
connectionManager.start();
remotingService.addFailureListener(connectionManager);
memoryManager.start();
postOffice.start();
-
- MessagingServerPacketHandler serverPacketHandler = new MessagingServerPacketHandler(this);
-
+ queueSettingsDeployer.start();
+ MessagingServerPacketHandler serverPacketHandler = new MessagingServerPacketHandler(this);
getRemotingService().getDispatcher().register(serverPacketHandler);
ClassLoader loader = Thread.currentThread().getContextClassLoader();
@@ -501,29 +500,29 @@
{
return "MessagingServer[" + configuration.getMessagingServerID() + "]";
}
-
+
public CreateConnectionResponse createConnection(final String username, final String password,
final String remotingClientSessionID, final String clientVMID,
final int prefetchSize, final String clientAddress)
throws Exception
{
log.trace("creating a new connection for user " + username);
-
+
// Authenticate. Successful autentication will place a new SubjectContext on thread local,
// which will be used in the authorization process. However, we need to make sure we clean
// up thread local immediately after we used the information, otherwise some other people
// security my be screwed up, on account of thread local security stack being corrupted.
-
+
securityStore.authenticate(username, password);
-
+
final ServerConnection connection =
new ServerConnectionEndpoint(username, password,
remotingClientSessionID, clientVMID, clientAddress,
prefetchSize, remotingService.getDispatcher(), resourceManager, persistenceManager,
postOffice, securityStore, connectionManager);
-
+
remotingService.getDispatcher().register(new ServerConnectionPacketHandler(connection));
-
+
return new CreateConnectionResponse(connection.getID());
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -30,6 +30,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.lang.management.ManagementFactory;
import org.jboss.aop.microcontainer.aspects.jmx.JMX;
import org.jboss.jms.client.api.ClientConnectionFactory;
@@ -47,6 +48,10 @@
import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
import org.jboss.messaging.util.MessagingException;
+import javax.management.StandardMBean;
+import javax.management.ObjectName;
+import javax.management.MBeanServer;
+
/**
* This interface describes the properties and operations that comprise the management interface of the
* Messaging Server.
@@ -57,7 +62,7 @@
* @author <a href="mailto:ataylor at redhat.com>Andy Taylor</a>
* @author <a href="mailto:ataylor at redhat.com>Andy Taylor</a>
*/
- at JMX(name = "jboss.messaging:service=MessagingServerManagement", exposedInterface = MessagingServerManagement.class)
+//@JMX(name = "jboss.messaging:service=MessagingServerManagement", exposedInterface = MessagingServerManagement.class)
public class MessagingServerManagementImpl implements MessagingServerManagement, MessagingComponent
{
private MessagingServer messagingServer;
@@ -195,7 +200,7 @@
throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
}
Queue queue = binding.getQueue();
- currentCounters.put(queueName, new MessageCounter(queue.getName(),queue, queue.isDurable(), queue.getMessageCounterHistoryDayLimit()));
+ currentCounters.put(queueName, new MessageCounter(queue.getName(),queue, queue.isDurable(), queue.getQueueSettings().getMatch(queue.getName()).getMessageCounterHistoryDayLimit()));
}
public void unregisterMessageCounter(final String queueName) throws Exception
@@ -223,7 +228,7 @@
throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
}
Queue queue = binding.getQueue();
- messageCounter = new MessageCounter(queue.getName(), queue, queue.isDurable(), queue.getMessageCounterHistoryDayLimit());
+ messageCounter = new MessageCounter(queue.getName(), queue, queue.isDurable(), queue.getQueueSettings().getMatch(queue.getName()).getMessageCounterHistoryDayLimit());
}
currentCounters.put(queueName, messageCounter);
messageCounter.resetCounter();
@@ -350,6 +355,21 @@
}
}
+
+ public void expireMessages(String queue, Filter filter) throws Exception
+ {
+ List<MessageReference> allRefs = getQueue(queue).removeReferences(filter);
+ for (MessageReference messageReference : allRefs)
+ {
+ messageReference.expire(messagingServer.getPersistenceManager());
+ }
+ }
+
+ public Set<String> listAvailableAddresses()
+ {
+ return messagingServer.getPostOffice().listAvailableAddresses();
+ }
+
//
//// public int getDeliveringCountForQueue(String queue) throws Exception
//// {
Modified: trunk/src/main/org/jboss/messaging/deployers/queue/QueueSettingsDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/deployers/queue/QueueSettingsDeployer.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/src/main/org/jboss/messaging/deployers/queue/QueueSettingsDeployer.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -22,6 +22,7 @@
package org.jboss.messaging.deployers.queue;
import org.jboss.messaging.core.QueueSettings;
+import org.jboss.messaging.core.PostOffice;
import org.jboss.messaging.deployers.Deployer;
import org.jboss.messaging.util.HierarchicalRepository;
import org.w3c.dom.Node;
@@ -37,6 +38,8 @@
* The repository to add to
*/
HierarchicalRepository<QueueSettings> queueSettingsRepository;
+
+ PostOffice postOffice;
private static final String CLUSTERED_NODE_NAME = "clustered";
private static final String DLQ_NODE_NAME = "dlq";
private static final String EXPIREY_QUEUE_NODE_NAME = "expiry-queue";
@@ -50,6 +53,11 @@
this.queueSettingsRepository = queueSettingsRepository;
}
+ public void setPostOffice(PostOffice postOffice)
+ {
+ this.postOffice = postOffice;
+ }
+
/**
* the names of the elements to deploy
* @return the names of the elements todeploy
@@ -78,11 +86,22 @@
}
if(DLQ_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
{
- queueSettings.setDLQ(child.getTextContent());
+ String queueName = child.getTextContent();
+ if(postOffice.getBinding(queueName) == null)
+ {
+
+ postOffice.addBinding(queueName, queueName, null, true, false);
+ }
+ queueSettings.setDLQ(postOffice.getBinding(queueName).getQueue());
}
if(EXPIREY_QUEUE_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
{
- queueSettings.setExpiryQueue(child.getTextContent());
+ String queueName = child.getTextContent();
+ if(postOffice.getBinding(queueName) == null)
+ {
+ postOffice.addBinding(queueName, queueName, null, true, false);
+ }
+ queueSettings.setExpiryQueue(postOffice.getBinding(queueName).getQueue());
}
if(REDELIVERY_DELAY_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
{
Modified: trunk/src/main/org/jboss/messaging/util/HierarchicalObjectRepository.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/HierarchicalObjectRepository.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/src/main/org/jboss/messaging/util/HierarchicalObjectRepository.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -24,6 +24,7 @@
import org.jboss.messaging.core.Mergeable;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.lang.reflect.ParameterizedType;
/**
@@ -49,6 +50,10 @@
MatchComparator<String> matchComparator = new MatchComparator<String>();
/**
+ * a cache
+ */
+ Map<String, T> cache = new ConcurrentHashMap<String,T>();
+ /**
* Add a new match to the repository
*
* @param match The regex to use to match against
@@ -56,6 +61,7 @@
*/
public void addMatch(String match, T value)
{
+ cache.clear();
Match.verify(match);
Match<T> match1 = new Match<T>(match);
match1.setValue(value);
@@ -71,11 +77,18 @@
*/
public T getMatch(String match)
{
+ if(cache.get(match) != null)
+ {
+ return cache.get(match);
+ }
T actualMatch;
HashMap<String, Match<T>> possibleMatches = getPossibleMatches(match);
List<Match<T>> orderedMatches = sort(possibleMatches);
actualMatch = merge(orderedMatches);
- return actualMatch != null ? actualMatch : defaultmatch;
+ T value = actualMatch != null ? actualMatch : defaultmatch;
+ if(value != null)
+ cache.put(match, value);
+ return value;
}
/**
@@ -135,6 +148,7 @@
*/
public void setDefault(T defaultValue)
{
+ cache.clear();
defaultmatch = defaultValue;
}
Modified: trunk/tests/src/org/jboss/messaging/core/test/unit/QueueSettingsTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/test/unit/QueueSettingsTest.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/tests/src/org/jboss/messaging/core/test/unit/QueueSettingsTest.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -23,7 +23,9 @@
import junit.framework.TestCase;
import org.jboss.messaging.core.QueueSettings;
+import org.jboss.messaging.core.Queue;
import org.jboss.messaging.core.impl.RoundRobinDistributionPolicy;
+import org.jboss.messaging.core.impl.QueueImpl;
/**
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
@@ -50,8 +52,10 @@
QueueSettings queueSettings = new QueueSettings();
QueueSettings queueSettingsToMerge = new QueueSettings();
queueSettingsToMerge.setClustered(true);
- queueSettingsToMerge.setDLQ("testDlq");
- queueSettingsToMerge.setExpiryQueue("testExpiryQueue");
+ Queue DLQ = new QueueImpl(0,"testDLQ", null, false, false, false,0);
+ Queue exp = new QueueImpl(0,"testExpiryQueue", null, false, false, false,0);
+ queueSettingsToMerge.setDLQ(DLQ);
+ queueSettingsToMerge.setExpiryQueue(exp);
queueSettingsToMerge.setMaxDeliveryAttempts(1000);
queueSettingsToMerge.setMaxSize(1001);
queueSettingsToMerge.setMessageCounterHistoryDayLimit(1002);
@@ -60,8 +64,8 @@
assertEquals(queueSettings.getDistributionPolicy().getClass(), QueueSettings.DEFAULT_DISTRIBUTION_POLICY.getClass());
assertEquals(queueSettings.getDistributionPolicyClass(), null);
assertEquals(queueSettings.isClustered(), Boolean.valueOf(true));
- assertEquals(queueSettings.getDLQ(), "testDlq");
- assertEquals(queueSettings.getExpiryQueue(), "testExpiryQueue");
+ assertEquals(queueSettings.getDLQ(), DLQ);
+ assertEquals(queueSettings.getExpiryQueue(), exp);
assertEquals(queueSettings.getMaxDeliveryAttempts(), Integer.valueOf(1000));
assertEquals(queueSettings.getMaxSize(), Integer.valueOf(1001));
assertEquals(queueSettings.getMessageCounterHistoryDayLimit(), Integer.valueOf(1002));
@@ -73,8 +77,10 @@
QueueSettings queueSettings = new QueueSettings();
QueueSettings queueSettingsToMerge = new QueueSettings();
queueSettingsToMerge.setClustered(true);
- queueSettingsToMerge.setDLQ("testDlq");
- queueSettingsToMerge.setExpiryQueue("testExpiryQueue");
+ Queue DLQ = new QueueImpl(0,"testDLQ", null, false, false, false,0);
+ Queue exp = new QueueImpl(0,"testExpiryQueue", null, false, false, false,0);
+ queueSettingsToMerge.setDLQ(DLQ);
+ queueSettingsToMerge.setExpiryQueue(exp);
queueSettingsToMerge.setMaxDeliveryAttempts(1000);
queueSettingsToMerge.setMaxSize(1001);
queueSettingsToMerge.setMessageCounterHistoryDayLimit(1002);
@@ -83,7 +89,8 @@
QueueSettings queueSettingsToMerge2 = new QueueSettings();
queueSettingsToMerge2.setClustered(true);
- queueSettingsToMerge2.setExpiryQueue("testExpiryQueue2");
+ Queue exp2 = new QueueImpl(0,"testExpiryQueue2", null, false, false, false,0);
+ queueSettingsToMerge2.setExpiryQueue(exp2);
queueSettingsToMerge2.setMaxSize(2001);
queueSettingsToMerge2.setRedeliveryDelay((long)2003);
queueSettings.merge(queueSettingsToMerge2);
@@ -91,8 +98,8 @@
assertEquals(queueSettings.getDistributionPolicy().getClass(), QueueSettings.DEFAULT_DISTRIBUTION_POLICY.getClass());
assertEquals(queueSettings.getDistributionPolicyClass(), null);
assertEquals(queueSettings.isClustered(), Boolean.valueOf(true));
- assertEquals(queueSettings.getDLQ(), "testDlq");
- assertEquals(queueSettings.getExpiryQueue(), "testExpiryQueue2");
+ assertEquals(queueSettings.getDLQ(), DLQ);
+ assertEquals(queueSettings.getExpiryQueue(), exp2);
assertEquals(queueSettings.getMaxDeliveryAttempts(), Integer.valueOf(1000));
assertEquals(queueSettings.getMaxSize(), Integer.valueOf(2001));
assertEquals(queueSettings.getMessageCounterHistoryDayLimit(), Integer.valueOf(1002));
@@ -104,8 +111,10 @@
QueueSettings queueSettings = new QueueSettings();
QueueSettings queueSettingsToMerge = new QueueSettings();
queueSettingsToMerge.setClustered(true);
- queueSettingsToMerge.setDLQ("testDlq");
- queueSettingsToMerge.setExpiryQueue("testExpiryQueue");
+ Queue DLQ = new QueueImpl(0,"testDLQ", null, false, false, false,0);
+ Queue exp = new QueueImpl(0,"testExpiryQueue", null, false, false, false,0);
+ queueSettingsToMerge.setDLQ(DLQ);
+ queueSettingsToMerge.setExpiryQueue(exp);
queueSettingsToMerge.setMaxDeliveryAttempts(1000);
queueSettingsToMerge.setMaxSize(1001);
queueSettingsToMerge.setMessageCounterHistoryDayLimit(1002);
@@ -114,8 +123,10 @@
QueueSettings queueSettingsToMerge2 = new QueueSettings();
queueSettingsToMerge2.setClustered(false);
- queueSettingsToMerge2.setDLQ("testDlq2");
- queueSettingsToMerge2.setExpiryQueue("testExpiryQueue2");
+ Queue exp2 = new QueueImpl(0,"testExpiryQueue2", null, false, false, false,0);
+ Queue DLQ2 = new QueueImpl(0,"testDlq2", null, false, false, false,0);
+ queueSettingsToMerge2.setExpiryQueue(exp2);
+ queueSettingsToMerge2.setDLQ(DLQ2);
queueSettingsToMerge2.setMaxDeliveryAttempts(2000);
queueSettingsToMerge2.setMaxSize(2001);
queueSettingsToMerge2.setMessageCounterHistoryDayLimit(2002);
@@ -125,8 +136,8 @@
assertEquals(queueSettings.getDistributionPolicy().getClass(), QueueSettings.DEFAULT_DISTRIBUTION_POLICY.getClass());
assertEquals(queueSettings.getDistributionPolicyClass(), null);
assertEquals(queueSettings.isClustered(), Boolean.valueOf(true));
- assertEquals(queueSettings.getDLQ(), "testDlq2");
- assertEquals(queueSettings.getExpiryQueue(), "testExpiryQueue2");
+ assertEquals(queueSettings.getDLQ(), DLQ2);
+ assertEquals(queueSettings.getExpiryQueue(), exp2);
assertEquals(queueSettings.getMaxDeliveryAttempts(), Integer.valueOf(2000));
assertEquals(queueSettings.getMaxSize(), Integer.valueOf(2001));
assertEquals(queueSettings.getMessageCounterHistoryDayLimit(), Integer.valueOf(2002));
Modified: trunk/tests/src/org/jboss/messaging/deployers/queue/tests/unit/QueueSettingsDeployerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/deployers/queue/tests/unit/QueueSettingsDeployerTest.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/tests/src/org/jboss/messaging/deployers/queue/tests/unit/QueueSettingsDeployerTest.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -57,8 +57,8 @@
queueSettingsDeployer.setQueueSettingsRepository(repository);
QueueSettings queueSettings = new QueueSettings();
queueSettings.setClustered(false);
- queueSettings.setDLQ("DLQtest");
- queueSettings.setExpiryQueue("ExpiryQueueTest");
+ //queueSettings.setDLQ("DLQtest");
+ //queueSettings.setExpiryQueue("ExpiryQueueTest");
queueSettings.setRedeliveryDelay((long)100);
queueSettings.setMaxSize(-100);
queueSettings.setDistributionPolicyClass("org.jboss.messaging.core.impl.RoundRobinDistributionPolicy");
Modified: trunk/tests/src/org/jboss/test/messaging/jms/server/JMSServerManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/JMSServerManagerTest.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/JMSServerManagerTest.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -24,7 +24,7 @@
import org.apache.tools.ant.taskdefs.Sleep;
import org.jboss.test.messaging.JBMServerTestCase;
import org.jboss.jms.server.JMSServerManager;
-import org.jboss.jms.server.ClientInfo;
+import org.jboss.jms.server.ConnectionInfo;
import org.jboss.jms.client.JBossConnectionFactory;
import org.jboss.jms.destination.JBossQueue;
import org.jboss.messaging.core.impl.server.SubscriptionInfo;
@@ -33,6 +33,7 @@
import javax.naming.NameNotFoundException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
/**
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
@@ -169,6 +170,24 @@
}
}
+ public void testListAllQueues()
+ {
+ Set<String> queueNames = jmsServerManager.listAllQueues();
+ for (String queueName : queueNames)
+ {
+ System.out.println("queueName = " + queueName);
+ }
+ }
+
+ public void testListAllTopics()
+ {
+ Set<String> topics = jmsServerManager.listAllTopics();
+ for (String queueName : topics)
+ {
+ System.out.println("queueName = " + queueName);
+ }
+ }
+
public void testCreateAndDestroyConectionFactory() throws Exception
{
jmsServerManager.createConnectionFactory("newtestcf", "anid", 100, true, 100, "newtestcf");
@@ -216,46 +235,197 @@
}
}
- public void testClientInfo() throws Exception
+ public void testGetConnections() throws Exception
{
Connection conn = getConnectionFactory().createConnection("guest", "guest");
- List<ClientInfo> clientInfos = jmsServerManager.getClients();
- assertNotNull(clientInfos);
- assertEquals(1, clientInfos.size());
- ClientInfo clientInfo = clientInfos.get(0);
- assertEquals("guest", clientInfo.getUser());
- assertEquals(ClientInfo.status.STOPPED, clientInfo.getStatus());
+ List<ConnectionInfo> connectionInfos = jmsServerManager.getConnections();
+ assertNotNull(connectionInfos);
+ assertEquals(1, connectionInfos.size());
+ ConnectionInfo connectionInfo = connectionInfos.get(0);
+ assertEquals("guest", connectionInfo.getUser());
+ assertEquals(ConnectionInfo.status.STOPPED, connectionInfo.getStatus());
conn.start();
// starting a connection is a remoting async operation
// wait a little before querying clients infos from the server
sleepIfRemoting(250);
- clientInfos = jmsServerManager.getClients();
- assertNotNull(clientInfos);
- assertEquals(1, clientInfos.size());
- clientInfo = clientInfos.get(0);
- assertEquals(ClientInfo.status.STARTED, clientInfo.getStatus());
- clientInfo.getAddress();
- clientInfo.getTimeCreated();
- clientInfo.getAliveTime();
+ connectionInfos = jmsServerManager.getConnections();
+ assertNotNull(connectionInfos);
+ assertEquals(1, connectionInfos.size());
+ connectionInfo = connectionInfos.get(0);
+ assertEquals(ConnectionInfo.status.STARTED, connectionInfo.getStatus());
+ connectionInfo.getAddress();
+ connectionInfo.getTimeCreated();
+ connectionInfo.getAliveTime();
conn.close();
- clientInfos = jmsServerManager.getClients();
- assertNotNull(clientInfos);
- assertEquals(0, clientInfos.size());
+ connectionInfos = jmsServerManager.getConnections();
+ assertNotNull(connectionInfos);
+ assertEquals(0, connectionInfos.size());
Connection conn2 = getConnectionFactory().createConnection("guest", "guest");
Connection conn3 = getConnectionFactory().createConnection("guest", "guest");
- clientInfos = jmsServerManager.getClients();
- assertNotNull(clientInfos);
- assertEquals(2, clientInfos.size());
+ connectionInfos = jmsServerManager.getConnections();
+ assertNotNull(connectionInfos);
+ assertEquals(2, connectionInfos.size());
conn2.close();
- clientInfos = jmsServerManager.getClients();
- assertNotNull(clientInfos);
- assertEquals(1, clientInfos.size());
+ connectionInfos = jmsServerManager.getConnections();
+ assertNotNull(connectionInfos);
+ assertEquals(1, connectionInfos.size());
conn3.close();
- clientInfos = jmsServerManager.getClients();
- assertNotNull(clientInfos);
- assertEquals(0, clientInfos.size());
+ connectionInfos = jmsServerManager.getConnections();
+ assertNotNull(connectionInfos);
+ assertEquals(0, connectionInfos.size());
}
+ public void testGetConnectionsForUser() throws Exception
+ {
+ Connection conn = getConnectionFactory().createConnection("guest", "guest");
+ Connection conn2 = getConnectionFactory().createConnection();
+ Connection conn3 = getConnectionFactory().createConnection();
+ Connection conn4 = getConnectionFactory().createConnection("guest", "guest");
+ Connection conn5 = getConnectionFactory().createConnection("guest", "guest");
+
+ try
+ {
+ List<ConnectionInfo> connectionInfos = jmsServerManager.getConnectionsForUser("guest");
+ assertNotNull(connectionInfos);
+ assertEquals(connectionInfos.size(),3);
+ for (ConnectionInfo connectionInfo : connectionInfos)
+ {
+ assertEquals(connectionInfo.getUser(), "guest");
+ }
+ }
+ finally
+ {
+ if(conn != null)
+ {
+ conn.close();
+ }
+ if(conn2 != null)
+ {
+ conn2.close();
+ }
+ if(conn3 != null)
+ {
+ conn3.close();
+ }
+ if(conn4 != null)
+ {
+ conn4.close();
+ }
+ if(conn5 != null)
+ {
+ conn5.close();
+ }
+ }
+
+ }
+
+ public void testDropConnectionForId() throws Exception
+ {
+ Connection conn = getConnectionFactory().createConnection("guest", "guest");
+ Connection conn2 = getConnectionFactory().createConnection();
+ Connection conn3 = getConnectionFactory().createConnection();
+ Connection conn4 = getConnectionFactory().createConnection("john", "needle");
+ Connection conn5 = getConnectionFactory().createConnection("guest", "guest");
+ String id = conn4.getClientID();
+ try
+ {
+
+ List<ConnectionInfo> connectionInfos = jmsServerManager.getConnectionsForUser("john");
+ assertEquals(connectionInfos.size(), 1);
+ jmsServerManager.dropConnection(connectionInfos.get(0).getId());
+ connectionInfos = jmsServerManager.getConnections();
+ assertNotNull(connectionInfos);
+ assertEquals(connectionInfos.size(),4);
+ for (ConnectionInfo connectionInfo : connectionInfos)
+ {
+ assertNotSame(connectionInfo.getUser(), "john");
+ }
+ try
+ {
+ conn4.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ fail("Should throw exception");
+ }
+ catch (JMSException e)
+ {
+ //pass
+ }
+ }
+ finally
+ {
+ if(conn != null)
+ {
+ conn.close();
+ }
+ if(conn2 != null)
+ {
+ conn2.close();
+ }
+ if(conn3 != null)
+ {
+ conn3.close();
+ }
+ if(conn5 != null)
+ {
+ conn5.close();
+ }
+ }
+
+ }
+
+ public void testDropConnectionForUser() throws Exception
+ {
+ Connection conn = getConnectionFactory().createConnection("guest", "guest");
+ Connection conn2 = getConnectionFactory().createConnection();
+ Connection conn3 = getConnectionFactory().createConnection();
+ Connection conn4 = getConnectionFactory().createConnection("john", "needle");
+ Connection conn5 = getConnectionFactory().createConnection("guest", "guest");
+ String id = conn4.getClientID();
+ try
+ {
+ jmsServerManager.dropConnectionForUser("guest");
+ List<ConnectionInfo> connectionInfos = jmsServerManager.getConnections();
+ assertNotNull(connectionInfos);
+ assertEquals(connectionInfos.size(),3);
+ for (ConnectionInfo connectionInfo : connectionInfos)
+ {
+ assertNotSame(connectionInfo.getUser(), "guest");
+ }
+ try
+ {
+ conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ fail("Should throw exception");
+ }
+ catch (JMSException e)
+ {
+ //pass
+ }
+ try
+ {
+ conn5.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ fail("Should throw exception");
+ }
+ catch (JMSException e)
+ {
+ //pass
+ }
+ }
+ finally
+ {
+ if(conn2 != null)
+ {
+ conn2.close();
+ }
+ if(conn3 != null)
+ {
+ conn3.close();
+ }
+ if(conn4 != null)
+ {
+ conn4.close();
+ }
+ }
+
+ }
public void test() throws Exception
{
Connection conn = getConnectionFactory().createConnection("guest", "guest");
@@ -558,4 +728,46 @@
}
}
+
+ public void testExpireMessage() throws Exception
+ {
+ Connection conn = getConnectionFactory().createConnection("guest", "guest");
+ try
+ {
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue q = (Queue) getInitialContext().lookup("/queue/QueueWithOwnDLQAndExpiryQueue");
+ MessageProducer producer = sess.createProducer(q);
+ Message messageToMove = null;
+ for (int i = 0; i < 10; i++)
+ {
+ TextMessage message = sess.createTextMessage();
+ producer.send(message);
+ if (i == 5)
+ {
+ messageToMove = message;
+ }
+ }
+ jmsServerManager.expireMessage("QueueWithOwnDLQAndExpiryQueue", messageToMove.getJMSMessageID());
+ MessageConsumer consumer = sess.createConsumer(q);
+ conn.start();
+ for (int i = 0; i < 9; i++)
+ {
+ Message message = consumer.receive();
+ assertNotSame(messageToMove.getJMSMessageID(), message.getJMSMessageID());
+ }
+ consumer.close();
+ Queue expQueue = (Queue) getInitialContext().lookup("/queue/PrivateExpiryQueue");
+ consumer = sess.createConsumer(expQueue);
+ Message message = consumer.receive();
+ assertEquals(messageToMove.getJMSMessageID(), message.getJMSMessageID());
+ }
+ finally
+ {
+ if(conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ }
}
Modified: trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2008-02-25 15:29:11 UTC (rev 3788)
+++ trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2008-02-25 15:34:18 UTC (rev 3789)
@@ -48,6 +48,7 @@
import org.jboss.messaging.core.Binding;
import org.jboss.messaging.core.MessagingServer;
import org.jboss.messaging.core.MessagingServerManagement;
+import org.jboss.messaging.core.QueueSettings;
import org.jboss.messaging.microcontainer.JBMBootstrapServer;
import org.jboss.messaging.util.JNDIUtil;
import org.jboss.test.messaging.tools.ConfigurationHelper;
@@ -834,11 +835,10 @@
public void setRedeliveryDelayOnDestination(String dest, boolean queue, long delay) throws Exception
{
- String condition = (queue ? "queue." : "topic.") + dest;
-
- List<Binding> bindings = this.getMessagingServer().getPostOffice().getBindingsForAddress(condition);
-
- bindings.get(0).getQueue().setRedeliveryDelay(delay);
+ String condition = (queue ? "queuejms." : "topicjms.") + dest;
+ QueueSettings queueSettings = new QueueSettings();
+ queueSettings.setRedeliveryDelay(delay);
+ getMessagingServer().getQueueSettingsRepository().addMatch(condition, queueSettings);
}
More information about the jboss-cvs-commits
mailing list