Author: clebert.suconic(a)jboss.com
Date: 2010-12-27 16:03:03 -0500 (Mon, 27 Dec 2010)
New Revision: 10075
Modified:
trunk/src/main/org/hornetq/api/core/management/HornetQServerControl.java
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
trunk/src/main/org/hornetq/core/server/ServerConsumer.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
Log:
HORNETQ-587 - adding listProducersInfoAsJSON
Modified: trunk/src/main/org/hornetq/api/core/management/HornetQServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/management/HornetQServerControl.java 2010-12-27
18:42:39 UTC (rev 10074)
+++ trunk/src/main/org/hornetq/api/core/management/HornetQServerControl.java 2010-12-27
21:03:03 UTC (rev 10075)
@@ -13,8 +13,6 @@
package org.hornetq.api.core.management;
-import java.util.List;
-
import javax.management.MBeanOperationInfo;
import org.hornetq.api.core.HornetQException;
@@ -481,6 +479,8 @@
*/
@Operation(desc = "List all the connection IDs", impact =
MBeanOperationInfo.INFO)
String[] listConnectionIDs() throws Exception;
+
+ String listProducersInfoAsJSON() throws Exception;
/**
* Lists all the sessions IDs for the specified connection ID.
Modified: trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2010-12-27
18:42:39 UTC (rev 10074)
+++
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2010-12-27
21:03:03 UTC (rev 10075)
@@ -1258,7 +1258,25 @@
blockOnIO();
}
}
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.api.core.management.HornetQServerControl#listProducersInfoAsJSON()
+ */
+ public String listProducersInfoAsJSON() throws Exception
+ {
+ JSONArray producers = new JSONArray();
+
+
+ for (ServerSession session : server.getSessions())
+ {
+ session.describeProducersInfo(producers);
+ }
+
+ return producers.toString();
+ }
+
+
public Object[] getConnectors() throws Exception
{
checkStarted();
Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2010-12-27
18:42:39 UTC (rev 10074)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2010-12-27
21:03:03 UTC (rev 10075)
@@ -813,6 +813,7 @@
JSONObject obj = new JSONObject();
obj.put("consumerID", serverConsumer.getID());
obj.put("connectionID",
serverConsumer.getConnectionID().toString());
+ obj.put("sessionID", serverConsumer.getSessionID());
obj.put("browseOnly", serverConsumer.isBrowseOnly());
obj.put("creationTime", serverConsumer.getCreationTime());
Modified: trunk/src/main/org/hornetq/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2010-12-27 18:42:39 UTC
(rev 10074)
+++ trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2010-12-27 21:03:03 UTC
(rev 10075)
@@ -53,6 +53,8 @@
boolean isBrowseOnly();
long getCreationTime();
+
+ String getSessionID();
}
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-12-27 18:42:39 UTC (rev
10074)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-12-27 21:03:03 UTC (rev
10075)
@@ -15,11 +15,15 @@
import java.util.List;
import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.xa.Xid;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.message.impl.MessageInternal;
+import org.hornetq.utils.json.JSONArray;
/**
*
@@ -120,6 +124,14 @@
String getMetaData(String key);
String[] getTargetAddresses();
+
+ /**
+ * Add all the producers detail to the JSONArray object.
+ * This is a method to be used by the management layer.
+ * @param objs
+ * @throws Exception
+ */
+ void describeProducersInfo(JSONArray objs) throws Exception;
String getLastSentMessageID(String address);
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-12-27
18:42:39 UTC (rev 10074)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-12-27
21:03:03 UTC (rev 10075)
@@ -206,6 +206,11 @@
{
return this.session.getConnectionID().toString();
}
+
+ public String getSessionID()
+ {
+ return this.session.getName();
+ }
public HandleStatus handle(final MessageReference ref) throws Exception
{
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-12-27 18:42:39
UTC (rev 10074)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-12-27 21:03:03
UTC (rev 10075)
@@ -24,12 +24,14 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.xa.XAException;
import javax.transaction.xa.Xid;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.core.client.impl.ClientMessageImpl;
@@ -69,6 +71,8 @@
import org.hornetq.spi.core.protocol.SessionCallback;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUID;
+import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONObject;
/*
* Session implementation
@@ -144,7 +148,7 @@
private Map<String, String> metaData;
// Session's usage should be by definition single threaded, hence it's not
needed to use a concurrentHashMap here
- private Map<SimpleString, UUID> targetAddressInfos = new
HashMap<SimpleString, UUID>();
+ private Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new
HashMap<SimpleString, Pair<UUID, AtomicLong>>();
private long creationTime = System.currentTimeMillis();
@@ -1072,7 +1076,80 @@
consumer.setTransferring(transferring);
}
}
+
+ public void addMetaData(String key, String data)
+ {
+ if (metaData == null)
+ {
+ metaData = new HashMap<String, String>();
+ }
+ metaData.put(key, data);
+ }
+
+ public String getMetaData(String key)
+ {
+ String data = null;
+ if (metaData != null)
+ {
+ data = metaData.get(key);
+ }
+ return data;
+ }
+
+ public String[] getTargetAddresses()
+ {
+ Map<SimpleString, Pair<UUID, AtomicLong>> copy =
cloneTargetAddresses();
+ Iterator<SimpleString> iter = copy.keySet().iterator();
+ int num = copy.keySet().size();
+ String[] addresses = new String[num];
+ int i = 0;
+ while (iter.hasNext())
+ {
+ addresses[i] = iter.next().toString();
+ i++;
+ }
+ return addresses;
+ }
+
+ public String getLastSentMessageID(String address)
+ {
+ Pair<UUID, AtomicLong> value =
targetAddressInfos.get(SimpleString.toSimpleString(address));
+ if (value != null)
+ {
+ return value.a.toString();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public long getCreationTime()
+ {
+ return this.creationTime;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerSession#getProducersInfoJSON()
+ */
+ public void describeProducersInfo(JSONArray array) throws Exception
+ {
+ Map<SimpleString, Pair<UUID, AtomicLong>> targetCopy =
cloneTargetAddresses();
+
+ for (Map.Entry<SimpleString, Pair<UUID, AtomicLong>> entry :
targetCopy.entrySet())
+ {
+ JSONObject producerInfo = new JSONObject();
+ producerInfo.put("connectionID", this.getConnectionID().toString());
+ producerInfo.put("sessionID", this.getName());
+ producerInfo.put("destination", entry.getKey().toString());
+ producerInfo.put("lastUUIDSent", entry.getValue().a);
+ producerInfo.put("msgSent", entry.getValue().b.longValue());
+ array.put(producerInfo);
+ }
+ }
+
+
// FailureListener implementation
// --------------------------------------------------------------------
@@ -1099,6 +1176,11 @@
// Private
// ----------------------------------------------------------------------------
+ private Map<SimpleString, Pair<UUID, AtomicLong>> cloneTargetAddresses()
+ {
+ return new HashMap<SimpleString, Pair<UUID,
AtomicLong>>(targetAddressInfos);
+ }
+
private void setStarted(final boolean s)
{
Set<ServerConsumer> consumersClone = new
HashSet<ServerConsumer>(consumers.values());
@@ -1196,57 +1278,18 @@
postOffice.route(msg, routingContext, direct);
- targetAddressInfos.put(msg.getAddress(), msg.getUserID());
-
- routingContext.clear();
- }
-
- public void addMetaData(String key, String data)
- {
- if (metaData == null)
+ Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress());
+
+ if (value == null)
{
- metaData = new HashMap<String, String>();
+ targetAddressInfos.put(msg.getAddress(), new
Pair<UUID,AtomicLong>(msg.getUserID(), new AtomicLong(1)));
}
- metaData.put(key, data);
- }
-
- public String getMetaData(String key)
- {
- String data = null;
- if (metaData != null)
+ else
{
- data = metaData.get(key);
+ value.a = msg.getUserID();
+ value.b.incrementAndGet();
}
- return data;
- }
-
- public String[] getTargetAddresses()
- {
- Map<SimpleString, UUID> copy = new HashMap<SimpleString,
UUID>(targetAddressInfos);
- Iterator<SimpleString> iter = copy.keySet().iterator();
- int num = copy.keySet().size();
- String[] addresses = new String[num];
- int i = 0;
- while (iter.hasNext())
- {
- addresses[i] = iter.next().toString();
- i++;
- }
- return addresses;
- }
- public String getLastSentMessageID(String address)
- {
- UUID id = targetAddressInfos.get(SimpleString.toSimpleString(address));
- if (id != null)
- {
- return id.toString();
- }
- return null;
+ routingContext.clear();
}
-
- public long getCreationTime()
- {
- return this.creationTime;
- }
}
Modified: trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-12-27
18:42:39 UTC (rev 10074)
+++ trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-12-27
21:03:03 UTC (rev 10075)
@@ -851,6 +851,7 @@
JSONObject obj = new JSONObject();
obj.put("consumerID", consumer.getID());
obj.put("connectionID", consumer.getConnectionID());
+ obj.put("sessionID", consumer.getSessionID());
obj.put("queueName", consumer.getQueue().getName().toString());
obj.put("browseOnly", consumer.isBrowseOnly());
obj.put("creationTime", consumer.getCreationTime());
Modified:
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java 2010-12-27
18:42:39 UTC (rev 10074)
+++
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java 2010-12-27
21:03:03 UTC (rev 10075)
@@ -13,9 +13,7 @@
package org.hornetq.tests.integration.management;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import javax.transaction.xa.XAResource;
@@ -37,7 +35,6 @@
import org.hornetq.api.core.management.DivertControl;
import org.hornetq.api.core.management.HornetQServerControl;
import org.hornetq.api.core.management.ObjectNameBuilder;
-import org.hornetq.api.core.management.Parameter;
import org.hornetq.api.core.management.QueueControl;
import org.hornetq.api.core.management.RoleInfo;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
@@ -712,10 +709,16 @@
clientSession.end(xid, XAResource.TMSUCCESS);
clientSession.prepare(xid);
+ HornetQServerControl serverControl = createManagementControl();
+
+ JSONArray jsonArray = new JSONArray(serverControl.listProducersInfoAsJSON());
+
+ assertEquals(1, jsonArray.length());
+ assertEquals(4, ((JSONObject)jsonArray.get(0)).getInt("msgSent"));
+
clientSession.close();
locator.close();
- HornetQServerControl serverControl = createManagementControl();
String txDetails = serverControl.listPreparedTransactionDetailsAsJSON();
Assert.assertTrue(txDetails.matches(".*m1.*"));
Modified:
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java 2010-12-27
18:42:39 UTC (rev 10074)
+++
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java 2010-12-27
21:03:03 UTC (rev 10075)
@@ -13,8 +13,6 @@
package org.hornetq.tests.integration.management;
-import java.util.List;
-
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
@@ -618,38 +616,10 @@
password);
}
- public void createBridge(String name,
- String queueName,
- String forwardingAddress,
- String filterString,
- String transformerClassName,
- long retryInterval,
- double retryIntervalMultiplier,
- int reconnectAttempts,
- boolean useDuplicateDetection,
- int confirmationWindowSize,
- long clientFailureCheckPeriod,
- String discoveryGroupName,
- boolean ha,
- String user,
- String password) throws Exception
+
+ public String listProducersInfoAsJSON() throws Exception
{
- proxy.invokeOperation("createBridge",
- name,
- queueName,
- forwardingAddress,
- filterString,
- transformerClassName,
- retryInterval,
- retryIntervalMultiplier,
- reconnectAttempts,
- useDuplicateDetection,
- confirmationWindowSize,
- clientFailureCheckPeriod,
- discoveryGroupName,
- ha,
- user,
- password);
+ return (String)proxy.invokeOperation("listProducersInfoAsJSON");
}
};
}