[jboss-cvs] JBoss Messaging SVN: r5435 - in trunk: src/main/org/jboss/messaging/core/management/impl and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Nov 26 04:43:32 EST 2008


Author: jmesnil
Date: 2008-11-26 04:43:32 -0500 (Wed, 26 Nov 2008)
New Revision: 5435

Modified:
   trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
   trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
   trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
   trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
   trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
   trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
   trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSServerControlTest.java
Log:
JBMESSAGING-1117: Request for feature to close connections on server side by ip address and by connection id

- ensure that all the session's connection are closed when the connection is closed using the management operation closeConnectionsForAddress()
- added management operation to list connection and session IDs


Modified: trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java	2008-11-25 15:43:08 UTC (rev 5434)
+++ trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java	2008-11-26 09:43:32 UTC (rev 5435)
@@ -23,6 +23,7 @@
 package org.jboss.messaging.core.management;
 
 import static javax.management.MBeanOperationInfo.ACTION;
+import static javax.management.MBeanOperationInfo.INFO;
 
 import java.util.List;
 import java.util.Map;
@@ -150,9 +151,19 @@
    @Operation(desc = "Rollback a prepared transaction")
    boolean rollbackPreparedTransaction(@Parameter(desc = "the Base64 representation of a transaction", name = "transactionAsBase64") String transactionAsBase64) throws Exception;
 
+   @Operation(desc = "List the client addresses", impact = INFO)
    String[] listRemoteAddresses();
 
-   String[] listRemoteAddresses(String ipAddress);
+   @Operation(desc = "List the client addresses which match the given IP Address", impact = INFO)
+   String[] listRemoteAddresses(@Parameter(desc = "an IP address", name = "ipAddress") String ipAddress);
 
-   boolean closeConnectionsForAddress(String ipAddress);
+   @Operation(desc = "Closes all the connections for the given IP Address", impact = INFO)
+   boolean closeConnectionsForAddress(@Parameter(desc = "an IP address", name = "ipAddress") String ipAddress);
+
+   @Operation(desc = "List all the connection IDs", impact = INFO)
+   String[] listConnectionIDs();
+
+   @Operation(desc = "List the sessions for the given connectionID", impact = INFO)
+   String[] listSessions(@Parameter(desc = "a connection ID", name = "connectionID") String connectionID);
+   
 }

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2008-11-25 15:43:08 UTC (rev 5434)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2008-11-26 09:43:32 UTC (rev 5435)
@@ -45,6 +45,7 @@
 
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.filter.impl.FilterImpl;
 import org.jboss.messaging.core.management.MessagingServerControlMBean;
@@ -58,6 +59,7 @@
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerSession;
 import org.jboss.messaging.core.server.impl.ServerSessionImpl;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -602,13 +604,37 @@
          String remoteAddress = connection.getRemoteAddress();
          if (remoteAddress.contains(ipAddress))
          {
-            connection.destroy();
+            connection.fail(new MessagingException(MessagingException.INTERNAL_ERROR, "connections for " + ipAddress + " closed by management"));
             closed = true;
          }
       }
 
       return closed;
    }
+   
+   public String[] listConnectionIDs()
+   {
+      Set<RemotingConnection> connections = remotingService.getConnections();
+      String[] connectionIDs = new String[connections.size()];
+      int i = 0;
+      for (RemotingConnection connection : connections)
+      {
+         connectionIDs[i++] = connection.getID().toString();
+      }
+      return connectionIDs;
+   }
+   
+   public String[] listSessions(final String connectionID)
+   {
+      List<ServerSession> sessions = server.getSessions(connectionID);
+      String[] sessionIDs = new String[sessions.size()];
+      int i = 0;
+      for (ServerSession serverSession : sessions)
+      {
+         sessionIDs[i++] = serverSession.getName();
+      }
+      return sessionIDs;
+   }
 
    // NotificationEmitter implementation ----------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2008-11-25 15:43:08 UTC (rev 5434)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2008-11-26 09:43:32 UTC (rev 5435)
@@ -12,6 +12,7 @@
 
 package org.jboss.messaging.core.server;
 
+import java.util.List;
 import java.util.Set;
 
 import org.jboss.messaging.core.config.Configuration;
@@ -107,4 +108,6 @@
    RemotingConnection getReplicatingConnection();
 
    ResourceManager getResourceManager();
+
+   List<ServerSession> getSessions(String connectionID);
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-11-25 15:43:08 UTC (rev 5434)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-11-26 09:43:32 UTC (rev 5435)
@@ -59,6 +59,8 @@
  */
 public interface ServerSession
 {
+   String getName();
+
    long getID();
 
    String getUsername();
@@ -67,6 +69,8 @@
    
    int getMinLargeMessageSize();
 
+   Object getConnectionID();
+
    void removeConsumer(ServerConsumer consumer) throws Exception;
 
    void close() throws Exception;
@@ -140,4 +144,5 @@
    int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);
    
    Channel getChannel();
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-11-25 15:43:08 UTC (rev 5434)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-11-26 09:43:32 UTC (rev 5435)
@@ -12,11 +12,12 @@
 
 package org.jboss.messaging.core.server.impl;
 
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.Collection;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -27,20 +28,15 @@
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
-import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
-import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.management.ManagementService;
 import org.jboss.messaging.core.management.MessagingServerControlMBean;
 import org.jboss.messaging.core.paging.PagingManager;
-import org.jboss.messaging.core.paging.PagingStoreFactory;
+import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
 import org.jboss.messaging.core.paging.impl.PagingStoreFactoryNIO;
-import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.impl.PostOfficeImpl;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.ChannelHandler;
@@ -59,7 +55,6 @@
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.server.ServerSession;
-import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.cluster.ClusterManager;
 import org.jboss.messaging.core.server.cluster.impl.ClusterManagerImpl;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
@@ -594,6 +589,21 @@
    {
       sessions.remove(name);
    }
+   
+   public List<ServerSession> getSessions(final String connectionID)
+   {
+      Set<Entry<String, ServerSession>> sessionEntries = sessions.entrySet();
+      List<ServerSession> matchingSessions = new ArrayList<ServerSession>();
+      for (Entry<String, ServerSession> sessionEntry : sessionEntries)
+      {
+         ServerSession serverSession = sessionEntry.getValue();
+         if (serverSession.getConnectionID().toString().equals(connectionID))
+         {
+            matchingSessions.add(serverSession);
+         }
+      }
+      return matchingSessions;
+   }
 
    public RemotingConnection getReplicatingConnection()
    {

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-11-25 15:43:08 UTC (rev 5434)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-11-26 09:43:32 UTC (rev 5435)
@@ -293,6 +293,16 @@
       return id;
    }
 
+   public String getName()
+   {
+      return name;
+   }
+
+   public Object getConnectionID()
+   {
+      return remotingConnection.getID();
+   }
+
    public void removeConsumer(final ServerConsumer consumer) throws Exception
    {
       if (consumers.remove(consumer.getID()) == null)

Modified: trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java	2008-11-25 15:43:08 UTC (rev 5434)
+++ trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java	2008-11-26 09:43:32 UTC (rev 5435)
@@ -163,4 +163,9 @@
    String[] listRemoteAddresses(String ipAddress);
 
    boolean closeConnectionsForAddress(String ipAddress);
+
+   String[] listConnectionIDs();
+
+   String[] listSessions(String connectionID);
+
 }

Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java	2008-11-25 15:43:08 UTC (rev 5434)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java	2008-11-26 09:43:32 UTC (rev 5435)
@@ -340,6 +340,16 @@
    {
       return messagingServer.closeConnectionsForAddress(ipAddress);
    }
+   
+   public String[] listConnectionIDs()
+   {
+      return messagingServer.listConnectionIDs();
+   }
+   
+   public String[] listSessions(final String connectionID)
+   {
+      return messagingServer.listSessions(connectionID);
+   }
 
    // Public --------------------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java	2008-11-25 15:43:08 UTC (rev 5434)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java	2008-11-26 09:43:32 UTC (rev 5435)
@@ -113,7 +113,15 @@
    String[] listRemoteAddresses();
    
    @Operation(desc = "List the client addresses which match the given IP Address", impact = INFO)
-   String[] listRemoteAddresses(String ipAddress);
+   String[] listRemoteAddresses(@Parameter(desc = "an IP address", name = "ipAddress") String ipAddress);
 
-   boolean closeConnectionsForAddress(String ipAddress);
+   @Operation(desc = "Closes all the connections for the given IP Address", impact = INFO)
+   boolean closeConnectionsForAddress(@Parameter(desc = "an IP address", name = "ipAddress") String ipAddress);
+   
+   @Operation(desc = "List all the connection IDs", impact = INFO)
+   String[] listConnectionIDs();
+
+   @Operation(desc = "List the sessions for the given connectionID", impact = INFO)
+   String[] listSessions(@Parameter(desc = "a connection ID", name = "connectionID") String connectionID);
+
 }

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java	2008-11-25 15:43:08 UTC (rev 5434)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java	2008-11-26 09:43:32 UTC (rev 5435)
@@ -234,6 +234,16 @@
       return server.closeConnectionsForAddress(ipAddress);
    }
 
+   public String[] listConnectionIDs()
+   {
+      return server.listConnectionIDs();
+   }
+
+   public String[] listSessions(final String connectionID)
+   {
+      return server.listSessions(connectionID);
+   }
+
    // StandardMBean overrides
    // ---------------------------------------------------
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSServerControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSServerControlTest.java	2008-11-25 15:43:08 UTC (rev 5434)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSServerControlTest.java	2008-11-26 09:43:32 UTC (rev 5435)
@@ -142,12 +142,123 @@
       doCloseConnectionsForUnknownAddress(MinaAcceptorFactory.class.getName(), MinaConnectorFactory.class.getName());
    }
    
+   public void testListSessionsForInVM() throws Exception
+   {
+      doListSessions(InVMAcceptorFactory.class.getName(), InVMConnectorFactory.class.getName());
+   }
+
+   public void testListSessionsForNetty() throws Exception
+   {
+      doListSessions(NettyAcceptorFactory.class.getName(), NettyConnectorFactory.class.getName());
+   }
+
+   public void testListSessionsForMina() throws Exception
+   {
+      doListSessions(MinaAcceptorFactory.class.getName(), MinaConnectorFactory.class.getName());
+   }
+
+   public void testListConnectionIDsForInVM() throws Exception
+   {
+      doListConnectionIDs(InVMAcceptorFactory.class.getName(), InVMConnectorFactory.class.getName());
+   }
+
+   public void testListConnectionIDsForNetty() throws Exception
+   {
+      doListConnectionIDs(NettyAcceptorFactory.class.getName(), NettyConnectorFactory.class.getName());
+   }
+
+   public void testListConnectionIDsForMina() throws Exception
+   {
+      doListConnectionIDs(MinaAcceptorFactory.class.getName(), MinaConnectorFactory.class.getName());
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
 
+   private void doListConnectionIDs(String acceptorFactory, String connectorFactory) throws Exception
+   {
+      MessagingService service = null;
+      try
+      {
+         service = startMessagingService(acceptorFactory);
+
+         JMSServerControlMBean control = createJMSServerControl();
+         
+         assertEquals(0, control.listConnectionIDs().length);
+
+         Connection connection = JMSUtil.createConnection(connectorFactory);
+         connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         String[] connectionIDs = control.listConnectionIDs();         
+         assertEquals(1, connectionIDs.length);
+
+         Connection connection2 = JMSUtil.createConnection(connectorFactory);
+         connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         assertEquals(2, control.listConnectionIDs().length);
+
+         connection.close();
+         Thread.sleep(500);
+         
+         assertEquals(1, control.listConnectionIDs().length);
+
+         connection2.close();
+         Thread.sleep(500);
+         
+         assertEquals(0, control.listConnectionIDs().length);
+      }
+      finally
+      {
+         if (service != null)
+         {
+            service.stop();
+         }
+      }
+   }
+   
+   private void doListSessions(String acceptorFactory, String connectorFactory) throws Exception
+   {
+      MessagingService service = null;
+      try
+      {
+         service = startMessagingService(acceptorFactory);
+
+         JMSServerControlMBean control = createJMSServerControl();
+         
+         assertEquals(0, control.listConnectionIDs().length);
+
+         Connection connection = JMSUtil.createConnection(connectorFactory);
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         String[] connectionIDs = control.listConnectionIDs();         
+         assertEquals(1, connectionIDs.length);
+         String connectionID = connectionIDs[0];
+
+         String[] sessions = control.listSessions(connectionID);
+         assertEquals(1, sessions.length);
+
+         session.close();
+
+         sessions = control.listSessions(connectionID);
+         assertEquals(0, sessions.length);
+
+         connection.close();
+         
+         Thread.sleep(500);
+         
+         assertEquals(0, control.listConnectionIDs().length);
+      }
+      finally
+      {
+         if (service != null)
+         {
+            service.stop();
+         }
+      }
+   }
+   
    private void doListClientConnections(String acceptorFactory, String connectorFactory) throws Exception
    {
       MessagingService service = null;
@@ -196,12 +307,15 @@
 
          JMSServerControlMBean control = createJMSServerControl();
 
+         assertEquals(0, service.getServer().getConnectionCount());
          assertEquals(0, control.listRemoteAddresses().length);
 
          Connection connection = JMSUtil.createConnection(connectorFactory);
          // the connection won't connect to the server until a session is created
          connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
+         assertEquals(1, service.getServer().getConnectionCount());
+
          String[] remoteAddresses = control.listRemoteAddresses();
          assertEquals(1, remoteAddresses.length);
          String remoteAddress = remoteAddresses[0];
@@ -217,9 +331,10 @@
 
          assertTrue(control.closeConnectionsForAddress(remoteAddress));
 
-         boolean gotException = exceptionLatch.await(500, TimeUnit.MILLISECONDS);
+         boolean gotException = exceptionLatch.await(1, TimeUnit.SECONDS);
          assertTrue("did not received the expected JMSException", gotException);
          assertEquals(0, control.listRemoteAddresses().length);
+         assertEquals(0, service.getServer().getConnectionCount());
       }
       finally
       {
@@ -242,12 +357,14 @@
 
          JMSServerControlMBean control = createJMSServerControl();
 
+         assertEquals(0, service.getServer().getConnectionCount());
          assertEquals(0, control.listRemoteAddresses().length);
 
          Connection connection = JMSUtil.createConnection(connectorFactory);
          // the connection won't connect to the server until a session is created
          connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
+         assertEquals(1, service.getServer().getConnectionCount());
          String[] remoteAddresses = control.listRemoteAddresses();
          assertEquals(1, remoteAddresses.length);
 
@@ -262,7 +379,12 @@
 
          assertFalse(control.closeConnectionsForAddress(unknownAddress));
 
+         boolean gotException = exceptionLatch.await(500, TimeUnit.MILLISECONDS);
+         assertFalse(gotException);
+
          assertEquals(1, control.listRemoteAddresses().length);
+         assertEquals(1, service.getServer().getConnectionCount());
+
       }
       finally
       {




More information about the jboss-cvs-commits mailing list