[jboss-cvs] JBoss Messaging SVN: r5423 - in trunk: src/main/org/jboss/messaging/core/management/impl and 13 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Nov 24 10:38:07 EST 2008
Author: jmesnil
Date: 2008-11-24 10:38:07 -0500 (Mon, 24 Nov 2008)
New Revision: 5423
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSServerControlTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java
trunk/src/main/org/jboss/messaging/core/remoting/spi/Connection.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/integration/transports/mina/MinaConnection.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java
trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/MessagingServerControlTest.java
Log:
JBMESSAGING-1117: Request for feature to close connections on server side by ip address and by connection id
- added getRemoteAddress() to core.remoting.spi.Connection interface
- added management operation to JMSServerControlMBean & MessagingServerControlMBean to list/close connections identified by a given address
Modified: trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/ManagementService.java 2008-11-24 10:29:19 UTC (rev 5422)
+++ trunk/src/main/org/jboss/messaging/core/management/ManagementService.java 2008-11-24 15:38:07 UTC (rev 5423)
@@ -29,6 +29,7 @@
import org.jboss.messaging.core.messagecounter.MessageCounterManager;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.remoting.RemotingService;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerMessage;
@@ -53,6 +54,7 @@
Configuration configuration,
HierarchicalRepository<QueueSettings> queueSettingsRepository,
ResourceManager resourceManager,
+ RemotingService remotingService,
MessagingServer messagingServer) throws Exception;
void unregisterServer() throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java 2008-11-24 10:29:19 UTC (rev 5422)
+++ trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java 2008-11-24 15:38:07 UTC (rev 5423)
@@ -150,4 +150,9 @@
@Operation(desc = "Rollback a prepared transaction")
boolean rollbackPreparedTransaction(@Parameter(desc = "the Base64 representation of a transaction", name = "transactionAsBase64") String transactionAsBase64) throws Exception;
+ String[] listRemoteAddresses();
+
+ String[] listRemoteAddresses(String ipAddress);
+
+ boolean closeConnectionsForAddress(String ipAddress);
}
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2008-11-24 10:29:19 UTC (rev 5422)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2008-11-24 15:38:07 UTC (rev 5423)
@@ -35,7 +35,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import javax.management.MBeanServer;
import javax.management.NotificationBroadcasterSupport;
@@ -54,6 +53,7 @@
import org.jboss.messaging.core.messagecounter.impl.MessageCounterManagerImpl;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.remoting.RemotingService;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.security.Role;
import org.jboss.messaging.core.server.MessageReference;
@@ -156,6 +156,7 @@
final Configuration configuration,
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
final ResourceManager resourceManager,
+ final RemotingService remotingService,
final MessagingServer messagingServer) throws Exception
{
this.postOffice = postOffice;
@@ -167,6 +168,7 @@
configuration,
queueSettingsRepository,
resourceManager,
+ remotingService,
messagingServer,
messageCounterManager,
broadcaster);
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2008-11-24 10:29:19 UTC (rev 5422)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2008-11-24 15:38:07 UTC (rev 5423)
@@ -53,6 +53,8 @@
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.RemotingService;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
@@ -89,6 +91,8 @@
private final ResourceManager resourceManager;
+ private final RemotingService remotingService;
+
private final MessagingServer server;
private final MessageCounterManager messageCounterManager;
@@ -124,6 +128,7 @@
final Configuration configuration,
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
final ResourceManager resourceManager,
+ final RemotingService remotingService,
final MessagingServer messagingServer,
final MessageCounterManager messageCounterManager,
final NotificationBroadcasterSupport broadcaster) throws Exception
@@ -134,6 +139,7 @@
this.configuration = configuration;
this.queueSettingsRepository = queueSettingsRepository;
this.resourceManager = resourceManager;
+ this.remotingService = remotingService;
server = messagingServer;
this.messageCounterManager = messageCounterManager;
this.broadcaster = broadcaster;
@@ -556,7 +562,51 @@
}
return false;
}
+
+ public String[] listRemoteAddresses()
+ {
+ Set<RemotingConnection> connections = remotingService.getConnections();
+ String[] remoteAddresses = new String[connections.size()];
+ int i = 0;
+ for (RemotingConnection connection : connections)
+ {
+ remoteAddresses[i++] = connection.getRemoteAddress();
+ }
+ return remoteAddresses;
+ }
+ public String[] listRemoteAddresses(final String ipAddress)
+ {
+ Set<RemotingConnection> connections = remotingService.getConnections();
+ List<String> remoteConnections = new ArrayList<String>();
+ for (RemotingConnection connection : connections)
+ {
+ String remoteAddress = connection.getRemoteAddress();
+ if (remoteAddress.contains(ipAddress))
+ {
+ remoteConnections.add(connection.getRemoteAddress());
+ }
+ }
+ return (String[])remoteConnections.toArray(new String[remoteConnections.size()]);
+ }
+
+ public boolean closeConnectionsForAddress(final String ipAddress)
+ {
+ boolean closed = false;
+ Set<RemotingConnection> connections = remotingService.getConnections();
+ for (RemotingConnection connection : connections)
+ {
+ String remoteAddress = connection.getRemoteAddress();
+ if (remoteAddress.contains(ipAddress))
+ {
+ connection.destroy();
+ closed = true;
+ }
+ }
+
+ return closed;
+ }
+
// NotificationEmitter implementation ----------------------------
public void removeNotificationListener(final NotificationListener listener,
Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2008-11-24 10:29:19 UTC (rev 5422)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2008-11-24 15:38:07 UTC (rev 5423)
@@ -26,6 +26,8 @@
{
Object getID();
+ String getRemoteAddress();
+
Channel getChannel(long channelID, int windowSize, boolean block);
long generateChannelID();
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-11-24 10:29:19 UTC (rev 5422)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-11-24 15:38:07 UTC (rev 5423)
@@ -303,6 +303,11 @@
return transportConnection.getID();
}
+ public String getRemoteAddress()
+ {
+ return transportConnection.getRemoteAddress();
+ }
+
public synchronized Channel getChannel(final long channelID, final int windowSize, final boolean block)
{
ChannelImpl channel = channels.get(channelID);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java 2008-11-24 10:29:19 UTC (rev 5422)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java 2008-11-24 15:38:07 UTC (rev 5423)
@@ -131,4 +131,9 @@
}
});
}
+
+ public String getRemoteAddress()
+ {
+ return "invm";
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/spi/Connection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/spi/Connection.java 2008-11-24 10:29:19 UTC (rev 5422)
+++ trunk/src/main/org/jboss/messaging/core/remoting/spi/Connection.java 2008-11-24 15:38:07 UTC (rev 5423)
@@ -40,4 +40,6 @@
void write(MessagingBuffer buffer);
void close();
+
+ String getRemoteAddress();
}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-11-24 10:29:19 UTC (rev 5422)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-11-24 15:38:07 UTC (rev 5423)
@@ -239,6 +239,7 @@
configuration,
queueSettingsRepository,
resourceManager,
+ remotingService,
this);
postOffice.start();
Modified: trunk/src/main/org/jboss/messaging/integration/transports/mina/MinaConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/mina/MinaConnection.java 2008-11-24 10:29:19 UTC (rev 5422)
+++ trunk/src/main/org/jboss/messaging/integration/transports/mina/MinaConnection.java 2008-11-24 15:38:07 UTC (rev 5423)
@@ -121,6 +121,11 @@
session.write(buffer.getUnderlyingBuffer());
}
+ public String getRemoteAddress()
+ {
+ return session.getRemoteAddress().toString();
+ }
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java 2008-11-24 10:29:19 UTC (rev 5422)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java 2008-11-24 15:38:07 UTC (rev 5423)
@@ -112,6 +112,11 @@
channel.write(buffer.getUnderlyingBuffer());
}
+ public String getRemoteAddress()
+ {
+ return channel.getRemoteAddress().toString();
+ }
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java 2008-11-24 10:29:19 UTC (rev 5422)
+++ trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java 2008-11-24 15:38:07 UTC (rev 5423)
@@ -158,4 +158,9 @@
*/
boolean destroyConnectionFactory(String name) throws Exception;
+ String[] listRemoteAddresses();
+
+ String[] listRemoteAddresses(String ipAddress);
+
+ boolean closeConnectionsForAddress(String ipAddress);
}
Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2008-11-24 10:29:19 UTC (rev 5422)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2008-11-24 15:38:07 UTC (rev 5423)
@@ -87,7 +87,8 @@
public static JMSServerManagerImpl newJMSServerManagerImpl(MessagingServer server) throws Exception
{
MessagingServerControlMBean control = new MessagingServerControl(server.getPostOffice(), server.getStorageManager(), server.getConfiguration(),
- server.getQueueSettingsRepository(), server.getResourceManager(), server, new MessageCounterManagerImpl(1000), new NotificationBroadcasterSupport());
+ server.getQueueSettingsRepository(), server.getResourceManager(),
+ server.getRemotingService(), server, new MessageCounterManagerImpl(1000), new NotificationBroadcasterSupport());
JMSManagementService jmsManagementService = new JMSManagementServiceImpl(server.getManagementService());
return new JMSServerManagerImpl(control, server.getPostOffice(), server.getStorageManager(),
server.getQueueSettingsRepository(), jmsManagementService);
@@ -325,6 +326,21 @@
return true;
}
+ public String[] listRemoteAddresses()
+ {
+ return messagingServer.listRemoteAddresses();
+ }
+
+ public String[] listRemoteAddresses(final String ipAddress)
+ {
+ return messagingServer.listRemoteAddresses(ipAddress);
+ }
+
+ public boolean closeConnectionsForAddress(final String ipAddress)
+ {
+ return messagingServer.closeConnectionsForAddress(ipAddress);
+ }
+
// Public --------------------------------------------------------
public void setInitialContext(final InitialContext initialContext)
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java 2008-11-24 10:29:19 UTC (rev 5422)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java 2008-11-24 15:38:07 UTC (rev 5423)
@@ -23,6 +23,7 @@
package org.jboss.messaging.jms.server.management;
import static javax.management.MBeanOperationInfo.ACTION;
+import static javax.management.MBeanOperationInfo.INFO;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.management.Operation;
@@ -107,4 +108,12 @@
@Operation(desc = "Create a JMS ConnectionFactory", impact = ACTION)
void destroyConnectionFactory(@Parameter(name = "name", desc = "Name of the ConnectionFactory to create")
String name) throws Exception;
+
+ @Operation(desc = "List the client addresses", impact = INFO)
+ String[] listRemoteAddresses();
+
+ @Operation(desc = "List the client addresses which match the given IP Address", impact = INFO)
+ String[] listRemoteAddresses(String ipAddress);
+
+ boolean closeConnectionsForAddress(String ipAddress);
}
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java 2008-11-24 10:29:19 UTC (rev 5422)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java 2008-11-24 15:38:07 UTC (rev 5423)
@@ -218,7 +218,22 @@
this.getClass().getName(),
"Notifications emitted by a JMS Server") };
}
+
+ public String[] listRemoteAddresses()
+ {
+ return server.listRemoteAddresses();
+ }
+ public String[] listRemoteAddresses(final String ipAddress)
+ {
+ return server.listRemoteAddresses(ipAddress);
+ }
+
+ public boolean closeConnectionsForAddress(final String ipAddress)
+ {
+ return server.closeConnectionsForAddress(ipAddress);
+ }
+
// StandardMBean overrides
// ---------------------------------------------------
Added: trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSServerControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSServerControlTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSServerControlTest.java 2008-11-24 15:38:07 UTC (rev 5423)
@@ -0,0 +1,277 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.jms.management;
+
+import static org.jboss.messaging.tests.util.RandomUtil.randomString;
+
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.management.MBeanServerInvocationHandler;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.integration.transports.mina.MinaAcceptorFactory;
+import org.jboss.messaging.integration.transports.mina.MinaConnectorFactory;
+import org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory;
+import org.jboss.messaging.integration.transports.netty.NettyConnectorFactory;
+import org.jboss.messaging.jms.server.impl.JMSServerManagerImpl;
+import org.jboss.messaging.jms.server.management.JMSServerControlMBean;
+import org.jboss.messaging.jms.server.management.impl.JMSManagementServiceImpl;
+
+/**
+ * A QueueControlTest
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * Created 14 nov. 2008 13:35:10
+ *
+ *
+ */
+public class JMSServerControlTest extends TestCase
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ private static JMSServerControlMBean createJMSServerControl() throws Exception
+ {
+ JMSServerControlMBean control = (JMSServerControlMBean)MBeanServerInvocationHandler.newProxyInstance(ManagementFactory.getPlatformMBeanServer(),
+ JMSManagementServiceImpl.getJMSServerObjectName(),
+ JMSServerControlMBean.class,
+ false);
+ return control;
+ }
+
+ private MessagingService startMessagingService(String acceptorFactory) throws Exception
+ {
+ Configuration conf = new ConfigurationImpl();
+ conf.setSecurityEnabled(false);
+ conf.setJMXManagementEnabled(true);
+ conf.getAcceptorConfigurations().add(new TransportConfiguration(acceptorFactory));
+ MessagingService service = MessagingServiceImpl.newNullStorageMessagingServer(conf);
+ service.start();
+
+ JMSServerManagerImpl serverManager = JMSServerManagerImpl.newJMSServerManagerImpl(service.getServer());
+ serverManager.start();
+ serverManager.setInitialContext(new NullInitialContext());
+
+ return service;
+ }
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testListClientConnectionsForInVM() throws Exception
+ {
+ doListClientConnections(InVMAcceptorFactory.class.getName(), InVMConnectorFactory.class.getName());
+ }
+
+ public void testListClientConnectionsForNetty() throws Exception
+ {
+ doListClientConnections(NettyAcceptorFactory.class.getName(), NettyConnectorFactory.class.getName());
+ }
+
+ public void testListClientConnectionsForMina() throws Exception
+ {
+ doListClientConnections(MinaAcceptorFactory.class.getName(), MinaConnectorFactory.class.getName());
+ }
+
+ public void testCloseConnectionsForAddressForInVM() throws Exception
+ {
+ doCloseConnectionsForAddress(InVMAcceptorFactory.class.getName(), InVMConnectorFactory.class.getName());
+ }
+
+ public void testCloseConnectionsForAddressForNetty() throws Exception
+ {
+ doCloseConnectionsForAddress(NettyAcceptorFactory.class.getName(), NettyConnectorFactory.class.getName());
+ }
+
+ public void testCloseConnectionsForAddressForMina() throws Exception
+ {
+ doCloseConnectionsForAddress(MinaAcceptorFactory.class.getName(), MinaConnectorFactory.class.getName());
+ }
+
+ public void testCloseConnectionsForUnknownAddressForInVM() throws Exception
+ {
+ doCloseConnectionsForUnknownAddress(InVMAcceptorFactory.class.getName(), InVMConnectorFactory.class.getName());
+ }
+
+ public void testCloseConnectionsForUnknownAddressForNetty() throws Exception
+ {
+ doCloseConnectionsForUnknownAddress(NettyAcceptorFactory.class.getName(), NettyConnectorFactory.class.getName());
+ }
+
+ public void testCloseConnectionsForUnknownAddressForMina() throws Exception
+ {
+ doCloseConnectionsForUnknownAddress(MinaAcceptorFactory.class.getName(), MinaConnectorFactory.class.getName());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private void doListClientConnections(String acceptorFactory, String connectorFactory) throws Exception
+ {
+ MessagingService service = null;
+ try
+ {
+ service = startMessagingService(acceptorFactory);
+
+ JMSServerControlMBean control = createJMSServerControl();
+
+ assertEquals(0, control.listRemoteAddresses().length);
+
+ Connection connection = JMSUtil.createConnection(connectorFactory);
+ // the connection won't connect to the server until a session is created
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String[] remoteAddresses = control.listRemoteAddresses();
+ assertEquals(1, remoteAddresses.length);
+
+ for (String remoteAddress : remoteAddresses)
+ {
+ System.out.println(remoteAddress);
+ }
+ connection.close();
+
+ // FIXME: with Netty, the server is not notified immediately that the connection is closed
+ Thread.sleep(500);
+
+ assertEquals(0, control.listRemoteAddresses().length);
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+
+ }
+
+ private void doCloseConnectionsForAddress(String acceptorFactory, String connectorFactory) throws Exception
+ {
+ MessagingService service = null;
+ try
+ {
+ service = startMessagingService(acceptorFactory);
+
+ JMSServerControlMBean control = createJMSServerControl();
+
+ assertEquals(0, control.listRemoteAddresses().length);
+
+ Connection connection = JMSUtil.createConnection(connectorFactory);
+ // the connection won't connect to the server until a session is created
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String[] remoteAddresses = control.listRemoteAddresses();
+ assertEquals(1, remoteAddresses.length);
+ String remoteAddress = remoteAddresses[0];
+
+ final CountDownLatch exceptionLatch = new CountDownLatch(1);
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ exceptionLatch.countDown();
+ }
+ });
+
+ assertTrue(control.closeConnectionsForAddress(remoteAddress));
+
+ boolean gotException = exceptionLatch.await(500, TimeUnit.MILLISECONDS);
+ assertTrue("did not received the expected JMSException", gotException);
+ assertEquals(0, control.listRemoteAddresses().length);
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ }
+
+ private void doCloseConnectionsForUnknownAddress(String acceptorFactory, String connectorFactory) throws Exception
+ {
+ String unknownAddress = randomString();
+
+ MessagingService service = null;
+
+ try
+ {
+ service = startMessagingService(acceptorFactory);
+
+ JMSServerControlMBean control = createJMSServerControl();
+
+ assertEquals(0, control.listRemoteAddresses().length);
+
+ Connection connection = JMSUtil.createConnection(connectorFactory);
+ // the connection won't connect to the server until a session is created
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String[] remoteAddresses = control.listRemoteAddresses();
+ assertEquals(1, remoteAddresses.length);
+
+ final CountDownLatch exceptionLatch = new CountDownLatch(1);
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ exceptionLatch.countDown();
+ }
+ });
+
+ assertFalse(control.closeConnectionsForAddress(unknownAddress));
+
+ assertEquals(1, control.listRemoteAddresses().length);
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ }
+ // Inner classes -------------------------------------------------
+
+}
\ No newline at end of file
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java 2008-11-24 10:29:19 UTC (rev 5422)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java 2008-11-24 15:38:07 UTC (rev 5423)
@@ -55,9 +55,9 @@
// Static --------------------------------------------------------
- static MessageConsumer createConsumer(Destination destination, boolean startConnection) throws JMSException
+ static Connection createConnection(String connectorFactory) throws JMSException
{
- JBossConnectionFactory cf = new JBossConnectionFactory(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ JBossConnectionFactory cf = new JBossConnectionFactory(new TransportConfiguration(connectorFactory),
null,
ClientSessionFactoryImpl.DEFAULT_PING_PERIOD,
ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
@@ -76,6 +76,31 @@
ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS);
+ return cf.createConnection();
+ }
+
+
+ static MessageConsumer createConsumer(Destination destination, boolean startConnection, String connectorFactory) throws JMSException
+ {
+ JBossConnectionFactory cf = new JBossConnectionFactory(new TransportConfiguration(connectorFactory),
+ null,
+ ClientSessionFactoryImpl.DEFAULT_PING_PERIOD,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+ null,
+ ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
+ ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE,
+ ClientSessionFactoryImpl.DEFAULT_BIG_MESSAGE_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+ ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+ true,
+ ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
+ ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS);
+
Connection conn = cf.createConnection();
Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -87,7 +112,13 @@
return s.createConsumer(destination);
}
+
+ static MessageConsumer createConsumer(Destination destination, boolean startConnection) throws JMSException
+ {
+ return createConsumer(destination, startConnection, "org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory");
+ }
+
static TopicSubscriber createDurableSubscriber(Topic topic, String clientID, String subscriptionName) throws JMSException
{
JBossConnectionFactory cf = new JBossConnectionFactory(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
@@ -149,6 +180,7 @@
}
}
+
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java 2008-11-24 10:29:19 UTC (rev 5422)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java 2008-11-24 15:38:07 UTC (rev 5423)
@@ -49,6 +49,7 @@
import org.jboss.messaging.core.management.impl.QueueControl;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.remoting.RemotingService;
import org.jboss.messaging.core.security.Role;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
@@ -91,6 +92,7 @@
HierarchicalRepository<Set<Role>> securityRepository = createMock(HierarchicalRepository.class);
HierarchicalRepository<QueueSettings> queueSettingsRepository = createMock(HierarchicalRepository.class);
ResourceManager resourceManager = createMock(ResourceManager.class);
+ RemotingService remotingService = createMock(RemotingService.class);
MessagingServer messagingServer = createMock(MessagingServer.class);
MBeanServer mbeanServer = createMock(MBeanServer.class);
expect(mbeanServer.isRegistered(objectName)).andReturn(false);
@@ -98,12 +100,12 @@
mbeanServer.registerMBean(isA(MessagingServerControl.class),
eq(objectName))).andReturn(objectInstance);
- replay(mbeanServer, postOffice, storageManager, configuration, securityRepository, queueSettingsRepository, messagingServer);
+ replay(mbeanServer, postOffice, storageManager, configuration, securityRepository, queueSettingsRepository, resourceManager, remotingService, messagingServer);
ManagementService service = new ManagementServiceImpl(mbeanServer, true);
- service.registerServer(postOffice, storageManager, configuration, queueSettingsRepository, resourceManager, messagingServer);
+ service.registerServer(postOffice, storageManager, configuration, queueSettingsRepository, resourceManager, remotingService, messagingServer);
- verify(mbeanServer, postOffice, storageManager, configuration, securityRepository, queueSettingsRepository, messagingServer);
+ verify(mbeanServer, postOffice, storageManager, configuration, securityRepository, queueSettingsRepository, resourceManager, remotingService, messagingServer);
}
public void testRegisterAlreadyRegisteredMessagingServer() throws Exception
@@ -121,6 +123,7 @@
HierarchicalRepository<Set<Role>> securityRepository = createMock(HierarchicalRepository.class);
HierarchicalRepository<QueueSettings> queueSettingsRepository = createMock(HierarchicalRepository.class);
ResourceManager resourceManager = createMock(ResourceManager.class);
+ RemotingService remotingService = createMock(RemotingService.class);
MessagingServer messagingServer = createMock(MessagingServer.class);
MBeanServer mbeanServer = createMock(MBeanServer.class);
expect(mbeanServer.isRegistered(objectName)).andReturn(true);
@@ -129,12 +132,12 @@
mbeanServer.registerMBean(isA(MessagingServerControlMBean.class),
eq(objectName))).andReturn(objectInstance);
- replay(mbeanServer, postOffice, storageManager, configuration, securityRepository, queueSettingsRepository, messagingServer);
+ replay(mbeanServer, postOffice, storageManager, configuration, securityRepository, queueSettingsRepository, resourceManager, remotingService, messagingServer);
ManagementService service = new ManagementServiceImpl(mbeanServer, true);
- service.registerServer(postOffice, storageManager, configuration, queueSettingsRepository, resourceManager, messagingServer);
+ service.registerServer(postOffice, storageManager, configuration, queueSettingsRepository, resourceManager, remotingService, messagingServer);
- verify(mbeanServer, postOffice, storageManager, configuration, securityRepository, queueSettingsRepository, messagingServer);
+ verify(mbeanServer, postOffice, storageManager, configuration, securityRepository, queueSettingsRepository, resourceManager, remotingService, messagingServer);
}
public void testUnregisterMessagingServer() throws Exception
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/MessagingServerControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/MessagingServerControlTest.java 2008-11-24 10:29:19 UTC (rev 5422)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/MessagingServerControlTest.java 2008-11-24 15:38:07 UTC (rev 5423)
@@ -54,6 +54,7 @@
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.remoting.RemotingService;
import org.jboss.messaging.core.security.Role;
import org.jboss.messaging.core.server.JournalType;
import org.jboss.messaging.core.server.MessagingServer;
@@ -83,6 +84,7 @@
private ResourceManager resourceManager;
private MessagingServer server;
private MessageCounterManager messageCounterManager;
+ private RemotingService remotingService;
// Constants -----------------------------------------------------
@@ -617,6 +619,7 @@
expect(configuration.isMessageCounterEnabled()).andReturn(false);
securityRepository = createMock(HierarchicalRepository.class);
queueSettingsRepository = createMock(HierarchicalRepository.class);
+ remotingService = createMock(RemotingService.class);
resourceManager = createMock(ResourceManager.class);
server = createMock(MessagingServer.class);
messageCounterManager = createMock(MessageCounterManager.class);
@@ -631,6 +634,7 @@
securityRepository = null;
queueSettingsRepository = null;
resourceManager = null;
+ remotingService = null;
server = null;
messageCounterManager = null;
@@ -643,20 +647,20 @@
{
MessagingServerControl control = new MessagingServerControl(postOffice,
storageManager, configuration,
- queueSettingsRepository, resourceManager, server, messageCounterManager, new NotificationBroadcasterSupport());
+ queueSettingsRepository, resourceManager, remotingService, server, messageCounterManager, new NotificationBroadcasterSupport());
return control;
}
private void replayMockedAttributes()
{
replay(postOffice, storageManager, configuration, securityRepository,
- queueSettingsRepository, resourceManager, server, messageCounterManager);
+ queueSettingsRepository, resourceManager, remotingService, server, messageCounterManager);
}
private void verifyMockedAttributes()
{
verify(postOffice, storageManager, configuration, securityRepository,
- queueSettingsRepository, resourceManager, server, messageCounterManager);
+ queueSettingsRepository, resourceManager, remotingService, server, messageCounterManager);
}
// Inner classes -------------------------------------------------
More information about the jboss-cvs-commits
mailing list