[jboss-cvs] JBoss Messaging SVN: r5435 - in trunk: src/main/org/jboss/messaging/core/management/impl and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Nov 26 04:43:32 EST 2008
Author: jmesnil
Date: 2008-11-26 04:43:32 -0500 (Wed, 26 Nov 2008)
New Revision: 5435
Modified:
trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSServerControlTest.java
Log:
JBMESSAGING-1117: Request for feature to close connections on server side by ip address and by connection id
- ensure that all the session's connection are closed when the connection is closed using the management operation closeConnectionsForAddress()
- added management operation to list connection and session IDs
Modified: trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java 2008-11-25 15:43:08 UTC (rev 5434)
+++ trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java 2008-11-26 09:43:32 UTC (rev 5435)
@@ -23,6 +23,7 @@
package org.jboss.messaging.core.management;
import static javax.management.MBeanOperationInfo.ACTION;
+import static javax.management.MBeanOperationInfo.INFO;
import java.util.List;
import java.util.Map;
@@ -150,9 +151,19 @@
@Operation(desc = "Rollback a prepared transaction")
boolean rollbackPreparedTransaction(@Parameter(desc = "the Base64 representation of a transaction", name = "transactionAsBase64") String transactionAsBase64) throws Exception;
+ @Operation(desc = "List the client addresses", impact = INFO)
String[] listRemoteAddresses();
- String[] listRemoteAddresses(String ipAddress);
+ @Operation(desc = "List the client addresses which match the given IP Address", impact = INFO)
+ String[] listRemoteAddresses(@Parameter(desc = "an IP address", name = "ipAddress") String ipAddress);
- boolean closeConnectionsForAddress(String ipAddress);
+ @Operation(desc = "Closes all the connections for the given IP Address", impact = INFO)
+ boolean closeConnectionsForAddress(@Parameter(desc = "an IP address", name = "ipAddress") String ipAddress);
+
+ @Operation(desc = "List all the connection IDs", impact = INFO)
+ String[] listConnectionIDs();
+
+ @Operation(desc = "List the sessions for the given connectionID", impact = INFO)
+ String[] listSessions(@Parameter(desc = "a connection ID", name = "connectionID") String connectionID);
+
}
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2008-11-25 15:43:08 UTC (rev 5434)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2008-11-26 09:43:32 UTC (rev 5435)
@@ -45,6 +45,7 @@
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.filter.impl.FilterImpl;
import org.jboss.messaging.core.management.MessagingServerControlMBean;
@@ -58,6 +59,7 @@
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerSession;
import org.jboss.messaging.core.server.impl.ServerSessionImpl;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -602,13 +604,37 @@
String remoteAddress = connection.getRemoteAddress();
if (remoteAddress.contains(ipAddress))
{
- connection.destroy();
+ connection.fail(new MessagingException(MessagingException.INTERNAL_ERROR, "connections for " + ipAddress + " closed by management"));
closed = true;
}
}
return closed;
}
+
+ public String[] listConnectionIDs()
+ {
+ Set<RemotingConnection> connections = remotingService.getConnections();
+ String[] connectionIDs = new String[connections.size()];
+ int i = 0;
+ for (RemotingConnection connection : connections)
+ {
+ connectionIDs[i++] = connection.getID().toString();
+ }
+ return connectionIDs;
+ }
+
+ public String[] listSessions(final String connectionID)
+ {
+ List<ServerSession> sessions = server.getSessions(connectionID);
+ String[] sessionIDs = new String[sessions.size()];
+ int i = 0;
+ for (ServerSession serverSession : sessions)
+ {
+ sessionIDs[i++] = serverSession.getName();
+ }
+ return sessionIDs;
+ }
// NotificationEmitter implementation ----------------------------
Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-11-25 15:43:08 UTC (rev 5434)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-11-26 09:43:32 UTC (rev 5435)
@@ -12,6 +12,7 @@
package org.jboss.messaging.core.server;
+import java.util.List;
import java.util.Set;
import org.jboss.messaging.core.config.Configuration;
@@ -107,4 +108,6 @@
RemotingConnection getReplicatingConnection();
ResourceManager getResourceManager();
+
+ List<ServerSession> getSessions(String connectionID);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-11-25 15:43:08 UTC (rev 5434)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-11-26 09:43:32 UTC (rev 5435)
@@ -59,6 +59,8 @@
*/
public interface ServerSession
{
+ String getName();
+
long getID();
String getUsername();
@@ -67,6 +69,8 @@
int getMinLargeMessageSize();
+ Object getConnectionID();
+
void removeConsumer(ServerConsumer consumer) throws Exception;
void close() throws Exception;
@@ -140,4 +144,5 @@
int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);
Channel getChannel();
+
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-11-25 15:43:08 UTC (rev 5434)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-11-26 09:43:32 UTC (rev 5435)
@@ -12,11 +12,12 @@
package org.jboss.messaging.core.server.impl;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.Collection;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -27,20 +28,15 @@
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
-import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
-import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.management.ManagementService;
import org.jboss.messaging.core.management.MessagingServerControlMBean;
import org.jboss.messaging.core.paging.PagingManager;
-import org.jboss.messaging.core.paging.PagingStoreFactory;
+import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
import org.jboss.messaging.core.paging.impl.PagingStoreFactoryNIO;
-import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.impl.PostOfficeImpl;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.ChannelHandler;
@@ -59,7 +55,6 @@
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
import org.jboss.messaging.core.server.ServerSession;
-import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.cluster.ClusterManager;
import org.jboss.messaging.core.server.cluster.impl.ClusterManagerImpl;
import org.jboss.messaging.core.settings.HierarchicalRepository;
@@ -594,6 +589,21 @@
{
sessions.remove(name);
}
+
+ public List<ServerSession> getSessions(final String connectionID)
+ {
+ Set<Entry<String, ServerSession>> sessionEntries = sessions.entrySet();
+ List<ServerSession> matchingSessions = new ArrayList<ServerSession>();
+ for (Entry<String, ServerSession> sessionEntry : sessionEntries)
+ {
+ ServerSession serverSession = sessionEntry.getValue();
+ if (serverSession.getConnectionID().toString().equals(connectionID))
+ {
+ matchingSessions.add(serverSession);
+ }
+ }
+ return matchingSessions;
+ }
public RemotingConnection getReplicatingConnection()
{
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-11-25 15:43:08 UTC (rev 5434)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-11-26 09:43:32 UTC (rev 5435)
@@ -293,6 +293,16 @@
return id;
}
+ public String getName()
+ {
+ return name;
+ }
+
+ public Object getConnectionID()
+ {
+ return remotingConnection.getID();
+ }
+
public void removeConsumer(final ServerConsumer consumer) throws Exception
{
if (consumers.remove(consumer.getID()) == null)
Modified: trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java 2008-11-25 15:43:08 UTC (rev 5434)
+++ trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java 2008-11-26 09:43:32 UTC (rev 5435)
@@ -163,4 +163,9 @@
String[] listRemoteAddresses(String ipAddress);
boolean closeConnectionsForAddress(String ipAddress);
+
+ String[] listConnectionIDs();
+
+ String[] listSessions(String connectionID);
+
}
Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2008-11-25 15:43:08 UTC (rev 5434)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2008-11-26 09:43:32 UTC (rev 5435)
@@ -340,6 +340,16 @@
{
return messagingServer.closeConnectionsForAddress(ipAddress);
}
+
+ public String[] listConnectionIDs()
+ {
+ return messagingServer.listConnectionIDs();
+ }
+
+ public String[] listSessions(final String connectionID)
+ {
+ return messagingServer.listSessions(connectionID);
+ }
// Public --------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java 2008-11-25 15:43:08 UTC (rev 5434)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java 2008-11-26 09:43:32 UTC (rev 5435)
@@ -113,7 +113,15 @@
String[] listRemoteAddresses();
@Operation(desc = "List the client addresses which match the given IP Address", impact = INFO)
- String[] listRemoteAddresses(String ipAddress);
+ String[] listRemoteAddresses(@Parameter(desc = "an IP address", name = "ipAddress") String ipAddress);
- boolean closeConnectionsForAddress(String ipAddress);
+ @Operation(desc = "Closes all the connections for the given IP Address", impact = INFO)
+ boolean closeConnectionsForAddress(@Parameter(desc = "an IP address", name = "ipAddress") String ipAddress);
+
+ @Operation(desc = "List all the connection IDs", impact = INFO)
+ String[] listConnectionIDs();
+
+ @Operation(desc = "List the sessions for the given connectionID", impact = INFO)
+ String[] listSessions(@Parameter(desc = "a connection ID", name = "connectionID") String connectionID);
+
}
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java 2008-11-25 15:43:08 UTC (rev 5434)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java 2008-11-26 09:43:32 UTC (rev 5435)
@@ -234,6 +234,16 @@
return server.closeConnectionsForAddress(ipAddress);
}
+ public String[] listConnectionIDs()
+ {
+ return server.listConnectionIDs();
+ }
+
+ public String[] listSessions(final String connectionID)
+ {
+ return server.listSessions(connectionID);
+ }
+
// StandardMBean overrides
// ---------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSServerControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSServerControlTest.java 2008-11-25 15:43:08 UTC (rev 5434)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSServerControlTest.java 2008-11-26 09:43:32 UTC (rev 5435)
@@ -142,12 +142,123 @@
doCloseConnectionsForUnknownAddress(MinaAcceptorFactory.class.getName(), MinaConnectorFactory.class.getName());
}
+ public void testListSessionsForInVM() throws Exception
+ {
+ doListSessions(InVMAcceptorFactory.class.getName(), InVMConnectorFactory.class.getName());
+ }
+
+ public void testListSessionsForNetty() throws Exception
+ {
+ doListSessions(NettyAcceptorFactory.class.getName(), NettyConnectorFactory.class.getName());
+ }
+
+ public void testListSessionsForMina() throws Exception
+ {
+ doListSessions(MinaAcceptorFactory.class.getName(), MinaConnectorFactory.class.getName());
+ }
+
+ public void testListConnectionIDsForInVM() throws Exception
+ {
+ doListConnectionIDs(InVMAcceptorFactory.class.getName(), InVMConnectorFactory.class.getName());
+ }
+
+ public void testListConnectionIDsForNetty() throws Exception
+ {
+ doListConnectionIDs(NettyAcceptorFactory.class.getName(), NettyConnectorFactory.class.getName());
+ }
+
+ public void testListConnectionIDsForMina() throws Exception
+ {
+ doListConnectionIDs(MinaAcceptorFactory.class.getName(), MinaConnectorFactory.class.getName());
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
+ private void doListConnectionIDs(String acceptorFactory, String connectorFactory) throws Exception
+ {
+ MessagingService service = null;
+ try
+ {
+ service = startMessagingService(acceptorFactory);
+
+ JMSServerControlMBean control = createJMSServerControl();
+
+ assertEquals(0, control.listConnectionIDs().length);
+
+ Connection connection = JMSUtil.createConnection(connectorFactory);
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String[] connectionIDs = control.listConnectionIDs();
+ assertEquals(1, connectionIDs.length);
+
+ Connection connection2 = JMSUtil.createConnection(connectorFactory);
+ connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertEquals(2, control.listConnectionIDs().length);
+
+ connection.close();
+ Thread.sleep(500);
+
+ assertEquals(1, control.listConnectionIDs().length);
+
+ connection2.close();
+ Thread.sleep(500);
+
+ assertEquals(0, control.listConnectionIDs().length);
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ }
+
+ private void doListSessions(String acceptorFactory, String connectorFactory) throws Exception
+ {
+ MessagingService service = null;
+ try
+ {
+ service = startMessagingService(acceptorFactory);
+
+ JMSServerControlMBean control = createJMSServerControl();
+
+ assertEquals(0, control.listConnectionIDs().length);
+
+ Connection connection = JMSUtil.createConnection(connectorFactory);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String[] connectionIDs = control.listConnectionIDs();
+ assertEquals(1, connectionIDs.length);
+ String connectionID = connectionIDs[0];
+
+ String[] sessions = control.listSessions(connectionID);
+ assertEquals(1, sessions.length);
+
+ session.close();
+
+ sessions = control.listSessions(connectionID);
+ assertEquals(0, sessions.length);
+
+ connection.close();
+
+ Thread.sleep(500);
+
+ assertEquals(0, control.listConnectionIDs().length);
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ }
+
private void doListClientConnections(String acceptorFactory, String connectorFactory) throws Exception
{
MessagingService service = null;
@@ -196,12 +307,15 @@
JMSServerControlMBean control = createJMSServerControl();
+ assertEquals(0, service.getServer().getConnectionCount());
assertEquals(0, control.listRemoteAddresses().length);
Connection connection = JMSUtil.createConnection(connectorFactory);
// the connection won't connect to the server until a session is created
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertEquals(1, service.getServer().getConnectionCount());
+
String[] remoteAddresses = control.listRemoteAddresses();
assertEquals(1, remoteAddresses.length);
String remoteAddress = remoteAddresses[0];
@@ -217,9 +331,10 @@
assertTrue(control.closeConnectionsForAddress(remoteAddress));
- boolean gotException = exceptionLatch.await(500, TimeUnit.MILLISECONDS);
+ boolean gotException = exceptionLatch.await(1, TimeUnit.SECONDS);
assertTrue("did not received the expected JMSException", gotException);
assertEquals(0, control.listRemoteAddresses().length);
+ assertEquals(0, service.getServer().getConnectionCount());
}
finally
{
@@ -242,12 +357,14 @@
JMSServerControlMBean control = createJMSServerControl();
+ assertEquals(0, service.getServer().getConnectionCount());
assertEquals(0, control.listRemoteAddresses().length);
Connection connection = JMSUtil.createConnection(connectorFactory);
// the connection won't connect to the server until a session is created
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertEquals(1, service.getServer().getConnectionCount());
String[] remoteAddresses = control.listRemoteAddresses();
assertEquals(1, remoteAddresses.length);
@@ -262,7 +379,12 @@
assertFalse(control.closeConnectionsForAddress(unknownAddress));
+ boolean gotException = exceptionLatch.await(500, TimeUnit.MILLISECONDS);
+ assertFalse(gotException);
+
assertEquals(1, control.listRemoteAddresses().length);
+ assertEquals(1, service.getServer().getConnectionCount());
+
}
finally
{
More information about the jboss-cvs-commits
mailing list