Author: jmesnil
Date: 2010-06-29 04:37:55 -0400 (Tue, 29 Jun 2010)
New Revision: 9367
Modified:
trunk/src/main/org/hornetq/api/core/management/HornetQServerControl.java
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
trunk/src/main/org/hornetq/core/server/HornetQServer.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-89: Create and manage diverts and bridges through
management API
* add createDivert/destroyDivert methods to HornetQServerControl API
* add test to HornetQServerControlTest
Modified: trunk/src/main/org/hornetq/api/core/management/HornetQServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/management/HornetQServerControl.java 2010-06-28
14:53:36 UTC (rev 9366)
+++ trunk/src/main/org/hornetq/api/core/management/HornetQServerControl.java 2010-06-29
08:37:55 UTC (rev 9367)
@@ -13,13 +13,10 @@
package org.hornetq.api.core.management;
-import java.util.Set;
-
import javax.management.MBeanOperationInfo;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
-import org.hornetq.core.security.Role;
/**
* A HornetQServerControl is used to manage HornetQ servers.
@@ -524,4 +521,15 @@
@Operation(desc = "returns the address settings as a JSON string for an address
match", impact = MBeanOperationInfo.INFO)
String getAddressSettingsAsJSON(@Parameter(desc="an address match",
name="addressMatch") String addressMatch) throws Exception;
+ @Operation(desc= "Create a Divert", impact = MBeanOperationInfo.ACTION)
+ void createDivert(@Parameter(name="name", desc="Name of the
divert") String name,
+ @Parameter(name="routingName", desc="Routing name of
the divert") String routingName,
+ @Parameter(name="address", desc="Address to divert
from") String address,
+ @Parameter(name="forwardingAddress", desc="Adress to
divert to") String forwardingAddress,
+ @Parameter(name="exclusive", desc="Is the divert
exclusive?") boolean exclusive,
+ @Parameter(name="filterString", desc="Filter of the
divert") String filterString,
+ @Parameter(name="transformerClassName", desc="Class
name of the divert's transformer") String transformerClassName) throws
Exception;
+
+ @Operation(desc= "Destroy a Divert", impact = MBeanOperationInfo.ACTION)
+ void destroyDivert(@Parameter(name="name", desc="Name of the
divert") String name) throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2010-06-28
14:53:36 UTC (rev 9366)
+++
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2010-06-29
08:37:55 UTC (rev 9367)
@@ -34,6 +34,7 @@
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.api.core.management.QueueControl;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.messagecounter.MessageCounterManager;
import org.hornetq.core.messagecounter.impl.MessageCounterManagerImpl;
@@ -1379,7 +1380,30 @@
blockOnIO();
}
}
-
+
+ public void createDivert(String name,
+ String routingName,
+ String address,
+ String forwardingAddress,
+ boolean exclusive,
+ String filterString,
+ String transformerClassName) throws Exception
+ {
+ DivertConfiguration config = new DivertConfiguration(name,
+ routingName,
+ address,
+ forwardingAddress,
+ exclusive,
+ filterString,
+ transformerClassName);
+ server.deployDivert(config);
+ }
+
+ public void destroyDivert(String name) throws Exception
+ {
+ server.destroyDivert(SimpleString.toSimpleString(name));
+ }
+
// NotificationEmitter implementation ----------------------------
public void removeNotificationListener(final NotificationListener listener,
Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-06-28 14:53:36 UTC (rev
9366)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-06-29 08:37:55 UTC (rev
9367)
@@ -20,6 +20,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
@@ -143,4 +144,8 @@
ReplicationManager getReplicationManager();
boolean checkActivate() throws Exception;
+
+ void deployDivert(DivertConfiguration config) throws Exception;
+
+ void destroyDivert(SimpleString name) throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-06-28 14:53:36
UTC (rev 9366)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-06-29 08:37:55
UTC (rev 9367)
@@ -1336,59 +1336,79 @@
{
for (DivertConfiguration config : configuration.getDivertConfigurations())
{
- if (config.getName() == null)
- {
- HornetQServerImpl.log.warn("Must specify a name for each divert. This
one will not be deployed.");
+ deployDivert(config);
+ }
+ }
- return;
- }
+ public void deployDivert(DivertConfiguration config) throws Exception
+ {
+ if (config.getName() == null)
+ {
+ HornetQServerImpl.log.warn("Must specify a name for each divert. This one
will not be deployed.");
- if (config.getAddress() == null)
- {
- HornetQServerImpl.log.warn("Must specify an address for each divert.
This one will not be deployed.");
+ return;
+ }
- return;
- }
+ if (config.getAddress() == null)
+ {
+ HornetQServerImpl.log.warn("Must specify an address for each divert. This
one will not be deployed.");
- if (config.getForwardingAddress() == null)
- {
- HornetQServerImpl.log.warn("Must specify an forwarding address for each
divert. This one will not be deployed.");
+ return;
+ }
- return;
- }
+ if (config.getForwardingAddress() == null)
+ {
+ HornetQServerImpl.log.warn("Must specify an forwarding address for each
divert. This one will not be deployed.");
- SimpleString sName = new SimpleString(config.getName());
+ return;
+ }
- if (postOffice.getBinding(sName) != null)
- {
- HornetQServerImpl.log.warn("Binding already exists with name " +
sName + ", divert will not be deployed");
+ SimpleString sName = new SimpleString(config.getName());
- continue;
- }
+ if (postOffice.getBinding(sName) != null)
+ {
+ HornetQServerImpl.log.warn("Binding already exists with name " + sName
+ ", divert will not be deployed");
- SimpleString sAddress = new SimpleString(config.getAddress());
+ return;
+ }
- Transformer transformer =
instantiateTransformer(config.getTransformerClassName());
+ SimpleString sAddress = new SimpleString(config.getAddress());
- Filter filter = FilterImpl.createFilter(config.getFilterString());
+ Transformer transformer =
instantiateTransformer(config.getTransformerClassName());
- Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()),
- sName,
- new SimpleString(config.getRoutingName()),
- config.isExclusive(),
- filter,
- transformer,
- postOffice,
- storageManager);
- // pagingManager,
- // storageManager);
+ Filter filter = FilterImpl.createFilter(config.getFilterString());
- Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress,
divert);
+ Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()),
+ sName,
+ new SimpleString(config.getRoutingName()),
+ config.isExclusive(),
+ filter,
+ transformer,
+ postOffice,
+ storageManager);
+ // pagingManager,
+ // storageManager);
- postOffice.addBinding(binding);
+ Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress,
divert);
- managementService.registerDivert(divert, config);
+ postOffice.addBinding(binding);
+
+ managementService.registerDivert(divert, config);
+ }
+
+ public void destroyDivert(SimpleString name) throws Exception
+ {
+ Binding binding = postOffice.getBinding(name);
+ if (binding == null)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "No binding for
divert " + name);
}
+ if (!(binding instanceof DivertBinding))
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Binding "
+ name + " is not a divert");
+ }
+
+ postOffice.removeBinding(name);
}
private synchronized void deployGroupingHandlerConfiguration(final
GroupingHandlerConfiguration config) throws Exception
Modified:
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java 2010-06-28
14:53:36 UTC (rev 9366)
+++
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java 2010-06-29
08:37:55 UTC (rev 9367)
@@ -20,12 +20,19 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.management.AddressSettingsInfo;
+import org.hornetq.api.core.management.DivertControl;
import org.hornetq.api.core.management.HornetQServerControl;
import org.hornetq.api.core.management.ObjectNameBuilder;
import org.hornetq.api.core.management.QueueControl;
import org.hornetq.api.core.management.RoleInfo;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.messagecounter.impl.MessageCounterManagerImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
@@ -499,6 +506,73 @@
assertEquals(addressFullMessagePolicy, info.getAddressFullMessagePolicy());
}
+ public void testCreateAndDestroyDivert() throws Exception
+ {
+ String address = RandomUtil.randomString();
+ SimpleString name = RandomUtil.randomSimpleString();
+ String routingName = RandomUtil.randomString();
+ String forwardingAddress = RandomUtil.randomString();
+
+ HornetQServerControl serverControl = createManagementControl();
+
+ checkNoResource(ObjectNameBuilder.DEFAULT.getDivertObjectName(name));
+
+ serverControl.createDivert(name.toString(), routingName, address,
forwardingAddress, true, null, null);
+
+ checkResource(ObjectNameBuilder.DEFAULT.getDivertObjectName(name));
+ DivertControl divertControl =
ManagementControlHelper.createDivertControl(name.toString(), mbeanServer);
+ assertEquals(name.toString(), divertControl.getUniqueName());
+ assertEquals(address, divertControl.getAddress());
+ assertEquals(forwardingAddress, divertControl.getForwardingAddress());
+ assertEquals(routingName, divertControl.getRoutingName());
+ assertTrue(divertControl.isExclusive());
+ assertNull(divertControl.getFilter());
+ assertNull(divertControl.getTransformerClassName());
+
+ // check that a message sent to the address is diverted exclusively
+ ClientSessionFactory csf = new ClientSessionFactoryImpl(new
TransportConfiguration(InVMConnectorFactory.class.getName()));
+ ClientSession session = csf.createSession();
+
+ String divertQueue = RandomUtil.randomString();
+ String queue = RandomUtil.randomString();
+ session.createQueue(forwardingAddress, divertQueue);
+ session.createQueue(address, queue);
+
+ ClientProducer producer = session.createProducer(address);
+ ClientMessage message = session.createMessage(false);
+ String text = RandomUtil.randomString();
+ message.putStringProperty("prop", text);
+ producer.send(message);
+
+ ClientConsumer consumer = session.createConsumer(queue);
+ ClientConsumer divertedConsumer = session.createConsumer(divertQueue);
+
+ session.start();
+
+ assertNull(consumer.receiveImmediate());
+ message = divertedConsumer.receive(5000);
+ assertNotNull(message);
+ assertEquals(text, message.getStringProperty("prop"));
+
+ serverControl.destroyDivert(name.toString());
+
+ // check that a message is no longer diverted
+ message = session.createMessage(false);
+ String text2 = RandomUtil.randomString();
+ message.putStringProperty("prop", text2);
+ producer.send(message);
+
+ assertNull(divertedConsumer.receiveImmediate());
+ message = consumer.receive(5000);
+ assertNotNull(message);
+ assertEquals(text2, message.getStringProperty("prop"));
+
+ session.close();
+
+
+ checkNoResource(ObjectNameBuilder.DEFAULT.getDivertObjectName(name));
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java 2010-06-28
14:53:36 UTC (rev 9366)
+++
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java 2010-06-29
08:37:55 UTC (rev 9367)
@@ -526,6 +526,22 @@
{
return (String)proxy.invokeOperation("getAddressSettingsAsJSON",
addressMatch);
}
+
+ public void createDivert(String name,
+ String routingName,
+ String address,
+ String forwardingAddress,
+ boolean exclusive,
+ String filterString,
+ String transformerClassName) throws Exception
+ {
+ proxy.invokeOperation("createDivert", name, routingName, address,
forwardingAddress, exclusive, filterString, transformerClassName);
+ }
+
+ public void destroyDivert(String name) throws Exception
+ {
+ proxy.invokeOperation("destroyDivert", name);
+ }
};
}
// Package protected ---------------------------------------------