Author: jmesnil
Date: 2009-11-02 11:03:05 -0500 (Mon, 02 Nov 2009)
New Revision: 8184
Added:
trunk/src/main/org/hornetq/jms/bridge/JMSBridgeControl.java
trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeControlImpl.java
Modified:
trunk/docs/user-manual/en/jms-bridge.xml
trunk/examples/javaee/jms-bridge/server/jms-bridge-jboss-beans.xml
trunk/examples/jms/jms-bridge/server1/hornetq-beans.xml
trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-193: register JMS Bridge in JMX to manage it
* added JMSBridgeControlMBean to manage JMS Bridge using JMX
* added to JMSBridgeImpl ctor optional parameters to inject MBeanServer and MBean's
ObjectName
* inject JVM platform (resp. AS) MBeanServer in jms/jms-bridge (resp. javaee/jms-bridge)
example
* doc
-- patch contributed by Jose de Castro
Modified: trunk/docs/user-manual/en/jms-bridge.xml
===================================================================
--- trunk/docs/user-manual/en/jms-bridge.xml 2009-11-02 15:32:07 UTC (rev 8183)
+++ trunk/docs/user-manual/en/jms-bridge.xml 2009-11-02 16:03:05 UTC (rev 8184)
@@ -98,7 +98,12 @@
<parameter><null /></parameter>
<!-- Add MessageID In Header -->
<parameter>true</parameter>
- </constructor>
+ <!-- register the JMS Bridge in the AS MBeanServer -->
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>org.hornetq:service=JMSBridge</parameter>
+ </constructor>
<property name="transactionManager">
<inject bean="RealTransactionManager"/>
</property>
@@ -172,6 +177,10 @@
</constructor>
</bean>
+ <bean name="MBeanServer"
class="javax.management.MBeanServer">
+ <constructor
factoryClass="org.jboss.mx.util.MBeanServerLocator"
+ factoryMethod="locateJBoss"/>
+ </bean>
</deployment></programlisting>
<section>
<title>JMS Bridge Parameters</title>
@@ -318,6 +327,16 @@
it back it will be able to correlate it. </para>
</note>
</listitem>
+ <listitem>
+ <para>MBean Server</para>
+ <para>To manage the JMS Bridge using JMX, set the MBeanServer
where the JMS Bridge MBean
+ must be registered (e.g. the JVM Platform MBeanServer or JBoss AS
MBeanServer)</para>
+ </listitem>
+ <listitem>
+ <para>ObjectName</para>
+ <para>If you set the MBeanServer, you also need to set the
ObjectName used to register
+ the JMS Bridge MBean (must be unique)</para>
+ </listitem>
</itemizedlist>
</section>
<section>
Modified: trunk/examples/javaee/jms-bridge/server/jms-bridge-jboss-beans.xml
===================================================================
--- trunk/examples/javaee/jms-bridge/server/jms-bridge-jboss-beans.xml 2009-11-02 15:32:07
UTC (rev 8183)
+++ trunk/examples/javaee/jms-bridge/server/jms-bridge-jboss-beans.xml 2009-11-02 16:03:05
UTC (rev 8184)
@@ -48,6 +48,11 @@
<parameter><null /></parameter>
<!-- concatenate JMS messageID to the target's message header
-->
<parameter>true</parameter>
+ <!-- register the JMS Bridge in the AS MBeanServer -->
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>org.hornetq:service=JMSBridge</parameter>
</constructor>
<property name="transactionManager">
<inject bean="RealTransactionManager"/>
@@ -118,4 +123,9 @@
</constructor>
</bean>
+ <bean name="MBeanServer"
class="javax.management.MBeanServer">
+ <constructor factoryClass="org.jboss.mx.util.MBeanServerLocator"
+ factoryMethod="locateJBoss"/>
+ </bean>
+
</deployment>
\ No newline at end of file
Modified: trunk/examples/jms/jms-bridge/server1/hornetq-beans.xml
===================================================================
--- trunk/examples/jms/jms-bridge/server1/hornetq-beans.xml 2009-11-02 15:32:07 UTC (rev
8183)
+++ trunk/examples/jms/jms-bridge/server1/hornetq-beans.xml 2009-11-02 16:03:05 UTC (rev
8184)
@@ -107,6 +107,11 @@
<parameter><null /></parameter>
<!-- concatenate JMS messageID to the target's message header
-->
<parameter>true</parameter>
+ <!-- register the JMS Bridge in the JMX MBeanServer -->
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>org.hornetq:service=JMSBridge</parameter>
</constructor>
<property name="transactionManager">
<inject bean="TransactionManager"/>
Added: trunk/src/main/org/hornetq/jms/bridge/JMSBridgeControl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/JMSBridgeControl.java
(rev 0)
+++ trunk/src/main/org/hornetq/jms/bridge/JMSBridgeControl.java 2009-11-02 16:03:05 UTC
(rev 8184)
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.bridge;
+
+import org.hornetq.core.management.HornetQComponentControl;
+
+/**
+ * A JMSBridgeControl
+ *
+ * @author <a href="jose(a)voxeo.com">Jose de Castro</a>
+ *
+ */
+public interface JMSBridgeControl extends HornetQComponentControl
+{
+ void pause() throws Exception;
+
+ void resume() throws Exception;
+
+ String getSourceUsername();
+
+ void setSourceUsername(String name);
+
+ String getSourcePassword();
+
+ void setSourcePassword(String pwd);
+
+ String getTargetUsername();
+
+ void setTargetUsername(String name);
+
+ String getTargetPassword();
+
+ void setTargetPassword(String pwd);
+
+ String getSelector();
+
+ void setSelector(String selector);
+
+ long getFailureRetryInterval();
+
+ void setFailureRetryInterval(long interval);
+
+ int getMaxRetries();
+
+ void setMaxRetries(int retries);
+
+ String getQualityOfServiceMode();
+
+ void setQualityOfServiceMode(String mode);
+
+ int getMaxBatchSize();
+
+ void setMaxBatchSize(int size);
+
+ long getMaxBatchTime();
+
+ void setMaxBatchTime(long time);
+
+ String getSubscriptionName();
+
+ void setSubscriptionName(String subname);
+
+ String getClientID();
+
+ void setClientID(String clientID);
+
+ String getTransactionManagerLocatorClass();
+
+ void setTransactionManagerLocatorClass(String transactionManagerLocatorClass);
+
+ String getTransactionManagerLocatorMethod();
+
+ void setTransactionManagerLocatorMethod(String transactionManagerLocatorMethod);
+
+ boolean isAddMessageIDInHeader();
+
+ void setAddMessageIDInHeader(boolean value);
+
+ boolean isPaused();
+
+ boolean isFailed();
+
+}
Added: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeControlImpl.java
(rev 0)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeControlImpl.java 2009-11-02
16:03:05 UTC (rev 8184)
@@ -0,0 +1,257 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.bridge.impl;
+
+import javax.management.StandardMBean;
+
+import org.hornetq.jms.bridge.JMSBridge;
+import org.hornetq.jms.bridge.JMSBridgeControl;
+import org.hornetq.jms.bridge.QualityOfServiceMode;
+
+/**
+ * A JMSBridgeControlImpl
+ *
+ * @author <a href="jose(a)voxeo.com">Jose de Castro</a>
+ *
+ */
+public class JMSBridgeControlImpl extends StandardMBean implements JMSBridgeControl
+{
+
+ private JMSBridge bridge;
+
+ // Constructors --------------------------------------------------
+
+ public JMSBridgeControlImpl(JMSBridge bridge) throws Exception
+ {
+ super(JMSBridgeControl.class);
+ this.bridge = bridge;
+ }
+
+ // Public --------------------------------------------------------
+
+ public void pause() throws Exception
+ {
+ bridge.pause();
+ }
+
+ public void resume() throws Exception
+ {
+ bridge.resume();
+ }
+
+ public boolean isStarted()
+ {
+ return bridge.isStarted();
+ }
+
+ public void start() throws Exception
+ {
+ bridge.start();
+ }
+
+ public void stop() throws Exception
+ {
+ bridge.stop();
+ }
+
+ public String getClientID()
+ {
+ return bridge.getClientID();
+ }
+
+ public long getFailureRetryInterval()
+ {
+ return bridge.getFailureRetryInterval();
+ }
+
+ public int getMaxBatchSize()
+ {
+ return bridge.getMaxBatchSize();
+ }
+
+ public long getMaxBatchTime()
+ {
+ return bridge.getMaxBatchTime();
+ }
+
+ public int getMaxRetries()
+ {
+ return bridge.getMaxRetries();
+ }
+
+ public String getQualityOfServiceMode()
+ {
+ QualityOfServiceMode mode = bridge.getQualityOfServiceMode();
+ if (mode != null)
+ {
+ return mode.name();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public String getSelector()
+ {
+ return bridge.getSelector();
+ }
+
+ public String getSourcePassword()
+ {
+ return bridge.getSourcePassword();
+ }
+
+ public String getSourceUsername()
+ {
+ return bridge.getSourceUsername();
+ }
+
+ public String getSubscriptionName()
+ {
+ return bridge.getSubscriptionName();
+ }
+
+ public String getTargetPassword()
+ {
+ return bridge.getTargetPassword();
+ }
+
+ public String getTargetUsername()
+ {
+ return bridge.getTargetUsername();
+ }
+
+ public String getTransactionManagerLocatorClass()
+ {
+ return bridge.getTransactionManagerLocatorClass();
+ }
+
+ public String getTransactionManagerLocatorMethod()
+ {
+ return bridge.getTransactionManagerLocatorMethod();
+ }
+
+ public boolean isAddMessageIDInHeader()
+ {
+ return bridge.isAddMessageIDInHeader();
+ }
+
+ public boolean isFailed()
+ {
+ return bridge.isFailed();
+ }
+
+ public boolean isPaused()
+ {
+ return bridge.isPaused();
+ }
+
+ public void setAddMessageIDInHeader(boolean value)
+ {
+ bridge.setAddMessageIDInHeader(value);
+ }
+
+ public void setClientID(String clientID)
+ {
+ bridge.setClientID(clientID);
+ }
+
+ public void setFailureRetryInterval(long interval)
+ {
+ bridge.setFailureRetryInterval(interval);
+ }
+
+ public void setMaxBatchSize(int size)
+ {
+ bridge.setMaxBatchSize(size);
+ }
+
+ public void setMaxBatchTime(long time)
+ {
+ bridge.setMaxBatchTime(time);
+ }
+
+ public void setMaxRetries(int retries)
+ {
+ bridge.setMaxRetries(retries);
+ }
+
+ public void setQualityOfServiceMode(String mode)
+ {
+ if (mode != null)
+ {
+ bridge.setQualityOfServiceMode(QualityOfServiceMode.valueOf(mode));
+ }
+ else
+ {
+ mode = null;
+ }
+ }
+
+ public void setSelector(String selector)
+ {
+ bridge.setSelector(selector);
+ }
+
+ public void setSourcePassword(String pwd)
+ {
+ bridge.setSourcePassword(pwd);
+ }
+
+ public void setSourceUsername(String name)
+ {
+ bridge.setSourceUsername(name);
+ }
+
+ public void setSubscriptionName(String subname)
+ {
+ bridge.setSubscriptionName(subname);
+ }
+
+ public void setTargetPassword(String pwd)
+ {
+ bridge.setTargetPassword(pwd);
+ }
+
+ public void setTargetUsername(String name)
+ {
+ bridge.setTargetUsername(name);
+ }
+
+ public void setTransactionManagerLocatorClass(String transactionManagerLocatorClass)
+ {
+ bridge.setTransactionManagerLocatorClass(transactionManagerLocatorClass);
+ }
+
+ public void setTransactionManagerLocatorMethod(String
transactionManagerLocatorMethod)
+ {
+ bridge.setTransactionManagerLocatorMethod(transactionManagerLocatorMethod);
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2009-11-02 15:32:07 UTC
(rev 8183)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2009-11-02 16:03:05 UTC
(rev 8184)
@@ -27,13 +27,15 @@
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.jms.XASession;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
@@ -44,6 +46,7 @@
import org.hornetq.jms.bridge.ConnectionFactoryFactory;
import org.hornetq.jms.bridge.DestinationFactory;
import org.hornetq.jms.bridge.JMSBridge;
+import org.hornetq.jms.bridge.JMSBridgeControl;
import org.hornetq.jms.bridge.QualityOfServiceMode;
import org.hornetq.jms.client.HornetQMessage;
import org.hornetq.jms.client.HornetQSession;
@@ -150,6 +153,10 @@
private String transactionManagerLocatorClass =
"org.hornetq.integration.jboss.tm.JBoss5TransactionManagerLocator";
private String transactionManagerLocatorMethod = "getTm";
+
+ private MBeanServer mbeanServer;
+
+ private ObjectName objectName;
private static final int FORWARD_MODE_XA = 0;
@@ -164,8 +171,24 @@
{
this.messages = new LinkedList<Message>();
}
-
+
public JMSBridgeImpl(ConnectionFactoryFactory sourceCff, ConnectionFactoryFactory
targetCff,
+ DestinationFactory sourceDestinationFactory, DestinationFactory
targetDestinationFactory,
+ String sourceUsername, String sourcePassword,
+ String targetUsername, String targetPassword,
+ String selector, long failureRetryInterval,
+ int maxRetries,
+ QualityOfServiceMode qosMode,
+ int maxBatchSize, long maxBatchTime,
+ String subName, String clientID,
+ boolean addMessageIDInHeader) {
+
+ this(sourceCff, targetCff, sourceDestinationFactory, targetDestinationFactory,
sourceUsername,
+ sourcePassword, targetUsername, targetPassword, selector,
failureRetryInterval, maxRetries,
+ qosMode, maxBatchSize, maxBatchTime, subName, clientID, addMessageIDInHeader,
null, null);
+ }
+
+ public JMSBridgeImpl(ConnectionFactoryFactory sourceCff, ConnectionFactoryFactory
targetCff,
DestinationFactory sourceDestinationFactory, DestinationFactory
targetDestinationFactory,
String sourceUsername, String sourcePassword,
String targetUsername, String targetPassword,
@@ -174,7 +197,9 @@
QualityOfServiceMode qosMode,
int maxBatchSize, long maxBatchTime,
String subName, String clientID,
- boolean addMessageIDInHeader)
+ boolean addMessageIDInHeader,
+ MBeanServer mbeanServer,
+ String objectName)
{
this();
@@ -211,8 +236,32 @@
this.clientID = clientID;
this.addMessageIDInHeader = addMessageIDInHeader;
-
- checkParams();
+
+ checkParams();
+
+ if(mbeanServer != null)
+ {
+ if(objectName != null)
+ {
+ this.mbeanServer = mbeanServer;
+
+ try
+ {
+ JMSBridgeControlImpl controlBean = new JMSBridgeControlImpl(this);
+ this.objectName = ObjectName.getInstance(objectName);
+ StandardMBean mbean = new StandardMBean(controlBean,
JMSBridgeControl.class);
+ mbeanServer.registerMBean(mbean, this.objectName);
+ log.debug("Registered JMSBridge instance as: " +
this.objectName.getCanonicalName());
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException("Failed to register JMSBridge
MBean", e);
+ }
+ }
+ else {
+ throw new IllegalArgumentException("objectName is required when
specifying an MBeanServer");
+ }
+ }
if (trace)
{
@@ -382,6 +431,21 @@
{
return started;
}
+
+ public void destroy()
+ {
+ if (mbeanServer != null && objectName != null)
+ {
+ try
+ {
+ mbeanServer.unregisterMBean(objectName);
+ }
+ catch (Exception e)
+ {
+ log.warn("Failed to unregisted JMS Bridge " + objectName);
+ }
+ }
+ }
// JMSBridge implementation
------------------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java 2009-11-02
15:32:07 UTC (rev 8183)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java 2009-11-02
16:03:05 UTC (rev 8184)
@@ -12,6 +12,7 @@
*/
package org.hornetq.tests.integration.jms.bridge;
+import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -23,6 +24,8 @@
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@@ -1600,6 +1603,26 @@
}
}
+ public void testMBeanServer() throws Exception {
+
+ MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+ ObjectName objectName = new
ObjectName("example.jmsbridge:service=JMSBridge");
+
+ JMSBridgeImpl bridge = new JMSBridgeImpl(cff0, cff0, sourceQueueFactory,
localTargetQueueFactory,
+ null, null, null, null,
+ null, 5000, 10, QualityOfServiceMode.AT_MOST_ONCE,
+ 1, -1,
+ null, null, false,
+ mbeanServer,
+ objectName.getCanonicalName());
+
+ assertTrue(mbeanServer.isRegistered(objectName));
+
+ bridge.destroy();
+
+ assertFalse(mbeanServer.isRegistered(objectName));
+ }
+
public TransactionManager getNewTm()
{
return newTransactionManager();