[hornetq-commits] JBoss hornetq SVN: r9852 - in trunk: src/main/org/hornetq/api/jms/management and 15 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Nov 8 09:22:15 EST 2010


Author: gaohoward
Date: 2010-11-08 09:22:13 -0500 (Mon, 08 Nov 2010)
New Revision: 9852

Added:
   trunk/src/main/org/hornetq/api/jms/management/JMSConnectionInfo.java
   trunk/src/main/org/hornetq/api/jms/management/JMSConsumerInfo.java
   trunk/src/main/org/hornetq/api/jms/management/JMSSessionInfo.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessage.java
Modified:
   trunk/src/main/org/hornetq/api/core/client/ClientSession.java
   trunk/src/main/org/hornetq/api/jms/management/DestinationControl.java
   trunk/src/main/org/hornetq/api/jms/management/JMSQueueControl.java
   trunk/src/main/org/hornetq/api/jms/management/JMSServerControl.java
   trunk/src/main/org/hornetq/api/jms/management/SubscriptionInfo.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
   trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
   trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
   trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/src/main/org/hornetq/core/server/HornetQServer.java
   trunk/src/main/org/hornetq/core/server/Queue.java
   trunk/src/main/org/hornetq/core/server/ServerConsumer.java
   trunk/src/main/org/hornetq/core/server/ServerSession.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
   trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
   trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
   trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
   trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
   trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
   trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
   trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
   trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
   trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
Log:
HORNETQ-416


Modified: trunk/src/main/org/hornetq/api/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/ClientSession.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/api/core/client/ClientSession.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -562,4 +562,9 @@
     */
    void setSendAcknowledgementHandler(SendAcknowledgementHandler handler);
 
+   /**
+    * Attach any metadata to the session.
+    * @throws HornetQException 
+    */
+   void addMetaData(String key, String data) throws HornetQException;
 }

Modified: trunk/src/main/org/hornetq/api/jms/management/DestinationControl.java
===================================================================
--- trunk/src/main/org/hornetq/api/jms/management/DestinationControl.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/api/jms/management/DestinationControl.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -47,6 +47,16 @@
     */
    int getMessageCount() throws Exception;
 
+   /**
+    * Returns the number of messages that this queue is currently delivering to its consumers.
+    */
+   int getDeliveringCount();
+
+   /**
+    * Returns the number of messages added to this queue since it was created.
+    */
+   long getMessagesAdded();
+
    // Operations ----------------------------------------------------
 
    /**

Copied: trunk/src/main/org/hornetq/api/jms/management/JMSConnectionInfo.java (from rev 9826, branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSConnectionInfo.java)
===================================================================
--- trunk/src/main/org/hornetq/api/jms/management/JMSConnectionInfo.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/api/jms/management/JMSConnectionInfo.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.api.jms.management;
+
+import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONObject;
+
+/**
+ * A JMSConnectionInfo
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public class JMSConnectionInfo
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final String connectionID;
+   
+   private final String clientAddress;
+   
+   private final long creationTime;
+   
+   private final String clientID;
+   
+   private final String username;
+   
+
+   // Static --------------------------------------------------------
+   
+   public static JMSConnectionInfo[] from(final String jsonString) throws Exception
+   {
+      JSONArray array = new JSONArray(jsonString);
+      JMSConnectionInfo[] infos = new JMSConnectionInfo[array.length()];
+      for (int i = 0; i < array.length(); i++)
+      {
+         JSONObject obj = array.getJSONObject(i);
+         String cid = obj.isNull("clientID") ? null : obj.getString("clientID");
+         String uname = obj.isNull("principal") ? null : obj.getString("principal");
+         
+         JMSConnectionInfo info = new JMSConnectionInfo(obj.getString("connectionID"),
+                                                        obj.getString("clientAddress"),
+                                                        obj.getLong("creationTime"),
+                                                        cid,
+                                                        uname);
+         infos[i] = info;
+      }
+      return infos;
+   }
+
+   // Constructors --------------------------------------------------
+
+   private JMSConnectionInfo(final String connectionID,
+                             final String clientAddress,
+                             final long creationTime,
+                             final String clientID,
+                             final String username)
+   {
+      this.connectionID = connectionID;
+      this.clientAddress = clientAddress;
+      this.creationTime = creationTime;
+      this.clientID = clientID;
+      this.username = username;
+   }
+   
+   // Public --------------------------------------------------------
+
+   public String getConnectionID()
+   {
+      return connectionID;
+   }
+   
+   public String getClientAddress()
+   {
+      return clientAddress;
+   }
+   
+   public long getCreationTime()
+   {
+      return creationTime;
+   }
+   
+   public String getClientID()
+   {
+      return clientID;
+   }
+   
+   public String getUsername()
+   {
+      return username;
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Copied: trunk/src/main/org/hornetq/api/jms/management/JMSConsumerInfo.java (from rev 9826, branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSConsumerInfo.java)
===================================================================
--- trunk/src/main/org/hornetq/api/jms/management/JMSConsumerInfo.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/api/jms/management/JMSConsumerInfo.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.api.jms.management;
+
+import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONObject;
+
+/**
+ * Helper class to create Java Objects from the
+ * JSON serialization returned by {@link JMSServerControl#listConsumersAsJSON(String)} and related methods.
+ * 
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class JMSConsumerInfo
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final String consumerID;
+
+   private final String connectionID;
+
+   private final String destinationName;
+
+   private final String destinationType;
+
+   private final boolean browseOnly;
+
+   private final long creationTime;
+   
+   private final boolean durable;
+   
+   private final String filter;
+
+   // Static --------------------------------------------------------
+
+   /**
+    * Returns an array of SubscriptionInfo corresponding to the JSON serialization returned
+    * by {@link TopicControl#listAllSubscriptionsAsJSON()} and related methods.
+    */
+   public static JMSConsumerInfo[] from(final String jsonString) throws Exception
+   {
+      JSONArray array = new JSONArray(jsonString);
+      JMSConsumerInfo[] infos = new JMSConsumerInfo[array.length()];
+      for (int i = 0; i < array.length(); i++)
+      {
+         JSONObject sub = array.getJSONObject(i);
+         JMSConsumerInfo info = new JMSConsumerInfo(sub.getString("consumerID"),
+                                                    sub.getString("connectionID"),
+                                                    sub.getString("destinationName"),
+                                                    sub.getString("destinationType"),
+                                                    sub.getBoolean("browseOnly"),
+                                                    sub.getLong("creationTime"),
+                                                    sub.getBoolean("durable"),
+                                                    sub.optString("filter", null));
+         infos[i] = info;
+      }
+
+      return infos;
+   }
+
+   // Constructors --------------------------------------------------
+
+   private JMSConsumerInfo(final String consumerID,
+                           final String connectionID,
+                            final String destinationName,
+                            final String destinationType,
+                            final boolean browseOnly,
+                            final long creationTime,
+                            final boolean durable,
+                            final String filter)
+   {
+      this.consumerID = consumerID;
+      this.connectionID = connectionID;
+      this.destinationName = destinationName;
+      this.destinationType = destinationType;
+      this.browseOnly = browseOnly;
+      this.creationTime = creationTime;
+      this.durable = durable;
+      this.filter = filter;
+   }
+
+   // Public --------------------------------------------------------
+
+   public String getConsumerID()
+   {
+      return consumerID;
+   }
+   
+   public String getConnectionID()
+   {
+      return connectionID;
+   }
+   
+   public String getDestinationName()
+   {
+      return destinationName;
+   }
+   
+   public String getDestinationType()
+   {
+      return destinationType;
+   }
+   
+   public boolean isBrowseOnly()
+   {
+      return browseOnly;
+   }
+   
+   public long getCreationTime()
+   {
+      return creationTime;
+   }
+   
+   /**
+    * @return the durable
+    */
+   public boolean isDurable()
+   {
+      return durable;
+   }
+   
+   public String getFilter()
+   {
+      return filter;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/src/main/org/hornetq/api/jms/management/JMSQueueControl.java
===================================================================
--- trunk/src/main/org/hornetq/api/jms/management/JMSQueueControl.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/api/jms/management/JMSQueueControl.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -51,11 +51,6 @@
    void setDeadLetterAddress(@Parameter(name = "deadLetterAddress", desc = "Dead-letter address of the queue") String deadLetterAddress) throws Exception;
 
    /**
-    * Returns the number of messages added to this queue since it was created.
-    */
-   long getMessagesAdded();
-
-   /**
     * Returns the number of scheduled messages in this queue.
     */
    long getScheduledCount();
@@ -66,11 +61,6 @@
    int getConsumerCount();
 
    /**
-    * Returns the number of messages that this queue is currently delivering to its consumers.
-    */
-   int getDeliveringCount();
-
-   /**
     * returns the selector for the queue
     */
    String getSelector();

Modified: trunk/src/main/org/hornetq/api/jms/management/JMSServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/api/jms/management/JMSServerControl.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/api/jms/management/JMSServerControl.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -229,12 +229,55 @@
    String[] listConnectionIDs() throws Exception;
 
    /**
+    * Lists all the connections connected to this server.
+    * The returned String is a JSON string containing an array of JMSConnectionInfo objects.
+    * 
+    * @see JMSConnectionInfo#from(String)
+    */
+   @Operation(desc = "List all JMS connections")
+   String listConnectionsAsJSON() throws Exception;
+   
+   /**
     * Lists all the sessions IDs for the specified connection ID.
     */
    @Operation(desc = "List the sessions for the given connectionID", impact = MBeanOperationInfo.INFO)
    String[] listSessions(@Parameter(desc = "a connection ID", name = "connectionID") String connectionID) throws Exception;
 
    /**
+    * Lists all the consumers which belongs to the JMS Connection specified by the connectionID.
+    * The returned String is a JSON string containing an array of JMSConsumerInfo objects.
+    * 
+    * @see JMSConsumerInfo#from(String)
+    */
+   @Operation(desc = "List all JMS consumers associated to a JMS Connection")
+   String listConsumersAsJSON(@Parameter(desc = "a connection ID", name = "connectionID") String connectionID) throws Exception;
+   
+   /**
+    * Lists all addresses to which the designated server session has sent messages.
+    */
+   @Operation(desc = "Lists all addresses to which the designated session has sent messages", impact = MBeanOperationInfo.INFO)
+   String[] listTargetDestinations(@Parameter(desc = "a session ID", name = "sessionID") String sessionID) throws Exception;
+   
+   /**
+    * Returns the last sent message's ID from the given session to an address.
+    */
+   @Operation(desc = "Returns the last sent message's ID from the given session to an address", impact = MBeanOperationInfo.INFO)
+   String getLastSentMessageID(@Parameter(desc = "session name", name = "sessionID") String sessionID,
+                               @Parameter(desc = "address", name = "address") String address) throws Exception;
+   
+   /**
+    * Gets the session's creation time.
+    */
+   @Operation(desc = "Gets the sessions creation time", impact = MBeanOperationInfo.INFO)
+   String getSessionCreationTime(@Parameter(desc = "session name", name = "sessionID") String sessionID) throws Exception;
+   
+   /**
+    * Lists all the sessions IDs for the specified connection ID.
+    */
+   @Operation(desc = "List the sessions for the given connectionID", impact = MBeanOperationInfo.INFO)
+   String listSessionsAsJSON(@Parameter(desc = "a connection ID", name = "connectionID") String connectionID) throws Exception;
+
+   /**
     * List all the prepared transaction, sorted by date,
     * oldest first, with details, in text format
     */
@@ -247,6 +290,5 @@
     */
    @Operation(desc = "List all the prepared transaction, sorted by date, oldest first, with details, in HTML format", impact = MBeanOperationInfo.INFO)
    String listPreparedTransactionDetailsAsHTML() throws Exception;
-   
 
 }

Copied: trunk/src/main/org/hornetq/api/jms/management/JMSSessionInfo.java (from rev 9826, branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSSessionInfo.java)
===================================================================
--- trunk/src/main/org/hornetq/api/jms/management/JMSSessionInfo.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/api/jms/management/JMSSessionInfo.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.api.jms.management;
+
+import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONException;
+import org.hornetq.utils.json.JSONObject;
+
+/**
+ * A JMSSessionInfo
+ *
+ * @author howard
+ *
+ *
+ */
+public class JMSSessionInfo
+{
+   private final String sessionID;
+   
+   private final long creationTime;
+   
+   public JMSSessionInfo(String sessionID, long creationTime)
+   {
+      this.sessionID = sessionID;
+      this.creationTime = creationTime;
+   }
+
+   public static JMSSessionInfo[] from(final String jsonString) throws JSONException
+   {
+      JSONArray array = new JSONArray(jsonString);
+      JMSSessionInfo[] infos = new JMSSessionInfo[array.length()];
+      for (int i = 0; i < array.length(); i++)
+      {
+         JSONObject obj = array.getJSONObject(i);
+
+         JMSSessionInfo info = new JMSSessionInfo(obj.getString("sessionID"),
+                                                        obj.getLong("creationTime"));
+         infos[i] = info;
+      }
+      return infos;
+   }
+   
+   public String getSessionID()
+   {
+      return sessionID;
+   }
+
+   public long getCreationTime()
+   {
+      return creationTime;
+   }
+}

Modified: trunk/src/main/org/hornetq/api/jms/management/SubscriptionInfo.java
===================================================================
--- trunk/src/main/org/hornetq/api/jms/management/SubscriptionInfo.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/api/jms/management/SubscriptionInfo.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -40,6 +40,8 @@
 
    private final int messageCount;
 
+   private final int deliveringCount;
+   
    // Static --------------------------------------------------------
 
    /**
@@ -58,7 +60,8 @@
                                                       sub.optString("name", null),
                                                       sub.getBoolean("durable"),
                                                       sub.optString("selector", null),
-                                                      sub.getInt("messageCount"));
+                                                      sub.getInt("messageCount"),
+                                                      sub.getInt("deliveringCount"));
          infos[i] = info;
       }
 
@@ -72,7 +75,8 @@
                             final String name,
                             final boolean durable,
                             final String selector,
-                            final int messageCount)
+                            final int messageCount,
+                            final int deliveringCount)
    {
       this.queueName = queueName;
       this.clientID = clientID;
@@ -80,6 +84,7 @@
       this.durable = durable;
       this.selector = selector;
       this.messageCount = messageCount;
+      this.deliveringCount = deliveringCount;
    }
 
    // Public --------------------------------------------------------
@@ -131,6 +136,14 @@
    {
       return messageCount;
    }
+   
+   /**
+    * Returns the number of messages currently delivered to this subscription.
+    */
+   public int getDeliveringCount()
+   {
+      return deliveringCount;
+   }
 
    // Package protected ---------------------------------------------
 

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -45,6 +45,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.RollbackMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionCloseMessage;
@@ -1810,4 +1811,9 @@
       }
 
    }
+
+   public void addMetaData(String key, String data) throws HornetQException
+   {
+      channel.sendBlocking(new SessionAddMetaDataMessage(key, data));
+   }
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -555,4 +555,9 @@
    {
       session.setPacketSize(packetSize);
    }
+
+   public void addMetaData(String key, String data) throws HornetQException
+   {
+      session.addMetaData(key, data);
+   }
 }

Modified: trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -1886,4 +1886,13 @@
       }
    }
 
+   public String[] listTargetAddresses(String sessionID)
+   {
+      ServerSession session = server.getSessionByID(sessionID);
+      if (session != null) {
+         return session.getTargetAddresses();
+      }
+      return new String[0];
+   }
+
 }

Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -62,6 +62,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.RollbackMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
@@ -460,6 +461,13 @@
                   session.requestProducerCredits(message.getAddress(), message.getCredits());
                   break;
                }
+               case PacketImpl.SESS_ADD_METADATA:
+               {
+                  response = new NullResponseMessage();
+                  SessionAddMetaDataMessage message = (SessionAddMetaDataMessage)packet;
+                  session.addMetaData(message.getKey(), message.getData());
+                  break;
+               }
             }
          }
          catch (HornetQXAException e)

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -78,6 +78,7 @@
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_START;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_SUSPEND;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA;
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.core.logging.Logger;
@@ -107,6 +108,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.RollbackMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionCloseMessage;
@@ -486,6 +488,11 @@
             packet = new SessionForceConsumerDelivery();
             break;
          }
+         case SESS_ADD_METADATA:
+         {
+            packet = new SessionAddMetaDataMessage();
+            break;
+         }
          default:
          {
             throw new IllegalArgumentException("Invalid type: " + packetType);

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -182,6 +182,8 @@
 
    public static final byte REPLICATION_SYNC = 103;
 
+   public static final byte SESS_ADD_METADATA = 104;
+
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -89,6 +89,10 @@
    
    private volatile boolean executing;
 
+   private final long creationTime;
+   
+   private String clientID;
+
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -129,6 +133,8 @@
       this.client = client;
 
       this.executor = executor;
+      
+      this.creationTime = System.currentTimeMillis();
    }
 
    // RemotingConnection implementation
@@ -160,6 +166,11 @@
    {
       return transportConnection.getRemoteAddress();
    }
+   
+   public long getCreationTime()
+   {
+      return creationTime;
+   }
 
    public synchronized Channel getChannel(final long channelID, final int confWindowSize)
    {
@@ -547,4 +558,14 @@
          channel.close();
       }
    }
+
+   public void setClientID(String cID)
+   {
+      clientID = cID;
+   }
+   
+   public String getClientID()
+   {
+      return clientID;
+   }
 }

Copied: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessage.java (from rev 9826, branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessage.java)
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessage.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessage.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * A SessionAddMetaDataMessage
+ *
+ * @author <a href="mailto:hgao at redhat.com>Howard Gao</a>
+ *
+ *
+ */
+public class SessionAddMetaDataMessage extends PacketImpl
+{
+   private String key;
+   private String data;
+
+   public SessionAddMetaDataMessage()
+   {
+      super(PacketImpl.SESS_ADD_METADATA);
+   }
+   
+   public SessionAddMetaDataMessage(String k, String d)
+   {
+      this();
+      key = k;
+      data = d;
+   }
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeString(key);
+      buffer.writeString(data);
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      key = buffer.readString();
+      data = buffer.readString();
+   }
+
+   @Override
+   public final boolean isRequiresConfirmations()
+   {
+      return false;
+   }
+
+   public String getKey()
+   {
+      return key;
+   }
+
+   public String getData()
+   {
+      return data;
+   }
+
+}

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -52,6 +52,8 @@
    private boolean valid;
 
    private boolean destroyed = false;
+   
+   private final long creationTime;
 
    private StompDecoder decoder = new StompDecoder();
 
@@ -73,6 +75,8 @@
       this.transportConnection = transportConnection;
 
       this.manager = manager;
+      
+      this.creationTime = System.currentTimeMillis();
    }
 
    public void addFailureListener(final FailureListener listener)
@@ -239,6 +243,11 @@
    {
       return transportConnection.getRemoteAddress();
    }
+   
+   public long getCreationTime()
+   {
+      return creationTime;
+   }
 
    public Connection getTransportConnection()
    {

Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -570,4 +570,5 @@
          }
       }
    }
+
 }
\ No newline at end of file

Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -159,4 +159,6 @@
    void deployBridge(BridgeConfiguration config) throws Exception;
 
    void destroyBridge(String name) throws Exception;
+
+   ServerSession getSessionByID(String sessionID);
 }

Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/server/Queue.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -157,4 +157,6 @@
    void close() throws Exception;
    
    boolean isDirectDeliver();
+
+   SimpleString getAddress();
 }

Modified: trunk/src/main/org/hornetq/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerConsumer.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/server/ServerConsumer.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -47,6 +47,10 @@
    void forceDelivery(long sequence);   
    
    void setTransferring(boolean transferring);
+
+   boolean isBrowseOnly();
+
+   long getCreationTime();
 }
 
 

Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -14,6 +14,7 @@
 package org.hornetq.core.server;
 
 import java.util.List;
+import java.util.Set;
 
 import javax.transaction.xa.Xid;
 
@@ -110,4 +111,16 @@
    void close(boolean failed) throws Exception;
 
    void setTransferring(boolean transferring);
+
+   Set<ServerConsumer> getServerConsumers();
+
+   void addMetaData(String key, String data);
+
+   String getMetaData(String key);
+
+   String[] getTargetAddresses();
+
+   String getLastSentMessageID(String address);
+
+   long getCreationTime();
 }

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -1513,6 +1513,11 @@
                                     });
       
    }
+
+   public ServerSession getSessionByID(String sessionName)
+   {
+      return sessions.get(sessionName);
+   }
    
 
    // Inner classes

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -238,6 +238,11 @@
    {
       return name;
    }
+   
+   public SimpleString getAddress()
+   {
+      return address;
+   }
 
    public long getID()
    {

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -126,6 +126,8 @@
     */
    private AtomicBoolean writeReady = new AtomicBoolean(true);
 
+   private final long creationTime;
+
    // Constructors ---------------------------------------------------------------------------------
 
    public ServerConsumerImpl(final long id,
@@ -170,6 +172,8 @@
       
       this.callback.addReadyListener(this);
 
+      this.creationTime = System.currentTimeMillis();
+      
       if (browseOnly)
       {
          browserDeliverer = new BrowserDeliverer(messageQueue.iterator());
@@ -188,6 +192,16 @@
       return id;
    }
    
+   public boolean isBrowseOnly()
+   {
+      return browseOnly;
+   }
+
+   public long getCreationTime()
+   {
+      return creationTime;
+   }
+
    public HandleStatus handle(final MessageReference ref) throws Exception
    {
       if (availableCredits != null && availableCredits.get() <= 0)

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -16,8 +16,10 @@
 import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -64,6 +66,7 @@
 import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.spi.core.protocol.SessionCallback;
 import org.hornetq.utils.TypedProperties;
+import org.hornetq.utils.UUID;
 
 /*
  * Session implementation 
@@ -135,7 +138,13 @@
    private volatile SimpleString defaultAddress;
 
    private volatile int timeoutSeconds;
+   
+   private Map<String, String> metaData;
 
+   private Map<SimpleString, UUID> targetAddressInfos = new HashMap<SimpleString, UUID>();
+   
+   private long creationTime = System.currentTimeMillis();
+
    // Constructors ---------------------------------------------------------------------------------
 
    public ServerSessionImpl(final String name,
@@ -232,6 +241,11 @@
    {
       return remotingConnection.getID();
    }
+   
+   public Set<ServerConsumer> getServerConsumers() {
+      Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
+      return Collections.unmodifiableSet(consumersClone);
+   }
 
    public void removeConsumer(final long consumerID) throws Exception
    {
@@ -1173,8 +1187,58 @@
       }
 
       postOffice.route(msg, routingContext, direct);
+      
+      targetAddressInfos.put(msg.getAddress(), msg.getUserID());
 
       routingContext.clear();
    }
 
+   public void addMetaData(String key, String data)
+   {
+      if (metaData == null)
+      {
+         metaData = new HashMap<String, String>();
+      }
+      metaData.put(key, data);
+   }
+
+   public String getMetaData(String key)
+   {
+      String data = null;
+      if (metaData != null)
+      {
+         data = metaData.get(key);
+      }
+      return data;
+   }
+   
+   public String[] getTargetAddresses()
+   {
+      Map<SimpleString, UUID> copy = new HashMap<SimpleString, UUID>(targetAddressInfos);
+      Iterator<SimpleString> iter = copy.keySet().iterator();
+      int num = copy.keySet().size();
+      String[] addresses = new String[num];
+      int i = 0;
+      while (iter.hasNext())
+      {
+         addresses[i] = iter.next().toString();
+         i++;
+      }
+      return addresses;
+   }
+
+   public String getLastSentMessageID(String address)
+   {
+      UUID id = targetAddressInfos.get(SimpleString.toSimpleString(address));
+      if (id != null)
+      {
+         return id.toString();
+      }
+      return null;
+   }
+
+   public long getCreationTime()
+   {
+      return this.creationTime;
+   }
 }

Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnection.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnection.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -181,6 +181,16 @@
       }
 
       this.clientID = clientID;
+      try
+      {
+         this.addSessionMetaData(initialSession);
+      }
+      catch (HornetQException e)
+      {
+         JMSException ex = new JMSException("Internal error setting metadata jms-client-id");
+         ex.setLinkedException(e);
+         throw ex;
+      }
 
       justCreated = false;
    }
@@ -537,6 +547,8 @@
          {
             session.start();
          }
+         
+         this.addSessionMetaData(session);
 
          return jbs;
       }
@@ -562,6 +574,8 @@
       {
          initialSession = sessionFactory.createSession(username, password, false, false, false, false, 0);
 
+         addSessionMetaData(initialSession);
+
          initialSession.addFailureListener(listener);
       }
       catch (HornetQException me)
@@ -570,6 +584,15 @@
       }
    }
 
+   private void addSessionMetaData(ClientSession session) throws HornetQException
+   {
+      session.addMetaData("jms-session", "");
+      if (clientID != null)
+      {
+         session.addMetaData("jms-client-id", clientID);
+      }
+   }
+
    // Inner classes --------------------------------------------------------------------------------
 
    private static class JMSFailureListener implements SessionFailureListener

Modified: trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -14,8 +14,10 @@
 package org.hornetq.jms.management.impl;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.management.ListenerNotFoundException;
@@ -32,11 +34,20 @@
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.jms.management.ConnectionFactoryControl;
+import org.hornetq.api.jms.management.DestinationControl;
 import org.hornetq.api.jms.management.JMSQueueControl;
 import org.hornetq.api.jms.management.JMSServerControl;
 import org.hornetq.api.jms.management.TopicControl;
+import org.hornetq.core.filter.Filter;
 import org.hornetq.core.management.impl.MBeanInfoHelper;
+import org.hornetq.core.server.ServerConsumer;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.client.HornetQQueue;
 import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONObject;
 import org.hornetq.jms.server.impl.JMSFactoryType;
 
 /**
@@ -86,6 +97,38 @@
       }
       return trimmed;
    }
+   
+   private static String[] determineJMSDestination(String coreAddress)
+   {
+      String[] result = new String[2]; // destination name & type
+      if (coreAddress.startsWith(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX))
+      {
+         result[0] = coreAddress.substring(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX.length());
+         result[1] = "queue";
+      }
+      else if (coreAddress.startsWith(HornetQQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX))
+      {
+         result[0] = coreAddress.substring(HornetQQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length());
+         result[1] = "tempqueue";
+      }
+      else if (coreAddress.startsWith(HornetQQueue.JMS_TOPIC_ADDRESS_PREFIX))
+      {
+         result[0] = coreAddress.substring(HornetQQueue.JMS_TOPIC_ADDRESS_PREFIX.length());
+         result[1] = "topic";
+      }
+      else if (coreAddress.startsWith(HornetQQueue.JMS_TEMP_TOPIC_ADDRESS_PREFIX))
+      {
+         result[0] = coreAddress.substring(HornetQQueue.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length());
+         result[1] = "temptopic";
+      }
+      else
+      {
+         System.out.println("JMSServerControlImpl.determineJMSDestination()" + coreAddress);
+         // not related to JMS
+         return null;
+      }
+      return result;
+   }
 
    private static List<Pair<TransportConfiguration, TransportConfiguration>> convertToConnectorPairs(final Object[] liveConnectorsTransportClassNames,
                                                                                                      final Object[] liveConnectorTransportParams,
@@ -682,7 +725,116 @@
          blockOnIO();
       }
    }
+   
+   public String listConnectionsAsJSON() throws Exception
+   {
+      checkStarted();
 
+      clearIO();
+
+      try
+      {
+         JSONArray array = new JSONArray();
+         
+         Set<RemotingConnection> connections = server.getHornetQServer().getRemotingService().getConnections();
+
+         Set<ServerSession> sessions = server.getHornetQServer().getSessions();
+         
+         Map<Object, ServerSession> jmsSessions = new HashMap<Object, ServerSession>();
+
+         for (ServerSession session : sessions)
+         {
+            if (session.getMetaData("jms-session") != null)
+            {
+               jmsSessions.put(session.getConnectionID(), session);
+            }
+         }
+
+         for (RemotingConnection connection : connections)
+         {
+            JSONObject obj = new JSONObject();
+            obj.put("connectionID", connection.getID().toString());
+            obj.put("clientAddress", connection.getRemoteAddress());
+            obj.put("creationTime", connection.getCreationTime());
+            obj.put("clientID", jmsSessions.get(connection.getID()).getMetaData("jms-client-id"));
+            obj.put("principal", jmsSessions.get(connection.getID()).getUsername());
+            array.put(obj);
+         }
+         return array.toString();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   public String listConsumersAsJSON(String connectionID) throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         JSONArray array = new JSONArray();
+         
+         Set<RemotingConnection> connections = server.getHornetQServer().getRemotingService().getConnections();
+         for (RemotingConnection connection : connections)
+         {
+            if (connectionID.equals(connection.getID().toString()))
+            {
+               List<ServerSession> sessions = server.getHornetQServer().getSessions(connectionID);
+               for (ServerSession session : sessions)
+               {
+                  Set<ServerConsumer> consumers = session.getServerConsumers();
+                  for (ServerConsumer consumer : consumers)
+                  {
+                     JSONObject obj = new JSONObject();
+                     obj.put("consumerID", consumer.getID());
+                     obj.put("connectionID", connectionID);
+                     obj.put("queueName", consumer.getQueue().getName().toString());
+                     obj.put("browseOnly", consumer.isBrowseOnly());
+                     obj.put("creationTime", consumer.getCreationTime());
+                     // JMS consumer with message filter use the queue's filter
+                     Filter queueFilter = consumer.getQueue().getFilter();
+                     if (queueFilter != null)
+                     {
+                        obj.put("filter", queueFilter.getFilterString().toString());
+                     }
+                     String[] destinationInfo = determineJMSDestination(consumer.getQueue().getAddress().toString());
+                     if (destinationInfo == null)
+                     {
+                        continue;
+                     }
+                     obj.put("destinationName", destinationInfo[0]);
+                     obj.put("destinationType", destinationInfo[1]);
+                     if (destinationInfo[1].equals("topic")) {
+                        try
+                        {
+                           HornetQDestination.decomposeQueueNameForDurableSubscription(consumer.getQueue().getName().toString());
+                           obj.put("durable", true);
+                        } catch (IllegalArgumentException e)
+                        {
+                           obj.put("durable", false);
+                        }
+                     }
+                     else
+                     {
+                        obj.put("durable", false);
+                     }
+                     array.put(obj);
+                  }
+               }
+            }
+         }
+         return array.toString();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
    public String[] listSessions(final String connectionID) throws Exception
    {
       checkStarted();
@@ -816,4 +968,81 @@
       }
       return list;
    }
+
+   public String[] listTargetDestinations(String sessionID) throws Exception
+   {
+      String[] addresses = server.getHornetQServer().getHornetQServerControl().listTargetAddresses(sessionID);
+      Map<String, DestinationControl> allDests = new HashMap<String, DestinationControl>();
+      
+      Object[] queueControls = server.getHornetQServer().getManagementService().getResources(JMSQueueControl.class);
+      for (int i = 0; i < queueControls.length; i++)
+      {
+         JMSQueueControl queueControl = (JMSQueueControl)queueControls[i];
+         allDests.put(queueControl.getAddress(), queueControl);
+      }
+      
+      Object[] topicControls = server.getHornetQServer().getManagementService().getResources(TopicControl.class);
+      for (int i = 0; i < topicControls.length; i++)
+      {
+         TopicControl topicControl = (TopicControl)topicControls[i];
+         allDests.put(topicControl.getAddress(), topicControl);
+      }
+      
+      List<String> destinations = new ArrayList<String>();
+      for (int i = 0; i < addresses.length; i++)
+      {
+         DestinationControl control = allDests.get(addresses[i]);
+         if (control != null)
+         {
+            destinations.add(control.getAddress());
+         }
+      }
+      return destinations.toArray(new String[0]);
+   }
+
+   public String getLastSentMessageID(String sessionID, String address) throws Exception
+   {
+      ServerSession session = server.getHornetQServer().getSessionByID(sessionID);
+      if (session != null)
+      {
+         return session.getLastSentMessageID(address);
+      }
+      return null;
+   }
+
+   public String getSessionCreationTime(String sessionID) throws Exception
+   {
+      ServerSession session = server.getHornetQServer().getSessionByID(sessionID);
+      if (session != null)
+      {
+         return String.valueOf(session.getCreationTime());
+      }
+      return null;
+   }
+
+   public String listSessionsAsJSON(final String connectionID) throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+      
+      JSONArray array = new JSONArray();
+      try
+      {
+         List<ServerSession> sessions = server.getHornetQServer().getSessions(connectionID);
+         for (ServerSession sess : sessions)
+         {
+            JSONObject obj = new JSONObject();
+            obj.put("sessionID", sess.getName());
+            obj.put("creationTime", sess.getCreationTime());
+            array.put(obj);
+         }
+      }
+      finally
+      {
+         blockOnIO();
+      }
+      return array.toString();
+   }
+
 }

Modified: trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -118,6 +118,28 @@
       return getMessageCount(DurabilityType.ALL);
    }
 
+   public int getDeliveringCount()
+   {
+      List<QueueControl> queues = getQueues(DurabilityType.ALL);
+      int count = 0;
+      for (QueueControl queue : queues)
+      {
+         count += queue.getDeliveringCount();
+      }
+      return count;
+   }
+   
+   public long getMessagesAdded()
+   {
+      List<QueueControl> queues = getQueues(DurabilityType.ALL);
+      int count = 0;
+      for (QueueControl queue : queues)
+      {
+         count += queue.getMessagesAdded();
+      }
+      return count;
+   }
+   
    public int getDurableMessageCount()
    {
       return getMessageCount(DurabilityType.DURABLE);
@@ -315,6 +337,7 @@
          info.put("name", subName);
          info.put("durable", queue.isDurable());
          info.put("messageCount", queue.getMessageCount());
+         info.put("deliveringCount", queue.getDeliveringCount());
          array.put(info);
       }
 

Modified: trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -13,6 +13,7 @@
 
 package org.hornetq.spi.core.protocol;
 
+import java.util.Collection;
 import java.util.List;
 
 import org.hornetq.api.core.HornetQBuffer;
@@ -38,6 +39,11 @@
    Object getID();
 
    /**
+    * Returns the creation time of the Remoting connection
+    */
+   long getCreationTime();
+
+   /**
     * returns a string representation of the remote address of this connection
     *
     * @return the remote address

Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -1390,6 +1390,24 @@
          // TODO Auto-generated method stub
          
       }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.api.core.client.ClientSession#setClientID(java.lang.String)
+       */
+      public void setClientID(String clientID)
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.api.core.client.ClientSession#addMetaData(java.lang.String, java.lang.String)
+       */
+      public void addMetaData(String key, String data) throws HornetQException
+      {
+         // TODO Auto-generated method stub
+         
+      }
    }
 
 }

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -21,11 +21,23 @@
 import javax.jms.ConnectionFactory;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
 
 import junit.framework.Assert;
 
 import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.api.jms.management.JMSConnectionInfo;
+import org.hornetq.api.jms.management.JMSConsumerInfo;
 import org.hornetq.api.jms.management.JMSServerControl;
+import org.hornetq.api.jms.management.JMSSessionInfo;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.logging.Logger;
@@ -35,6 +47,7 @@
 import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.client.HornetQQueue;
 import org.hornetq.jms.server.impl.JMSServerManagerImpl;
 import org.hornetq.tests.integration.management.ManagementControlHelper;
 import org.hornetq.tests.integration.management.ManagementTestBase;
@@ -139,7 +152,300 @@
    {
       doListConnectionIDs(NettyAcceptorFactory.class.getName(), NettyConnectorFactory.class.getName());
    }
+   
+   public void testListConnectionsAsJSONForNetty() throws Exception
+   {
+      doListConnectionsAsJSON(NettyAcceptorFactory.class.getName(), NettyConnectorFactory.class.getName());
+   }
 
+   public void testListConnectionsAsJSONForInVM() throws Exception
+   {
+      doListConnectionsAsJSON(InVMAcceptorFactory.class.getName(), InVMConnectorFactory.class.getName());
+   }
+   
+   public void testListConsumersAsJSON() throws Exception
+   {
+      String queueName = RandomUtil.randomString();
+
+      try
+      {
+         startHornetQServer(NettyAcceptorFactory.class.getName());
+         serverManager.createQueue(false, queueName, null, true, queueName);
+         Queue queue = HornetQJMSClient.createQueue(queueName);
+
+         JMSServerControl control = createManagementControl();
+
+         long startTime = System.currentTimeMillis();
+         
+         String jsonStr = control.listConnectionsAsJSON();
+         assertNotNull(jsonStr);
+         JMSConnectionInfo[] infos = JMSConnectionInfo.from(jsonStr);
+         assertEquals(0, infos.length);
+
+         ConnectionFactory cf1 = JMSUtil.createFactory(NettyConnectorFactory.class.getName(),
+                                                       JMSServerControl2Test.CONNECTION_TTL,
+                                                       JMSServerControl2Test.PING_PERIOD);
+         Connection connection = cf1.createConnection();
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         TemporaryTopic temporaryTopic = session.createTemporaryTopic();
+
+         // create a regular message consumer
+         MessageConsumer consumer = session.createConsumer(queue);
+
+         jsonStr = control.listConnectionsAsJSON();
+         assertNotNull(jsonStr);
+         infos = JMSConnectionInfo.from(jsonStr);
+         assertEquals(1, infos.length);
+         String connectionID = infos[0].getConnectionID();
+         
+         String consJsonStr = control.listConsumersAsJSON(connectionID);
+         assertNotNull(consJsonStr);
+         JMSConsumerInfo[] consumerInfos = JMSConsumerInfo.from(consJsonStr);
+         assertEquals(1, consumerInfos.length);
+         JMSConsumerInfo consumerInfo = consumerInfos[0];
+         assertNotNull(consumerInfo.getConsumerID());
+         assertEquals(connectionID, consumerInfo.getConnectionID());
+         assertEquals(queue.getQueueName(), consumerInfo.getDestinationName());
+         assertEquals("queue", consumerInfo.getDestinationType());
+         assertNull(consumerInfo.getFilter());
+         assertEquals(false, consumerInfo.isBrowseOnly());
+         assertEquals(false, consumerInfo.isDurable());
+         assertTrue(startTime <= consumerInfo.getCreationTime() && consumerInfo.getCreationTime() <= System.currentTimeMillis());
+
+         consumer.close();
+         
+         consJsonStr = control.listConsumersAsJSON(connectionID);
+         assertNotNull(consJsonStr);
+         consumerInfos = JMSConsumerInfo.from(consJsonStr);
+         assertEquals(0, consumerInfos.length);
+
+         // create a queue browser
+         QueueBrowser browser = session.createBrowser(queue);
+         // the server resources are created when the browser starts enumerating
+         browser.getEnumeration();
+         
+         consJsonStr = control.listConsumersAsJSON(connectionID);
+         assertNotNull(consJsonStr);
+         System.out.println(consJsonStr);
+         consumerInfos = JMSConsumerInfo.from(consJsonStr);
+         assertEquals(1, consumerInfos.length);
+         consumerInfo = consumerInfos[0];
+         assertNotNull(consumerInfo.getConsumerID());
+         assertEquals(connectionID, consumerInfo.getConnectionID());
+         assertEquals(queue.getQueueName(), consumerInfo.getDestinationName());
+         assertEquals("queue", consumerInfo.getDestinationType());
+         assertNull(consumerInfo.getFilter());
+         assertEquals(true, consumerInfo.isBrowseOnly());
+         assertEquals(false, consumerInfo.isDurable());
+         assertTrue(startTime <= consumerInfo.getCreationTime() && consumerInfo.getCreationTime() <= System.currentTimeMillis());
+
+         browser.close();
+         
+         // create a regular consumer w/ filter on a temp topic
+         String filter = "color = 'red'";
+         consumer = session.createConsumer(temporaryTopic, filter);
+         
+         consJsonStr = control.listConsumersAsJSON(connectionID);
+         assertNotNull(consJsonStr);
+         System.out.println(consJsonStr);
+         consumerInfos = JMSConsumerInfo.from(consJsonStr);
+         assertEquals(1, consumerInfos.length);
+         consumerInfo = consumerInfos[0];
+         assertNotNull(consumerInfo.getConsumerID());
+         assertEquals(connectionID, consumerInfo.getConnectionID());
+         assertEquals(temporaryTopic.getTopicName(), consumerInfo.getDestinationName());
+         assertEquals("temptopic", consumerInfo.getDestinationType());
+         assertEquals(filter, consumerInfo.getFilter());
+         assertEquals(false, consumerInfo.isBrowseOnly());
+         assertEquals(false, consumerInfo.isDurable());
+         assertTrue(startTime <= consumerInfo.getCreationTime() && consumerInfo.getCreationTime() <= System.currentTimeMillis());
+
+         consumer.close();
+
+         connection.close();
+      }
+      finally
+      {
+         if (serverManager != null)
+         {
+            serverManager.destroyQueue(queueName);
+            serverManager.stop();
+         }
+
+         if (server != null)
+         {
+            server.stop();
+         }
+      }
+   }
+
+   /**
+    * test for durable subscriber
+    */
+   public void testListConsumersAsJSON2() throws Exception
+   {
+      String topicName = RandomUtil.randomString();
+      String clientID = RandomUtil.randomString();
+      String subName = RandomUtil.randomString();
+
+      try
+      {
+         startHornetQServer(NettyAcceptorFactory.class.getName());
+         serverManager.createTopic(false, topicName, topicName);
+         Topic topic = HornetQJMSClient.createTopic(topicName);
+         
+         JMSServerControl control = createManagementControl();
+
+         long startTime = System.currentTimeMillis();
+         
+         String jsonStr = control.listConnectionsAsJSON();
+         assertNotNull(jsonStr);
+         JMSConnectionInfo[] infos = JMSConnectionInfo.from(jsonStr);
+         assertEquals(0, infos.length);
+
+         ConnectionFactory cf1 = JMSUtil.createFactory(NettyConnectorFactory.class.getName(),
+                                                       JMSServerControl2Test.CONNECTION_TTL,
+                                                       JMSServerControl2Test.PING_PERIOD);
+         Connection connection = cf1.createConnection();
+         connection.setClientID(clientID);
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         // create a durable subscriber
+         MessageConsumer consumer = session.createDurableSubscriber(topic, subName);
+         
+         jsonStr = control.listConnectionsAsJSON();
+         assertNotNull(jsonStr);
+         infos = JMSConnectionInfo.from(jsonStr);
+         assertEquals(1, infos.length);
+         String connectionID = infos[0].getConnectionID();
+         
+         String consJsonStr = control.listConsumersAsJSON(connectionID);
+         assertNotNull(consJsonStr);
+         JMSConsumerInfo[] consumerInfos = JMSConsumerInfo.from(consJsonStr);
+         assertEquals(1, consumerInfos.length);
+         JMSConsumerInfo consumerInfo = consumerInfos[0];
+         assertNotNull(consumerInfo.getConsumerID());
+         assertEquals(connectionID, consumerInfo.getConnectionID());
+         assertEquals(topic.getTopicName(), consumerInfo.getDestinationName());
+         assertEquals("topic", consumerInfo.getDestinationType());
+         assertNull(consumerInfo.getFilter());
+         assertEquals(false, consumerInfo.isBrowseOnly());
+         assertEquals(true, consumerInfo.isDurable());
+         assertTrue(startTime <= consumerInfo.getCreationTime() && consumerInfo.getCreationTime() <= System.currentTimeMillis());
+
+         consumer.close();
+
+         connection.close();
+      }
+      finally
+      {
+         if (serverManager != null)
+         {
+            serverManager.destroyTopic(topicName);
+            serverManager.stop();
+         }
+
+         if (server != null)
+         {
+            server.stop();
+         }
+      }
+   }
+   
+   //https://jira.jboss.org/browse/HORNETQ-416
+   public void testProducerInfo() throws Exception
+   {
+      String queueName = RandomUtil.randomString();
+
+      System.out.println("queueName is: " + queueName);
+      
+      try
+      {
+         startHornetQServer(NettyAcceptorFactory.class.getName());
+         serverManager.createQueue(false, queueName, null, true, queueName);
+         Queue queue = HornetQJMSClient.createQueue(queueName);
+
+         JMSServerControl control = createManagementControl();
+
+         long startTime = System.currentTimeMillis();
+
+         ConnectionFactory cf1 = JMSUtil.createFactory(NettyConnectorFactory.class.getName(),
+                                                       JMSServerControl2Test.CONNECTION_TTL,
+                                                       JMSServerControl2Test.PING_PERIOD);
+         Connection connection = cf1.createConnection();
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageProducer producer = session.createProducer(queue);
+         
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage msg = session.createTextMessage("mymessage-" + i);
+            producer.send(msg);
+         }
+
+         connection.start();
+         
+         // create a regular message consumer
+         MessageConsumer consumer = session.createConsumer(queue);
+
+         TextMessage receivedMsg = null;
+         for (int i = 0; i < 10; i++)
+         {
+            receivedMsg = (TextMessage)consumer.receive(3000);
+            System.out.println("receiveMsg: " + receivedMsg);
+         }
+         
+         String lastMsgID = receivedMsg.getJMSMessageID();
+         System.out.println("Last mid: " + lastMsgID);
+         
+         String jsonStr = control.listConnectionsAsJSON();
+         JMSConnectionInfo[] infos = JMSConnectionInfo.from(jsonStr);
+         
+         JMSConnectionInfo connInfo = infos[0];
+         
+         String sessionsStr = control.listSessionsAsJSON(connInfo.getConnectionID());
+         JMSSessionInfo[] sessInfos = JMSSessionInfo.from(sessionsStr);
+         
+         assertTrue(sessInfos.length > 0);
+         boolean lastMsgFound = false;
+         for (JMSSessionInfo sInfo : sessInfos)
+         {
+            System.out.println("Session name: " + sInfo.getSessionID());
+            assertNotNull(sInfo.getSessionID());
+            long createTime = sInfo.getCreationTime();
+            assertTrue(startTime <= createTime && createTime <= System.currentTimeMillis());
+            String lastID = control.getLastSentMessageID(sInfo.getSessionID(), "jms.queue." + queueName);
+            if (lastID != null)
+            {
+               assertEquals(lastMsgID, lastID);
+               lastMsgFound = true;
+            }
+         }
+         assertTrue(lastMsgFound);
+
+         consumer.close();
+
+         connection.close();
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         throw e;
+      }
+      finally
+      {
+         if (serverManager != null)
+         {
+            serverManager.destroyQueue(queueName);
+            serverManager.stop();
+         }
+
+         if (server != null)
+         {
+            server.stop();
+         }
+      }
+   }
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -207,6 +513,95 @@
       }
    }
 
+   private void doListConnectionsAsJSON(final String acceptorFactory, final String connectorFactory) throws Exception
+   {
+      try
+      {
+         startHornetQServer(acceptorFactory);
+
+         JMSServerControl control = createManagementControl();
+
+         long startTime = System.currentTimeMillis();
+         
+         String jsonStr = control.listConnectionsAsJSON();
+         assertNotNull(jsonStr);
+         JMSConnectionInfo[] infos = JMSConnectionInfo.from(jsonStr);
+         assertEquals(0, infos.length);
+
+         ConnectionFactory cf1 = JMSUtil.createFactory(connectorFactory,
+                                                       JMSServerControl2Test.CONNECTION_TTL,
+                                                       JMSServerControl2Test.PING_PERIOD);
+         Connection connection = cf1.createConnection();
+
+         jsonStr = control.listConnectionsAsJSON();
+         assertNotNull(jsonStr);
+         infos = JMSConnectionInfo.from(jsonStr);
+         assertEquals(1, infos.length);
+         for (JMSConnectionInfo info : infos)
+         {
+            assertNotNull(info.getConnectionID());
+            assertNotNull(info.getClientAddress());
+            assertTrue(startTime <= info.getCreationTime() && info.getCreationTime() <= System.currentTimeMillis());
+         }
+
+         ConnectionFactory cf2 = JMSUtil.createFactory(connectorFactory,
+                                                       JMSServerControl2Test.CONNECTION_TTL,
+                                                       JMSServerControl2Test.PING_PERIOD);
+         Connection connection2 = cf2.createConnection();
+
+         jsonStr = control.listConnectionsAsJSON();
+         assertNotNull(jsonStr);
+         infos = JMSConnectionInfo.from(jsonStr);
+         assertEquals(2, infos.length);
+         for (JMSConnectionInfo info : infos)
+         {
+            assertNotNull(info.getConnectionID());
+            assertNotNull(info.getClientAddress());
+            assertTrue(startTime <= info.getCreationTime() && info.getCreationTime() <= System.currentTimeMillis());
+            assertNull(info.getClientID());
+            assertNull(info.getUsername());
+         }
+
+         connection.close();
+
+         waitForConnectionIDs(1, control);
+
+         connection2.close();
+
+         waitForConnectionIDs(0, control);
+         
+         Connection connection3 = cf2.createConnection("guest", "guest");
+         connection3.setClientID("MyClient");
+         
+         jsonStr = control.listConnectionsAsJSON();
+         assertNotNull(jsonStr);
+         
+         infos = JMSConnectionInfo.from(jsonStr);
+         JMSConnectionInfo info = infos[0];
+         assertEquals("MyClient", info.getClientID());
+         assertEquals("guest", info.getUsername());
+         
+         connection3.close();
+         
+         jsonStr = control.listConnectionsAsJSON();
+         assertNotNull(jsonStr);
+         infos = JMSConnectionInfo.from(jsonStr);
+         assertEquals(0, infos.length);
+      }
+      finally
+      {
+         if (serverManager != null)
+         {
+            serverManager.stop();
+         }
+
+         if (server != null)
+         {
+            server.stop();
+         }
+      }
+   }
+   
    private void waitForConnectionIDs(final int num, final JMSServerControl control) throws Exception
    {
       final long timeout = 10000;

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -21,14 +21,12 @@
 import javax.jms.Session;
 
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.management.Parameter;
 import org.hornetq.api.core.management.ResourceNames;
 import org.hornetq.api.jms.HornetQJMSClient;
 import org.hornetq.api.jms.management.JMSServerControl;
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
 import org.hornetq.core.security.Role;
 import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.client.HornetQDestination;
 import org.hornetq.jms.client.HornetQQueue;
 
 /**
@@ -233,6 +231,16 @@
          {
             return (String[])proxy.invokeOperation("listConnectionIDs");
          }
+         
+         public String listConnectionsAsJSON() throws Exception
+         {
+            return (String)proxy.invokeOperation("listConnectionsAsJSON");
+         }
+         
+         public String listConsumersAsJSON(String connectionID) throws Exception
+         {
+            return (String)proxy.invokeOperation("listConsumersAsJSON", connectionID);
+         }
 
          public String[] listRemoteAddresses() throws Exception
          {
@@ -275,6 +283,27 @@
             return (Boolean)proxy.invokeOperation("createTopic", name, jndiBinding);
          }
 
+         public String[] listTargetDestinations(String sessionID) throws Exception
+         {
+            return null;
+         }
+
+         public String getLastSentMessageID(String sessionID, String address) throws Exception
+         {
+            return null;
+         }
+
+         public String getSessionCreationTime(String sessionID) throws Exception
+         {
+            return null;
+         }
+
+         public String listSessionsAsJSON(String connectionID) throws Exception
+         {
+            // TODO Auto-generated method stub
+            return null;
+         }
+
          public String listPreparedTransactionDetailsAsJSON() throws Exception
          {
             return (String)proxy.invokeOperation("listPreparedTransactionDetailsAsJSON");

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -78,26 +78,36 @@
    }
 
    static MessageConsumer createConsumer(final Connection connection,
+                                         final Destination destination) throws JMSException
+   {
+      return createConsumer(connection, destination, Session.AUTO_ACKNOWLEDGE);
+   }
+   
+   static MessageConsumer createConsumer(final Connection connection,
                                          final Destination destination,
-                                         final String connectorFactory) throws JMSException
+                                         int ackMode) throws JMSException
    {
-      Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Session s = connection.createSession(false, ackMode);
 
       return s.createConsumer(destination);
    }
 
-   public static MessageConsumer createConsumer(final Connection connection, final Destination destination) throws JMSException
+   static TopicSubscriber createDurableSubscriber(final Connection connection,
+                                                  final Topic topic,
+                                                  final String clientID,
+                                                  final String subscriptionName) throws JMSException
    {
-      return JMSUtil.createConsumer(connection, destination, InVMConnectorFactory.class.getName());
+      return createDurableSubscriber(connection, topic, clientID, subscriptionName, Session.AUTO_ACKNOWLEDGE);
    }
-
+   
    static TopicSubscriber createDurableSubscriber(final Connection connection,
                                                   final Topic topic,
                                                   final String clientID,
-                                                  final String subscriptionName) throws JMSException
+                                                  final String subscriptionName,
+                                                  final int ackMode) throws JMSException
    {
       connection.setClientID(clientID);
-      Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Session s = connection.createSession(false, ackMode);
 
       return s.createDurableSubscriber(topic, subscriptionName);
    }

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -13,15 +13,22 @@
 
 package org.hornetq.tests.integration.jms.server.management;
 
+import static junit.framework.Assert.assertEquals;
+
 import java.util.Map;
 
 import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.Session;
 import javax.jms.TopicSubscriber;
 
 import junit.framework.Assert;
 
+import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.management.QueueControl;
 import org.hornetq.api.jms.HornetQJMSClient;
 import org.hornetq.api.jms.management.SubscriptionInfo;
 import org.hornetq.api.jms.management.TopicControl;
@@ -406,7 +413,77 @@
       {
       }
    }
+   
+   public void testGetMessagesAdded() throws Exception
+   {
+      Connection connection_1 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      JMSUtil.createConsumer(connection_1, topic);
+      Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
+      Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
 
+      TopicControl topicControl = createManagementControl();
+
+      Assert.assertEquals(0, topicControl.getMessagesAdded());
+
+      JMSUtil.sendMessages(topic, 2);
+
+      Assert.assertEquals(3 * 2, topicControl.getMessagesAdded());
+
+      connection_1.close();
+      connection_2.close();
+      connection_3.close();
+   }
+   
+   public void testGetMessagesDelivering() throws Exception
+   {
+      Connection connection_1 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      MessageConsumer cons_1 = JMSUtil.createConsumer(connection_1, topic, Session.CLIENT_ACKNOWLEDGE);
+      Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      MessageConsumer cons_2 = JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName, Session.CLIENT_ACKNOWLEDGE);
+      Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2", Session.CLIENT_ACKNOWLEDGE);
+
+      TopicControl topicControl = createManagementControl();
+
+      assertEquals(0, topicControl.getDeliveringCount());
+
+      JMSUtil.sendMessages(topic, 2);
+
+      assertEquals(0, topicControl.getDeliveringCount());
+      
+      connection_1.start();
+      connection_2.start();
+      connection_3.start();
+
+      Message msg_1 = null;
+      Message msg_2 = null;
+      Message msg_3 = null;
+      for (int i = 0; i < 2; i++)
+      {
+         msg_1 = cons_1.receive(5000);
+         assertNotNull(msg_1);
+         msg_2 = cons_2.receive(5000);
+         assertNotNull(msg_2);
+         msg_3 = cons_3.receive(5000);         
+         assertNotNull(msg_3);
+      }
+
+      assertEquals(3 * 2, topicControl.getDeliveringCount());
+
+      msg_1.acknowledge();
+      assertEquals(2 * 2, topicControl.getDeliveringCount());
+      msg_2.acknowledge();
+      assertEquals(1 * 2, topicControl.getDeliveringCount());
+      msg_3.acknowledge();
+      assertEquals(0, topicControl.getDeliveringCount());
+      
+      connection_1.close();
+      connection_2.close();
+      connection_3.close();
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -13,9 +13,12 @@
 
 package org.hornetq.tests.integration.jms.server.management;
 
+import static junit.framework.Assert.assertEquals;
 import static org.hornetq.tests.util.RandomUtil.randomString;
 
 import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.QueueConnection;
 import javax.jms.QueueSession;
 import javax.jms.Session;
@@ -26,6 +29,7 @@
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.management.ResourceNames;
 import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.api.jms.management.TopicControl;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
@@ -333,6 +337,71 @@
       }
    }
 
+   public void testGetMessagesAdded() throws Exception
+   {
+      Connection connection_1 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      JMSUtil.createConsumer(connection_1, topic);
+      Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
+      Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+
+      assertEquals(0, proxy.retrieveAttributeValue("messagesAdded"));
+
+      JMSUtil.sendMessages(topic, 2);
+
+      assertEquals(3 * 2, proxy.retrieveAttributeValue("messagesAdded"));
+
+      connection_1.close();
+      connection_2.close();
+      connection_3.close();
+   }
+   
+   public void testGetMessagesDelivering() throws Exception
+   {
+      Connection connection_1 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      MessageConsumer cons_1 = JMSUtil.createConsumer(connection_1, topic, Session.CLIENT_ACKNOWLEDGE);
+      Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      MessageConsumer cons_2 = JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName, Session.CLIENT_ACKNOWLEDGE);
+      Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2", Session.CLIENT_ACKNOWLEDGE);
+
+      assertEquals(0, proxy.retrieveAttributeValue("deliveringCount"));
+
+      JMSUtil.sendMessages(topic, 2);
+
+      assertEquals(0, proxy.retrieveAttributeValue("deliveringCount"));
+      
+      connection_1.start();
+      connection_2.start();
+      connection_3.start();
+
+      Message msg_1 = null;
+      Message msg_2 = null;
+      Message msg_3 = null;
+      for (int i = 0; i < 2; i++)
+      {
+         msg_1 = cons_1.receive(5000);
+         assertNotNull(msg_1);
+         msg_2 = cons_2.receive(5000);
+         assertNotNull(msg_2);
+         msg_3 = cons_3.receive(5000);         
+         assertNotNull(msg_3);
+      }
+
+      assertEquals(3 * 2, proxy.retrieveAttributeValue("deliveringCount"));
+
+      msg_1.acknowledge();
+      assertEquals(2 * 2, proxy.retrieveAttributeValue("deliveringCount"));
+      msg_2.acknowledge();
+      assertEquals(1 * 2, proxy.retrieveAttributeValue("deliveringCount"));
+      msg_3.acknowledge();
+      assertEquals(0, proxy.retrieveAttributeValue("deliveringCount"));
+      
+      connection_1.close();
+      connection_2.close();
+      connection_3.close();
+   }
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2010-11-08 14:22:13 UTC (rev 9852)
@@ -348,6 +348,12 @@
       return name;
    }
 
+   public SimpleString getAddress()
+   {
+      // TODO Auto-generated method stub
+      return null;
+   }
+   
    /* (non-Javadoc)
     * @see org.hornetq.core.server.Queue#getID()
     */



More information about the hornetq-commits mailing list